SQLAlchemyをPythonバッチで使う


【最終更新日】2023-05-02

PythonバッチスクリプトでSQLAlchemy ORM を使って複数のデータベースにデータを登録する方法について解説します。

Webアプリでデータベース処理コードを書くよりはバッチスクリプトでコーディングしたほうがテストもデバックも遥かにも容易で効率的です。 ※Flask-SQLAlchemyは使わず、SQLAlchemyを単独で使うことでバッチスクリプトでもFlask Webアプリの両方で動作するテーブル定義クラスを作成することができます。

当リポジトリではラズパイ4専用のアプリケーションインストーラーのテーブル作成スクリプトによりテーブルを作成するので、SQLAclchemyのcreate_all()関数でテーブルを生成することはありません。

【バッチスクリプトで2つのデータベースに登録】
【参考URL】主に(1)〜(9)の本家サイトの内容を参考に作成
1.バッチアプリケーシ用プロジェクト
プロジェクトのクラス構成
healthcare/
    ├── RegistHealthcareData.py           // データ登録バッチスクリプト
    ├── UpdateHealthcareData.py           // 登録済みデータ更新バッチスクリプト 
    ├── conf                              // データベース接続情報
    │   ├── db_healthcare.json            //  (1)健康管理データベース用
    │   └── db_sensors.json               //  (2)気象センサーデータベース用 
    ├── dao
    │   ├── __init__.py                   //   健康管理データベーススキーマ定義
    │   ├── blood_pressure.py             //   (1)血圧測定テーブル用データクラス
    │   ├── body_temperature.py           //   (2)体温測定テーブル用データクラス
    │   ├── nocturia_factors.py           //   (3)夜間頻尿要因テーブル用データクラス
    │   ├── person.py                     //   個人情報テーブル用データクラス
    │   ├── sleep_management.py           //   (4)睡眠管理テーブル用データクラス
    │   ├── walking_count.py              //   (5)歩数テーブル用データクラス
    │   └── weather_condition.py          //   (6)気象センサーデータベースの天候状態テーブル用テータクラス
    └── json_datas
        └── healthcare_data_20230314.json // データ登録用JSONファイル ※AndroidアプリからのPOSTデータと同等
2.データ登録バッチメインスクリプト
(1)データベース接続情報
(2) 変数定義: 各データベース情報ファイルパス, ログ出力フォーマット定義
# 健康管理デーベース接続情報: [DB] healthcare_db [PORT] 5432
DB_HEALTHCARE_CONF: str = os.path.join("conf", "db_healthcare.json")
# 気象センサーデータベース: [DB] sensors_pgdb [PORT] 5432
DB_SENSORSE_CONF: str = os.path.join("conf", "db_sensors.json")
# ログフォーマット
LOG_FMT = '%(asctime)s %(filename)s %(funcName)s %(levelname)s %(message)s'
# デバックログ有効
app_logger_debug: bool = True
(3) データベース情報ファイル(JSON)を辞書オブジェクトに変換する関数

ここでのポイントはhostname = socket.gethostname() としている箇所。

def get_conn_dict(filePath: str) -> dict:
    with open(filePath, 'r') as fp:
        db_conf: json = json.load(fp)
        hostname = socket.gethostname()  # == `hostname` コマンドで取得できるホスト名
        db_conf["host"] = db_conf["host"].format(hostname=hostname) # == ホスト名.local 
    return db_conf

下記 hostnameコマンドを実行した結果で db_conf["host"] = 'Dell-T7500.local' となります。

$ hostname
Dell-T7500
(4) JSONファイル読み込み関数 ※データ登録用JSONファイルを辞書オブジェクトに変換
def load_json(filePath: str) -> dict:
    with open(filePath, 'r') as fp:
        json_text = json.load(fp)  # 辞書オブジェクト
    return json_text
(5) メールアドレスに対応するID取得関数
def _get_personid(session: Session, email_address: str) -> Optional[int]:
    """
    メールアドレスに対応するPersion.idを取得する
    :email_address: メールアドレス
    """
    try:
        person_id: Optional[int]
        with Session_healthcare() as session:
            with session.begin():
                stmt: Select = select(Person).where(Person.email == email_address)
                person: Person = session.scalars(stmt).one()
                if person:
                    person_id = person.id
                else:
                    person_id = None
        return person_id
    except NoResultFound as notFound:
        app_logger.warning(f"NoResultFound: {notFound}")
        return None
    except Exception as excption:
        app_logger.error(f"Exception: {excption}")
        raise excption
(6) 健康管理データベース登録処理
def _insert_healthdata(sess: Session, person_id: int, measurement_day: str, data: Dict) -> None:
    # JSONキーチェック
    sleep_man = {}
    blood_press = {}
    nocturia_factors = {}
    walking_count = {}
    body_temper = {}
    try:
        # 健康管理データコンテナ
        healthcare_data: Dict = data["healthcareData"]
        # (1) 睡眠管理
        sleep_man: Dict = healthcare_data["sleepManagement"]
        # (2) 血圧測定
        blood_press: Dict = healthcare_data["bloodPressure"]
        # (3) 夜中トイレ回数要因
        nocturia_factors: Dict = healthcare_data["nocturiaFactors"]
        # (4) 歩数
        walking_count: Dict = healthcare_data["walkingCount"]
        # (5) 体温データ
        body_temper: Dict = healthcare_data["bodyTemperature"]
    except KeyError as err:
        app_logger.warning(err)
        exit(1)  # Flaskアプリ: abort(BadRequest.code, _set_errormessage("460,{err}"))

    # 主キー値を設定
    sleep_man["pid"] = person_id
    sleep_man["measurementDay"] = measurement_day
    blood_press["pid"] = person_id
    blood_press["measurementDay"] = measurement_day
    nocturia_factors["pid"] = person_id
    nocturia_factors["measurementDay"] = measurement_day
    walking_count["pid"] = person_id
    walking_count["measurementDay"] = measurement_day
    body_temper["pid"] = person_id
    body_temper["measurementDay"] = measurement_day
    # 登録用の各クラスにデータを設定
    sleepMan: SleepManagement = SleepManagement(**sleep_man)
    if app_logger_debug:
        app_logger.debug(sleepMan)
    bloodPressure: BloodPressure = BloodPressure(**blood_press)
    if app_logger_debug:
        app_logger.debug(bloodPressure)
    factors: NocturiaFactors = NocturiaFactors(**nocturia_factors)
    if app_logger_debug:
        app_logger.debug(factors)
    walking: WalkingCount = WalkingCount(**walking_count)
    if app_logger_debug:
        app_logger.debug(walking)
    bodyTemper: BodyTemperature = BodyTemperature(**body_temper)
    if app_logger_debug:
        app_logger.debug(bodyTemper)

    # 健康管理DB用セッションオブジェクト取得
    try:
        sess.begin()
        sess.add_all(
            [sleepMan, bloodPressure, factors, walking, bodyTemper]
        )
        sess.commit()
    except sqlalchemy.exc.IntegrityError as err:
        app_logger.warning(f"IntegrityError: {err.args}")
        sess.rollback()
        raise err
    except sqlalchemy.exc.SQLAlchemyError as err:
        app_logger.warning(err.args)
        sess.rollback()
        raise err
    finally:
        sess.close()
(7) 気象センサーデータベース登録処理
def _insert_weather(sess: Session,measurement_day: str, data: Dict) -> None:
    """
    天候状態の登録処理
    :param measurement_day: 測定日
    :data 登録用データ (必須)
    """
    try:
        # 天候データコンテナは必ずある
        weather_data: Dict = data["weatherData"]
        # 天候状態は必須項目
        weather_condition: Dict = weather_data["weatherCondition"]
        if app_logger_debug:
            app_logger.debug(f"weather_condition: {weather_condition}")
    except KeyError as err:
        app_logger.warning(err)
        return

    # 主キー設定
    weather_condition["measurementDay"] = measurement_day
    weather: WeatherCondition = WeatherCondition(**weather_condition)
    try:
        sess.begin()
        sess.add(weather)
        sess.commit()
    except sqlalchemy.exc.IntegrityError as err:
        app_logger.warning(f"IntegrityError: {err.args}")
        sess.rollback()
    except sqlalchemy.exc.SQLAlchemyError as err:
        app_logger.warning(err.args)
        sess.rollback()
    finally:
        sess.close()
(8) バッチメイン処理
if __name__ == '__main__':
    logging.basicConfig(format=LOG_FMT, level=logging.DEBUG)
    app_logger = logging.getLogger(__name__)

    parser: argparse.ArgumentParser = argparse.ArgumentParser()
    # JSONファイルパス: 必須
    # (例) ~/Documents/Healthcare/json/healthcare_data_20230313.json"
    parser.add_argument("--json-path", type=str, required=True,
                        help="Healthcare JSON file path.")
    args: argparse.Namespace = parser.parse_args()
    # JSONファイルロード
    healthcare_data: Dict = load_json(os.path.expanduser(args.json_path))
    # ■■  Flaskアプリでは ■■ 
    #  data: dict = json.loads(request.data) 
    if app_logger_debug:
        app_logger.debug(healthcare_data)

    # 健康管理データベース接続情報
    url_dict: dict = get_conn_dict(DB_HEALTHCARE_CONF)
    conn_url: URL = URL.create(**url_dict)
    engine_healthcare: sqlalchemy.Engine = create_engine(conn_url, echo=False)
    # 個人テーブルチェック用
    Session_healthcare = sessionmaker(bind=engine_healthcare)
    # 登録処理用セッション
    Cls_sess_healthcare: scoped_session = scoped_session(
        sessionmaker(bind=engine_healthcare)
    )
    app_logger.info(f"Cls_sess_healthcare: {Cls_sess_healthcare}")
    # 気象センサーデータベース接続情報
    url_dict: dict = get_conn_dict(DB_SENSORSE_CONF)
    conn_url: URL = URL.create(**url_dict)
    engine_sensors: sqlalchemy.Engine = create_engine(conn_url, echo=False)
    Cls_sess_sensors: scoped_session = scoped_session(
        sessionmaker(bind=engine_sensors)
    )
    app_logger.info(f"Cls_sess_sensors: {Cls_sess_sensors}")

    # メールアドレス取得
    emailAddress: str = healthcare_data["emailAddress"]
    # メールアドレスに対応する個人ID取得: 健康管理テーブルの主キー
    person_id: int = _get_personid(Session_healthcare(), emailAddress)
    if person_id is None:
        app_logger.warning("Person not found.")
        exit(0)

    # 測定日付: 健康管理テーブルと天候状態テーブルの主キー
    measurementDay: str = healthcare_data["measurementDay"]

    # 健康管理データベースの全テーブル一括登録
    try:
        _insert_healthdata(Cls_sess_healthcare(),
                           person_id, measurementDay, healthcare_data)
    except Exception:
        pass
    else:
        # 天候状態テーブル登録
        _insert_weather(Cls_sess_sensors(), measurementDay, healthcare_data)
3.データ更新バッチメインスクリプト
(1) 更新用のJSONチェック処理
※ 健康管理の更新データは各テーブル毎に任意なのでキーが無い場合はNoneを返却しエラーにしない。
def _has_dict_in_data(dict_key: str, data:Dict) -> Optional[Dict]:
    try:
        result: Dict = data[dict_key]
        return result
    except KeyError as err:
        # データ無し
        return None
(2) 健康管理データベース更新処理
def _update_healthdata(sess: Session, person_id: int, measurement_day: str, data: Dict) -> None:
    # 健康管理データコンテナ: 必須
    healthcare_data: Optional[Dict] = _has_dict_in_data("healthcareData", data)
    if healthcare_data is None:
        app_logger.error("Required healthcareData!")
        exit(1)

    # 更新用データは更新されたテーブルのデータのみが存在する
    update_table_count: int = 0
    # (1) 睡眠管理
    sleep_man: Optional[Dict] = _has_dict_in_data("sleepManagement", healthcare_data)
    if app_logger_debug:
        app_logger.debug(f"sleepManagement: {sleep_man}")
    if sleep_man is not None:
        update_table_count += 1

    # (2) 血圧測定
    blood_press: Optional[Dict] = _has_dict_in_data("bloodPressure", healthcare_data)
    if app_logger_debug:
        app_logger.debug(f"bloodPressure: {blood_press}")
    if blood_press is not None:
        update_table_count += 1

    # (3) 夜中トイレ回数要因
    nocturia_factors: Optional[Dict] = _has_dict_in_data("nocturiaFactors", healthcare_data)
    if app_logger_debug:
        app_logger.debug(f"nocturiaFactors: {nocturia_factors}")
    if nocturia_factors is not None:
        update_table_count += 1

    # (4) 歩数
    walking_count: Optional[Dict] = _has_dict_in_data("walkingCount", healthcare_data)
    if app_logger_debug:
        app_logger.debug(f"walkingCount: {walking_count}")
    if walking_count is not None:
        update_table_count += 1

    # (5) 体温データ ※現状テータを運用していないが主キーのみ追加
    body_temper: Optional[Dict] = _has_dict_in_data("bodyTemperature", healthcare_data)
    if app_logger_debug:
        app_logger.debug(f"bodyTemperature: {body_temper}")
    if body_temper is not None:
        update_table_count += 1

    if update_table_count == 0:
        app_logger.info("Update data is None!")
        return

    try:
        sess.begin()
        if sleep_man is not None:
            stmt = (
                sqlalchemy.update(SleepManagement).
                where(SleepManagement.pid==person_id, SleepManagement.measurementDay==measurement_day).
                values(**sleep_man)
            )
            sess.execute(stmt)
        if blood_press is not None:
            stmt = (
                sqlalchemy.update(BloodPressure).
                where(BloodPressure.pid==person_id, BloodPressure.measurementDay==measurement_day).
                values(**blood_press)
            )
            sess.execute(stmt)
        if nocturia_factors is not None:
            stmt = (
                sqlalchemy.update(NocturiaFactors).
                where(NocturiaFactors.pid==person_id, NocturiaFactors.measurementDay==measurement_day).
                values(**nocturia_factors)
            )
            sess.execute(stmt)
        if walking_count is not None:
            stmt = (
                sqlalchemy.update(WalkingCount).
                where(WalkingCount.pid==person_id, WalkingCount.measurementDay==measurement_day).
                values(**walking_count)
            )
            sess.execute(stmt)
        if body_temper is not None:
            stmt = (
                sqlalchemy.update(BodyTemperature).
                where(BodyTemperature.pid==person_id, BodyTemperature.measurementDay==measurement_day).
                values(**body_temper)
            )
            sess.execute(stmt)
        sess.commit()
        if app_logger_debug:
            app_logger.debug(f"Updated[HealthcareData]: Person.id: {person_id}, MeasuremtDay: {measurement_day}")
    except sqlalchemy.exc.SQLAlchemyError as err:
        sess.rollback()
        app_logger.warning(err.args)
    finally:
        sess.close()
(3) 気象センサーデータベース更新処理
def _update_weather(sess: Session, measurement_day: str, data: Dict) -> None:
    """
    天候状態の更新処理
    :sess Session
    :param measurement_day: 測定日
    :data 更新用データ (任意)
    """
    try:
        # 天候データコンテナは必須
        weather_data: Dict = data["weatherData"]
    except KeyError as err:
        app_logger.warning(err)
        return

    # 天候状態は任意
    weather_condition: Dict = _has_dict_in_data("weatherCondition", weather_data)
    if app_logger_debug:
        app_logger.debug(f"weather_condition: {weather_condition}")
    if weather_condition is None:
        # 更新データ無し
        return

    # 気象センサDB用セッションオブジェクト取得
    try:
        sess.begin()
        stmt = (
            sqlalchemy.update(WeatherCondition).
            where(WeatherCondition.measurementDay == measurement_day).
            values(**weather_condition)
        )
        sess.execute(stmt)
        sess.commit()
        if app_logger_debug:
            app_logger.debug(f"Updated[WeatherData]: MeasuremtDay: {measurement_day}")
    except sqlalchemy.exc.SQLAlchemyError as err:
        app_logger.warning(err.args)
        sess.rollback()
    finally:
        sess.close()
(4) バッチメイン処理 ※登録処理とほぼ内容が同じなのでコードのみ掲載し説明は省略
if __name__ == '__main__':
    logging.basicConfig(format=LOG_FMT, level=logging.DEBUG)
    app_logger = logging.getLogger(__name__)

    parser: argparse.ArgumentParser = argparse.ArgumentParser()
    # JSONファイルパス: 必須
    # (例) ~/Documents/Healthcare/json/healthcare_data_20230213.json"
    parser.add_argument("--json-path", type=str, required=True,
                        help="Healthcare JSON file path.")
    args: argparse.Namespace = parser.parse_args()
    # JSONファイルロード
    healthcare_data: Dict = load_json(os.path.expanduser(args.json_path))
    if app_logger_debug:
        app_logger.debug(healthcare_data)

    # 健康管理データベス接続情報
    url_dict: dict = get_conn_dict(DB_HEALTHCARE_CONF)
    conn_url: URL = URL.create(**url_dict)
    engine_healthcare: sqlalchemy.Engine = create_engine(conn_url, echo=True)
    # 個人テーブルチェック用
    Session_healthcare = sessionmaker(bind=engine_healthcare)
    # 登録処理用セッション
    Cls_sess_healthcare: scoped_session = scoped_session(
        sessionmaker(bind=engine_healthcare)
    )
    app_logger.info(f"Cls_sess_healthcare: {Cls_sess_healthcare}")
    # 気象センサーデータベース接続情報
    url_dict: dict = get_conn_dict(DB_SENSORSE_CONF)
    conn_url: URL = URL.create(**url_dict)
    engine_sensors: sqlalchemy.Engine = create_engine(conn_url, echo=True)
    Cls_sess_sensors: scoped_session = scoped_session(
        sessionmaker(bind=engine_sensors)
    )
    app_logger.info(f"Cls_sess_sensors: {Cls_sess_sensors}")

    # メールアドレス取得
    emailAddress: str = healthcare_data["emailAddress"]
    # メールアドレスに対応する個人ID取得: 健康管理テーブルの主キー
    person_id: int = _get_personid(emailAddress)
    if person_id is None:
        app_logger.warning("Person not found.")
        exit(0)

    # 測定日付: 健康管理テーブルと天候状態テーブルの主キー
    measurementDay: str = healthcare_data["measurementDay"]

    # 健康管理データ更新
    try:
        _update_healthdata(Cls_sess_healthcare(), person_id, measurementDay,
                           healthcare_data)
    except Exception:
        pass
    else:
        # 天候状態
        _update_weather(Cls_sess_sensors(), measurementDay, healthcare_data)
メニューページへ
戻る
Pythonバッチアプリのソースコードはこちら
https://github.com/pipito-yukio/personal_healthcare/tree/main/src/batch
サーバー側のFlaskアプリのソースコードはこちら
https://github.com/pipito-yukio/personal_healthcare/tree/main/src/webapp