Pythonのバックグラウンドタスク:CeleryとRedisによる本番環境向けガイド

Programming tutorial - IT technology blog
Programming tutorial - IT technology blog

午前2時のメルトダウン:なぜアプリが停止するのか

火曜日の午前2時、私のポケベルが鳴りました。3分以内に150件ものPagerDutyのアラートが受信トレイに溢れました。Webサーバーは504 Gateway Timeoutで息絶え、私たちのプラットフォームは実質的に動作不能な状態に陥っていました。

原因は、ユーザーが年初来のデータを50MBのPDFとしてエクスポートできる「単純な」機能でした。ある大口の企業クライアントが大規模なエクスポートを実行したことで、Webワーカーが120秒間ブロックされました。ワーカープールは10個しかなかったため、数件の同時リクエストが発生しただけでシステム全体のリソースが枯渇し、サイトがダウンしてしまったのです。

成長は、最終的にあらゆるアーキテクチャのボトルネックを露呈させます。リクエスト・レスポンス・サイクル内ですべてを処理しようとすれば、アプリはいずれ限界を迎えます。トランザクショナルメールの送信、画像のリサイズ、サードパーティAPIとの同期といったタスクで、ユーザーをローディング・スピナーの前に釘付けにすべきではありません。バックグラウンドタスクをマスターすることは、脆弱なプロトタイプと、回復力の高い本番システムとの違いです。

タスクを処理する3つの方法(そして、なぜ多くが失敗するのか)

処理をバックグラウンドに移動する必要がある場合、通常3つの選択肢があります。間違った選択は、データの損失やメンテナンス不可能なコードにつながる可能性があります。

1. 同期アプローチ(ブロッキング)

ほとんどの開発者はここから始めます。関数を呼び出し、ユーザーは待機し、レスポンスが返されます。15ミリ秒のデータベース検索であれば問題ありませんが、3秒かかるAWS S3へのアップロードでは悲劇となります。20個のWebワーカーがある状態で、21人のユーザーが同時にアップロードを実行すると、21人目のユーザーはワーカーが空くまで動けなくなります。

2. スレッディングとマルチプロセッシング

Pythonのthreadingmultiprocessingモジュールを使えば、別スレッドでタスクを起動できます。これにより初期レスポンスは速くなりますが、本番環境では一種の賭けになります。デプロイのためにWebサーバーを再起動すると、実行中のすべてのタスクが瞬時に消失します。また、再試行メカニズムがなく、WebサーバーのRAMを使い果たしてインスタンス全体をクラッシュさせるリスクもあります。

3. 分散タスクキュー(Celeryモデル)

これが業界標準です。タスクをメッセージとしてパッケージ化し、「Broker」(Redisなど)に投入します。独立した「Worker」プロセスがそのブローカーを監視し、それぞれのペースでジョブを実行します。ワーカーがクラッシュしても別のワーカーが引き継ぎ、APIコールが失敗した場合は5分間の冷却期間後に自動で再試行できます。この関心の分離により、バックグラウンドでどれほど重い処理が行われていても、ユーザーインターフェースを軽快に保つことができます。

Celery + Redisスタック:トレードオフ

無料のアーキテクチャなど存在しません。Celeryは強力ですが、アクティブな監視が必要な動的コンポーネントをインフラに追加することになります。

メリット

  • 独立したスケーリング: Webサーバーは軽量で安価なインスタンスに抑えつつ、CeleryワーカーをCPU性能の高いマシンで実行できます。
  • 信頼性: Redisはメッセージ配信においてミリ秒未満のレイテンシを提供します。タスクの途中でワーカーが停止しても、Celeryはジョブを自動的に再キューイングするように設定可能です。
  • リアルタイム監視: Flowerのようなダッシュボードを使用して、成功率を追跡し、30秒の閾値を超えて実行されているタスクを特定できます。
  • 精密なスケジューリング: Celery Beatは分散型cronジョブのように機能します。午前3時のデータベース整理や週次の請求レポートに最適です。

課題

  • 運用オーバーヘッド: Redisインスタンスと複数のワーカープロセスの管理責任が生じます。
  • シリアル化の制限: 複雑なDjangoやSQLAlchemyオブジェクトをタスクに直接渡すことはできません。代わりにプライマリキー(ID)を渡し、ワーカー内でデータを再取得して整合性を確保する必要があります。
  • デバッグの遅延: コードが別のプロセスで実行されるため、Webアプリ内にブレークポイントを置いてもワーカーの実行をキャッチすることはできません。

本番グレードの実装

ほとんどのPythonプロジェクトには、Celery + Redisの組み合わせをお勧めします。RabbitMQの方が複雑なルーティングに適していますが、Redisの方がメンテナンスが簡単で、小規模なキューではメモリ消費も少なく、キャッシュ用として既にスタックに含まれている可能性が高いからです。

このアーキテクチャでは、RedisがBroker(キュー)およびResult Backend(タスクの最終ステータスが保存される場所)として機能し、Celeryが実行ロジックを処理します。

ステップ1:インストール

環境をクリーンに保つため、RedisにはDockerを使用します。これにより、チームの全員がまったく同じバージョンを実行できます。

# Redisコンテナを起動
docker run -d -p 6379:6379 redis:7-alpine

# コアライブラリをインストール
pip install celery redis

ステップ2:ワーカーの設定

tasks.pyを作成します。このファイルでCeleryにブローカーの場所を教え、バックグラウンドジョブのロジックを定義します。

import time
from celery import Celery

app = Celery('prod_app', 
             broker='redis://localhost:6379/0', 
             backend='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3)
def process_video_upload(self, video_id):
    try:
        print(f"[Worker] ビデオ {video_id} を処理中...")
        # CPU負荷の高いトランスコーディングをシミュレート
        time.sleep(10) 
        return f"ビデオ {video_id} の処理が正常に完了しました"
    except Exception as exc:
        # 指数関数的バックオフで再試行:60秒、120秒、240秒
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

ステップ3:非同期タスクの実行

Webフレームワーク(FlaskやFastAPIなど)で、.delay()を使用して処理をオフロードします。以下はapp.pyでのシミュレーションです。

from tasks import process_video_upload

def handle_upload_request(video_id):
    print("[App] ビデオのメタデータがデータベースに保存されました。")
    
    # 重い処理をオフロード!この呼び出しには約2ミリ秒かかります。
    process_video_upload.delay(video_id) 
    
    print("[App] ユーザーに 202 Accepted を返します。")

if __name__ == "__main__":
    handle_upload_request("vid_99")

ステップ4:インフラの稼働

ワーカーがリッスンを開始するまで、アプリケーションはタスクを処理しません。新しいターミナルを開いてプロセスを開始します。

# infoレベルのログ出力でワーカーを起動
celery -A tasks worker --loglevel=info --concurrency=4

python app.pyを実行するとレスポンスが即座に返され、ワーカーのターミナルが10秒間の処理タスクを切り離して処理します。これが、高負荷時でも応答性の高いUIを維持する方法です。

本番環境に不可欠なガードレール

Celeryの設定は簡単ですが、大規模に運用するには規律が必要です。高トラフィックのクラスター管理から学んだ4つの教訓を紹介します。

  • 可視性タイムアウト(Visibility Timeout): タスクがRedisのvisibility_timeout(デフォルトは1時間)よりも長く実行されると、Redisはワーカーがクラッシュしたと判断し、別のワーカーにタスクを配信します。2時間のデータ移行を行う場合は、無限ループを避けるためにこの設定を増やしてください。
  • ignore_resultによる最適化: すべてのタスクの結果をRedisに書き戻すと、不要なI/Oが発生します。戻り値が必要ない場合は、@app.task(ignore_result=True)を使用してRedisのメモリ使用量を削減しましょう。
  • スマートな再試行: すぐに再試行してはいけません。サードパーティAPIがダウンしている場合、500ミリ秒で復旧することはありません。失敗している依存先への負荷を減らすため、指数関数的バックオフを使用してください。
  • べき等性の設計: すべてのタスクは2回実行される可能性があると想定してください。トランザクションIDにUNIQUE制約をかけるなどデータベースの制約を利用し、「顧客への請求」タスクが2回実行されても二重請求にならないようにします。

メインプロセスから重いロジックを切り離すことは、単なるパフォーマンスの改善ではなく、システムの安定性のための基本的な要件です。CeleryとRedisを統合することで、午前2時のトラフィック急増時でもアプリの高速性と信頼性を維持できるようになります。

Share: