同期通信の罠
マイクロサービスを構築し始めた当初、私はあらゆる課題に対してREST APIを解決策としていました。注文サービスが在庫サービスを必要とする場合、POSTリクエストを送信して応答を待つという仕組みです。数百ユーザー程度なら問題ありませんでしたが、同時リクエストが5,000件に達した時、脆い積み木が崩れるようにシステムがダウンしました。下流サービスのたった1つの遅いデータベースクエリが15秒のタイムアウトを引き起こし、チェックアウトフロー全体を麻痺させたのです。すべてが密結合すぎました。
私はすぐに、ウェルカムメールの送信やPDF請求書の作成といった操作には即時のレスポンスが不要であることに気づきました。顧客が必要なのは、注文が受理されたという確認だけです。これらのタスクをイベント駆動型モデルに移行することでサービスを疎結合にし、APIのレスポンスタイムを80%以上削減することに成功しました。
標準的なPub/Subが信頼性テストに合格しない理由
多くの開発者は、その速さからRedisのPub/Subを使い始めます。確かに速いですが、それは「送りっぱなし(fire and forget)」でもあります。メッセージの送信中にコンシューマーサービスが10秒間のデプロイのために再起動した場合、そのデータは永遠に失われます。履歴もセーフティネットも存在しません。
Apache Kafkaは強力な代替手段ですが、Kafkaクラスターの管理は専任の担当者が必要なほど大変です。90%のプロジェクトにとって、それはオーバーキル(過剰)です。Redis Streamsはこのギャップを完璧に埋めてくれます。永続的なアペンド専用ログと「コンシューマーグループ」の力を提供します。これにより、複数のワーカーで負荷を分散でき、1つのワーカーがクラッシュしても、別のワーカーが中断した場所から正確に再開できます。これらすべてが、すでに運用しているRedisインスタンスで実現可能なのです。
環境構築
Redis 5.0以上が必要です。ローカルマシンを異なるバージョンで汚さないよう、Dockerを使用することをお勧めします。
# Redis 7を起動
docker run --name redis-streams -p 6379:6379 -d redis:7-alpine
次に、Pythonクライアントをインストールします。依存関係を隔離するために仮想環境の使用を推奨します。
pip install redis
パイプラインの構築
信頼性の高いシステムの設計は、単にデータを移動させるだけではありません。オフセットの管理や失敗への優雅な対応が重要です。実装を見ていきましょう。
1. プロデューサー:イベントの送出
プロデューサーを入力係と考えてください。データを上書きするのではなく、XADDを使用してストリームの末尾に新しいレコードを追加します。
import redis
import time
# decode_responses=True によりバイト列を自動的に文字列に変換します
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def create_order(order_id, amount):
event = {
"order_id": order_id,
"amount": amount,
"status": "created"
}
# '*' はタイムスタンプに基づいて一意のIDを生成します (例: 1678912345-0)
msg_id = r.xadd("order_stream", event)
print(f"[注文サービス] {order_id} を作成しました。ストリームID: {msg_id}")
if __name__ == "__main__":
for i in range(1, 6):
create_order(f"ORD-100{i}", 25.50 * i)
time.sleep(0.5)
2. コンシューマーグループの初期化
ワーカーが開始する前に、グループを定義する必要があります。これによりRedisはどのメッセージが確認(ACK)されたかを追跡できます。$ 記号を使用すると、古い履歴を無視して新しいメッセージから開始するようにグループに指示できます。
# ターミナルでこれを実行してグループを初期化します
redis-cli XGROUP CREATE order_stream inventory_group $ MKSTREAM
3. コンシューマー:堅牢な処理
コンシューマーは XREADGROUP を使用します。真の魔法は XACK で起こります。確認前にコードでエラーが発生した場合、メッセージは「保留エントリリスト(PEL)」に残り、消失することなくリトライを待機します。
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
GROUP = "inventory_group"
WORKER_ID = "worker_a"
def process_inventory():
print(f"[{WORKER_ID}] 注文を待機中...")
while True:
# '>' は、他のワーカーに一度も配信されていないメッセージを取得することを意味します
response = r.xreadgroup(GROUP, WORKER_ID, {"order_stream": ">"}, count=1, block=2000)
if not response:
continue
for stream, messages in response:
for msg_id, data in messages:
try:
print(f"[{WORKER_ID}] {data['order_id']} の在庫を差し引いています...")
# ここでデータベースの更新が行われると想定してください
# 完了したことをRedisに通知
r.xack("order_stream", GROUP, msg_id)
except Exception as e:
print(f"処理エラー: {e}")
if __name__ == "__main__":
process_inventory()
運用のヘルスチェック
ログだけを信頼してはいけません。本番環境では、コンシューマーが遅れていないかを知る必要があります。Redisは内部の状態を確認するための XINFO を提供しています。
ラグの監視
XINFO GROUPS order_stream を実行し、「lag」フィールドを確認してください。その数値が着実に上昇している場合は、トラフィックを処理するためにより多くのワーカーインスタンスを起動する必要があるという明確な信号です。
放棄されたメッセージの救出
処理の途中でワーカーが停止したらどうなるでしょうか?メッセージは保留エントリリスト(PEL)に留まります。これらは XPENDING を使って見つけることができます。一般的なパターンは、XCLAIM を使用してこれらの古い保留メッセージを奪い取り、正常なワーカーに引き渡す「デッドレター」スクリプトを用意することです。
脆弱なRESTの連鎖からログベースのストリームに切り替えることで、アーキテクチャは一変します。サービスは、注文を1つも落とすことなく、独立して失敗、再起動、スケーリングが可能になります。あなたはもう単なるスクリプトを書いているのではありません。現実世界の過酷な環境に耐えるように設計された、レジリエント(回復力のある)なシステムを構築しているのです。

