FlaskアプリでFlask-SQLAlchemyを使わず単独でSQLAlchemyを使う方法を紹介します。
Flask-SQLAlchemyを使ってしまうとデータベース処理の開発・テストがWebアプリ依存のものになってしまい開発効率が低下します。実際のシステムではWebアプリケーション以外にバッチシステムもたくさんあります。
Healthcare/
├── healthcare
│ ├── __init__.py // Flaskアプリケーション生成・初期化, データベースセッション取得等
│ ├── dao // テーブル定義クラス
│ │ ├── __init__.py // 健康管理データベースのスキーマ定義
│ │ ├── blood_pressure.py // (健康) 血圧測定テーブルクラス
│ │ ├── body_temperature.py // (健康) 体温測定テーブルクラス
│ │ ├── nocturia_factors.py // (健康) 夜間頻尿要因テーブルクラス
│ │ ├── person.py // (健康) 個人情報テーブルクラス
│ │ ├── queries.py // (健康・気象) 登録済みデータ検索クラス
│ │ ├── sleep_management.py // (健康) 睡眠管理テーブルクラス
│ │ ├── walking_count.py // (健康) 歩数管理テーブルクラス
│ │ └── weather_condition.py // (気象) 天候状態テーブルクラス
│ ├── log
│ │ ├── __init__.py
│ │ ├── logconf_main.json
│ │ └── logsetting.py
│ ├── util
│ │ ├── __init__.py
│ │ ├── dateutil.py
│ │ ├── file_util.py
│ │ └── image_util.py
│ └── views
│ ├── __init__.py
│ └── app_main.py // リクエスト処理メインクラス
├── run.py // デバックモードでFlask, 本番モードでwaitressを起動するpythonスタートアップスクリプト
└── start.sh // 仮想環境に入りpythonスタートアップスクリプトを実行するシェルスクリプト
FLASK_PROD_PORT=12920
/etc/systemd/system/webapp-healthcare.service ※start.shを起動するシステムサービス
[Unit]
Description=Flask webapp Healthcare service
After=postgres-12-docker.service
[Service]
Type=idle
# FLASK_PROD_PORT
EnvironmentFile=/etc/default/webapp-healthcare
ExecStart=/home/pi/Healthcare/start.sh prod >/dev/null # ■ 本番環境で起動
User=pi
[Install]
WantedBy=multi-user.target
#!/bin/bash
# ./start.sh -> development # スクリプト引数がなければ開発環境で起動
# ./start.sh prod | production ->production # スクリプト引数が "production" なら本番環境で起動
env_mode="development"
if [ $# -eq 0 ]; then
:
else
if [[ "$1" = "prod" || "$1" = "production" ]]; then
env_mode="production"
fi
fi
host_name="$(/bin/cat /etc/hostname)" # ■ 1-1 ホスト名取得
IP_HOST_ORG="${host_name}.local" # ADD host suffix ".local" # ■ 1-2 Webアプリ用のホスト名決定
export IP_HOST="${IP_HOST_ORG,,}" # to lowercase # ■ 2-1 環境変数: Webアプリ用ホスト名 (小文字)
export FLASK_ENV=$env_mode # ■ 2-2 環境変数: FLASK_ENV (Flaskアプリケーション用)
echo "$IP_HOST with $FLASK_ENV"
EXEC_PATH=
if [ -n "$PATH_HEALTHCARE" ]; then
EXEC_PATH=$PATH_HEALTHCARE # ■ 3-1 開発環境では開発PCの環境変数(PATH_HEALTHCARE)を設定
else
EXEC_PATH="$HOME/Healthcare" # ■ 3-2 本番環境(ラズパイ) /home/pi/Healthcare
fi
echo "$EXEC_PATH"
. $HOME/py_venv/raspi4_apps/bin/activate # ■ 4-1 Python仮想環境に入る
python $EXEC_PATH/run.py # ■ 4-2 Pythonスタートアップスクリプト実行
deactivate
import json
import logging
import os
import socket
import uuid
import sqlalchemy
from flask import Flask
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL
from sqlalchemy.orm import scoped_session, sessionmaker
from healthcare.log import logsetting
def getdict_forurl(filePath: str) -> dict:
with open(filePath, 'r') as fp:
db_conf: json = json.load(fp)
hostname = socket.gethostname()
# host in /etc/hosts
db_conf["host"] = db_conf["host"].format(hostname=hostname)
return db_conf
# PostgreSQL connection information json file.
CONF_PATH: str = os.path.expanduser("~/bin/conf")
DB_HEALTHCARE_CONF: str = os.path.join(CONF_PATH, "db_healthcare.json")
DB_SENSORS_CONF: str = os.path.join(CONF_PATH, "db_sensors.json")
app: Flask = Flask(__name__) # ■ 1
# ロガーを本アプリ用のものに設定する
app_logger: logging.Logger = logsetting.get_logger("app_main")
app_logger_debug: bool = (app_logger.getEffectiveLevel() <= logging.DEBUG)
app.config.from_object("healthcare.config")
# セッション用の秘密キー
app.secret_key = uuid.uuid4().bytes
# サーバホストとセッションのドメインが一致しないとブラウザにセッションIDが設定されない
IP_HOST: str = os.environ.get("IP_HOST", "localhost")
FLASK_PROD_PORT: str = os.environ.get("FLASK_PROD_PORT", "8080")
has_prod: bool = os.environ.get("FLASK_ENV", "development") == "production"
SERVER_HOST: str
if has_prod:
# Production mode
SERVER_HOST = IP_HOST + ":" + FLASK_PROD_PORT # ■ 2-1 本番環境用ホスト
else:
SERVER_HOST = IP_HOST + ":5000" # ■ 2-2 開発環境用ホスト
app_logger.info("SERVER_HOST: {}".format(SERVER_HOST))
app.config["SERVER_NAME"] = SERVER_HOST # ■ 3-2
app.config["APPLICATION_ROOT"] = "/healthcare" # ■ 3-2
# use flask jsonify with japanese message
app.config["JSON_AS_ASCII"] = False # ■ 4
# ... (3) データベースセッション取得処理 ...
# Application main program
from healthcare.views import app_main
# SQLAlchemy engine
# 1.健康管理データベース: postgresql[5433]
conn_dict: dict = getdict_forurl(DB_HEALTHCARE_CONF)
conn_url: URL = URL.create(**conn_dict)
app_logger.info(f"Healthcare database URL: {conn_url}")
engine_healthcare: sqlalchemy.Engine = create_engine(conn_url, echo=False)
# 個人テーブルチェック用
Session_healthcare = sessionmaker(bind=engine_healthcare) # ■ 1
app_logger.info(f"Session_healthcare: {Session_healthcare}")
# セッションクラス定義
Cls_sess_healthcare: sqlalchemy.orm.scoping.scoped_session = scoped_session( # ■ 2
sessionmaker(bind=engine_healthcare)
)
app_logger.info(f"Cls_sess_healthcare: {Cls_sess_healthcare}")
# 2.気象センサーデータベース: postgresql[5432]
conn_dict: dict = getdict_forurl(DB_SENSORS_CONF)
conn_url: URL = URL.create(**conn_dict)
app_logger.info(f"Sensors database URL: {conn_url}")
engine_sensors: sqlalchemy.Engine = create_engine(conn_url, echo=False)
Cls_sess_sensors: sqlalchemy.orm.scoping.scoped_session = scoped_session( # ■ 3
sessionmaker(bind=engine_sensors)
)
app_logger.info(f"Cls_sess_sensors: {Cls_sess_sensors}")
if app_logger_debug:
app_logger.debug(f"{app.config}")
import os
from healthcare import app, app_logger # ■ app を参照しているので上記(1)__init__.pyが先に実行される
"""
This module load after app(==__init__.py)
"""
if __name__ == "__main__":
has_prod = os.environ.get("FLASK_ENV") == "production" # ■ 1
# app config SERVER_NAME
srv_host = app.config["SERVER_NAME"] # ■ 2-1 ホスト情報
srv_hosts = srv_host.split(":") # ■ 2-2 ホスト名とポート番号に分解
host, port = srv_hosts[0], srv_hosts[1]
app_logger.info("run.py in host: {}, port: {}".format(host, port))
if has_prod:
# Production mode
try:
# Prerequisites: pip install waitress
from waitress import serve
app_logger.info("Production start.")
# console log for Reqeust suppress: _quiet=True
serve(app, host=host, port=port, _quiet=True) # ■ 3[A]
except ImportError:
# Production with flask,debug False
app_logger.info("Development start, without debug.")
app.run(host=host, port=port, debug=False) # ■ 3[C]
else:
# Development mode
app_logger.info("Development start, with debug.")
app.run(host=host, port=port, debug=True) # ■ 3[B]
import json
from typing import Dict, Optional, Union
import sqlalchemy
from flask import Response, abort, g, jsonify, make_response, request
from sqlalchemy import Select, select
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.orm import Session, scoped_session, sessionmaker
from sqlalchemy.orm.exc import NoResultFound
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
HTTPException, InternalServerError, NotFound)
from healthcare import (Cls_sess_healthcare, Cls_sess_sensors,
Session_healthcare, app, app_logger, app_logger_debug)
from healthcare.dao.blood_pressure import BloodPressure
from healthcare.dao.body_temperature import BodyTemperature
from healthcare.dao.nocturia_factors import NocturiaFactors
from healthcare.dao.person import Person
from healthcare.dao.queries import Selector
from healthcare.dao.sleep_management import SleepManagement
from healthcare.dao.walking_count import WalkingCount
from healthcare.dao.weather_condition import WeatherCondition
MSG_DESCRIPTION: str = "error_message"
ABORT_DICT_BLANK_MESSAGE: Dict[str, str] = {MSG_DESCRIPTION: ""}
# アプリケーションルートパス
APP_ROOT: str = app.config["APPLICATION_ROOT"]
データベース接続オブジェクトの取得とクリーンアップ方法 ※上記【参考URL】(2) のサンプルコード参照
from flask import g
def get_db():
if 'db' not in g:
g.db = connect_to_database()
return g.db
@app.teardown_appcontext
def teardown_db(exception): # ■ レスポンス返却前に必ず呼び出しされる
db = g.pop('db', None)
if db is not None:
db.close()
def get_healthcare_session() -> scoped_session:
"""
健康管理DB用セッションオブジェクトを取得する
"""
if 'healthcare_session' not in g:
# 健康管理DB用セッションオブジェクト生成
g.healthcare_session = Cls_sess_healthcare()
if app_logger_debug:
app_logger.debug(f"g.healthcare_session:{g.healthcare_session}")
return g.healthcare_session
def get_sensors_session() -> scoped_session:
"""
気象センサDB用セッションオブジェクトを取得する
"""
if 'sensors_session' not in g:
# 気象センサDB用セッションオブジェクト生成
g.sensors_session = Cls_sess_sensors()
if app_logger_debug:
app_logger.debug(f"g.sensors_session:{g.sensors_session}")
return g.sensors_session
@app.teardown_appcontext
def close_sessions(exception=None) -> None:
"""
各データベース用セッションのクリーンアップ
"""
# 健康管理DB用セッション
sess: scoped_session = g.pop('healthcare_session', None) # ■ 1-1 gオブジェクトからセッションオブジェクトを取得
app_logger.debug(f"healthcare_session:{sess}")
if sess is not None: # ■ 1-2 セッションオブジェクトが存在
# クラスのremoveメソッド呼び出し
Cls_sess_healthcare.remove() # ■ 2
# 気象センサDB用セッション
sess: scoped_session = g.pop('sensors_session', None)
app_logger.debug(f"sensors_session:{sess}")
if sess is not None:
Cls_sess_sensors.remove()
def _check_postdata(request):
# リクエストヘッダーチェック
if "application/json" not in request.headers["Content-Type"]:
abort(BadRequest.code, _set_errormessage("450,Bad request Content-Type."))
# 登録用データ取得
data: dict = json.loads(request.data)
if app_logger_debug:
app_logger.debug(data)
# メールアドレスチェック
emailAddress = data.get("emailAddress", None)
if emailAddress is None:
abort(BadRequest.code, _set_errormessage("462,Required EmailAddress."))
# 測定日付チェック
measurementDay = data.get("measurementDay", None)
if measurementDay is None:
abort(BadRequest.code, _set_errormessage("463,Required MeasurementDay."))
# PersonテーブルにemailAddressが存在するか
personId = _get_personid(emailAddress)
if personId is None:
abort(BadRequest.code, _set_errormessage("461,User is not found."))
return personId, emailAddress, measurementDay, data
def make_register_success(email_address: str, measurement_day: str) -> Response:
"""
登録処理OKレスポンス
:param email_address: メールアドレス
:param measurement_day 測定日付
:return: Response
"""
resp_obj: Dict = {
"status":
{"code": 0, "message": "OK"},
"data": {
"emailAddress": email_address,
"measurementDay": measurement_day
}
}
return _make_respose(resp_obj, 200)
def make_getdata_success(json_dict: Dict) -> Response:
"""
データ取得処理OKレスポンス
:param json_dict: JSON出力用辞書
:return: Response
"""
resp_obj: Dict = {
"status": {
"code": 0, "message": "OK"},
"data": json_dict
}
return _make_respose(resp_obj, 200)
def _set_errormessage(message: str) -> Dict:
ABORT_DICT_BLANK_MESSAGE[MSG_DESCRIPTION] = message
return ABORT_DICT_BLANK_MESSAGE
def _make_respose(resp_obj: Dict, resp_code) -> Response:
response = make_response(jsonify(resp_obj), resp_code)
response.headers["Content-Type"] = "application/json"
return response
@app.errorhandler(BadRequest.code)
@app.errorhandler(Forbidden.code)
@app.errorhandler(NotFound.code)
@app.errorhandler(Conflict.code) # IntegrityError (登録済み)
@app.errorhandler(InternalServerError.code)
def error_handler(error: HTTPException) -> Response:
app_logger.warning(f"error_type:{type(error)}, {error}")
resp_obj: Dict[str, Dict[str, Union[int, str]]] = {
"status": {"code": error.code, "message": error.description["error_message"]}
}
return _make_respose(resp_obj, error.code)
@app.route(APP_ROOT + "/register", methods=["POST"])
def register():
"""
健康管理データ(必須)と天候データ(必須)の登録
"""
if app_logger_debug:
app_logger.debug(request.path)
# 登録データチェック
personId, emailAddress, measurementDay, data = _check_postdata(request)
# 健康管理データ登録 (必須)
_insert_healthdata(personId, measurementDay, data)
# 気象データ登録 (必須) ※気象センサーデータベース
_insert_weather(measurementDay, data)
# ここまでエラーがなければOKレスポンス
return make_register_success(emailAddress, measurementDay)
@app.route(APP_ROOT + "/update", methods=["POST"])
def update():
"""
健康管理データ(任意)または天候データ(任意)の更新
"""
if app_logger_debug:
app_logger.debug(request.path)
# 更新データチェック
personId, emailAddress, measurementDay, data = _check_postdata(request)
# 健康管理データの更新
_update_healthdata(personId, measurementDay, data)
# 天候状態(気象データベース)の更新
_update_weather(measurementDay, data)
# ここまでエラーがなければOKレスポンス
return make_register_success(emailAddress, measurementDay)
@app.route(APP_ROOT + "/getcurrentdata", methods=["GET"])
def getcurrentdata():
"""メールアドレスと測定日付から健康管理データを取得するリクエスト
:param: request parameter: ?emailAddress=user1@examples.com&measurementDay=2023-0-03
:return: JSON形式(健康管理DBから取得したデータ + 気象センサーDBから取得した天候)
"""
if app_logger_debug:
app_logger.debug(request.path)
app_logger.debug(request.args.to_dict())
# 1.リクエストパラメータチェック
# 1-1.メールアドレス
emailAddress = request.args.get("emailAddress")
if emailAddress is None:
abort(BadRequest.code, _set_errormessage("462,Required EmailAddress."))
# 1-2.測定日付
measurementDay = request.args.get("measurementDay")
if emailAddress is None:
abort(BadRequest.code, _set_errormessage("463,Required MeasurementDay."))
# PersonテーブルにemailAddressが存在するか
personId = _get_personid(emailAddress)
if personId is None:
abort(BadRequest.code, _set_errormessage("461,User is not found."))
# 健康管理DBと気象センサーDBからデータ取得する
sess_healthcare: scoped_session = get_healthcare_session()
sess_sensors: scoped_session = get_sensors_session()
selector = Selector(sess_healthcare, sess_sensors, logger=app_logger)
# 健康管理データ取得
healthcare_dict: Dict = selector.get_healthcare_asdict(emailAddress, measurementDay)
if app_logger_debug:
app_logger.debug(f"Healthcare: {healthcare_dict}")
if healthcare_dict:
healthcare_dict["emailAddress"] = emailAddress
healthcare_dict["measurementDay"] = measurementDay
# 天気状態取得
weather_dict = selector.get_weather_asdict(measurementDay)
if app_logger_debug:
app_logger.debug(f"Weather: {weather_dict}")
if weather_dict:
healthcare_dict["weatherData"] = weather_dict
else:
# 天候がなければ未設定
healthcare_dict["weatherData"] = None
return make_getdata_success(healthcare_dict)
else:
abort(NotFound.code, _set_errormessage("Data is not found."))