データベースへのポーリングはやめよう:DebeziumとKafkaによるCDC実践ガイド

Database tutorial - IT technology blog
Database tutorial - IT technology blog

データ同期の悩み

ElasticsearchのインデックスやRedisのキャッシュを、本番環境のメインデータベースと同期させる作業を何度経験したか分かりません。キャリアの初期, 私は「二重書き込み(Dual Write)」パターンに頼っていました。アプリケーションがデータベースに書き込み、その直後にキャッシュを更新しようとする仕組みです。理想的な環境ではうまく機能しますが、現実はそう甘くありません。ネットワークの瞬断などで2回目の書き込みに失敗すると、「データのドリフト(不整合)」が発生します。これは追跡が非常に困難で、悪夢のような問題です。

ポーリングも試しました。60秒ごとに updated_at > last_sync_time となるレコードを取得するcronジョブを設定したのです。シンプルではありますが、この方法はリソースを大量に消費します。AWSの控えめなt3.mediumインスタンスでも、データに変更がない時でさえ、頻繁なポーリングクエリによってCPU使用率が15〜20%もスパイクすることがあります。さらに、ポーリングは物理削除(ハードデリート)を検知できません。行が消えてしまえば、クエリでそれを見つけることができず、システムの他の部分に削除を伝えることができないのです。

データベースを「箱」ではなく「ストリーム」として捉える

多くの人が犯す間違いは、データベースを静的なコンテナのように扱うことです。データが格納されるのを待ち、それを後から引き出そうとします。しかし、内部的には、現代のあらゆるデータベースは変更内容をログファイルに書き出し続けています。MySQLなら Binlog、PostgreSQLなら Write-Ahead Log (WAL) です。

Change Data Capture (CDC) は、この考え方を逆転させます。データベースにデータを問い合わせるのではなく、これらの内部ログを「リッスン(傍受)」するのです。ここで活躍するのが Debezium です。これは Apache Kafka にプラグインとして組み込めるオープンソースツールで、データベースの生のバイトデータを, クリーンで継続的な JSON イベントのストリームに変換してくれます。これは、1日に1回ポストを確認しに行くのと、玄関にライブカメラを設置して常に監視しているのとの違いのようなものです。

パイプラインの構築

実用的なCDCパイプラインを構築するには、3つの柱が必要です。ソースとなるデータベース、メッセージのバックボーンとなる Apache Kafka、そして Kafka Connect 内で動作する Debezium です。ローカル開発には Docker Compose が最適です。エコシステム全体を数秒で立ち上げることができます。

version: '3.8'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:2.4
    ports:
     - 2181:2181

  kafka:
    image: quay.io/debezium/kafka:2.4
    ports:
     - 9092:9092
    depends_on:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181

  mysql:
    image: quay.io/debezium/example-mysql:2.4
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw

  connect:
    image: quay.io/debezium/connect:2.4
    ports:
     - 8083:8083
    depends_on:
     - kafka
     - mysql
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses

docker-compose up -d で起動しましょう。この特定の MySQL イメージは、バイナリログが有効な状態で事前設定されているため、面倒な手動設定の手間を省くことができます。

Debezium コネクタの設定

既存の本番環境 MySQL インスタンスに対して Debezium を使用する場合は、my.cnf ファイルを調整する必要があります。Debezium は、各行で何が変更されたかを正確に把握するために、ROW レベルのロギングを必要とします。これがないと、SQL ステートメントしか見ることができず、データの状態を再構築するには不十分です。

[mysqld]
server-id         = 223344
log_bin           = mysql-bin
binlog_format     = ROW
binlog_row_image  = FULL
expire_logs_days  = 7

データベースの準備ができたら、Debezium に監視を開始するよう指示します。これは、Kafka Connect の REST API に JSON 設定を POST することで行います。シェルスクリプトでバージョン管理がしやすいため、私は curl を使うのが好みです。

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "topic.prefix": "production_db",
    "database.include.list": "inventory",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schemahistory.inventory"
  }
}'

topic.prefix に注目してください。この文字列がすべての Kafka トピック名のプレフィックスになります。例えば、customers という名前のテーブルは、production_db.inventory.customers というトピックにデータをストリーミングします。

ストリームの動作確認

コネクタが起動すると、Debezium はまず現在のデータの「スナップショット」を作成します。その後、ストリーミングモードに移行します。組み込みのコンソールコンシューマーを使用して Kafka トピックを購読することで、この様子をリアルタイムで観察できます。

docker exec -it kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic production_db.inventory.customers \
    --from-beginning

MySQL で UPDATE クエリを実行してみてください。ほぼ瞬時に、ターミナルに JSON オブジェクトが表示されるはずです。Debezium の素晴らしい点は、ペイロードの構造にあります。変更前(before)と変更後(after)の行のスナップショットを提供してくれるのです。これにより、データベースに再度問い合わせることなく、どのフィールドが変更されたかを非常に簡単に特定できます。

現場で学んだ教訓

CDC は強力ですが、万能薬ではありません。テーブルが数百に増えると、Kafka トピックの管理が主要な業務になります。トピックの作成を自動化し、厳格な保持(retention)ポリシーを設定する必要があるでしょう。そうしないと、Kafka のディスク容量は予想以上の速さで枯渇します。

また、コネクタのヘルスチェックも欠かせません。Debezium が想定していない突然のスキーマ変更によって、コネクタが停止するのを何度も見てきました。常に /status エンドポイントを監視してください。ステータスが FAILED になると同期が止まり、ラグが蓄積し始めます。秒間10,000イベントを処理するような高トラフィックな環境では、数分間のダウンタイムでも膨大なバックログが発生する可能性があります。

Share: