信頼性の高いマイクロサービス:Node.jsによるTransactional Outboxパターンの実装

Programming tutorial - IT technology blog
Programming tutorial - IT technology blog

午前2時の悪夢:なぜ「二重書き込み(Dual Write)」が静かなる暗殺者なのか

午前2時、PagerDutyのアラートが鳴り響きました。注文処理システムが行き詰まっていたのです。データベース上では15,000ドルの顧客決済が成功しているにもかかわらず、配送サービス側は沈黙したまま。’OrderCreated’イベントが一度も届いていなかったのです。私たちは、データベースを更新した直後に broker.publish() を呼び出すという、標準的でナイーブな手法を採用していました。これが二重書き込み(Dual Write)であり、データ不整合の元凶です。

分散アーキテクチャにおいて、SQLデータベースとメッセージブローカー(RabbitMQKafkaなど)という2つの独立したシステムが、単一のユニットとして成功または失敗することを保証することはできません。データベースのコミットが完了しても、メッセージがブローカーに届く前にネットワークの瞬断が発生すれば、データは同期されません DBには存在するが、他のマイクロサービスからは決して見えない「幽霊」レコードが生成されてしまうのです。

これを解決するには、Transactional Outboxパターンへと移行する必要があります。

イベント駆動型通信のアプローチ比較

データベースを更新し、他のサービスに通知する必要がある場合、一般的に2つの選択肢があります。

1. 二重書き込み(リスクの高い方法)

async function createOrder(orderData) {
  await db.orders.create(orderData); // ステップ 1: DB保存
  await broker.publish('order_created', orderData); // ステップ 2: ブローカーへの発行
}

ステップ2が失敗した場合、ステップ1を簡単にロールバックすることはできません。これによりデータアイランドが発生し、スタック全体のビジネスロジックが破綻します。

2. Transactional Outbox(レジリエントな方法)

ブローカーを直接叩く代わりに、メッセージを専用 of 「Outbox」テーブルに保存します。重要なのは、これがビジネスロジックと同じデータベーストランザクション内で行われる点です。その後、別のバックグラウンドプロセスがこのテーブルを読み取り、発行処理を行います。これにより、注文が保存されれば、イベントも確実に保存されることが保証されます。

Outboxパターンのトレードオフ

どんなアーキテクチャも銀の弾丸ではありません。毎秒1,000リクエスト以上を処理する本番環境において、このパターンは信頼性のゴールドスタンダードですが、新たなオーバーヘッドも発生します。

  • メリット:
    • 厳密な原子性(Atomicity): ビジネスの状態とイベントの保存が、一蓮托生で成功または失敗します。
    • レジリエンス: ブローカー(Kafka/RabbitMQ)がメンテナンス等でオフラインになっても、メッセージはDB内で安全に保持されます。
    • 配信の保証: この構成により、下流のコンシューマーに対して「少なくとも1回(at-least-once)」の配信が保証されます。
  • デメリット:
    • 運用上の遅延: DBのコミットからメッセージがブローカーに届くまでに、わずかな遅延(通常50msから500ms)が発生します。
    • 構成要素の増加: バックグラウンドワーカーの管理と、Outboxテーブルの肥大化を監視する必要があります。
    • 重複処理の考慮: 配信が保証される反面、稀に発生する重複配信を処理するために、コンシューマー側がべき等(idempotent)である必要があります。

技術スタック

今回の実装では、以下を使用します:

  • Node.js: APIおよびリレーワーカー用のランタイム。
  • PostgreSQL: 強固なACIDトランザクションをサポートするメインストレージ。
  • pg: Node.js用の実績あるPostgreSQLクライアント。
  • メッセージブローカー: RabbitMQやKafkaなど。

ステップバイステップの実装

ステップ1:Outboxテーブルのスキーマ

まず、内部キューとして機能するテーブルを作成します。柔軟なイベント構造に対応するため、ペイロードには JSONB を使用します。

CREATE TABLE outbox (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  aggregate_type TEXT NOT NULL,
  aggregate_id TEXT NOT NULL,
  type TEXT NOT NULL,
  payload JSONB NOT NULL,
  created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
  processed_at TIMESTAMP WITH TIME ZONE
);

-- パフォーマンス用インデックス:未処理のメッセージを素早く検索
CREATE INDEX idx_outbox_unprocessed ON outbox (created_at) WHERE processed_at IS NULL;

ステップ2:Node.jsでのアトミックな保存

ビジネスロジックとOutboxへの挿入を単一のトランザクションでラップします。注文の保存に失敗すれば、イベントがOutboxに入ることはありません。イベントの保存に失敗すれば、注文はロールバックされます。「全か無か」です。

const { Pool } = require('pg');
const pool = new Pool();

async function createOrder(order) {
  const client = await pool.connect();

  try {
    await client.query('BEGIN');

    // 1. 実際のビジネスデータを保存
    const orderQuery = 'INSERT INTO orders (id, total, status) VALUES ($1, $2, $3)';
    await client.query(orderQuery, [order.id, order.total, 'PENDING']);

    // 2. 同じトランザクション内でイベントをキューイング
    const outboxQuery = `
      INSERT INTO outbox (aggregate_type, aggregate_id, type, payload)
      VALUES ($1, $2, $3, $4)
    `;
    const eventPayload = { orderId: order.id, total: order.total };
    await client.query(outboxQuery, ['Order', order.id, 'OrderCreated', JSON.stringify(eventPayload)]);

    await client.query('COMMIT');
  } catch (err) {
    await client.query('ROLLBACK');
    throw err;
  } finally {
    client.release();
  }
}

ステップ3:リレーワーカー

次に、Postgresからブローカーへメッセージを移動させるワーカーが必要です。ここでは FOR UPDATE SKIP LOCKED を使用します。これにより、複数のワーカーインスタンスが同時に同じメッセージを取得するのを防ぐことができ、スケーリングにおいて非常に重要です。

async function relayMessages() {
  const client = await pool.connect();

  try {
    // 他のワーカーが参照できないよう10件のメッセージをロック
    const selectQuery = `
      SELECT id, type, payload 
      FROM outbox 
      WHERE processed_at IS NULL 
      ORDER BY created_at ASC 
      LIMIT 10 
      FOR UPDATE SKIP LOCKED
    `;
    
    const res = await client.query(selectQuery);

    for (const row of res.rows) {
      try {
        await broker.publish(row.type, row.payload);

        // 発行成功後に完了マークを付ける
        await client.query('UPDATE outbox SET processed_at = NOW() WHERE id = $1', [row.id]);
      } catch (publishError) {
        // ブローカーがダウンしている場合は、次のポーリングのためにレコードを残す
        console.error(`${row.id} の発行に失敗しました`);
      }
    }
  } finally {
    client.release();
  }
}

// 2秒ごとにポーリング
setInterval(relayMessages, 2000);

「少なくとも1回」の現実への対処

Outboxパターンは、メッセージが送信されることを保証します。しかし、ブローカーがメッセージを受け取った直後、かつワーカーがデータベースを更新する「前」にネットワーク障害が発生する可能性があります。この場合、ワーカーは再試行し、重複したメッセージを送信することになります。

そのため、下流のサービスはべき等(idempotent)である必要があります。処理を行う前に、一意の event_idorder_id を確認するようにしてください。既にそのIDを処理済みであれば、単にメッセージを承認(Acknowledge)し、何もせずに終了すべきです。

まとめ

二重書き込み(Dual Writes)からの脱却は、バックエンドエンジニアにとって大きな節目となります。これにより、膨大な種類のレースコンディションが排除され、トラフィックのピーク時におけるデータ損失を防ぐことができます。ポーリングは優れた出発点ですが、大規模なシステムでは最終的に Change Data Capture (CDC) ツール(Debeziumなど)への移行を検討することになるでしょう。CDCツールはPostgresのWrite-Ahead Log (WAL)を監視し、さらに低いレイテンシを実現します。しかし、ほとんどのアプリケーションにとって、ポーリング方式は夜に枕を高くして眠るのに十分な効果を発揮します。

Share: