NATSによる高性能マイクロサービス:RESTのボトルネックを超えて

Programming tutorial - IT technology blog
Programming tutorial - IT technology blog

マイクロサービス通信のボトルネック

モノリスを分解することは、いわゆる「分散スパゲッティ」状態に陥るまでは勝利のように感じられます。多くのチームは、馴染みがあるという理由でRESTやgRPCから始めます。これらは外部APIには適していますが、内部のすべてのやり取りをこれらに依存すると、密結合な脆弱なネットワークが生み出されます。サービスAがHTTP経由でサービスBを呼び出し、サービスBが停滞すると、サービスAもハングアップします。これが、わずかな遅延がシステム全体のブラックアウトに繋がる仕組みです。

以前、ある決済プラットフォームの監査を行った際、注文サービスが同期的なRESTコールを介して在庫、通知、配送の処理を待機していました。通知サービスの200ミリ秒のラグが連鎖反応し、ユーザーには5秒のタイムアウトとして現れました。セール期間中にトラフィックが15%増加した際、この連鎖反応によってサイト全体がオフラインになりました。その瞬間、私たちは同期呼び出しが資産ではなく、負債であることに気づいたのです。

同期結合の隠れたコスト

標準的なHTTP通信では、サービス同士がお互いのことを知りすぎる必要があります。小さなJSONペイロードを移動させるためだけに、ロードバランサーやサービスディスカバリ用のサイドカー、そして強力なリトライロジックが必要になります。このインフラにより、1ホップあたり約10〜50ミリ秒のレイテンシが加わります。 RabbitMQという解決策もありますが、管理が非常に大変です。Kafkaは強力ですが、多くの場合オーバーキルです。サービス間のシグナリングのためだけにフルクラスターを運用するのは、手紙を1通届けるために大型トラックを使うようなものです。

私たちには、より高速な何かが必要です。軽量で、毎秒数百万のメッセージを処理でき、複数の通信パターンを標準でサポートしているシステムが必要です。NATSはこのギャップに完璧にフィットします。

NATS:10マイクロ秒の神経系

NATSは、単一の20MBバイナリとして配布されるクラウドネイティブなメッセージングシステムです。アーキテクチャの「中枢神経系」として機能します。デフォルトでディスクへの永続化を重視するKafkaとは異なり、NATSはメモリ優先です。これにより、10マイクロ秒という極めて低いレイテンシを実現しています。NATSは、ほぼすべてのバックエンドシナリオをカバーする3つの主要なパターンを処理します:

  • Pub/Sub: イベント駆動型フローのための、ファンアウト(拡散)型非同期メッセージング。
  • Request-Reply: 超高速な非同期基盤の上に構築された、同期スタイルのロジック。
  • JetStream: 1バイトのデータも失うことが許されない場合のための、組み込みの永続化レイヤー。

ハンズオン:NATS駆動システムの構築

準備に必要なのはDockerとPythonだけです。今回はnats-pyライブラリを使用しますが、ロジックはGo、Node.js、Javaでも同様です。

1. NATSサーバーの起動

Dockerコマンド1つで、JetStreamを有効にしたサーバーを起動します。これにより、コアメッセージングと永続化レイヤーの両方が即座に利用可能になります。

docker run -d --name nats-main -p 4222:4222 -p 8222:8222 nats:latest -js

2. パターン1:Pub/Subによるデカップリング

Pub/Subを使用すると、サービスは誰が受信しているかを気にすることなくイベントをブロードキャストできます。ウェルカムメールの送信や検索インデックスの更新などの副作用を処理するのに最適な方法です。

サブスクライバー(リスナー):

import asyncio
from nats.aio.client import Client as NATS

async def run():
    nc = NATS()
    await nc.connect("nats://localhost:4222")

    async def message_handler(msg):
        print(f"'{msg.subject}' でイベントを受信しました: {msg.data.decode()}")

    # ユーザー作成イベントをリッスン
    await nc.subscribe("user.created", cb=message_handler)
    print("イベントを待機中...")

    while True:
        await asyncio.sleep(1)

if __name__ == '__main__':
    asyncio.run(run())

パブリッシャー:

import asyncio
from nats.aio.client import Client as NATS

async def run():
    nc = NATS()
    await nc.connect("nats://localhost:4222")

    # 送信して終了(Fire and forget)
    await nc.publish("user.created", b'{"id": 101, "user": "tech_editor"}')
    print("イベントをブロードキャストしました")
    await nc.close()

if __name__ == '__main__':
    asyncio.run(run())

3. パターン2:高速Request-Reply

NATSは、単一の持続的なTCP接続を再利用することで、HTTPよりも高速なRequest-Replyを実現します。レスポンス用の「reply-to」サブジェクトを動的に作成するため、複雑なロードバランサーの設定は不要です。

レスポンダー:

async def run():
    nc = NATS()
    await nc.connect("nats://localhost:4222")

    async def handle_request(msg):
        print(f"クエリを受信しました: {msg.data.decode()}")
        await nc.publish(msg.reply, b"在庫ステータス: OK")

    await nc.subscribe("inventory.check", cb=handle_request)

4. パターン3:JetStreamによる配信保証

コアなNATSは「送信して終了(fire and forget)」です。ブロードキャスト中にサービスが停止していると、メッセージを逃してしまいます。JetStreamは永続化レイヤーを追加することでこれを解決します。私はこれをフィンテックプロジェクトで秒間50,000トランザクションの処理に使用しました。コンシューマーがクラッシュしても、中断した場所から正確に再開できました。

信頼性の高い処理の例:

async def run():
    nc = NATS()
    await nc.connect("nats://localhost:4222")
    js = nc.jetstream()

    # 24時間メッセージを保持するストリームを定義
    await js.add_stream(name="SALES", subjects=["sales.*"])

    # 確認応答(ack)付きでパブリッシュ
    ack = await js.publish("sales.new", b'Invoice #999')
    print(f"JetStreamに保存されました。シーケンス: {ack.seq}")

    # 高負荷ワークロード向けのプル型コンシューマー
    sub = await js.pull_subscribe("sales.new", "invoice-processor")
    msgs = await sub.fetch(1)
    for msg in msgs:
        print(f"処理中: {msg.data.decode()}")
        await msg.ack() # 完了したことをNATSに通知

アーキテクチャに関する洞察

NATSへの切り替えは、メンタルモデルを変化させます。「どのエンドポイントを叩くべきか?」と考えるのをやめ、「今、どんなイベントが発生したか?」と考えるようになります。

スマートなサブジェクト設計

NATSは、`orders.us.east.created`のようなドット区切りの階層を使用します。ワイルドカード(1レベルには`*`、それ以下すべてには`>`)を使用して、データを効率的にルーティングできます。監視ツールは、パブリッシャーのコードを1行も変更することなく、`orders.>.created`をサブスクライブすることで、世界中の全リージョンの新規注文を追跡できます。

キューグループによるスケーリング

ワーカーのインスタンスを5つ実行している場合、すべてのインスタンスが同じメールを処理してほしくないでしょう。NATSのキューグループ(Queue Groups)はこれを自動的に処理します。キュー名を指定してサブスクライブすると、NATSは利用可能なすべてのメンバー間でメッセージをロードバランスします。

# NATSは各メッセージに対して'billing-service'グループ内のワーカーを1つ選択します
await nc.subscribe("payments.process", queue="billing-service", cb=handler)

最後に

マイクロサービスは高速でデカップルされているべきであり、同期的なオーバーヘッドに悩まされるべきではありません。通信をNATSに移行することで、サービスディスカバリの複雑さや、直接的なHTTPリンクの脆弱性を取り除くことができます。NATSは、小さなエッジデバイスからグローバルクラスターまで、同じシンプルなAPIでスケールします。

ログがタイムアウトエラーで溢れているなら、内部のRESTコールの1つをNATSのRequest-Replyパターンに置き換えてみてください。レイテンシの即座の低下と、システム安定性の劇的な向上を実感できるはずです。

Share: