午前2時のPagerDutyアラート
2021年のこと、私は本番環境のトラブルに見舞われ、長い夜を過ごしていました。Sparkジョブが重要なS3パーティションの一部を誤って上書きしてしまったのです。従来のデータレイクはフォルダをパーティションとして扱うため、「元に戻す」ボタンは存在しませんでした。コールドバックアップからのリストアが唯一の希望でした。複数のライターが同じプレフィックスに対して書き込みを行っており、ACID準拠がなかったため、私たちの「信頼できる唯一の情報源(Source of Truth)」は、事実上コイン投げのような不確かな状態でした。
私たちのデータチームは、破損したパーティションの手動修正や、「小さなファイル(small file)」問題によるパフォーマンス低下のトラブルシューティングだけで、毎週16時間も費やしていました。AWS S3上で Hiveテーブルフォーマットを使用した標準的なデータレイクを運用していましたが、初期の頃は機能していたものの、ペタバイト規模に達すると、その亀裂は大きな溝へと変わりました。
従来のデータレイクが大規模環境で破綻する理由
問題はS3ではなく、Hiveテーブルフォーマットにあります。Hiveの世界では、テーブルは単なるディレクトリの集合体にすぎません。カラムの型を変更したい場合は?通常、データセット全体を書き直す必要があります。GDPRの要求で1行だけ削除する必要がある場合は?ファイル全体を読み込み、フィルタリングして、再びディスクに書き戻さなければなりません。
技術的負債は通常、次の3つの形で現れます:
- ディレクトリベースのパーティショニング: 到着の遅れたレコードに2019年の
event_dateが含まれている場合、システムはその配置に苦労するか、また一つ非効率な極小ディレクトリを作成してしまいます。 - アトミック性の欠去: 書き込みが失敗すると「孤立した(orphan)」ファイルが残ります。その後のクエリがこのゴミを拾ってしまい、サイレントなデータ破損につながります。
- スキーマの硬直性:
user_idをcustomer_uuidにリネームするだけで、1週間がかりの移行作業が必要でした。多くのチームは諦めて、分かりにくいカラム名のまま運用を続けています。
競合:Hive vs. Delta vs. Iceberg
解決策を探す中で、3つの候補が浮上しました. Hiveは捨てるべき遺産でした。Delta Lakeは強力ですが、当時はDatabricksのエコシステムに縛られすぎているように感じました。そこで出会ったのがApache Icebergです。
もともとNetflixで大規模なデータを扱うために開発されたIcebergは、ディレクトリ構造を完全に無視します。その代わりに、個々のデータファイルを「マニフェストファイル」で追跡します。このアーキテクチャの転換は非常に重要です。これにより、SQLデータベースのような信頼性を、安価で無限のストレージであるS3にもたらすことができるのです。
何百ものテーブルを移行して学んだ教訓の一つは、ソースデータがクリーンであることはほぼないということです。スキーマテストやマッピングのためにCSVサンプルをJSONに素早く変換する必要があるときは、toolcraft.app/ja/tools/data/csv-to-jsonを使用しています。ブラウザ上で完結するため、機密性の高いフィールドがマシン外に出ることはありません。
モダン・レイクハウスの構築
今日から始めるなら、SparkとIcebergの組み合わせがゴールドスタンダードです。成熟しており、ドキュメントも充実しています。移行戦略を処理するためのセッション設定例を以下に示します。
1. Sparkセッションのセットアップ
from pyspark.sql import SparkSession
# SparkをIcebergカタログに向ける
spark = SparkSession.builder \
.appName("Iceberg移行") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type", "hadoop") \
.config("spark.sql.catalog.local.warehouse", "/tmp/iceberg_warehouse") \
.getOrCreate()
2. 隠れたパーティショニング:キラー機能
Icebergには隠れたパーティショニング(Hidden Partitioning)という機能があります。timestampから手動でdayカラムを作成する必要はありません。エンジンがバックグラウンドで変換を処理してくれるため、冗長なカラムは不要になります。
# 余分なカラムを追加せずに、日単位でパーティショニングされたテーブルを作成する
spark.sql("""
CREATE TABLE local.db.events (
id bigint,
event_time timestamp,
level string,
message string
)
USING iceberg
PARTITIONED BY (days(event_time))
""")
悩みのない進化
かつては、カラムのリネームといえば12時間の移行ジョブを意味していました。Icebergなら、メタデータの変更だけで済みます。データファイルには一切触れないため、ミリ秒単位で完了します。
-- 即時変更、ダウンタイムなし
ALTER TABLE local.db.events RENAME COLUMN message TO event_details;
ALTER TABLE local.db.events ADD COLUMN correlation_id string;
このメタデータ層が、古いデータファイルを新しいスキーマにオンザフライでマッピングします。この機能一つだけで、昨年私たちのチームは何十時間もの単純作業を削減できました。
タイムトラベル:デバッグの超能力
ステークホルダーから「昨日の収益がなぜ違って見えるのか?」と聞かれたところを想像してみてください。従来のレイクでは推測するしかありません。Icebergなら、先週の火曜日午後2時時点の状態を正確にクエリできます。
-- スナップショットの履歴を表示
SELECT * FROM local.db.events.snapshots;
-- 過去にタイムトラベル
SELECT * FROM local.db.events FOR TIMESTAMP AS OF (current_timestamp() - INTERVAL 2 DAYS);
これは実質的に、ペタバイト規模のデータに対するgit revertです。バッチジョブが50,000件の破損レコードを書き込んでしまっても、ファイルを探し回る必要はありません。直前のスナップショットにロールバックするだけです。
現実的な進め方
すべてを一度に移行しようとしないでください。まずは最も「信頼性の低い」テーブル、つまりパイプラインを頻繁に壊しているテーブルから始めましょう。HiveとIcebergの両方に7日間書き込みを行い、カウントが一致するかを確認する「シャドウマイグレーション」を実施してください。
定期的なメンテナンスルーチンで、パフォーマンスを最高の状態に保ちましょう。Icebergは多くの小さなメタデータファイルを作成するため、定期的に削除する必要があります。
# 古いスナップショットを無効化してS3の容量を回収する
spark.sql("CALL local.system.expire_snapshots('db.events', timestamp '2026-05-20 00:00:00')")
# 小さなファイルを効率的な128MBのチャンクに集約する
spark.sql("CALL local.system.rewrite_data_files('db.events')")
Icebergへの移行は単なる技術的なアップグレードではなく、週末を取り戻すためのものです。ACIDの保証があるおかげで、私のオンコール(呼び出し)は劇的に静かになりました。もしあなたが今も壊れたS3パーティションと格闘しているなら、今こそその出血を止める時です。

