アプリのクラッシュを止める:Web Streams API導入6ヶ月のプロダクション・レビュー

Programming tutorial - IT technology blog
Programming tutorial - IT technology blog

1GBのボトルネック:本番環境での現実

6ヶ月前、私たちのログ分析ツールは壁に突き当たりました。500MBのJSONエクスポートをパースしようとするたびに、プロセスが「out of memory(メモリ不足)」エラーでクラッシュしていたのです。私たちは「すべてを読み込んでから処理する」という古い考え方に囚いました。パイプライン全体をWeb Streams APIに移行することで、ツールは一変しました。ファイルが5MBであれ5GBであれ、メモリ使用量は1GB近くからわずか40MBへと激減したのです。

「連続的な流れ(continuous flow)」という考え方への転換は、単なるマイクロ最適化ではありません。現代のスケーラブルなアプリケーションにとって、それは生存戦略です。このレビューでは、なぜWeb StreamsがNode.jsとブラウザの両方において、データ処理のゴールドスタンダード(標準)となっているのかを探ります。

バケツがいっぱいになるのを待つのはやめよう

従来のデータ処理はBuffer(バッファ)に依存しています。fs.readFile()response.json()を呼び出すとき、システムに対してすべてのバイトが到着するのを待ち、それをメモリという「バケツ」に放り込み、それからハンドルを渡すよう指示していることになります。これは小さな設定ファイルなら機能しますが、2GBの動画ファイルや100万行のCSVでは破綻します。

Web Streams APIは、データをバケツではなく「パイプ」として扱います。データが流れ込み、小さなチャンク(塊)を処理し、すぐに次へと渡します。Node.js v16.5.0以降、このAPIはグローバルに利用可能になり、Chrome、Firefox、Safari、そしてサーバーサイドでコードを等価に扱う(isomorphic)ことが可能になりました。

パフォーマンスの概要

  • メモリフットプリント: 従来の手法はファイルサイズに比例してスケールしますが、Web Streamsはメモリ使用量を低く一定に保ちます。
  • 最初のアクションまでのスピード: 100MBのダウンロード完了を待つ代わりに、データが到着した瞬間に最初の1行目のレンダリングを開始できます。
  • エコシステムの統合: Node固有のストリーム(require('stream'))を意識する必要はありません。Web Streamsは、すべてのモダンなランタイムで使用されるグローバルのReadableStreamおよびWritableStreamコンストラクタを使用します。

トレードオフ:パフォーマンス vs 複雑さ

適切なツールを選択するには、摩擦点(課題)を正直に見つめる必要があります。高トラフィックな環境で運用した結果の分析は以下の通りです。

メリット

バックプレッシャー(Backpressure)は最も重要な機能です。データソースがプロセッサよりも速い場合、ストリームは自動的にソースに一時停止を伝えます。これにより、システムの処理が追いつくまでの間にRAMが膨れ上がるを防ぎます。さらに、パイピング機構(Piping Mechanism)により、ロジックを宣言的に記述できます。readable.pipeThrough(transform).pipeTo(writable)のようなチェーンは、わずか数行でデータのライフサイクルを明確にマッピングします。

課題

チャンク単位で考えることはメンタルモデルの転換を必要とします。配列を用いた単純なasync/awaitパターンに慣れていると、ストリームの構文は最初は冗長に感じるでしょう。エラーハンドリングにも規律が求められます。パイプラインの途中で失敗した場合、メモリリークを防ぐために明示的なクリーンアップが必要です。また、サポートは広がっていますが、一部の古いnpmパッケージは依然として古いNodeストリームを想定しているため、小さなラッパーユーティリティが必要になる場合があります。

本番環境で使える戦略

Web Streamsを最大限に活用するには、ネイティブAPIを優先し、不要な抽象化を避けてください。Internet Explorer 11をサポートする必要がない限り、重いポリフィルはスキップしましょう。

  1. ネイティブのFetchを使用する: モダンなNode.jsやブラウザでは、fetch()response.bodyから直接ReadableStreamを取得できます。
  2. ロジックを分離する: 圧縮、暗号化、パースなどの重い処理にはTransformStreamを使用します。
  3. クリーンアップを監査する: ストリームのロジックは常にtry...finallyブロックで囲むか、AbortControllerを使用して、ネットワーク障害時にリソースが確実に解放されるようにします。

実践的な実装:巨大なCSVのストリーミング処理

具体的なシナリオを見てみましょう。巨大なCSVを取得し、一行ずつJSONに変換して結果をログに出力するとします。「古い」やり方では、おそらくユーザーのブラウザがフリーズしてしまいます。こちらがストリーミングによるアプローチです。

1. データソース

// データをストリームとして取得
const response = await fetch('https://api.itfromzero.com/huge-data.csv');
const readableStream = response.body;

2. 変換ロジック

バイト列をテキストに変換し、そのテキストを個々の行に分割する必要があります。組み込みのTextDecoderStreamとカスタムのトランスフォーマーを組み合わせることができます。

let partial = '';
const lineSplitter = new TransformStream({
  transform(chunk, controller) {
    partial += chunk;
    const lines = partial.split('\n');
    partial = lines.pop(); // 次のチャンクのために不完全な行を保存

    for (const line of lines) {
      controller.enqueue(line);
    }
  },
  flush(controller) {
    if (partial) controller.enqueue(partial);
  }
});

3. パイプライン

ここで効率化のメリットが発揮されます。コンポーネントを接続し、システムを流れるデータを処理します。

await readableStream
  .pipeThrough(new TextDecoderStream())
  .pipeThrough(lineSplitter)
  .pipeTo(new WritableStream({
    write(line) {
      // メモリ内には常に1行分しか存在しない
      console.log('行を処理中:', line);
    },
    close() {
      console.log('ストリームが完了しました。');
    },
    abort(err) {
      console.error('ストリームが失敗しました:', err);
    }
  }));

最終的な結論

Web Streamsをマスターしたことで、データ集約型のツールの構築方法が変わりました。関心事は「どれだけのRAMを用意できるか?」から「どれだけ効率的にデータを移動できるか?」へと移りました。ファイルアップローダー、リアルタイムダッシュボード、あるいはログプロセッサを構築しているのであれば、今日からこのAPIを使い始めてください。インフラ、そしてユーザーがその違いを実感するはずです。

次のステップとして、MDNのTransformStreamのドキュメントを読み込んでみてください。これはAPIの中で最も柔軟な部分であり、カスタムの高性能パイプラインを構築するための鍵となります。

Share: