SQLAlchemyのRawクエリーを使う


【最終更新日】2023-05-03

PythonバッチスクリプトでSQLAlchemyのRawクエリーを使って登録済みデータを取得しJSON化する方法について解説します。

※著者は残念ながらSQLAlchemyに関してはその道のエキスパートでないため検索処理はORM標準の方法ではなくRAWクエリーを直接実行する方法を採用しています。 本リポジトリは健康管理データの可視化が目的なので開発に割く工数にはメリハリをつけて実装しています。ORM標準での検索方法を学習するコストもかかります。

【参考URL】
【SQLAlchemyのソース】
1.検索バッチアプリケーション用プロジェクト
プロジェクトのクラス構成
healthcare/
    ├── GetHealthcareDataWithSelector.py  // 登録済みデータ取得バッチスクリプト
    ├── conf                              // データベース接続情報
    │   ├── db_healthcare.json            //  (1)健康管理データベース用
    │   └── db_sensors.json               //  (2)気象センサーデータベース用 
    └── dao
         ├── __init__.py                  //  健康管理データベーススキーマ定義
         └── queries.py                   //  RAW クエリを実行して辞書オブジェクトを生成するクラス
2.RAW クエリを実行して辞書オブジェクトを生成するクラス
(1) datetime.date型, datetime.time型をAndroidアプリ用の文字列に変換する関数
def _datetime_to_str(src_dict: dict) -> Dict:
    """
    ソース辞書の中のdatetime.date型とdatetime.time型のデータを文字列型に変換する
    :param src_dict: ソース辞書
    :return: 変換後の新たな辞書
    """
    conv_dict = {}
    for key, val in zip(src_dict.keys(), src_dict.values()):
        if isinstance(val, date):
            # "年(4桁)-月(2桁)-日(2桁)'
            # (例)  {date} 2023-03-01 -> {str} '2023-03-01'
            conv_dict[key] = val.isoformat()
        elif isinstance(val, time):
            # "時:分" ※秒は不要: Androidアプリ側の精度が分まで
            # (例) {time} 05:55:00 -> {str} '05:55'
            conv_dict[key] = val.strftime("%H:%M")
        else:
            conv_dict[key] = val
    return conv_dict
【テーブルの時刻フィールドをAndroidアプリ用文字列に変換】
(2) SELECT項目を辞書オブジェクトに変換するためのコンテナクラス

名前付きフィールドを持つタプルのサブクラスを作成するとテーブルから取得した値を簡単に辞書オブジェクトに変換できます。 ポイントはタプルのフィールド名をJSONのプロパティ名に一致させるだけです。

class Selector:
    # 1.健康管理データベース
    # 1-1.睡眠管理データ
    _SleepManagement = namedtuple('SleepManagement', [
        'wakeupTime','sleepScore','sleepingTime','deepSleepingTime']
    )
    # 1-2.血圧測定データ
    _BloodPressure = namedtuple('BloodPressure', [
        'morningMeasurementTime','morningMax','morningMin','morningPulseRate',
        'eveningMeasurementTime', 'eveningMax', 'eveningMin', 'eveningPulseRate']
    )
    # 1-3.頻尿要因データ
    _NocturiaFactors = namedtuple('NocturiaFactors', [
        'midnightToiletVisits','hasCoffee','hasTea','hasAlcohol','hasNutritionDrink',
        'hasSportsDrink','hasDiuretic','takeMedicine','takeBathing','conditionMemo']
    )
    # 1-4.歩数データ
    _WalkingCount = namedtuple('WalkingCount', ['counts'])
    # 1-5.体温管理データ
    _BodyTemperature = namedtuple('_BodyTemperature', ['measurementTime', 'temperature'])

    # 2.気象データベース.天候データ
    _WeatherCondition = namedtuple('WeatherCondition', ['condition'])
(3) 健康管理データベース検索クエリ定義
_QRY_GET_HEALTHCARE: str = """
SELECT
  sm.measurement_day
  ,wakeup_time --必ず入力する
  ,sleep_score --任意
  ,sleeping_time --任意
  ,deep_sleeping_time --任意
  ,morning_measurement_time --任意
  ,morning_max --任意
  ,morning_min --任意
  ,morning_pulse_rate --任意
  ,evening_measurement_time--任意
  ,evening_max --任意
  ,evening_min --任意
  ,evening_pulse_rate --任意
  ,midnight_toilet_visits --必ず入力する
  ,has_coffee
  ,has_tea
  ,has_alcohol
  ,has_nutrition_drink
  ,has_sports_drink
  ,has_diuretic
  ,take_medicine
  ,take_bathing
  ,condition_memo --任意
  ,counts --必ず入力する
  ,measurement_time --BLEデバイス取得(任意)
  ,temperature -- BLEデバイス取得(任意)
FROM
  bodyhealth.person p
  INNER JOIN bodyhealth.sleep_management sm ON p.id = sm.pid
  INNER JOIN bodyhealth.blood_pressure bp ON p.id = bp.pid
  INNER JOIN bodyhealth.nocturia_factors nf ON p.id = nf.pid
  INNER JOIN bodyhealth.walking_count wc ON p.id = wc.pid
  INNER JOIN bodyhealth.body_temperature bt ON p.id = bt.pid
WHERE    _QRY_GET_HEALTHCARE: str = """
SELECT
  sm.measurement_day
  ,wakeup_time --必ず入力する
  ,sleep_score --任意
  ,sleeping_time --任意
  ,deep_sleeping_time --任意
  ,morning_measurement_time --任意
  ,morning_max --任意
  ,morning_min --任意
  ,morning_pulse_rate --任意
  ,evening_measurement_time--任意
  ,evening_max --任意
  ,evening_min --任意
  ,evening_pulse_rate --任意
  ,midnight_toilet_visits --必ず入力する
  ,has_coffee
  ,has_tea
  ,has_alcohol
  ,has_nutrition_drink
  ,has_sports_drink
  ,has_diuretic
  ,take_medicine
  ,take_bathing
  ,condition_memo --任意
  ,counts --必ず入力する
  ,measurement_time --BLEデバイス取得(任意)
  ,temperature -- BLEデバイス取得(任意)
FROM
  bodyhealth.person p
  INNER JOIN bodyhealth.sleep_management sm ON p.id = sm.pid
  INNER JOIN bodyhealth.blood_pressure bp ON p.id = bp.pid
  INNER JOIN bodyhealth.nocturia_factors nf ON p.id = nf.pid
  INNER JOIN bodyhealth.walking_count wc ON p.id = wc.pid
  INNER JOIN bodyhealth.body_temperature bt ON p.id = bt.pid
WHERE
  email=:emailAddress                -- params={"emailAddress": "user1@example.com",}
  AND
  sm.measurement_day=:measurementDay -- params={"measurementDay": "2023-04-01"}
  AND  
  bp.measurement_day=:measurementDay
  AND 
  nf.measurement_day=:measurementDay
  AND
  wc.measurement_day=:measurementDay
  AND
  bt.measurement_day=:measurementDay
"""
【SELECT項目とテーブル用データクラス関連図】
(4) 気象センサーデータベースの天候状態テーブル検索クエリ定義
    _QRY_GET_WEATHER: str = """
SELECT condition FROM weather.weather_condition WHERE measurement_day=:measurementDay
"""
(5) コンストラクタ
    __init__(self, sess_healthcare, sess_sensors, logger: Logger=None):
        # 健康管理DB用セッションオブジェクト
        self.sess_healthcare: scoped_session = sess_healthcare
        # 気象センサーDB用セッションオブジェクト
        self.sess_sensors: scoped_session = sess_sensors
        self.app_logger = app_logger
(6) 健康管理データベース検索結果取得関数
    def get_healthcare_asdict(self, email: str, measurement: str) -> Optional[dict]:
        """
        健康管理DBから指定された主キー項目(メールアドレス,測定日付)のデータの辞書オブジェクトを取得する
        :param email: メールアドレス
        :param measurement: 測定日付
        :return: 健康管理データの辞書オブジェクト, 存在しない場合はNone
        """
        # パラメータ辞書生成: 主キー
        params = {"emailAddress": email, "measurementDay": measurement} # ■ A-1
        try:
            rs: Result = self.sess_healthcare.execute(text(self._QRY_GET_HEALTHCARE), params) # ■ A-2
            row = None
            if rs:
                row = rs.fetchone()                                     # ■ A-3
            self.sess_healthcare.commit()
            if row is None:
                return None

            # 取得したTupleデータを位置引数で引き渡す
            # 先頭の測定日付はスキップする
            # 睡眠管理データ: 4項目
            sleepman = self._SleepManagement(*row[1:5])    # ■ A-4 (1)
            # 血圧データ: 8項目
            bloodpress = self._BloodPressure(*row[5:13])   # ■ A-4 (2)
            # 頻尿要因データ: 10項目
            factors = self._NocturiaFactors(*row[13:23])   # ■ A-4 (3)
            # 歩数データ: 1項目
            walkingcnt = self._WalkingCount(*row[23:24])   # ■ A-4 (4)
            # 体温データ: 2項目
            bodytemper = self._BodyTemperature(*row[24:])  # ■ A-4 (5)
            # ■ A-5 時刻データが含まれるデータ変換
            sleepman_dict: Dict = _datetime_to_str(sleepman._asdict())     # ■ A-5 (1)
            bloodpress_dict: Dict = _datetime_to_str(bloodpress._asdict()) # ■ A-5 (2)
            bodytemper_dict: Dict = _datetime_to_str(bodytemper._asdict()) # ■ A-5 (3)
            # ■ A-6 それ以外は namedtupleからDictオブジェクトに変換する
            factors_dict: Dict = factors._asdict()
            walkingcnt_dict: Dict = walkingcnt._asdict()
            # ■ A-7 各辞書オブジェクトを健康管理データコンテナー用辞書に格納する
            container_dict: Dict = {"sleepManagement": sleepman_dict,
                                    "bloodPressure": bloodpress_dict,
                                    "nocturiaFactors": factors_dict,
                                    "walkingCount": walkingcnt_dict,
                                    "bodyTemperature": bodytemper_dict}
        except sqlalchemy.exc.SQLAlchemyError as err:
            self.sess_healthcare.rollback()
            if self.app_logger:
                self.app_logger.warning(err.args)
            return None
        finally:
            self.sess_healthcare.close()

        # ■ A-8 コンテナーオブジェクトを健康管理データ用辞書に格納する
        return {"healthcareData": container_dict}
【戻り値】デバックコンソールの値 ※見やすくするするため改行を入れています
{'healthcareData': {
  'sleepManagement': {
    'wakeupTime': '05:30', 'sleepScore': 86, 'sleepingTime': '06:50', 'deepSleepingTime': '00:50'
  }, 
  'bloodPressure': {
    'morningMeasurementTime': '06:40', 'morningMax': 122, 'morningMin': 70, 'morningPulseRate': 62,
    'eveningMeasurementTime': '22:10', 'eveningMax': 101, 'eveningMin': 63, 'eveningPulseRate': 67
  }, 
  'nocturiaFactors': {
    'midnightToiletVisits': 1, 'hasCoffee': False, 'hasTea': False, 'hasAlcohol': False, 
    'hasNutritionDrink': True, 'hasSportsDrink': False, 'hasDiuretic': False, 
    'takeMedicine': True, 'takeBathing': True, 'conditionMemo': None
  }, 
  'walkingCount': {
    'counts': 8460
  }, 
  'bodyTemperature': {
    'measurementTime': None, 'temperature': None
  }
}
(6) 気象センサーデータベース検索処理
    def get_weather_asdict(self, measurement: str) -> Optional[dict]:
        """
        指定された日付(主キー)の天候を取得し辞書オブジェクトとして返却する
        :param measurement_day: 日付
        :return: 天候データの辞書オブジェクト, 存在しない場合はNone
        """
        params = {"measurementDay": measurement}
        try:
            rs: Result = self.sess_sensors.execute(text(self._QRY_GET_WEATHER), params)
            row = None
            if rs:
                row = rs.fetchone()
            self.sess_sensors.commit()    
            if row is None:
                return None

            weather_condition = self._WeatherCondition(*row)
            container_dict = {"weatherCondition": weather_condition._asdict()}
        except SQLAlchemyError as err:
            self.sess_sensors.rollback()
            if self.logger:
                self.logger.warning(err.args)
            return None
        finally:
            self.sess_sensors.close()

        return container_dict
【戻り値】デバックコンソールの値
container_dict= {dict:1} {'weatherCondition': {'condition': '曇り'}}
3.登録済みデータ検索メインクラス
import argparse
import logging
from typing import Dict, Optional
import json
import os
import socket

import sqlalchemyimport argparse
import logging
from typing import Dict, Optional
import json
import os
import socket

import sqlalchemy
from sqlalchemy.engine.url import URL
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

from dao.queries import Selector

DB_HEALTHCARE_CONF: str = os.path.join("conf", "db_healthcare.json")
DB_SENSORS_CONF: str = os.path.join("conf", "db_sensors.json")
SAVE_JSON = os.path.join("json_datas", 'out_healthcare_data.json')
LOG_FMT = '%(asctime)s %(filename)s %(funcName)s %(levelname)s %(message)s'
# デバックログ有効
app_logger_debug: bool = True


def save_text(filePath: str, text) -> None:
    with open(filePath, 'w') as fp:
        fp.write(text)


def get_conn_dict(filePath: str) -> dict:
    with open(filePath, 'r') as fp:
        db_conf: json = json.load(fp)
        hostname = socket.gethostname()
        # host in /etc/hosts
        db_conf["host"] = db_conf["host"].format(hostname=hostname)
    return db_conf


if __name__ == '__main__':
    logging.basicConfig(format=LOG_FMT, level=logging.DEBUG)
    app_logger = logging.getLogger(__name__)
    # ■ 1 コマンドラインオプション
    parser: argparse.ArgumentParser = argparse.ArgumentParser()
    # ■ 1-1 検索主キーを取得
    # (1) メールアドレス (例) user1@examples.com
    parser.add_argument("--mail-address", type=str, required=True,
                        help="Healthcare Databae Person mailAddress.")
    # (2) 測定日付 (例) 2023-03-14
    parser.add_argument("--measurement-day", type=str, required=True,
                        help="測定日付 yyyy-MM-dd.")
    # JSON保存パス
    parser.add_argument("--save-path", type=str,
                        help="保存パス ~/Documents/Healthcare/json/output")
    args: argparse.Namespace = parser.parse_args()
    emailAddress: str = args.mail_address
    measurementDay: str = args.measurement_day
    # 保存ファイル名
    jsonName: str = f"out_healthcare_data_{measurementDay}.json"
    # 保存パスは任意
    savePath: Optional[str] = args.save_path
    saveJsonFile = None
    if savePath is not None:
        saveJsonFile = os.path.join(os.path.expanduser(savePath), jsonName)
    else:
        # プロジェクト内のjson_datas
        saveJsonFile = os.path.join("json_datas", jsonName)
    app_logger.info(f"saveJsonFile: {saveJsonFile}")

    # 1-1. 健康管理データベース
    conn_dict: dict = get_conn_dict(DB_HEALTHCARE_CONF)
    conn_url: URL = URL.create(**conn_dict)
    engine_healthcare: sqlalchemy.Engine = create_engine(conn_url, echo=False)
    # Flaskアプリで使うscoped_sessionに合わせるためバッチでもscoped_sessionクラスを生成する
    Cls_sess_healthcare: scoped_session = scoped_session(
        sessionmaker(bind=engine_healthcare)
    )
    app_logger.info(f"Cls_sess_healthcare: {Cls_sess_healthcare}")

    # 1-2. 気象センサデータベース
    conn_sensors: dict = get_conn_dict(DB_SENSORS_CONF)
    conn_url: URL = URL.create(**conn_sensors)
    engine_sensors: sqlalchemy.Engine = create_engine(conn_url, echo=False)
    Cls_sess_sensors: scoped_session = scoped_session(
        sessionmaker(bind=engine_sensors)
    )
    app_logger.info(f"Cls_sess_sensors: {Cls_sess_sensors}")

    # 健康管理DBセッションオブジェクト生成
    sess_healthcare = Cls_sess_healthcare()
    app_logger.info(f"sess_healthcare: {sess_healthcare}")
    # 気象センサーDBセッションオブジェクト生成
    sess_sensors = Cls_sess_sensors()
    app_logger.info(f"sess_sensors: {sess_sensors}")
    # ■ 2 登録済みデータ検索クラスの検索オブジェクトを生成する
    selector = Selector(sess_healthcare, sess_sensors, logger=app_logger)
    # ■ 3 検索オブジェクトの健康管理データ(辞書オブジェクト)取得関数を呼び出す
    healthcare_dict: Dict = selector.get_healthcare_asdict(emailAddress, measurementDay)
    if app_logger_debug:
        app_logger.debug(f"Healthcare: {healthcare_dict}")
    # ■ 4 健康管理データ辞書オブジェクトが存在すれば天候状態を取得する
    if healthcare_dict:
        # ■ 4-1 レスポンス用に入力パラメータのメールアドレスと測定日付を辞書オブジェクトに追加する
        healthcare_dict["emailAddress"] = emailAddress
        healthcare_dict["measurementDay"] = measurementDay

        # ■ 4-2 検索オブジェクトの天候状態データ取得関数を呼び出す
        weather_dict: Dict = selector.get_weather_asdict(measurementDay)
        if app_logger_debug:
            app_logger.debug(f"Weather: {weather_dict}")
        if weather_dict:
            # ■ 4-3 天候状態データを天候データにラップして辞書オブジェクトに追加
            healthcare_dict["weatherData"] = weather_dict
        else:
            # 天候がなければ未設定
            # ※登録時に気象センサーデータベースに障害があった場合に可能性が有るが
            # 通常ここにくることは想定していない
            healthcare_dict["weatherData"] = None

        # ■ 5 FlaskアプリのOKレスポンスと同じ辞書オブジェクトを生成
        resp_obj: Dict = {
            "data": healthcare_dict,
            "status": {"code": 0, "message": "OK"}
        }
        # ■ 5-1. 辞書オブジェクトをJSON形式文字列に変換
        # 日本語が含まれるため: ensure_ascii=False
        json_str = json.dumps(resp_obj, indent=2, ensure_ascii=False)
        # ■ 5-2. JSON文字列をファイル保存 ※保存されるJSONはFlaskアプリのレスポンスと同一
        save_text(SAVE_JSON, json_str)
【出力内容】JSONファイル ※Flaskアプリが返却するJSONレスポンスと同一になります
{
  "data": {
    "healthcareData": {
      "sleepManagement": {
        "wakeupTime": "05:30",
        "sleepScore": 86,
        "sleepingTime": "06:50",
        "deepSleepingTime": "00:50"
      },
      "bloodPressure": {
        "morningMeasurementTime": "06:40",
        "morningMax": 122,
        "morningMin": 70,
        "morningPulseRate": 62,
        "eveningMeasurementTime": "22:10",
        "eveningMax": 101,
        "eveningMin": 63,
        "eveningPulseRate": 67
      },
      "nocturiaFactors": {
        "midnightToiletVisits": 1,
        "hasCoffee": false,
        "hasTea": false,
        "hasAlcohol": false,
        "hasNutritionDrink": true,
        "hasSportsDrink": false,
        "hasDiuretic": false,
        "takeMedicine": true,
        "takeBathing": true,
        "conditionMemo": null
      },
      "walkingCount": {
        "counts": 8460
      },
      "bodyTemperature": {
        "measurementTime": null,
        "temperature": null
      }
    },
    "emailAddress": "user1@examples.com",
    "measurementDay": "2023-03-14",
    "weatherData": {
      "weatherCondition": {
        "condition": "曇り"
      }
    }
  },
  "status": {
    "code": 0,
    "message": "OK"
  }
}
メニューページへ
戻る
Pythonバッチアプリのソースコードはこちら
https://github.com/pipito-yukio/personal_healthcare/tree/main/src/batch
サーバー側のFlaskアプリのソースコードはこちら
https://github.com/pipito-yukio/personal_healthcare/tree/main/src/webapp