有名テック企業の技術ブログを、ひとつのフィードで。
フィード
35件
はじめに こんにちは、データ基盤ブロックの平本(@cisetn)です。 本記事では、ZOZOTOWNのリアルタイムデータ連携基盤の中核であるETL層を作り直した事例を紹介します。対象はオンプレミスのSQL ServerからBigQueryへリアルタイムにデータを連携する基盤です。そのETL層をGoで実装したプラグイン(実行基盤はFluent Bit)で再設計しました。 ZOZOのリアルタイム連携基盤は2020年に一度紹介記事を公開していますが、それ以降、段階的にアーキテクチャを見直してきました。本記事はその中でもETL層の再設計にフォーカスします。 想定読者は、リアルタイム連携基盤やストリーミング処理基盤の設計・運用に関わる方です。 本記事で扱うこと、扱わないことは次のとおりです。 扱う:ZOZOのリアルタイム連携の全体像、今回リプレイスした基盤の背景・設計・実装 扱わない:BigQuery側のテーブル設計、SQL Server側のChange Tracking設定、利用側(BI・分析クエリ等) 目次 はじめに 目次 ZOZOのリアルタイムデータ連携の全体像 これまでの変遷 リプレイスに至った背景 顕在化してきた課題 新基盤アーキテクチャ 設計の軸 技術選定:Fluent Bit + Goプラグイン 全体構成 大量のデータをリアルタイムで捌くために考えたこと 新基盤の構成 INPUT内部:取得とエンコードを分けた OUTPUT内部:送信とACK確認を分けた 結果 今後の展望:Change Data Captureへの移行 まとめ ZOZOのリアルタイムデータ連携の全体像 本題の前に、ZOZOにおけるリアルタイム連携の全体像を軽く俯瞰しておきます。本記事のテーマがあくまで「その中のひとつ」であることを共有するためです。 ZOZOではデータソースが多岐にわたります。オンプレミスのものもあれば、クラウド上のものもあり、MySQL、SQL Server、DynamoDBなどさまざまです。当然、差分を検知する手段もソースに応じて変わりますし、連携の実現方式も1つではありません。 マネージド / SaaSで済むケース:例えばMySQL → BigQueryであればDatastreamを利用する 専用のパイプラインを組む必要があるケース:例えばDynamoDB → BigQueryのように、対応するマネージドサービスがない場合は、別途データ連携のパイプラインを構築する必要がある 結果として、ZOZOのリアルタイム連携基盤は複数系統に分かれて共存しています。本記事で扱うのは、そのうちオンプレ SQL Server → BigQueryの系統です。本番環境(prd)で約400のテーブルを連携対象としており、新規の連携依頼も日々発生するため、データ基盤の運用において比重の大きな系統となっています。SQL ServerのChange Tracking機能で変更を検知し、プラグインで取得したレコードをPub/Sub経由でBigQueryに流しています。 これまでの変遷 実は、本記事で扱う系統は今回が初めてのリプレイスではありません。以下の変遷を経ています。 時期 アーキテクチャ 主目的 2020 Qlik Replicate→ fluentd + Dataflow→ BigQuery 安定性向上 + コスト削減 2024 fluentd + BigQuery Subscription(Dataflow を廃止) コスト削減 2025 プラグインによる ETL 層の再設計+ BigQuery Subscription 効率改善(メモリ・スループット・コスト) 2024年には、ストリーム処理層のDataflowを廃止し、Pub/SubのBigQuery Subscriptionに置き換えるリプレイスが行われました。このフェーズの主目的はコスト削減です。 そして今回、ETL層をプラグインで再設計したのが本記事のテーマです。詳細な背景と目標は次章で述べますが、結果として、コスト削減・メモリ効率の改善・スループット向上・運用課題の解消といった効果につながりました(数値は末尾)。 リプレイスに至った背景 誤解のないよう先に述べておくと、旧基盤の設計が「悪かった」わけではありません。2020年当時、ZOZOのデータ基盤はまさに拡大していくフェーズにあり、リアルタイム連携の需要も増え始めたばかりでした。そうした状況では、プラグインが豊富なfluentdとDataflowのように既存のツールを組み合わせて素早く構築できる構成は合理的な選択だったかと思います。実際、信頼性(データ欠損が起きないこと)はチェックポイント機構などによって担保できており、長く運用されてきました。チェックポイント機構は、処理済みのChange TrackingバージョンをBigQueryに保持する仕組みです。Pod再起動時はそこから再開できます。 顕在化してきた課題 一方で、運用を続け、データ量や利用要件が増えていく中で、効率の側面でいくつかの課題が徐々に顕在化してきました。 メモリ効率:結果セットを一括でメモリに載せる実装のため、メモリ使用量がデータ量に比例して増加する構造でした。大量更新時のOOMを避けるためには「ピーク時のデータ量」を見越した大きなメモリを常時確保しておく必要があり、データ量が増えるにつれてリソース見積もりの難しさが目立つようになってきました。 コスト:上記のメモリ確保がそのままコストに直結します。メモリがトランザクション単位のデータ量に比例する構造であるかぎり、「ピーク時のデータ量」の見積もりを下回るとOOM直行となります。そのため運用上の工夫(時間帯別のスケーリング等)では本質的な改善が難しく、リソースの常時確保によるコスト増を抱え続けるしかありませんでした。 性能:逐次処理ベースの実装のため、1トランザクションあたりの規模が大きいテーブルでは、リアルタイム性を保ちにくい場面もありました。 運用:依存していたコンテナイメージがEOLを迎えており、継続利用にリスクがありました。加えて、内部状態の可視性が低く、障害発生時の原因特定にも時間がかかる状況でした。 一言でまとめると、各所でガタが出始めており、信頼性を維持したまま効率(メモリ・スループット・コスト)の側面を改善するため、リプレイスを検討するタイミングに来ていた、ということです。 新基盤アーキテクチャ 設計の軸 新基盤の設計指針はシンプルで、キャパシティプランニングの軸を「ピーク時のデータ量」から「単位時間あたりの処理量」に変えることに尽きます。信頼性(データ欠損が起きないこと)は旧基盤からチェックポイント機構によって担保されており、新基盤でもそのまま引き継いでいます。そのため本記事のテーマは信頼性を維持したまま、効率(メモリ・スループット・コスト)をどう改善したかです。 技術選定:Fluent Bit + Goプラグイン 今回のリプレイスは、前フェーズ(2024年のDataflow撤廃 + BigQuery Subscriptionへの切り替え)の延長線上にあります。前フェーズでDataflow関連の費用がまるごと不要になり大きなコスト削減は既に達成済みで、下流(Pub/Sub HubとBigQuery Subscription)も整理されている状態でした。一方でETL層はfluentdベースのまま残っており、メモリ効率とスループットの面で課題が顕在化していたため、今回はその続きとしてETL 層の中身を作り直すことにしました。下流はそのまま踏襲し、ソース側(Change Tracking設定)にも手を加えません。 このスコープと、既存のPub/Sub Hub構成・BigQueryテーブル設計を維持する制約のもとで、マネージドCDCサービスやOSSのCDCミドルウェアの活用も検討しました。ただし我々のケースでは、既存テーブル設計とPub/Sub Hubへの直接出力をそのまま組み合わせ続けられる選択肢を見つけられず、プラグインとして実装する形に決めました。 採用したのはFluent Bit + Goプラグインです。決め手は次のとおりでした。 既存基盤がfluentdベースで運用されていたため、Fluent Bitへの移行が素直:プラグインモデル・設定構造・デプロイ手順といった運用ノウハウがそのまま活きる INPUT(Change Tracking取得)とOUTPUT(Pub/Sub送信)の挙動を自分たちで細かく調整できる。後述の非同期ACK並列確認のような最適化も、プラグインとして自前で書いているからこそ仕込める Fluent BitのBuffer・バックプレッシャー機構をそのまま活用できる Goプラグイン公式サポートにより、後述する並列処理をgoroutineとchannelで素直に書ける 全体構成 以下の図は主要コンポーネントのみを示した簡略図です。 ETL層(Fluent Bit + Goプラグイン)はGKE上で動作します。プラグインはデータ取得(INPUT)とPub/Subへの送信(OUTPUT)の2つで構成されており、それぞれの実装の詳細は次章で扱います。 大量のデータをリアルタイムで捌くために考えたこと 新基盤の設計で常に意識していたのは、「大量のデータをいかにリアルタイムで捌くか」という問いでした。データ量が増えてもパイプラインが詰まらず、メモリ消費がデータ量に比例しない構造をどう実装するかを検討しました。前章で述べた「単位時間あたりの処理量を軸にする」方針を、Fluent Bitのパイプライン上に乗せて具体化していった話を、本章で紹介します。 なお、Fluent Bitのパイプライン構造の全体像については、公式ドキュメントもあわせてご覧ください。 新基盤の構成 Fluent Bitのパイプライン構造はINPUT → Filter → Buffer → Router → OUTPUTという形です。新基盤ではこのうちINPUTとOUTPUTをGoプラグインで実装しました。チャンク単位の処理やバックプレッシャーといったBuffer周りの機構はFluent Bit Engineが標準で備えています。そのためプラグイン側はINPUTとOUTPUTの"箱の中"の設計に集中できました。 設計の出発点として、データ取得から送信までの各処理を「どこがボトルネックになるか」で整理し、並列化方針を決めました。 処理 特性 並列化方針 CT取得(クエリ → カーソル) I/O bound(DB側) 単一スレッド(DBがボトルネック) エンコード CPU bound Worker数で並列化 Pub/Sub Publish I/O bound(NW) 非同期APIで並列化 ACK確認 I/O bound(NW待ち) 別Workerプールで並列化 CPU boundとI/O boundを別レーンに分け、それぞれを独立した並列度で動かす設計です。以下、INPUT内部・OUTPUT内部の順で紹介します。 INPUT内部:取得とエンコードを分けた INPUT内部の設計では、メモリとCPUを独立した軸として扱えるようにしました。 メモリの設計:結果セット全体を展開せず、カーソルで小分けに読み進める方式を採用。1回のクエリで読むレコード数 RecordsPerChunk をプラグインの設定で指定でき、本番では10,000件/チャンク CPUの設計:取得処理とエンコード処理を別レーンに分け、エンコードは複数のWorkerで並列実行 取得とエンコードの間に中間キュー(jobs queue)を挟むことで、取得側はエンコードの完了を待たずに次のチャンクを先行投入できます。キュー容量がゼロだと直列に戻ってしまうため、本実装ではjobs queueの容量をWorker数の5倍に設定しています。 この構造のもとで、同時にレコード形式でメモリに乗るチャンク数はNumWorkers × 6個で頭打ちになります。内訳は「jobs queue上の最大NumWorkers × 5個 + 各Workerが処理中の1個」です。 同時メモリ上のレコード数 = RecordsPerChunk × (jobs queue + 処理中 Worker) = RecordsPerChunk × (NumWorkers × 5 + NumWorkers) = RecordsPerChunk × NumWorkers × 6 = 10,000 × NumWorkers × 6 例えばNumWorkers = 2なら、データ量に関わらず常に約12万レコード分のメモリしか確保しなくて済みます。100万件規模のトランザクションが流れてきても、結果セット全体を一括ロードしてしまう旧基盤と違ってOOMにはなりません。 なお、Fluent Bit上でカーソル方式を実装するときには工夫が必要でした。Fluent BitはINPUTに対して定期的に「データをちょうだい」と呼び出してくる構造になっており、素朴に書くと毎回新規にクエリを発行してしまいます。それでは結果セットが毎回頭から読み直されてしまうため、カーソル状態をプラグイン側に持ち越し、呼び出しごとに「続きから」読み進めるようにしました。 OUTPUT内部:送信とACK確認を分けた OUTPUT内部では、送信処理とACK確認処理を別レーンに分離しました。Pub/SubのPublishは同期的に書くと「送信 → ACK待ち → 次へ」と直列化してしまい、ACK待ちのネットワークI/Oが支配的になります。これだとスループットがACKレイテンシに律速されてしまうため、両者を分離して並列化する方針を取りました。 送信側:非同期APIを呼んで即座にFuture相当の結果を受け取り、次へ進む。送信そのものは止まらない 確認側:受け取ったFutureのACK確認専用のWorkerプールを設け、複数並列で確認する 各メッセージが独立したACKタイムアウトを持つようになり、1件の遅延が後続全