永続化、集約、イベント
1 コマンドで一貫させるべき状態変更とドメインイベントを別トランザクションで保存すると、障害やリトライのたびに「状態だけ進んだ」「イベントだけ二重に出た」が起きうる。Kamae では集約境界・楽観的ロック・アウトボックスをセットで設計する。
状態の型と純粋遷移は 状態遷移 と ドメインモデリング が前提。ORM への落とし込みは ORM アダプター、外部呼び出しのリトライは インフラの耐障害性 と整合させる。
ここでの集約の定義
Section titled “ここでの集約の定義”DDD の用語をそのまま全テーブルに当てはめるのではなく、1 コマンドで一貫させたい不変条件の単位として集約を切る。
Kamae Python における集約は、次の単位である:
- 1 つの判別状態共用体(
TaxiRequest = Waiting | EnRoute | ...) - その共用体を変更する純粋遷移関数
- それらの遷移が発行するドメインイベント
- コマンドごとの 1 つの一貫性境界
集約ルートは状態共用体を所有するアイデンティティである。タクシーの例では、request_id が TaxiRequest 集約ルートを識別する。
すべてのデータベーステーブルを集約としてモデル化しない。1 コマンドで一貫性を保つ必要があるビジネス不変条件ごとに 1 ルートを優先する。
境界の内側と外側
Section titled “境界の内側と外側”| 1 集約内 | 外側 / 別集約 |
|---|---|
| 同一ルートの状態バリアント | 独立したライフサイクルを持つ Passenger、Driver、Payment、Invoice |
| ルート状態が参照する値オブジェクト | ルックアップにのみ使われ、同一コマンドで変更されない外部キー |
| ルートの遷移が発行するイベント | 別ルートの状態変化を記述するイベント |
2 つのルートがすべてのコマンドで一緒に変わる必要があるなら、境界を小さくモデル化しすぎている可能性がある。その場合はマージするか、結果整合性を受け入れる。
# Inside TaxiRequest aggregatetype TaxiRequest = Annotated[ Waiting | EnRoute | InTrip | Completed | Cancelled, Field(discriminator="kind"),]
# Separate aggregate: do not mutate inside assign_driver_use_caseclass DriverAvailability(DomainModel): driver_id: UUID is_available: bool集約横断ルールは、アプリケーションレイヤーのオーケストレーション、サガ、またはリアクティブハンドラーに属する。単一ルートの純粋遷移の内側には置かない。
1 ユースケース、1 集約、1 一貫性境界
Section titled “1 ユースケース、1 集約、1 一貫性境界”デフォルトルール:
HTTP/queue command -> use case (application) -> load one aggregate state -> authorize -> pure transition -> build domain events -> repository.save(state, events) # single TXユースケースは、プロジェクトに明示的で文書化された例外がない限り、1 トランザクションで 2 つの集約ルートを更新してはならない。2 つのルートを整合させる必要があるときは、次を優先する:
- 第 2 集約向けのドメインイベント + ハンドラー(結果整合性)
- 補償ステップ付きのプロセスマネージャー / サガ
- 真の不変条件が原子性を要求するときの単一集約の再設計
トランザクションの所有者
Section titled “トランザクションの所有者”リポジトリアダプターが save(...) のデータベーストランザクションを所有すべきだ。ユースケースはビジネス上の順序を所有し、アダプターはコミット/ロールバックを所有する。
ポートメソッドにトランザクションの所有権を文書化する。パラメータは 永続化、集約、イベント の正規ポートと一致する:
class RequestStore(Protocol): async def save_en_route(...) -> None: """Persist state and outbox rows atomically.
Opens the transaction, writes aggregate state, inserts events/outbox records, and commits. Raises on infrastructure failure or version conflict. """ ...テストが依然として原子性セマンティクスを強制するインメモリフェイクを使う場合を除き、save_state と insert_events を別々の公開リポジトリメソッドに分割しない。
VersionConflict をユースケースで Err にマップする — エラーハンドリング を参照。
楽観的 vs 悲観的並行性
Section titled “楽観的 vs 悲観的並行性”| 戦略 | 使うとき | リポジトリシグナル |
|---|---|---|
| 楽観的(デフォルト) | ほとんどのライフサイクル遷移。競合は稀またはリトライ可能 | expected_version、条件付き UPDATE、一意制約 |
| 悲観的 | 在庫、残高、座席ホールド、強い競合 | SELECT ... FOR UPDATE、行ロック、シリアライザブル分離 |
楽観的ロックは frozen 状態モデルと相性が良い。バージョンを読み込み、純粋遷移を適用し、expected_version で保存する。
悲観的ロックはアダプターに属する。SQL ロックの詳細を純粋遷移関数に漏らさない。
不変条件: アプリケーション vs データベース
Section titled “不変条件: アプリケーション vs データベース”両方のレイヤーを維持する:
- 純粋遷移は型と関数が明確に表現できるルールを強制する。
- データベース制約は並行性下でも存続すべきルールを強制する(
UNIQUE、CHECK、外部キー、非負金額)。
アプリケーションチェックは良い Err 値を生成する。2 つのコマンドが競合するとき、データベース制約はバックストップである。
集約サイズの指針
Section titled “集約サイズの指針”小さく始める。良い集約は:
- 明確なルート ID を持つ
- 小さな状態共用体を持つ
- 1 回のリポジトリ呼び出しで読み込み・保存できる
- 自身の履歴を記述するイベントを発行する
次のときに集約を分割する:
- 読み込み/保存が重くなりすぎる
- 無関係なライフサイクルが 1 つの blob モデルを共有している
- 異なるコマンドが異なる一貫性戦略を必要とする
アウトボックスと冪等性の詳細は、後述の「アウトボックスリレーと at-least-once 配信」および「リトライを冪等にする」を参照する。
リポジトリプロトコルは小さく保つ
Section titled “リポジトリプロトコルは小さく保つ”楽観的ロック、冪等性、イベント永続化向けの正規 RequestResolver と RequestStore 定義:
リポジトリプロトコルは ORM の都合ではなくユースケースのニーズを表現すべきだ。広い CRUD 操作への依存を呼び出し側から防ぐ必要があるときは、読み取りと書き込みのインターフェースを分割する。
class RequestResolver(Protocol): async def find_waiting(self, request_id: UUID) -> Waiting | None: ...
class RequestStore(Protocol): async def save_en_route( self, state: EnRoute, events: tuple[DriverAssigned, ...], *, expected_version: int, idempotency_key: str, ) -> None: ...アダプターは内部で SQLAlchemy、SQLModel、asyncpg、psycopg、Django ORM などを使える。そのツールのモデル形状をデフォルトでドメイン API にしてはならない。ORM エンティティと Pydantic ドメイン状態間のマッパー実装は ORM アダプター を読む。
楽観的ロック
Section titled “楽観的ロック”チェックリスト対応(12.1、12.4): 状態とともにバージョンを読み込み、純粋遷移を適用し、expected_version で保存する。データベース UPDATE は条件付きにすべきだ。
状態とバージョン列
Section titled “状態とバージョン列”永続化集約行に単調増加の version を含める(またはデータベースが並行性下で一意性を保証する場合のみ updated_at トークンから導出。稀)。
class Waiting(DomainModel): kind: Literal["waiting"] = "waiting" request_id: UUID tenant_id: UUID passenger_id: UUID created_at: datetime version: int # starts at 1 on create; increment on each successful saveバージョンチェック付きリポジトリ保存
Section titled “バージョンチェック付きリポジトリ保存”class VersionConflict(Exception): def __init__(self, aggregate_id: UUID, expected: int, actual: int | None) -> None: self.aggregate_id = aggregate_id self.expected = expected self.actual = actual
async def save_en_route( conn: asyncpg.Connection, state: EnRoute, events: tuple[DriverAssigned, ...], *, expected_version: int, idempotency_key: str,) -> None: async with conn.transaction(): row = await conn.fetchrow( """ UPDATE taxi_requests SET kind = $2, driver_id = $3, assigned_at = $4, version = version + 1 WHERE request_id = $1 AND version = $5 AND tenant_id = $6 RETURNING version """, state.request_id, state.kind, state.driver_id, state.assigned_at, expected_version, state.tenant_id, ) if row is None: current = await conn.fetchval( "SELECT version FROM taxi_requests WHERE request_id = $1", state.request_id, ) raise VersionConflict(state.request_id, expected_version, current)
for event in events: await insert_outbox_event(conn, event, idempotency_key=idempotency_key)VersionConflict をユースケースで Err にマップする。クライアントは新しい読み取りでリトライできる。リポジトリ内で盲目的にリトライしない。
ユースケースフロー
Section titled “ユースケースフロー”waiting = await resolver.find_waiting(request_id)if waiting is None: return Err(RequestNotFound(...))
en_route, events = assign_driver(waiting, driver_id, now=utc_now())
try: await store.save_en_route( en_route, events, expected_version=waiting.version, idempotency_key=idempotency_key, )except VersionConflict: return Err(ConcurrentModification(request_id=request_id))
return Ok(en_route)在庫や残高ホールド向けの悲観的ロック(SELECT … FOR UPDATE)はアダプターに属する。永続化、集約、イベント を読む。
トランザクションコンテキストマネージャー
Section titled “トランザクションコンテキストマネージャー”リポジトリアダプターがトランザクションを所有する。例外下でもコミット/ロールバックが正しいよう、ドライバー固有のコンテキストマネージャーを使う。
asyncpg
Section titled “asyncpg”import asyncpg
class AsyncpgUnitOfWork: def __init__(self, pool: asyncpg.Pool) -> None: self._pool = pool self._conn: asyncpg.Connection | None = None self._tx: asyncpg.transaction.Transaction | None = None
async def __aenter__(self) -> asyncpg.Connection: self._conn = await self._pool.acquire() self._tx = self._conn.transaction() await self._tx.start() return self._conn
async def __aexit__(self, exc_type, exc, tb) -> None: assert self._conn is not None and self._tx is not None try: if exc_type is None: await self._tx.commit() else: await self._tx.rollback() finally: await self._pool.release(self._conn)
async def save_with_outbox(pool: asyncpg.Pool, state: EnRoute, events: tuple[DriverAssigned, ...], *, expected_version: int) -> None: async with AsyncpgUnitOfWork(pool) as conn: await save_en_route(conn, state, events, expected_version=expected_version, idempotency_key=...)psycopg 3
Section titled “psycopg 3”from psycopg import AsyncConnectionfrom psycopg.rows import dict_row
async def save_with_outbox_psycopg(conn: AsyncConnection, state: EnRoute, events: tuple[DriverAssigned, ...], *, expected_version: int) -> None: async with conn.transaction(): async with conn.cursor(row_factory=dict_row) as cur: await cur.execute( """ UPDATE taxi_requests SET kind = %(kind)s, driver_id = %(driver_id)s, version = version + 1 WHERE request_id = %(request_id)s AND version = %(expected_version)s RETURNING version """, {**state.model_dump(mode="python"), "expected_version": expected_version}, ) if cur.rowcount != 1: raise VersionConflict(...) for event in events: await insert_outbox_event_psycopg(conn, event)1 つの async with conn.transaction() ブロックが状態更新とアウトボックス挿入を包む。その間にコミットしない。
状態とイベントを原子性で永続化する
Section titled “状態とイベントを原子性で永続化する”遷移がドメインイベントを発行するとき、集約状態とアウトボックス/イベント行を同一トランザクションで書く。呼び出し側が状態とイベントを別々に保存できる API は避ける。
async with transaction: await update_request_state(state, expected_version=expected_version) await insert_outbox_events(events)アウトボックスワーカーはコミット後にイベントを公開できる。トランザクション内、または状態コミット前の直接公開は、重複または欠落通知のリスクがある。
アウトボックスリレーと at-least-once 配信
Section titled “アウトボックスリレーと at-least-once 配信”メッセージブローカーは通常 at-least-once 配信を提供する。冪等コンシューマーと公開側の重複排除を前提に設計する。
アウトボックステーブル形状
Section titled “アウトボックステーブル形状”class OutboxRow(BaseModel): id: UUID aggregate_id: UUID event_name: str event_version: int payload: dict[str, object] idempotency_key: str created_at: datetime published_at: datetime | None = Noneワーカーパターン
Section titled “ワーカーパターン”loop: SELECT ... FROM outbox WHERE published_at IS NULL ORDER BY created_at LIMIT N FOR UPDATE SKIP LOCKED publish each row to broker UPDATE outbox SET published_at = now() WHERE id = ...保証:
- 状態とアウトボックス行は一緒にコミット — コンシューマーは未コミット状態のイベントを見ない。
- コミット後に公開 — ワーカーはコミット済み行のみ読む。
- at-least-once 公開 — 公開後、
published_at更新前にクラッシュすると重複配信。コンシューマーはevent_idで重複排除。 event_id一意 — アウトボックスまたはコンシューマー受信箱テーブルにUNIQUE(event_id)を挿入。- 冪等ハンドラー — 副作用の前に
INSERT INTO processed_events (event_id) ON CONFLICT DO NOTHING。
async def relay_outbox_batch(conn: asyncpg.Connection, publisher: EventPublisher) -> int: rows = await conn.fetch( """ SELECT id, payload, event_id FROM outbox WHERE published_at IS NULL ORDER BY created_at LIMIT 50 FOR UPDATE SKIP LOCKED """ ) count = 0 for row in rows: await publisher.publish(row["payload"]) await conn.execute( "UPDATE outbox SET published_at = now() WHERE id = $1", row["id"], ) count += 1 return count公開失敗はバックオフでリトライ(infrastructure-resilience.md)。保持方針が要求するまでアウトボックス行を削除しない。
データベースに重要な不変条件をミラーする
Section titled “データベースに重要な不変条件をミラーする”データベースが強制できる不変条件にはデータベース制約を使う: 一意性、テナント所有外部キー、非負残高、有効ライフサイクル状態、冪等性キー、イベント一意性。
良いエラーとドメインの明確さのためアプリケーションチェックは依然として必要だが、並行性下では不十分である。
ALTER TABLE taxi_requests ADD CONSTRAINT taxi_requests_version_positive CHECK (version > 0);
CREATE UNIQUE INDEX outbox_event_id_unique ON outbox (event_id);
CREATE TABLE command_idempotency ( idempotency_key TEXT PRIMARY KEY, aggregate_id UUID NOT NULL, response_hash TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT now());リトライを冪等にする
Section titled “リトライを冪等にする”コマンド、イベントハンドラー、Webhook、アウトボックスリレー、外部呼び出しは、リトライ時に金額、在庫、ライフサイクル遷移、通知を二重適用してはならない。
冪等性キー、重複排除レコード、一意制約、イベント ID、またはインフラで利用可能な exactly-once 処理保証を使う。リポジトリまたはハンドラープロトコルは冪等性キーが入る場所を示すべきだ。
async def save_en_route(..., idempotency_key: str) -> None: async with conn.transaction(): existing = await conn.fetchrow( "SELECT response_hash FROM command_idempotency WHERE idempotency_key = $1", idempotency_key, ) if existing is not None: return # prior attempt succeeded; return cached response if needed
await _do_save(...) await conn.execute( "INSERT INTO command_idempotency (idempotency_key, aggregate_id) VALUES ($1, $2)", idempotency_key, state.request_id, )永続化イベントにバージョンを付ける
Section titled “永続化イベントにバージョンを付ける”イベントは長寿命の契約である。イベント名/型、バージョン、イベント ID、発生タイムスタンプ、集約 ID、明示的単位と精度のペイロードを含める。
class DriverAssigned(DomainModel): event_name: Literal["driver_assigned"] = "driver_assigned" event_version: Literal[1] = 1 event_id: UUID event_at: datetime aggregate_id: UUID driver_id: UUID passenger_id: UUID非同期に保存または消費されるとき、ペイロードを変更する前に後方互換の逆シリアライズ計画を定義する。
イベントスキーマ進化
Section titled “イベントスキーマ進化”チェックリスト対応(12.6): 保存されるイベントには event_name + event_version と文書化されたマイグレーション経路が必要である。
バージョニングルール
Section titled “バージョニングルール”| 変更 | 戦略 | コンシューマー側 |
|---|---|---|
| オプショナルフィールド追加 | event_version を上げる。新フィールドにデフォルトまたは None | extra="ignore" のバージョン付き DTO でパースする古いコンシューマーは未知フィールドを無視 |
| 必須フィールド追加 | 新 event_version のみ。古い行を遡及しない | コンシューマーは event_version で分岐またはアップキャスター |
| フィールド改名 | 新バージョン。読み取り時に v1 → v2 アップキャスター | リプレイジョブはドメインハンドラー前にアップキャスター |
| フィールド削除 | 発行停止。古いバージョンは逆シリアライズ継続 | イベントカタログに tombstone 文書 |
| 意味変更(単位、列挙) | 新 event_name またはバージョン。意味を上書きしない | 明示的破壊的変更注記 |
消費時のアップキャスター
Section titled “消費時のアップキャスター”DriverAssignedAdapter = TypeAdapter(DriverAssigned)
def parse_driver_assigned(raw: dict[str, object]) -> DriverAssigned: version = raw.get("event_version", 1) if version == 1: return DriverAssignedAdapter.validate_python(raw) if version == 2: dto = DriverAssignedV2Adapter.validate_python(raw) return DriverAssigned( event_id=dto.event_id, event_at=dto.event_at, aggregate_id=dto.aggregate_id, driver_id=dto.driver_id, passenger_id=dto.passenger_id, ) raise UnsupportedEventVersion(event_name="driver_assigned", version=version)デュアルライト / デュアルリード期間
Section titled “デュアルライト / デュアルリード期間”ライブトラフィックを移行するとき:
- v1 と v2 の両方を受け入れるコンシューマーをデプロイ。
- v2(または移行中は両方)を発行するプロデューサーをデプロイ。
- 必要ならオフラインジョブで履歴アウトボックス/アーカイブ行をバックフィル。
- v1 トラフィックがゼロであるメトリクスを確認した後のみ v1 サポートを削除。
新フィールドに PII を含めるときは PII と観測経路の保護 に合わせる。保持とマスキングレビューが必要である。
レビュー観点
Section titled “レビュー観点”1 つのユースケースがトランザクション境界を所有しているか — High
Section titled “1 つのユースケースがトランザクション境界を所有しているか — High”単一ユースケースがアトミックな作業単位を調整せず、無関係な複数の呼び出し元から状態保存・イベント発行・メッセージ発行を担うワークフローを指摘する。
リトライと重複コマンドは境界で冪等か — High
Section titled “リトライと重複コマンドは境界で冪等か — High”テストデータ と照合する。冪等キーや重複排除レコードなしに同じ遷移を二回適用しうるコマンドハンドラやコンシューマを指摘する。
リトライと重複配信は冪等か — High
Section titled “リトライと重複配信は冪等か — High”冪等キーや重複排除レコードなしに、金額・在庫・ライフサイクル遷移・通知を二重適用しうるコマンド、イベントハンドラ、アウトボックスプロセッサ、外部呼び出しを指摘する。
状態とドメインイベントはアトミックに永続化されるか — High
Section titled “状態とドメインイベントはアトミックに永続化されるか — High”トランザクションやアウトボックスパターンなしに、集約状態の保存とイベントの発行・挿入を別操作で行っているユースケースを指摘する。
競合書き込みに楽観的並行性は扱われているか — High
Section titled “競合書き込みに楽観的並行性は扱われているか — High”残高、ライフサイクル状態、在庫、その他高競合集約の load/modify/save に、バージョンチェック、CAS セマンティクス、同等の DB 制約がない箇所を指摘する。
ゼロ行更新とバージョン不一致は ConcurrentModification のような型付きエラーにマップし、黙って成功させてはならない。
集約の不変条件はルート経由でのみ変更されるか — High
Section titled “集約の不変条件はルート経由でのみ変更されるか — High”集約ルートの遷移関数や型付き状態モデルを迂回して、子エンティティやライフサイクル状態を変更しているコードを指摘する。
DB 制約は重要な不変条件を反映しているか — Medium
Section titled “DB 制約は重要な不変条件を反映しているか — Medium”一意性、テナント所有権、非負残高、有効ライフサイクル状態、外部キー存在など、データベースが強制できるのにアプリケーションチェックだけに頼っている永続化を指摘する。
イベントは永続化アダプター外で生成されるか — Medium
Section titled “イベントは永続化アダプター外で生成されるか — Medium”ユースケースまたはドメイン層が供給したイベントを永続化するのではなく、リポジトリ内でビジネスイベントを発明している箇所を指摘する。
リポジトリプロトコルはドメインのニーズを表現しているか — Medium
Section titled “リポジトリプロトコルはドメインのニーズを表現しているか — Medium”ユースケースが実際に必要とする小さなインターフェースではなく、ORM の CRUD を写した大きなリポジトリプロトコルを指摘する。
悲観的ロックはスコープが適切で正当化されているか — Medium
Section titled “悲観的ロックはスコープが適切で正当化されているか — Medium”楽観的並行性や DB 制約で足りるのに、特に await をまたぐ広い・長時間ロックを指摘する。ロックスコープが不明瞭、またはドメイン不変条件がまだ競合しうる場合はエスカレートする。
永続化イベントはバージョン管理されているか — Medium
Section titled “永続化イベントはバージョン管理されているか — Medium”イベントを非同期に保存・消費するのに、明示的なイベント型・バージョン、スキーマ進化戦略、後方互換デシリアライズのないイベントペイロードを指摘する。
集約横断の調整は明示的か — Medium
Section titled “集約横断の調整は明示的か — Medium”2 つの集約ルートをメモリ上で変更し、呼び出し元の両方永続化に頼っているユースケースやリポジトリを指摘する。イベント、saga、スナップショット、文書化された単一トランザクション戦略のいずれかを提案する。