ユーザーを待たせるのはもうやめよう:RabbitMQタスクキューによるPythonアプリのスケーリング

Programming tutorial - IT technology blog
Programming tutorial - IT technology blog

同期処理のボトルネック:504 Gateway Nightmare(悪夢)

2021年当時、私はトラフィックの多いECサイトを管理するチームの一員でした。私たちは一見シンプルに見える機能をリリースしました。それは、ユーザーが過去3年間の全購入履歴のPDF請求書をエクスポートできるというものです。テスト中、5人や10人のユーザーであれば完璧に動作していました。私たちは万全の体制だと思っていました。

そしてブラックフライデーがやってきました。突然、同時接続ユーザー数が4,500人を超えました。通常なら100ミリ秒でリクエストを処理していたウェブワーカーが、30秒かかるPDF生成タスクに突然乗っ取られてしまったのです。APIノードのCPU使用率は数分で98%に急上昇しました。サイトは這うように遅くなり、ロードバランサーは紙吹雪のように504 Gateway Timeoutを返し始めました。トラフィックが足りなくて失敗したのではなく、ウェブサーバーが一度に多くのことをやろうとしすぎたために失敗したのです。

「ちょっと待って」問題:なぜウェブサーバーは行き詰まるのか

結論から言うと、DjangoやFastAPIのようなほとんどのウェブフレームワークは、重い処理ではなく速度を重視して設計されています。これらは厳格なリクエスト/レスポンス ライフサイクルに基づいて動作します。画像処理やメールの一括送信のような長時間実行されるタスクをそのサイクルに直接投入すると、ワーカープロセスが完全にブロックされてしまいます。

Pythonでは、CPUバウンドなタスクは特にリソースを消費します。ワーカーがPDF生成のための数値計算に追われていると、新しい着信接続のリスニングを停止してしまいます。これによりキューに大量のバックアップが発生します。レイテンシが増大し、最終的にはシステム全体がその重みに耐えきれず崩壊します。私たちに必要だったのは、ユーザーに「リクエストを受け付けました。1分後にメールを確認してください」と伝え、次の顧客のためにウェブワーカーを即座に解放する方法でした。

ツールの選択:スレッド、Redis、それともRabbitMQ?

メインスレッドからこれらのタスクを切り離すために、主に3つの方法を検討しました。

  • マルチスレッド: これは手っ取り早い解決策ですが、大規模な管理には不向きです。サーバーが再起動すると、保留中のタスクがすべて失われます。また、エラーが発生した際の進捗追跡や再試行を処理する簡単な方法もありません。
  • Redis (RQを使用): Redisは非常に高速で、数百万個のシンプルなジョブに最適です。時折ウェルカムメールを送信する程度であれば、Redisとrqライブラリで十分です。しかし、インメモリ・データストアであるため、ミッションクリティカルな財務データに必要な高度な配信保証が欠けています。
  • RabbitMQ: これはAMQPプロトコルを使用する専用のメッセージブローカーです。信頼性を重視して設計されています。メッセージをディスクに保存し、ワーカーの確認応答(Acknowledgment)をネイティブに処理し、Redisでは追加プラグインなしでは対応できない複雑なルーティングパターンをサポートしています。

解決策:PythonによるRabbitMQの実装

耐久性と水平スケーリングのテストの結果、RabbitMQが明確な勝者となりました。それ以来、私はこの構成を3つの本番環境にデプロイしてきましたが、依然として非常に安定しています。これにより、負荷の高い処理をユーザー向けのAPIから完全に分離できます。

1. DockerでRabbitMQを起動する

Erlangの依存関係を手動でインストールして時間を無駄にしないでください。Dockerなら数秒で起動できます。

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

これにより、管理プラグイン付きでRabbitMQが起動します。localhost:15672(デフォルト:guest/guest)でキューをリアルタイムに確認できます。

2. プロデューサー:タスクのディスパッチ

pikaライブラリを使用すると、タスクの完了を待たずにウェブハンドラーからタスクを送信できます。使用するロジックは以下の通りです。

import pika
import json

def send_pdf_request(user_id, report_data):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 'durable=True' はRabbitMQの再起動後もタスクを保持することを保証します
    channel.queue_declare(queue='pdf_tasks', durable=True)

    message = {'user_id': user_id, 'data': report_data}

    channel.basic_publish(
        exchange='',
        routing_key='pdf_tasks',
        body=json.dumps(message),
        properties=pika.BasicProperties(
            delivery_mode=2,  # 永続メッセージ
        )
    )
    print(f" [x] ユーザー {user_id} のリクエストをディスパッチしました")
    connection.close()

3. コンシューマー:バックグラウンドでの処理

コンシューマーは別のサービスとして実行されます。ウェブサーバーの速度を維持したまま、メッセージをリッスンし、負荷の高い処理を実行します。

import pika
import time
import json

def callback(ch, method, properties, body):
    data = json.loads(body)
    print(f" [x] ID {data['user_id']} のPDFを生成中...")
    
    # 10秒の処理ジョブをシミュレート
    time.sleep(10) 
    
    print(" [x] タスク完了!")
    # 手動ACKにより、ワーカーがクラッシュしてもタスクが失われないようにします
    ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='pdf_tasks', durable=True)

# 各ワーカーに一度に1つのタスクのみを割り当てる
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='pdf_tasks', on_message_callback=callback)

print(' [*] タスクを待機しています。終了するには CTRL+C を押してください。')
channel.start_consuming()

本番環境から得た苦労の教訓

タスクをキューに移動するには、考え方の転換が必要です。1日10万件以上のタスクを管理して学んだ3つの教訓を紹介します。

1. 耐久性(Durability)はセーフティネット

RabbitMQが再起動した際、durable=Truedelivery_mode=2を設定していないと、キューは消滅します。保留中のすべての請求書リクエストはどうなるでしょうか? 消えてしまいます。本番環境でこれらの設定を怠ることは、データ損失を招く原因となります。常に永続性の設定を確認してください。

2. Auto-ACK(自動確認応答)を信用しない

デフォルトでは、RabbitMQはタスクがワーカーに送信された直後に完了したと見なすことがあります。もしそのワーカーが処理の途中でクラッシュしたら、メッセージは永遠に失われます。関数の最後で手動の確認応答(ch.basic_ack)を使用してください。これにより、ワーカーが予期せず停止した場合でも、RabbitMQがタスクを再キューイングするように強制できます。

3. キューの深さを監視する

タスクをバックグラウンドに移動すると、HTTPエラーによる即時のフィードバックが得られなくなります。監視が必要です。RabbitMQ管理UIを使用して「Ready」カウントを追跡してください。その数値が1,000を超えて減らないようなら、負荷を処理するためにコンシューマーのインスタンスをさらに5つ増設する時期です。

最後に

RabbitMQへの切り替えにより、私たちの脆弱なモノリスは回復力のあるシステムに生まれ変わりました。トラフィックの急増を恐れることはなくなり、プレッシャーに合わせてコンシューマーをスケールさせるだけで済むようになりました。もしユーザーが1秒以上ローディング・アニメーションを見つめているなら、もう待たせるのはやめて、キューを導入すべき時かもしれません。

Share: