PythonバッチスクリプトでSQLAlchemy ORM を使って複数のデータベースにデータを登録する方法について解説します。
Webアプリでデータベース処理コードを書くよりはバッチスクリプトでコーディングしたほうがテストもデバックも遥かにも容易で効率的です。 ※Flask-SQLAlchemyは使わず、SQLAlchemyを単独で使うことでバッチスクリプトでもFlask Webアプリの両方で動作するテーブル定義クラスを作成することができます。
当リポジトリではラズパイ4専用のアプリケーションインストーラーのテーブル作成スクリプトによりテーブルを作成するので、SQLAclchemyのcreate_all()関数でテーブルを生成することはありません。
healthcare/
├── RegistHealthcareData.py // データ登録バッチスクリプト
├── UpdateHealthcareData.py // 登録済みデータ更新バッチスクリプト
├── conf // データベース接続情報
│ ├── db_healthcare.json // (1)健康管理データベース用
│ └── db_sensors.json // (2)気象センサーデータベース用
├── dao
│ ├── __init__.py // 健康管理データベーススキーマ定義
│ ├── blood_pressure.py // (1)血圧測定テーブル用データクラス
│ ├── body_temperature.py // (2)体温測定テーブル用データクラス
│ ├── nocturia_factors.py // (3)夜間頻尿要因テーブル用データクラス
│ ├── person.py // 個人情報テーブル用データクラス
│ ├── sleep_management.py // (4)睡眠管理テーブル用データクラス
│ ├── walking_count.py // (5)歩数テーブル用データクラス
│ └── weather_condition.py // (6)気象センサーデータベースの天候状態テーブル用テータクラス
└── json_datas
└── healthcare_data_20230314.json // データ登録用JSONファイル ※AndroidアプリからのPOSTデータと同等
{
"drivername": "postgresql+psycopg2",
"host": "{hostname}.local",
"port": "5432",
"database": "healthcare_db",
"username": "developer",
"password": "developer"
}
{
"drivername": "postgresql+psycopg2",
"host": "{hostname}.local",
"port": "5432",
"database": "sensors_pgdb",
"username": "developer",
"password": "developer"
}
# 健康管理デーベース接続情報: [DB] healthcare_db [PORT] 5432
DB_HEALTHCARE_CONF: str = os.path.join("conf", "db_healthcare.json")
# 気象センサーデータベース: [DB] sensors_pgdb [PORT] 5432
DB_SENSORSE_CONF: str = os.path.join("conf", "db_sensors.json")
# ログフォーマット
LOG_FMT = '%(asctime)s %(filename)s %(funcName)s %(levelname)s %(message)s'
# デバックログ有効
app_logger_debug: bool = True
ここでのポイントはhostname = socket.gethostname() としている箇所。
def get_conn_dict(filePath: str) -> dict:
with open(filePath, 'r') as fp:
db_conf: json = json.load(fp)
hostname = socket.gethostname() # == `hostname` コマンドで取得できるホスト名
db_conf["host"] = db_conf["host"].format(hostname=hostname) # == ホスト名.local
return db_conf
下記 hostnameコマンドを実行した結果で db_conf["host"] = 'Dell-T7500.local' となります。
$ hostname
Dell-T7500
def load_json(filePath: str) -> dict:
with open(filePath, 'r') as fp:
json_text = json.load(fp) # 辞書オブジェクト
return json_text
def _get_personid(session: Session, email_address: str) -> Optional[int]:
"""
メールアドレスに対応するPersion.idを取得する
:email_address: メールアドレス
"""
try:
person_id: Optional[int]
with Session_healthcare() as session:
with session.begin():
stmt: Select = select(Person).where(Person.email == email_address)
person: Person = session.scalars(stmt).one()
if person:
person_id = person.id
else:
person_id = None
return person_id
except NoResultFound as notFound:
app_logger.warning(f"NoResultFound: {notFound}")
return None
except Exception as excption:
app_logger.error(f"Exception: {excption}")
raise excption
def _insert_healthdata(sess: Session, person_id: int, measurement_day: str, data: Dict) -> None:
# JSONキーチェック
sleep_man = {}
blood_press = {}
nocturia_factors = {}
walking_count = {}
body_temper = {}
try:
# 健康管理データコンテナ
healthcare_data: Dict = data["healthcareData"]
# (1) 睡眠管理
sleep_man: Dict = healthcare_data["sleepManagement"]
# (2) 血圧測定
blood_press: Dict = healthcare_data["bloodPressure"]
# (3) 夜中トイレ回数要因
nocturia_factors: Dict = healthcare_data["nocturiaFactors"]
# (4) 歩数
walking_count: Dict = healthcare_data["walkingCount"]
# (5) 体温データ
body_temper: Dict = healthcare_data["bodyTemperature"]
except KeyError as err:
app_logger.warning(err)
exit(1) # Flaskアプリ: abort(BadRequest.code, _set_errormessage("460,{err}"))
# 主キー値を設定
sleep_man["pid"] = person_id
sleep_man["measurementDay"] = measurement_day
blood_press["pid"] = person_id
blood_press["measurementDay"] = measurement_day
nocturia_factors["pid"] = person_id
nocturia_factors["measurementDay"] = measurement_day
walking_count["pid"] = person_id
walking_count["measurementDay"] = measurement_day
body_temper["pid"] = person_id
body_temper["measurementDay"] = measurement_day
# 登録用の各クラスにデータを設定
sleepMan: SleepManagement = SleepManagement(**sleep_man)
if app_logger_debug:
app_logger.debug(sleepMan)
bloodPressure: BloodPressure = BloodPressure(**blood_press)
if app_logger_debug:
app_logger.debug(bloodPressure)
factors: NocturiaFactors = NocturiaFactors(**nocturia_factors)
if app_logger_debug:
app_logger.debug(factors)
walking: WalkingCount = WalkingCount(**walking_count)
if app_logger_debug:
app_logger.debug(walking)
bodyTemper: BodyTemperature = BodyTemperature(**body_temper)
if app_logger_debug:
app_logger.debug(bodyTemper)
# 健康管理DB用セッションオブジェクト取得
try:
sess.begin()
sess.add_all(
[sleepMan, bloodPressure, factors, walking, bodyTemper]
)
sess.commit()
except sqlalchemy.exc.IntegrityError as err:
app_logger.warning(f"IntegrityError: {err.args}")
sess.rollback()
raise err
except sqlalchemy.exc.SQLAlchemyError as err:
app_logger.warning(err.args)
sess.rollback()
raise err
finally:
sess.close()
def _insert_weather(sess: Session,measurement_day: str, data: Dict) -> None:
"""
天候状態の登録処理
:param measurement_day: 測定日
:data 登録用データ (必須)
"""
try:
# 天候データコンテナは必ずある
weather_data: Dict = data["weatherData"]
# 天候状態は必須項目
weather_condition: Dict = weather_data["weatherCondition"]
if app_logger_debug:
app_logger.debug(f"weather_condition: {weather_condition}")
except KeyError as err:
app_logger.warning(err)
return
# 主キー設定
weather_condition["measurementDay"] = measurement_day
weather: WeatherCondition = WeatherCondition(**weather_condition)
try:
sess.begin()
sess.add(weather)
sess.commit()
except sqlalchemy.exc.IntegrityError as err:
app_logger.warning(f"IntegrityError: {err.args}")
sess.rollback()
except sqlalchemy.exc.SQLAlchemyError as err:
app_logger.warning(err.args)
sess.rollback()
finally:
sess.close()
if __name__ == '__main__':
logging.basicConfig(format=LOG_FMT, level=logging.DEBUG)
app_logger = logging.getLogger(__name__)
parser: argparse.ArgumentParser = argparse.ArgumentParser()
# JSONファイルパス: 必須
# (例) ~/Documents/Healthcare/json/healthcare_data_20230313.json"
parser.add_argument("--json-path", type=str, required=True,
help="Healthcare JSON file path.")
args: argparse.Namespace = parser.parse_args()
# JSONファイルロード
healthcare_data: Dict = load_json(os.path.expanduser(args.json_path))
# ■■ Flaskアプリでは ■■
# data: dict = json.loads(request.data)
if app_logger_debug:
app_logger.debug(healthcare_data)
# 健康管理データベース接続情報
url_dict: dict = get_conn_dict(DB_HEALTHCARE_CONF)
conn_url: URL = URL.create(**url_dict)
engine_healthcare: sqlalchemy.Engine = create_engine(conn_url, echo=False)
# 個人テーブルチェック用
Session_healthcare = sessionmaker(bind=engine_healthcare)
# 登録処理用セッション
Cls_sess_healthcare: scoped_session = scoped_session(
sessionmaker(bind=engine_healthcare)
)
app_logger.info(f"Cls_sess_healthcare: {Cls_sess_healthcare}")
# 気象センサーデータベース接続情報
url_dict: dict = get_conn_dict(DB_SENSORSE_CONF)
conn_url: URL = URL.create(**url_dict)
engine_sensors: sqlalchemy.Engine = create_engine(conn_url, echo=False)
Cls_sess_sensors: scoped_session = scoped_session(
sessionmaker(bind=engine_sensors)
)
app_logger.info(f"Cls_sess_sensors: {Cls_sess_sensors}")
# メールアドレス取得
emailAddress: str = healthcare_data["emailAddress"]
# メールアドレスに対応する個人ID取得: 健康管理テーブルの主キー
person_id: int = _get_personid(Session_healthcare(), emailAddress)
if person_id is None:
app_logger.warning("Person not found.")
exit(0)
# 測定日付: 健康管理テーブルと天候状態テーブルの主キー
measurementDay: str = healthcare_data["measurementDay"]
# 健康管理データベースの全テーブル一括登録
try:
_insert_healthdata(Cls_sess_healthcare(),
person_id, measurementDay, healthcare_data)
except Exception:
pass
else:
# 天候状態テーブル登録
_insert_weather(Cls_sess_sensors(), measurementDay, healthcare_data)
def _has_dict_in_data(dict_key: str, data:Dict) -> Optional[Dict]:
try:
result: Dict = data[dict_key]
return result
except KeyError as err:
# データ無し
return None
def _update_healthdata(sess: Session, person_id: int, measurement_day: str, data: Dict) -> None:
# 健康管理データコンテナ: 必須
healthcare_data: Optional[Dict] = _has_dict_in_data("healthcareData", data)
if healthcare_data is None:
app_logger.error("Required healthcareData!")
exit(1)
# 更新用データは更新されたテーブルのデータのみが存在する
update_table_count: int = 0
# (1) 睡眠管理
sleep_man: Optional[Dict] = _has_dict_in_data("sleepManagement", healthcare_data)
if app_logger_debug:
app_logger.debug(f"sleepManagement: {sleep_man}")
if sleep_man is not None:
update_table_count += 1
# (2) 血圧測定
blood_press: Optional[Dict] = _has_dict_in_data("bloodPressure", healthcare_data)
if app_logger_debug:
app_logger.debug(f"bloodPressure: {blood_press}")
if blood_press is not None:
update_table_count += 1
# (3) 夜中トイレ回数要因
nocturia_factors: Optional[Dict] = _has_dict_in_data("nocturiaFactors", healthcare_data)
if app_logger_debug:
app_logger.debug(f"nocturiaFactors: {nocturia_factors}")
if nocturia_factors is not None:
update_table_count += 1
# (4) 歩数
walking_count: Optional[Dict] = _has_dict_in_data("walkingCount", healthcare_data)
if app_logger_debug:
app_logger.debug(f"walkingCount: {walking_count}")
if walking_count is not None:
update_table_count += 1
# (5) 体温データ ※現状テータを運用していないが主キーのみ追加
body_temper: Optional[Dict] = _has_dict_in_data("bodyTemperature", healthcare_data)
if app_logger_debug:
app_logger.debug(f"bodyTemperature: {body_temper}")
if body_temper is not None:
update_table_count += 1
if update_table_count == 0:
app_logger.info("Update data is None!")
return
try:
sess.begin()
if sleep_man is not None:
stmt = (
sqlalchemy.update(SleepManagement).
where(SleepManagement.pid==person_id, SleepManagement.measurementDay==measurement_day).
values(**sleep_man)
)
sess.execute(stmt)
if blood_press is not None:
stmt = (
sqlalchemy.update(BloodPressure).
where(BloodPressure.pid==person_id, BloodPressure.measurementDay==measurement_day).
values(**blood_press)
)
sess.execute(stmt)
if nocturia_factors is not None:
stmt = (
sqlalchemy.update(NocturiaFactors).
where(NocturiaFactors.pid==person_id, NocturiaFactors.measurementDay==measurement_day).
values(**nocturia_factors)
)
sess.execute(stmt)
if walking_count is not None:
stmt = (
sqlalchemy.update(WalkingCount).
where(WalkingCount.pid==person_id, WalkingCount.measurementDay==measurement_day).
values(**walking_count)
)
sess.execute(stmt)
if body_temper is not None:
stmt = (
sqlalchemy.update(BodyTemperature).
where(BodyTemperature.pid==person_id, BodyTemperature.measurementDay==measurement_day).
values(**body_temper)
)
sess.execute(stmt)
sess.commit()
if app_logger_debug:
app_logger.debug(f"Updated[HealthcareData]: Person.id: {person_id}, MeasuremtDay: {measurement_day}")
except sqlalchemy.exc.SQLAlchemyError as err:
sess.rollback()
app_logger.warning(err.args)
finally:
sess.close()
def _update_weather(sess: Session, measurement_day: str, data: Dict) -> None:
"""
天候状態の更新処理
:sess Session
:param measurement_day: 測定日
:data 更新用データ (任意)
"""
try:
# 天候データコンテナは必須
weather_data: Dict = data["weatherData"]
except KeyError as err:
app_logger.warning(err)
return
# 天候状態は任意
weather_condition: Dict = _has_dict_in_data("weatherCondition", weather_data)
if app_logger_debug:
app_logger.debug(f"weather_condition: {weather_condition}")
if weather_condition is None:
# 更新データ無し
return
# 気象センサDB用セッションオブジェクト取得
try:
sess.begin()
stmt = (
sqlalchemy.update(WeatherCondition).
where(WeatherCondition.measurementDay == measurement_day).
values(**weather_condition)
)
sess.execute(stmt)
sess.commit()
if app_logger_debug:
app_logger.debug(f"Updated[WeatherData]: MeasuremtDay: {measurement_day}")
except sqlalchemy.exc.SQLAlchemyError as err:
app_logger.warning(err.args)
sess.rollback()
finally:
sess.close()
if __name__ == '__main__':
logging.basicConfig(format=LOG_FMT, level=logging.DEBUG)
app_logger = logging.getLogger(__name__)
parser: argparse.ArgumentParser = argparse.ArgumentParser()
# JSONファイルパス: 必須
# (例) ~/Documents/Healthcare/json/healthcare_data_20230213.json"
parser.add_argument("--json-path", type=str, required=True,
help="Healthcare JSON file path.")
args: argparse.Namespace = parser.parse_args()
# JSONファイルロード
healthcare_data: Dict = load_json(os.path.expanduser(args.json_path))
if app_logger_debug:
app_logger.debug(healthcare_data)
# 健康管理データベス接続情報
url_dict: dict = get_conn_dict(DB_HEALTHCARE_CONF)
conn_url: URL = URL.create(**url_dict)
engine_healthcare: sqlalchemy.Engine = create_engine(conn_url, echo=True)
# 個人テーブルチェック用
Session_healthcare = sessionmaker(bind=engine_healthcare)
# 登録処理用セッション
Cls_sess_healthcare: scoped_session = scoped_session(
sessionmaker(bind=engine_healthcare)
)
app_logger.info(f"Cls_sess_healthcare: {Cls_sess_healthcare}")
# 気象センサーデータベース接続情報
url_dict: dict = get_conn_dict(DB_SENSORSE_CONF)
conn_url: URL = URL.create(**url_dict)
engine_sensors: sqlalchemy.Engine = create_engine(conn_url, echo=True)
Cls_sess_sensors: scoped_session = scoped_session(
sessionmaker(bind=engine_sensors)
)
app_logger.info(f"Cls_sess_sensors: {Cls_sess_sensors}")
# メールアドレス取得
emailAddress: str = healthcare_data["emailAddress"]
# メールアドレスに対応する個人ID取得: 健康管理テーブルの主キー
person_id: int = _get_personid(emailAddress)
if person_id is None:
app_logger.warning("Person not found.")
exit(0)
# 測定日付: 健康管理テーブルと天候状態テーブルの主キー
measurementDay: str = healthcare_data["measurementDay"]
# 健康管理データ更新
try:
_update_healthdata(Cls_sess_healthcare(), person_id, measurementDay,
healthcare_data)
except Exception:
pass
else:
# 天候状態
_update_weather(Cls_sess_sensors(), measurementDay, healthcare_data)