Kubernetes Job의基本を超えて
KubernetesのJobやCronJobは、単純な単発タスクには非常に優れています。しかし、データ処理のニーズが拡大するにつれ、私たちは壁に突き当たりました。ステップAを実行し、次にBとCを並列でトリガーし、最終的にステップDで結果を集計するという依存関係の連鎖は、カスタムのBashスクリプトや脆弱な外部トリガーによる管理の手間を増大させ、メンテナンスの頭痛の種となりました。
Argo Workflowsはこのギャップを埋めてくれます。これにより、Kubernetesネイティブな単一のリソース内で複雑なロジックを定義できるようになります。すべてのステップをコンテナとして扱うことで、YAMLやDockerに慣れているチームに柔軟なフレームワークを提供します。Argoをマスターすることは、手動デプロイから、高度で自己修復機能を持つインフラ管理へと移行するための基礎となります。
なぜArgoに移行するのか?
- ネイティブな統合: KubernetesのCRD(カスタムリソース定義)であるため、既存のRBAC、Secrets、ConfigMapsを完璧に活用できます。
- DAGオーケストレーション: タスクを有効非巡回グラフ(DAG)として定義できます。これにより複雑な並列実行が可能になり、ETLの実行時間を50%以上短縮できる場合があります。
- スマートなデータ受け渡し: S3、GCS、Minioなどを使用して、Pod間での大容量ファイル(例:5GBのCSV)の受け渡しを自動化します。
- リアルタイムの可視化: 40ステップに及ぶパイプラインのデバッグも、UI上で失敗したノードを視覚的に追跡できれば、大幅にスピードアップします。
数分で完了するインストール
Argoは独自のネームスペースに隔離することをお勧めします。Helmチャートも利用可能ですが、テスト用にコントローラーとサーバーUIを素早く起動するには、クイックスタート用のマニフェストが最も簡単です。
# 専用のネームスペースを作成
kubectl create namespace argo
# コントローラーとサーバーをデプロイ
kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.5.0/install.yaml
Podが初期化されたら、ダッシュボードをポートフォワードしてコントローラーの状態を確認します。環境が最初のジョブ投入に対して準備できているか確認するため、私はすぐにこれを行います。
kubectl -n argo port-forward deployment/argo-server 2746:2746
https://localhost:2746 でダッシュボードにアクセスできます。本番環境では、最終的にOIDC認証を備えたIngressが必要になりますが、ローカル開発ではポートフォワードで完璧に動作します。
初めてのDAGを構築する
Argoの真骨頂は、有効非巡回グラフ(DAG)にあります。単なる線形なリストではなく、どのタスクが他のタスクに依存しているかを明示的に定義します。一般的なデータパイプラインでは、1つの「抽出(extract)」ステップがあり、それが3つの並列な「変換(transform)」タスクにデータを供給するような構成が考えられます。この構造により、必要なときだけ計算リソースを使用するように制御できます。
以下は、再利用可能な Workflow のテンプレートです。 dependencies キーがどのようにオーケストレーションロジックを処理しているかに注目してください。
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: data-pipeline-dag-
spec:
entrypoint: main-pipeline
templates:
- name: main-pipeline
dag:
tasks:
- name: extract-data
template: job-container
arguments:
parameters: [{name: command, value: "echo 10,000件のレコードを取得中..."}]
- name: transform-a
dependencies: [extract-data]
template: job-container
arguments:
parameters: [{name: command, value: "echo バッチAを処理中..."}]
- name: transform-b
dependencies: [extract-data]
template: job-container
arguments:
parameters: [{name: command, value: "echo バッチBを処理中..."}]
- name: load-data
dependencies: [transform-a, transform-b]
template: job-container
arguments:
parameters: [{name: command, value: "echo BigQueryに書き込み中..."}]
- name: job-container
inputs:
parameters:
- name: command
container:
image: alpine:latest
command: [sh, -c]
args: ["{{inputs.parameters.command}}"]
resources:
limits:
memory: "512Mi"
cpu: "500m"
この例では、load-data タスクは両方の変換(transform)が完了するまで待機します。もし transform-a が失敗した場合、パイプラインは停止し、破損したデータがウェアハウスに到達するのを防ぎます。
共有ストレージ問題の解決
ワークフロー内の各Podは、それぞれ異なるノードで実行される可能性があるため、ローカルディスクを共有することはできません。これは初心者にとってよくあるボトルネックです。Argoはこれを **アーティファクト(Artifacts)** を通じて解決します。S3バケットを設定することで、ステップAが出力した results.json をArgoが自動的にアップロードし、ステップBはそのコンテナが起動する前に、そのファイルを入力としてダウンロードします。
本番環境でのベストプラクティス
ワークフローを大規模に実行するには、単に正しいYAMLを書くだけでは不十分です。リソースの枯渇やネットワークの不安定さからクラスターを保護する必要があります。
1. リトライによるレジリエンス
ネットワークの一時的な瞬断やAPIのタイムアウト(429エラーなど)で、2時間かかるジョブを台無しにすべきではありません。指数バックオフを備えた retryStrategy を使用して、パイプラインを堅牢にしましょう。
retryStrategy:
limit: "5"
retryPolicy: "OnFailure"
backoff:
duration: "2m"
factor: "2"
2. リソースのガードレール
DAGが200個のPodを並列にトリガーすると、クラスター内の他のサービスを圧迫する可能性があります。すべてのテンプレートに必ず requests と limits を設定してください。さらに、ワークフローのspecで parallelism: 10 フィールドを使用して、同時実行Pod数を制限しましょう。
3. クリーンアップの自動化
正常終了したワークフローは、完了したPodを後に残し、APIサーバーを圧迫します。 ttlStrategy を使用して、24時間後に成功したワークフローを自動的に削除し、手動介入なしでネームスペースをクリーンに保ちましょう。
4. Pipeline as Code
手動での kubectl apply コマンドは、設定のドリフト(乖離)を招きます。テンプレートを Git で管理し、Argo CD のような GitOps ツールを使用して管理してください。これにより、データ変換ロジックへのすべての変更がレビュー可能になり、追跡可能になります。
Kubernetes上でのバッチジョブの自動化は、ボトルネックである必要はありません。Argo Workflowsを採用することで、複雑なデータロジックを処理するための構造化され、スケーラブルな方法を手に入れることができます。まずは数ステップのシーケンシャルな処理から始め、DAGやアーティファクトの全機能を活用して、真に自動化されたインフラを構築しましょう。

