スケーラブルなELT:2,500行のSQLをdbtとPostgresにリプレースした方法

Database tutorial - IT technology blog
Database tutorial - IT technology blog

午前2時14分の呼び出し:ストアドプロシージャが壊れる時

午前2時14分、PagerDutyのアラートは物理的な衝撃として襲ってきます。3晩連続でタイムアウトした、PostgreSQLの2,500行に及ぶレガシーなストアドプロシージャを凝視しています。ロジックは、一時テーブルやネストされたループ、暗黙の依存関係が入り混じり、混沌としています。たった1つの修正ミスがレポート層全体を壊しかねないため、チームの誰もが手を出したがらない状態でした。

MySQL、PostgreSQL、MongoDBを扱ってきた経験から、すべてのツールにはそれぞれ適した役割があることを学びました。構造化された分析変換において、クエリプランナを適切に考慮すれば、PostgreSQLは非常に強力なツールとなります。ボトルネックは通常、データベース自体ではなく、ワークフローにあります。私たちは重要なビジネスロジックをブラックボックスの中に隠してきました。これを解決するには、dbt (data build tool)を使用したプロフェッショナルなELT(抽出、ロード、変換)アプローチへと移行する必要があります。

5分以内にdbtをPostgresに接続する

コーヒーを一杯飲み終える前に、dbtをPostgresインスタンスと通信させることができます。dbtをオーケストレーション層として考えてみてください。データベースの上に位置し、モジュール化されたSQLを記述することで、dbtがそれをコンパイルし、正しい順序で実行してくれます。

まずはPostgresアダプターを取得することから始めましょう。依存関係の競合を避けるため、私は常に仮想環境を使用しています:

python3 -m venv dbt-env
source dbt-env/bin/activate
pip install dbt-postgres

次に、プロジェクトを初期化します。これにより、データモデルを整理するためのディレクトリ構造が作成されます:

dbt init analytics_pipeline

設定はprofiles.yml(通常は~/.dbt/にあります)で行います。このファイルは、dbtに認証方法を指示します。本番環境では、パスワードを直接記述せず、環境変数を使用してください。

analytics_pipeline:
  outputs:
    dev:
      type: postgres
      threads: 4
      host: localhost
      port: 5432
      user: transform_user
      pass: "{{ env_var('DB_PASSWORD') }}"
      dbname: analytics_db
      schema: analytics_main
  target: dev

dbt debugを実行してみましょう。ターミナルに緑色のテキストが表示されれば、モノリシックなスクリプトの作成を卒業し、モデルの構築を開始する準備は完了です。

モジュール型モデリングへの転換

dbtの本質は、すべての.sqlファイルをモデルとして扱うことです。モデルは単なるSELECT文です。ロジックを記述すれば、dbtがCREATE TABLE ASCREATE VIEW ASなどの定型的なコードを自動的に処理してくれます。

依存関係の管理は、かつては悩みの種でした。今では、ref()関数がそれを解決してくれます。どのテーブルを先に構築すべきか推測する代わりに、モデル名を直接参照します。これにより、dbtが実行時に追跡する明確なリネージ(系譜)が作成されます。

eコマースの生データに対するステージングモデル(models/staging/stg_orders.sql)を考えてみましょう:

with raw_orders as (
    select * from {{ source('raw_data', 'orders') }}
)

select
    id as order_id,
    user_id,
    order_date,
    lower(status) as order_status -- 生の文字列を標準化
from raw_orders

最終的な変換(models/marts/fct_orders.sql)では、それらのステージング済みファイルからデータを取得するだけです:

with orders as (
    select * from {{ ref('stg_orders') }}
),

order_payments as (
    select * from {{ ref('stg_payments') }}
)

select
    orders.order_id,
    orders.user_id,
    order_payments.amount
from orders
left join order_payments using (order_id)

舞台裏では、dbtが有向非巡回グラフ(DAG)を構築します。これにより、fct_ordersが開始される前にstg_ordersが確実に完了するようになります。もう5つの異なるPythonスクリプトの実行順序を手動でスケジュールする必要はありません。

テスト:ダッシュボードの不具合を終わらせる

モジュール化されたコードは、戦いの半分に過ぎません。実際にアラートを静かに保つのはテストです。以前は、ダッシュボードの月次経常収益(MRR)が115%も過大表示されて初めて、主キーの重複に気づくという有様でした。dbtを使えば、ビルドフェーズでこれらのエラーをキャッチできます。

テストはschema.ymlファイルで定義します:

version: 2

models:
  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: ['placed', 'shipped', 'completed', 'returned']

dbt testを実行することで、データの整合性が検証されます。レコードにNULLのIDやサポートされていないステータスが含まれている場合、パイプラインは停止します。これにより、破損したデータが本番のBIツールに到達するのを防ぎます。

実際にアラートを静かに保つのはテストです。dbt docs generateを実行することで、Webベースのインターフェースを作成できます。これにより、データの全リネージとテーブルの説明が表示され、コードを読み解くことなくプロダクトマネージャーにデータの出所を簡単に説明できるようになります。

本番環境に向けた貴重な教訓

Postgresをスケールさせるには、単にクリーンなSQLを書くだけでは不十分です。3つのレガシーシステムをこのスタックに移行した結果、次の4つの戦略が不可欠であるとわかりました:

  1. 増分モデル(Incremental Models)の活用: 5億行のテーブルを一から再構築するのはやめましょう。incrementalマテリアライゼーションを使用して、新しいデータのみを追加します。これにより、実行時間を45分から120秒に短縮できました。
  2. マテリアライゼーション戦略: 開発時はデフォルトでviewsを使用してください。即座に構築できます。BIツールのパフォーマンスが低下し始めたときだけtablesに切り替えます。
  3. 生データの分離: raw(生)ソースデータは専用のスキーマに保持します。dbtユーザーにはrawへの読み取り専用権限と、analyticsへのフル権限を与えます。この単純な障壁が、ソースデータに対する誤ったDROP TABLEの惨劇を防ぎます。
  4. 実行後フック(Post-Run Hooks): dbtの完了後に、Postgresテーブルに対して自動的にANALYZEを実行します。これによりクエリプランナを最適化し、データ量が増えても結合(JOIN)の速度を維持できます。

プロフェッショナルなデータエンジニアリングとは、SQLにソフトウェア開発の規律を適用することです。バージョン管理、自動テスト、モジュール化は、もはやアプリ開発者だけのものではありません。ストアドプロシージャを廃止して以来、私のストレスレベルは大幅に低下しました。もしあなたが今でもPostgresの依存関係を手動で管理しているなら、必要以上に苦労していると言えるでしょう。

Share: