コンテンツにスキップ
検索語を入力してください

    並行性と非同期

    Kamae Python は純粋ドメイン遷移を同期のまま保ち、I/O はユースケースとアダプターで async 化する。この分離がなければ、イベントループ上で重い CPU 処理やブロック ORM が同時リクエスト全体を止め、ビジネスルールの単体テストも asyncio に引きずられる。

    ユースケースの配線は アプリケーション配線、ロックとトランザクションの境界は 状態遷移永続化、集約、イベント と照合する。

    レイヤー並行スタイル理由
    ドメイン遷移同期純粋関数隠れたスケジューリングなし。単体テストが容易
    ユースケースポートが非同期なら async defループをブロックせず I/O をオーケストレーション
    リポジトリ / HTTP アダプター非同期ドライバー付き async defasyncpghttpx、aiobotocore など
    CPU バウンド作業ProcessPoolExecutor またはワーカーキューGIL のため Python スレッドは CPU を並列化しない
    # assign_driver_use_case — full flow in state-transitions.md
    waiting = await resolver.find_waiting(request_id)
    if waiting is None:
    return Err(RequestNotFound(request_id=request_id))
    en_route = assign_driver(waiting, driver_id, now) # sync; runs on the event loop
    await store.save_en_route(en_route, ...)
    return Ok(en_route)

    完全なユースケースは 状態遷移 を参照。

    遷移は呼び出し元スレッドのイベントループ上で実行される。処理が軽ければ問題ないが、重い CPU 処理を同期で走らせるとイベントループを塞ぐ。

    CPython の Global Interpreter Lock は、プロセスごとに一度に 1 スレッドだけが Python バイトコードを実行できるようにする。その含意は次のとおりである。

    • asyncio はタスクが I/O 待ちのときに優れる。1 コルーチンがソケットを await している間、他が実行される。
    • threading は非同期非対応のI/O バウンドブロッキングライブラリ(一部 DB ドライバー、レガシー SDK)に役立つ。CPU バウンドの Python ループは高速化しない。
    • multiprocessing / ProcessPoolExecutor は、CPU バウンドな Python 作業に対する既定の選択肢である。例として、画像処理、大規模集計、大きなペイロードの暗号処理、純 Python の ML 推論がある。

    async def ユースケース内で長い CPU バウンド Python 関数を直接実行しない。イベントループ全体をブロックし、同時リクエストをすべて停滞させる。

    CPU バウンドドメイン作業のオフロード

    Section titled “CPU バウンドドメイン作業のオフロード”

    ドメイン関数は同期のまま保つ。ユースケースまたはインフラエッジからスケジュールする。

    import asyncio
    from concurrent.futures import ProcessPoolExecutor
    _executor = ProcessPoolExecutor(max_workers=4)
    async def resize_proof_image_use_case(
    store: ImageStore,
    image_id: UUID,
    max_edge_px: int,
    ) -> Result[ResizedImage, ResizeError]:
    raw = await store.load_bytes(image_id)
    loop = asyncio.get_running_loop()
    try:
    resized = await loop.run_in_executor(
    _executor,
    resize_image_bytes, # sync; CPU-bound; picklable top-level function
    raw,
    max_edge_px,
    )
    except ImageTooLarge as exc:
    return Err(ResizeError.too_large(image_id, str(exc)))
    await store.save_resized(image_id, resized)
    return Ok(resized)

    指針:

    • ワーカープロセスにはプレーンデータ(bytes、プリミティブ、frozen Pydantic モデル)を渡す。開いた接続、ORM セッション、ロックは渡さない。
    • multiprocessing を POSIX と Windows で使うとき、ワーカー関数はトップレベルで picklable であるべき。
    • ジョブが長い、リトライが必要、プロセス再起動後も存続すべきときは専用ワーカーサービス(Celery、RQ、ARQ、SQS コンシューマー)を優先する。
    • ネイティブ拡張(Pillow、numpy、一部暗号ライブラリ)内では GIL が解放される。公式に文書化された上限がある場合に限り、ネイティブ関数への同期呼び出しをイベントループ上で許容してよい。判断前にプロファイルする。
    アプローチ向いている用途避けるとき
    asyncio のみネットワーク I/O、非同期 DBコルーチン内の CPU 重い Python ループ
    デフォルトプール付き threading / run_in_executorレガシー SDK のブロッキング I/OCPU バウンド Python(GIL 競合)
    ProcessPoolExecutorCPU バウンド純 Python共有可変状態や開いた DB ハンドルが必要な関数
    外部ワーカーキュー長いジョブ、リトライ、バックプレッシャーキューオーバーヘッドが支配的なサブミリ秒作業

    ドメインレイヤーと非同期でないコード

    Section titled “ドメインレイヤーと非同期でないコード”

    ドメインモジュールは asyncio なしで import できるべきだ。ルールは次のとおり。

    1. 純粋遷移はプレーンな def 関数。
    2. domain.py 内で asyncio.runget_event_loopawait を呼ばない。
    3. ポートが同期と非同期の両方を露出する必要があるなら、アプリケーション境界では1 つの非同期ポートを優先し、同期アダプターはインフラエッジでのみ asyncio.to_thread で実装する。ドメインコードではしない。
    4. 時刻と乱数は引数として注入する(now: datetimerng: Random)。環境グローバルから読まない。同じ関数をワーカーとテストで実行できる。

    フレームワークエントリポイント

    Section titled “フレームワークエントリポイント”

    エグゼキュータとプロセスプールはコンポジションルート(FastAPI lifespan、Celery アプリファクトリー)で配線し、ドメインパッケージのモジュールレベル副作用にはしない。

    @asynccontextmanager
    async def lifespan(app: FastAPI):
    app.state.image_executor = ProcessPoolExecutor(max_workers=4)
    yield
    app.state.image_executor.shutdown(wait=True)

    ポート配線は アプリケーション配線、遅いワーカー周りのタイムアウトとリトライは インフラの耐障害性 を読む。

    CPU バウンドのドメイン処理はイベントループ外か — High

    Section titled “CPU バウンドのドメイン処理はイベントループ外か — High”

    asyncio.to_thread、エグゼキューター、明示的同期境界なしに、async def ハンドラやユースケース内のブロック ORM、ファイル I/O、重いパース、CPU バウンドループを指摘する。

    ロックとセッションは正しくスコープされているか — High

    Section titled “ロックとセッションは正しくスコープされているか — High”

    所有権やトランザクション境界が不明瞭なまま、並行タスク間で共有される DB セッション、ORM アイデンティティマップ、ロックを指摘する。

    await/ロック相互作用は エラーハンドリング永続化、集約、イベント と照合する。

    ドメインコードで共有可変状態を避けているか — Medium

    Section titled “ドメインコードで共有可変状態を避けているか — Medium”

    明示引数やポートでテスト可能にできるのに、遷移やユースケースが使うモジュールレベルの可変キャッシュ、グローバル、シングルトンを指摘する。

    プロセス/スレッドプールはスコープが適切で正当化されているか — Medium

    Section titled “プロセス/スレッドプールはスコープが適切で正当化されているか — Medium”

    小さな純粋遷移への広い ProcessPoolExecutor、ライフサイクル管理なしのリクエストごとプール作成を指摘する。