Apache Kafkaによるリアルタイムデータ処理:本番運用6ヶ月レビュー

Programming tutorial - IT technology blog
Programming tutorial - IT technology blog

Kafkaと過ごした6ヶ月:最初から知っておきたかったこと

チームがバッチETLパイプラインをApache Kafkaに置き換えようと提案したとき、正直なところ懐疑的だった。当時は1日あたり約200万イベントを処理していた——1時間ごとの遅延に悩むには十分な規模だったが、分散ストリーミング基盤の運用コストを正当化するほどではないと感じていた。

6ヶ月後、その懐疑心はすっかり消えた。Kafkaは単にレイテンシの問題を解決しただけでなく、データフローそのものの考え方を変えた。実際にどんな道のりだったか、包み隠さず振り返る。

クイックスタート:5分でKafkaを起動する

まず何より、ローカルでKafkaを動かして実際に触れるようにしよう。Docker Composeが圧倒的に手っ取り早い。

# docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
docker-compose up -d

# 最初のトピックを作成する
docker exec -it <kafka-container> kafka-topics \
  --bootstrap-server localhost:9092 \
  --create --topic user-events \
  --partitions 3 \
  --replication-factor 1

# 作成されたか確認する
docker exec -it <kafka-container> kafka-topics \
  --bootstrap-server localhost:9092 \
  --list

では最初のメッセージを送信して、受信してみよう:

# プロデューサー(ターミナル1)
docker exec -it <kafka-container> kafka-console-producer \
  --bootstrap-server localhost:9092 \
  --topic user-events

# コンシューマー(ターミナル2)
docker exec -it <kafka-container> kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic user-events \
  --from-beginning

ターミナル1で何か入力すると、数ミリ秒以内にターミナル2に表示される。これがKafkaのすべての根幹となるループだ——それが大規模スケールになっても変わらない。

深掘り:Kafkaの実際の仕組み

ほとんどのチュートリアルは内部構造を完全に省略している——だから本番環境のKafkaに驚かされる人が多い。真剣にデプロイする前に必要なメンタルモデルを説明しよう。

トピック、パーティション、オフセット

トピックは単なるイベントの名前付きログだ。イベントは順番に追記され、ディスクに永続保存される。メッセージキューとは違い、Kafkaはコンシューム後にメッセージを削除しない。それがKafkaを単なるブローカーではなくログたらしめている所以だ。

トピックはパーティションに分割される。各パーティションは順序付きの不変なレコード列だ。3つのパーティションとコンシューマーグループ内の3つのコンシューマーがあれば、各コンシューマーはちょうど1つのパーティションを担当する。パーティション数が多いほど並列性が高まる——これがKafkaの水平スケールの仕組みだ。

パーティション内の各メッセージにはオフセット——連番の整数——がある。コンシューマーは自分のオフセットを管理するため、いつでもイベントを再生・スキップ・再処理できる。消費したメッセージが永遠に消えるシステムから来た私には、初めて使ったとき魔法のように感じた。

PythonでのプロデューサーとコンシューマーAPI

本番環境ではconfluent-kafkaライブラリを使っている。公式Cライブラリをラップしており、高スループットワークロードではkafka-pythonより約3〜5倍高速——毎秒約5万メッセージを超えると差が明確に現れる。

pip install confluent-kafka
# producer.py
from confluent_kafka import Producer
import json
import time

producer = Producer({'bootstrap.servers': 'localhost:9092'})

def delivery_report(err, msg):
    if err:
        print(f'配信失敗: {err}')
    else:
        print(f'{msg.topic()} [{msg.partition()}] オフセット {msg.offset()} に配信完了')

for i in range(100):
    event = {
        'user_id': f'user_{i % 10}',
        'action': 'page_view',
        'timestamp': time.time()
    }
    producer.produce(
        'user-events',
        key=event['user_id'],  # 同じキーは常に同じパーティションへ
        value=json.dumps(event).encode('utf-8'),
        callback=delivery_report
    )
    producer.poll(0)

producer.flush()
# consumer.py
from confluent_kafka import Consumer
import json

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'analytics-service',
    'auto.offset.reset': 'earliest'
})

consumer.subscribe(['user-events'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f'コンシューマーエラー: {msg.error()}')
            continue

        event = json.loads(msg.value().decode('utf-8'))
        print(f"処理中: user={event['user_id']}, action={event['action']}")
finally:
    consumer.close()

痛い目を見て学んだこと:意味のあるコンシューマーグループIDを必ず設定すること。2つのサービスが誤って同じグループIDを共有すると、パーティションを分け合ってしまい、それぞれが半分のイベントを取りこぼす。サイレントなデータロスは最悪の障害だ——エラーも、アラートも出ず、3日後にダッシュボードの数値がおかしいと気づくだけだ。

応用:本格的なパイプラインを構築する

プロデューサーとコンシューマーが1つずつあるのはデモに過ぎない。本番パイプラインは複数のステージを持つ——そこでKafkaの設計思想が活きてくる。

マルチステージ処理のためのトピック連鎖

各トピックがパイプラインの1ステージになる:

  • raw-events — アプリケーションから入ってくるすべてのイベント
  • validated-events — スキーマ検証と重複排除後
  • enriched-events — ユーザープロファイルデータとの結合後
  • analytics-aggregates — 最終的に算出されたメトリクス

マイクロサービスは1つのトピックからコンシュームし、次のトピックにプロデュースする。独立したスケーリング、明確な障害境界、そして他のステージに触れることなく任意のステージを再生できる柔軟性が手に入る。

水平スケーリングのためのコンシューマーグループ

# 同じgroup.idで複数インスタンスを起動してスケールアウトする
# Kafkaはアクティブなコンシューマー間でパーティションを自動的に再分配する

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'analytics-service',  # 同じグループ = 負荷分散
    'auto.offset.reset': 'latest',
    'enable.auto.commit': False  # Exactly-onceセマンティクスのため手動コミット
})

consumer.subscribe(['validated-events'])

while True:
    msg = consumer.poll(1.0)
    if msg and not msg.error():
        process_event(json.loads(msg.value()))
        consumer.commit(asynchronous=False)  # 処理成功後にコミット

ステートフル処理のためのKafka Streams

「直近5分間のユーザーごとのイベント数」のような集計にはステートフルなストリーム処理が必要だ。JVM上ではKafka Streamsが自然な選択肢だが、Pythonの場合はFaustがうまく機能する:

pip install faust
import faust

app = faust.App('analytics', broker='kafka://localhost:9092')

events_topic = app.topic('validated-events')

# ウィンドウテーブル:5分間のウィンドウでユーザーごとのイベント数をカウント
user_event_counts = app.Table(
    'user-event-counts',
    default=int,
).tumbling(300.0)  # 5分間のタンブリングウィンドウ

@app.agent(events_topic)
async def process(events):
    async for event in events.group_by(lambda e: e['user_id']):
        user_event_counts[event['user_id']] += 1
        print(f"ユーザー {event['user_id']}: {user_event_counts[event['user_id']].current()} イベント")

本番運用から得た実践的なヒント

Kafkaを動かすのは簡単な部分だ。負荷がかかった状態でも深夜3時のアラートなしに安定稼働させること——そこが本当の難しさだ。以下のヒントは、すべて痛い経験から学んだものだ。

パーティション数は一方通行のドア

後からパーティションを増やすことはできるが、減らすことはできない。さらに悪いことに、パーティションを増やすとキーベースの順序が崩れる——同じキーのメッセージが異なるパーティションに振り分けられる可能性が出てくる。パーティション数は最初によく考えること。実用的な目安として、想定ピーク時のコンシューマー数の2〜3倍を目標にしよう。毎秒約1万イベントを処理する中規模トピックなら、50パーティションで通常は十分だ。

スループットではなくラグを監視する

スループットは処理速度を示す。ラグはどれだけ遅れているかを示す。プロデューサーが速ければ、コンシューマーは優れたスループットを示していても実は遅れていることがある。ラグアラートを設定しよう——私はBurrowを使っているが、組み込みのkafka-consumer-groups CLIでも急場しのぎには使える:

kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --describe \
  --group analytics-service

LAGの列が増え続けているなら、コンシューマーを追加する必要がある。パーティション数を上限として、コンシューマーを増やすことができる。

リプレイウィンドウに基づいてリテンションを設定する

Kafkaのデフォルトリテンションは7日間だ。自問しよう:深刻なインシデント時、どこまで遡って再生する必要があるか?ホットパスなら3日あれば十分だ。重要なトピックは30日を設定し、トピックレベルで構成する:

kafka-configs \
  --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name user-events \
  --alter \
  --add-config retention.ms=2592000000  # 30日をミリ秒で指定

本番環境ではSchema Registryを使う

開発環境ではプレーンJSONで十分だ。しかし本番環境では、スキーマドリフトがいずれパイプラインを壊す——上流でフィールドがリネームされると、下流のコンシューマーがサイレントに壊れる。AvroまたはProtobufを使ったConfluent Schema Registryは、ブローカーレベルで互換性を強制する。セットアップは確かに手間がかかる。しかし私はこれのおかげで、少なくとも3回は深刻なデータロスを引き起こしかねない破壊的変更を未然に防ぐことができた。

リバランシングを恐れず、ただし適切に対処する

コンシューマーがグループに参加または離脱すると、Kafkaはリバランスをトリガーする。グループ全体で数秒間処理が停止する。手動オフセットコミットを行っている場合は、パーティションが再割り当てされるon_revokeコールバック内でコミットすること——これを省略すると、リバランスのたびに同じイベントを再処理することになる:

from confluent_kafka import Consumer, TopicPartition

def on_revoke(consumer, partitions):
    # パーティションが取り上げられる前にコミットする
    consumer.commit(asynchronous=False)

consumer.subscribe(['user-events'], on_revoke=on_revoke)

Kafkaはあなたのユースケースに適しているか?

Kafkaは高スループットのイベントストリーミング、耐久性のあるログ、そして大規模なサービス間の疎結合に真価を発揮する。シンプルなタスクキューには過剰装備だ——RedisやRabbitMQの方が、はるかに少ない運用コストでニーズを満たせる。また、Kafkaは本格的な運用作業を要求する:分散状態の管理、レプリケーションファクターの決定、リテンションポリシーの設計、コンシューマーラグの監視——すべてに注意が必要だ。

6ヶ月後の結論:複数のサービスにまたがって毎秒数万イベントを処理しているなら、Kafkaの学習コストは十分に見合う。毎分数百イベント程度なら、よりシンプルなものから始めて、痛みが明らかになってから移行すればいい。Kafkaはそのときもまだ存在しているし——本当に必要なときに使えば、さらに価値を実感できるはずだ。

Share: