この記事は、Merpay Advent Calendar 2022 の10日目の記事です。
こんにちは。メルペイ Data Management TeamのData Managerのhyrrot(@hyrrot)です。
メルカリグループでは、社員がデータに基づく意思決定を行えるようにするために、Google BigQueryを使って構築したデータウェアハウスを管理・運用しています。様々なデータソースからBigQueryにデータを取り込んでから、dbt(data build tool)を利用してデータウェアハウスに取り込まれたデータを変換し、利用者がスムーズにデータを利用できるようにしています。
本記事では、こちらのdbtを実行するシステムをどのように設計・実装したかについて説明します。
dbtに限らず、何かを定期的に実行するシステムをGCP (Google Cloud Platform)で実装したい方にとって、下記の情報がお役に立つかもしれません。
dbtを実行する基盤を作る上での考慮点
dbtを実行する基盤を作る上での考慮点として、以下のようなものがありました。
- 利用者に新鮮なデータをお届けするために、dbtのデータ変換処理を定期的・自動的に実行したい。
- BigQueryがメルカリグループ管理のGCP組織内にあるため、ネットワークの局所性や認証・認可の容易性などを考慮して、BigQueryと同じGCP組織内でdbtを実行したい。
- dbtの実行は数時間に及ぶことがあるので、その終了までに基盤の都合でタイムアウトされないようにしたい。
- 処理の失敗時のリトライなどの必要に応じて、定期的に実行されている処理を手動で再実行したい。
- GCPサービスに対する知見はある程度チーム内にあったが、Airflowなどのワークフローエンジンや、そのマネージドサービスであるCloud Composerなどの知見がなかった。ワークフローエンジンを習熟する時間をかけずに、動作するものを作成したい。
設計と実装
上記を考慮の上で、以下の図の仕組みを構築しました。Cloud SchedulerをトリガーとしてCloud Buildのビルドを起動し、そのビルドがdbtが含まれるDockerイメージをpullし、そのイメージに含まれるdbtを実行することで、BigQueryのデータの変換を行います。
なぜCloud Build?
Cloud BuildはCI/CDのプラットフォームであり、ビルド/テスト/デプロイが行われることを意図して作られているものです。しかし、これを使ってdbtを実行することで、いくつかの利点を享受することができます。
- タイムアウト時間が最長で24時間に設定できるので、ジョブの実行が数時間かかる場合でもタイムアウトされることがない。(たとえば、同様にコンテナを実行できるCloud Runのタイムアウトは1時間を超えることができない。)
- サーバーレスのシステムであり、dbtコンテナを実際に動作させる基盤の運用が必要ないほか、処理が実行されていない時間に課金されることはない。
- 複数の依存関係がある処理群を、複数の「ステップ」として定義し、互いに依存関係がない処理は並列に実行して、処理全体の時間の短縮につなげることができる。
また、メルカリグループでは、すでに多数のチームがCloud Buildを利用しているためメリットを享受できませんが、Cloud Buildは120分/日のビルドを無料で実行できるので、小規模な組織でバッチ処理を行うプラットフォームをお探しの方にとって、このメリットが魅力的であるかもしれません。
処理の流れ
あらかじめ、(E) Cloud Storageのチーム管理下のバケットに、dbtのDockerイメージ上でdbt処理を実行するCloud Buildのビルド構成ファイル(ここでは、dbt.yaml)を作成しておきます。dbt.yamlの内容は以下のようなものとなります。
steps:
- id: dbt_run
name: gcr.io/some-project-name/artifact-registry/dbt:latest
wait_for: ["-"]
entrypoint: "bash"
args:
- "-c"
- |
if [[ -z "${_RUN_DBT_RUN}" ]]; then
exit 0
fi
dbt run --selector "${_SELECTOR}"
- id: dbt_test
name: gcr.io/some-project-name/artifact-registry/dbt:latest
wait_for: ["dbt_run"]
entrypoint: "bash"
args:
- "-c"
- |
if [[ -z "${_RUN_DBT_TEST}" ]]; then
exit 0
fi
dbt test --selector "${_SELECTOR}"
このビルド構成をCloud Buildで実行すると、以下のような動作となります。
- もしパラメータ
_RUN_DBT_RUN
が空文字列でない場合、dbtのセレクタに_SELECTOR
の値を指定してdbt run
を実行 - 上記が正常終了後、もしパラメータ
_RUN_DBT_TEST
が空文字列でない場合、dbtのセレクタに_SELECTOR
の値を指定してdbt test
を実行
次に、(A) Cloud Schedulerのジョブは、設定された時間に(B) Pub/Subに以下の情報を含むメッセージを送信します。
- dbt.yamlの(F) Cloud Storage上のパス
- dbt.yaml内のパラメータ(
_RUN_DBT_RUN
,_RUN_DBT_TEST
,_SELECTOR
)の置換
実際のメッセージ内容は以下のようになります。
{"cloud_build_yaml_path": "dbt.yaml",
"substitutions": {
"_SELECTOR": "daily",
"_RUN_DBT_RUN": "1",
"_RUN_DBT_TEST": "1"
}
}
ここで送信された(B) Pub/Subのメッセージによって、(C) Cloud FunctionsのFunctionが起動されます。このFunctionは以下のような実装になっています。
import base64
from google.cloud.devtools import cloudbuild_v1
import json
def main(event, context):
project_id = "YOUR_PROJECT_ID"
payload_dict = json.loads(base64.b64decode(event['data']).decode('utf-8'))
substitution_params = ",".join([f"{k}={v}" for k, v in payload_dict["substitutions"].items()])
bootstrap_script = """
gsutil cp gs://some-yaml-store-bucket/{cloud_build_yaml_path} ./cloudbuild.yaml
gcloud builds submit \
--async \
--no-source \
--project={project_id} \
--config=cloudbuild.yaml \
--substitutions={substitution_params}
""".format(
cloud_build_yaml_path=payload_dict["cloud_build_yaml_path"],
substitution_params=substitution_params,
project_id=project_id
)
bootstrap_build = {
"steps":[
{
"name": "gcr.io/cloud-builders/gcloud",
"entrypoint": "bash",
"args": [
"-e",
"-x",
"-c",
bootstrap_script
]
}
]
}
build_client = cloudbuild_v1.CloudBuildClient()
build = cloudbuild_v1.Build(bootstrap_build)
build_result = build_client.create_build(build=build, project_id=project_id)
このFunctionsは、別のCloud Buildのビルド (F) を起動する Cloud Buildのビルド(D)をCloud BuildのAPIを直接呼び出して作成します。ここで作成されるビルド(D)は以下のような処理を行います。
- (E) Cloud StorageからCloud Buildビルド構成ファイルをダウンロード
- (B)Pub/Subから送られた、パラメータ置換の組み合わせを指定して、
gcloud builds submit
コマンドを利用してCloud Buildのビルド (F)を起動
起動されたビルド(F)は、Artifact Registry (G) からdbtランタイムとモデルが入ったイメージをダウンロードし、そのイメージ内でdbtを実行し、BigQuery(H)でデータを変換します。
この仕組みのポイント
上記で紹介したdbtの実行基盤によって、以下のようなメリットを享受することができています。
- (A) Cloud Schedulerのジョブ管理画面から、定期実行と同様の構成の手動再実行を容易に行える。
- (B) Cloud Pub/Subにパブリッシュするメッセージを書き換えて、手動でパブリッシュすることで、任意のパラメータに対しての処理の実行を、この基盤の再デプロイを伴わずに行うことができる。
たとえば、再実行の際に「定期実行時と同じ_SELECTOR
の値で、dbt run
を実行せずに、dbt test
だけを実行する」という場合は、以下のように書き換えたメッセージを(B) Pub/Subにパブリッシュするだけとなる。{"cloud_build_yaml_path": "dbt.yaml", "substitutions": { "_SELECTOR": "daily", "_RUN_DBT_RUN": "", # 長さ0の文字列にする "_RUN_DBT_TEST": "1" } }
補足
(C) Cloud Functions が直接(F) Cloud Buildのビルドを呼び出していないのはなぜ?
Cloud FunctionsでCloud Buildを実行する場合、内部でgcloud builds submit
コマンドを直接利用できないため、Cloud Buildのクライアントライブラリを利用してCloud Buildを起動することになります。
その際に、このライブラリのパラメータとして、Cloud Buildのビルド構成ファイルの内容を指定するのですが、このパラメータのスキーマが gcloud builds submit
で送るビルド構成ファイルのものと互換性がありません。このスキーマを相互に変換するのは簡単ではないことも調査の結果わかりました。
Cloud Buildのビルド構成ファイルは一般に、gcloud builds submit
コマンドで実行可能なyamlファイルとして配布されており、この基盤で扱うビルド構成もそれに合わせた形としておきたいところです。
上記の理由で、gcloud builds submit
コマンドを利用してCloud Buildのビルドを起動するために、そのためだけの別のビルドを別に用意するという設計にしました。
(A) Cloud Schedulerが (D) Cloud Buildを直接呼んでもよかったのでは?
Cloud Schedulerは、Pub/Subにメッセージをパブリッシュするだけでなく、httpエンドポイントにリクエストを送るジョブを定義することができます。これを利用し、Cloud BuildのAPIを呼び、直接ビルドを作成することも可能です。
今回そのようにせず、Pub/Sub経由にしたのは、上の基盤を再デプロイすることなく、任意のパラメータでCloud Buildのビルドを起動する機能が必要だったためです。
Pub/SubからCloud Buildをトリガーする機能があり、これを利用するとCloud Functionsは不要になりますが、最終的には利用しませんでした。この機能はトリガーのたびにGitHubなどのソースコードリポジトリからコードをcloneする処理になっており、今回の用途で使ってしまうと、たとえばGitHubが落ちたときに本来実行されてほしい処理が実行されなくなる問題があります。結果としてCloud FunctionsからCloud Buildをトリガーする方法で処理を実現することにしました。
また、後日、Pub/Subが同一のメッセージを二重に配送してしまうことがあり、その場合に後続のCloud Buildが二重に実行されてしまう可能性があることに気づきました。その対応を、Cloud FunctionsのFunctionにそのための機能を追加するだけの、少ない変更で行うことができたことも大きなメリットでした。
まとめと今後
Cloud BuildをはじめとしたGCPのサービスを利用して、ワークフローエンジンを別途用意せずに処理を定期実行するシステムを作り、それを使ってdbtを定期実行して、データ利用者に新鮮なデータをお届けすることができるようになりました。
当面はこの構成を利用してデータサービスを提供することができていましたが、サービスに対する様々な追加の要求に応えている中で、最近では現状のシステムで限界も見えてきました。
たとえば、Cloud Buildは、あるステップが失敗すると、そこで問答無用でビルド全体を失敗として終了させます。「あるステップが失敗した後でも、その後続のステップは実行させ、その上で最終的にビルドを失敗させる」という制御を行いたい場合があったのですが、この処理をスクリプトとしてステップの中に実装して対処していました。
このような、追加の要件にともなう別の処理を追加するたびにビルド構成ファイルのビルドステップ数が増えることになり、構成ファイルが肥大化し、メンテナビリティが低下していきました。
それに伴って、チームではワークフローエンジンを用いるなど、新しい基盤への乗り換えを検討しているところです。
最後に
GCP上で、何かを定期的に実行したい方にとってお役に立てますとありがたく存じます。
明日の記事は @resotto さんです。引き続きお楽しみください。