午前2時14分の呼び出し
PagerDutyがただ鳴ったのではありません。悲鳴を上げたのです。開発者が50GBのデータセットをS3バケットにプッシュしましたが、私たちの脆弱な処理スクリプトがそれをキャッチできませんでした。後続のサービスはアイドル状態のまま、本番パイプラインは停止。私はその後の90分間、ターミナルにコマンドを手動で打ち込み続けました。本来ならマシンが数秒で処理すべき仕事です。その夜、私はcronジョブや運に頼ることはできないと悟りました。環境にリアルタイムで反応するシステムが必要だったのです。
Argo Eventsは、外部シグナルとKubernetesのアクションの間の架け橋となります。20種類以上のソースに基づいて、K8sオブジェクト、Argo Workflows、またはサーバーレス関数をトリガーする堅牢なイベント駆動型フレームワークです。これを18ヶ月前に導入して以来、私たちのチームではデータパイプラインの再起動に関する手動介入が90%減少しました。
クイックスタート:5分で稼働させる
複雑なロジックに入る前に、コントローラーを動かしてみましょう。Kubernetesクラスター(v1.24以上)とkubectlへのアクセス権が必要です。
1. Argo Eventsのインストール
まず、イベントインフラを隔離するための専用名前空間を作成します。これにより、イベントソースとセンサーのライフサイクルを管理します。
kubectl create namespace argo-events
kubectl apply -n argo-events -f https://raw.githubusercontent.com/argoproj/argo-events/stable/manifests/install.yaml
# Validating Admission Controllerは、YAMLのエラーを早期に発見するのに役立ちます
kubectl apply -n argo-events -f https://raw.githubusercontent.com/argoproj/argo-events/stable/manifests/install-validating-webhook.yaml
2. Event Busのデプロイ
Event Busは、このセットアップにおける神経系のようなものだと考えてください。ソースとアクションの間でメッセージを転送します。ここでは高可用性のためにNATS Jetstreamを使用します。
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
name: default
namespace: argo-events
spec:
nats:
native:
replicas: 3
これをkubectl apply -f eventbus.yamlで適用します。これで、クラスターは外部シグナルを受信する準備が整いました。
イベント自動化の3つの柱
手動運用から脱却するには、Argo Eventsがどのようにロジックを構成しているかを理解する必要があります。それは、EventSource、Sensor、Triggerという3つの主要コンポーネントに依存しています。
EventSource:リスナー
EventSourceは、外部の世界を監視する常駐ポッドです。GitHubのプルリクエスト、S3に現れたファイル、Kafkaトピック内のメッセージなど、EventSourceがそのシグナルをキャッチします。その後、そのシグナルをCloudEventとしてフォーマットし、Event Busに送ります。
Sensor:ロジックエンジン
Sensorは「脳」の役割を果たします。Event Busをリッスンし、アクションが必要かどうかを判断します。ここでは複雑な依存関係を定義できます。例えば、「S3へのアップロードが発生し、かつWebhookがメタデータの有効性を確認した場合のみ」プロセスをトリガーするといった設定が可能です。
Trigger:最終的なアクション
Triggerは、実際に作成したいリソースを定義します。多くのチームはArgo Workflowの起動に使用しますが、標準のKubernetes Jobを作成したり、汎用HTTPリクエストを介してSlack通知を送信したりすることも可能です。
実践シナリオ:S3データパイプラインの自動化
CSVファイルが特定のバケットに届くたびにデータを検証する必要があると想像してください。これを大規模に手動でチェックするのは不可能です。代わりに、数行の設定でフロー全体を自動化できます。
例:S3 EventSource
この設定は、特定のバケットでs3:ObjectCreated:Putイベントを監視します。ファイルアップロードから数ミリ秒以内に反応します。
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: aws-s3-event-source
spec:
s3:
example-bucket:
bucket: my-data-science-input
endpoint: s3.amazonaws.com
events:
- s3:ObjectCreated:Put
region: us-east-1
accessKey:
name: aws-secret
key: accessKey
secretKey:
name: aws-secret
key: secretKey
Sensorのロジック
次に、そのS3イベントをWorkflowに接続します。dataFiltersを使用して、.csvで終わるファイルのみを処理し、ログや一時ファイルは無視するようにします。
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: s3-sensor
spec:
template:
serviceAccountName: argo-events-sa
dependencies:
- name: s3-dep
eventSourceName: aws-s3-event-source
eventName: example-bucket
filters:
data:
- path: "notification.s3.object.key"
type: string
comparator: ".*\\.csv$"
triggers:
- template:
name: s3-workflow-trigger
k8s:
operation: create
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: s3-processing-job-
spec:
entrypoint: process
templates:
- name: process
container:
image: my-docker-repo/processor:latest
parameters:
- src:
dependencyName: s3-dep
dataKey: notification.s3.object.key
dest: spec.arguments.parameters.0.value
実戦で磨かれたプロダクション運用のヒント
ローカルの実験環境からプロダクション環境へ移行するには、マインドセットの切り替えが必要です。イベントが秒間100件も飛んでくるようになると、小さな設定ミスが重大な障害につながります。
1. RBACは「静かなる暗殺者」
SensorがWorkflowを起動しない場合、原因のほとんどは権限不足です。Sensorポッドには、リソースをcreateおよびpatchする権限を持つServiceAccountが必要です。私は、workflows.argoproj.ioに対するRBACルールが欠落していただけの「サイレント」な失敗のトラブルシューティングに、数えきれないほどの時間を費やしてきました。
2. パイプラインのデバッグ
常に最初にEventSourceのログを確認してください。そこにJSONペイロードが届いていなければ、クラスターはAWSやGitHubからのシグナルをそもそも受信できていません。Sensorのロジックを微調整し始める前に、kubectl logs -f [eventsource-pod-name]を使用してハンドシェイクを確認しましょう。
3. べき等性の強制
分散システムにおいて「正確に一度(exactly-once)」の配信は神話です。イベントはいずれ必ず二度配信されます。ワークフローは複数回実行しても安全である必要があります。私はコンテナ内にプリフライトチェックを含め、特定のevent-idやファイル名がすでに追跡データベースに記録されていないかを確認するようにしています。
4. イベントストームの防止
1,000件のファイルアップロードが突発的に発生すると、1,000個のワークフローが同時に生成され、クラスターが簡単にクラッシュしてしまいます。Sensorのfiltersセクションを使用してスコープを絞りましょう。さらに、名前空間にリソースクォータを設定し、イベントの急増によってコアアプリケーションサービスのCPUやメモリが枯渇しないようにしてください。
最後に
Argo Eventsの採用は、私のチームのインフラ管理方法を根本から変えました。私たちは受動的な「消火活動」をやめ、自己修復システムの構築に注力できるようになりました。S3、GitHub、Webhookを直接Kubernetesに結びつけることで、人間のボトルネックを排除できます。最初のYAML設定には労力が必要ですが、その見返りは、あなたが眠っている間も働き続けるシステムです。

