この記事は、Merpay Tech Openness Month 2023 の11日目の記事です。
こんにちは。メルペイのデータマネージャー@katsukitです。
本日は、現在メルペイで取り組んでいる非エンジニアのためのデータ集計環境についてご紹介します。
はじめに
データ活用には可視化、分析、調査、ML、CRMなど、さまざまな場面があると思います。エンジニアはもとよりデータアナリスト、マーケター、プロジェクトマネージャーなどと利用するユーザーもさまざまです。
これらの利用シーンで使用するデータにはお客さまのデータを取り扱うこともあり、データの管理をしっかりとやる必要があります。
一方で、お客さまへのアプローチまでスピード感が求められるマーケティングやCRM配信など、現場にデータ抽出・作成を委ねているデータ活用では、データガバナンスの維持が難しく、現場全体に統制されたデータ管理体制を構築する必要があると思います。
このような、現場にデータ抽出・作成を委ねるデータ活用に対し、データガバナンスの向上を目的とした取り組みの一つをご紹介したいと思います。
データ管理上の課題
マーケティング、CRM配信など関係者が多く、現場に必要なデータ抽出やデータ作成を委ねているデータ活用では、データの作成手段やルールがさまざまでデータ管理上の統制が難しいという問題があります。
データ管理を統制するために社内のデータ基盤を利用する事も考えられますが、関係者のコミュニケーションやシステムの実装・リリースが伴うので、一定の時間が必要なこともあり、スピード感が求められるデータ作成には適しません。
そこで、データ抽出要件からデータ作成まで、現場の非エンジニアに委ねるべきところは委ね、スピード感を維持する一方で、データ管理を統制するための、簡易的なデータの集計環境とルールを提供し、データガバナンス上の問題を改善する取り組みを行っています。
簡易的なデータ集計環境
非エンジニアがCRM配信などで利用するために提供しているデータ集計環境は、以下のような構成とフローになっています。
データの抽出とデータロードはBigQueryのScheduled Queryで行います。
データ基盤により集計された各マイクロサービスのデータ、もしくは加工された中間データをデータソースとして、Scheduled Queryにより、データ抽出・加工を行います。
実行するクエリや、結果データの保存先やスケジュールなどのデータ作成に関するメタ情報はGitHubで管理し、データ作成情報の履歴管理と承認プロセスを提供します。
クエリやデータ作成情報のGitHubリポジトリへのマージをトリガーに、GitHub Actionsを起動し、Scheduled Queryを登録もしくは更新を行います。
上記により、ユーザーは基本的にGitHubだけを利用し、Scheduled Queryを登録・データ作成までを実現することができます。
Scheduled Queryによる簡易的なデータ集計
Scheduled QueryはBigQueryの1機能で、クエリの定期的な実行をスケジュールすることができる機能です。BigQueryのGUIコンソールでも利用可能で、BigQueryのデータを抽出できるユーザーは簡単に利用することができます。
CRM配信関連のデータ作成では、これまでこのScheduled Queryを多用していたこともあり、当環境でも採用しています。
以下にScheduled Queryの利用の仕方についてご紹介します。
クエリのスケジュール登録/更新
Scheduled Queryの登録・更新はコンソールでの利用の他に、bqコマンド、API、Java、Pythonが利用できますが、Scheduled Queryに利用できる設定内容に差があります。例えば、クエリの実行開始時間や終了時間を設定する場合には、bqコマンドではできず、APIやJava/Pythonを利用する必要があります。当環境はPythonで実装しています。
Pythonで作成する場合は、google-cloud-bigquery-datatransfer
ライブラリを使用します。
実装する際は、BigQueryのガイドラインにあるScheduled Queryの設定内容では、仕様の詳細まではわからないので、Pythonライブラリのドキュメントで確認したほうがよいと思います。
Scheduled Queryの登録・更新時の主な設定情報は以下の通りです。
パラメータ | 型 | 説明 |
---|---|---|
destination_dataset_id | String | 結果保存先データセット |
display_name | String | スケジュールの名称 |
params | Struct(protobuf) dictionaryも可 |
実行内容詳細 |
├ query | String | 実行対象のクエリ |
├ destination_table_name_template | String | 作成テーブル名 |
├ write_disposition | String | テーブル書込方法 WRITE_TRUNCATE/WRITE_APPEND |
├ partitioning_field | String | パーティション対象のfield名 |
schedule | String | スケジュール |
schedule_options | ScheduleOptions | スケジュール詳細 |
├ start_time | Timestamp | 開始時間 |
├ end_time | Timestamp | 終了時間 |
service_account_name | String | 実行サービスアカウント |
またコード例を以下に示します。
* 以下は上位のTransferConfigという抽象クラスで初期化処理を実装している例になります
* paramsはjsonで受け取っている例になります
登録:
from google.cloud import bigquery_datatransfer
from google.protobuf import field_mask_pb2
transfer_client = bigquery_datatransfer.DataTransferServiceClient()
class CreateTransferConfig(TransferConfig):
def __init__(self, config):
super().__init__(config)
def execute(self):
parent = transfer_client.common_project_path(self.project_id)
schedule_options = bigquery_datatransfer.ScheduleOptions(
start_time=start_time,
end_time=end_time
)
transfer_config = bigquery_datatransfer.TransferConfig(
destination_dataset_id=self.target_dataset,
display_name=self.display_name,
data_source_id="scheduled_query",
params=json.loads(self.params),
schedule=self.schedule,
schedule_options=schedule_options
)
transfer_config = transfer_client.create_transfer_config(
bigquery_datatransfer.CreateTransferConfigRequest(
parent=parent,
transfer_config=transfer_config,
service_account_name=self.service_account_name,
)
)
更新:
class UpdateTransferConfig(TransferConfig):
def __init__(self, config):
super().__init__(config)
def execute(self):
schedule_options = bigquery_datatransfer.ScheduleOptions(
start_time=start_time,
end_time=end_time
)
transfer_config = bigquery_datatransfer.TransferConfig(
name=self.resource_name,
destination_dataset_id=self.target_dataset,
display_name=self.display_name,
params=json.loads(self.params),
schedule=self.schedule,
schedule_options=schedule_options
)
transfer_config = transfer_client.update_transfer_config(
{
"transfer_config": transfer_config,
"update_mask": field_mask_pb2.FieldMask(
paths=["params",
"destination_dataset_id",
"display_name",
"schedule",
"schedule_options",
"service_account_name"
]
),
"service_account_name": self.service_account_name,
}
)
更新時は、FieldMaskで更新対象を指定します。
テーブルの更新仕様
テーブル更新方法はparams内のwrite_disposition
で設定できます。
設定できるのは WRITE_TRUNCATE (上書き) もしくは WRITE_APPEND (追加)になります。
取り込み時間でのパーティション分割に設定することで実行毎の履歴データとして保存することができます。指定は以下のように設定します。
"destination_table_name_template": "table_name${run_date}"
このとき、partitioning_field
には何も設定しないようにしてください。
なお、suffixテーブルとして作成したい場合は、以下のように設定します。
"destination_table_name_template": "table_name_{run_time|"%Y%m%d"}"
Scheduled Queryのバックフィル実行時の冪等性を考えて、実行クエリには実行日時にScheduled Queryで利用できるクエリパラメータ@run_time / @run_dateを利用するようにします。
SQL例:
-- 実行日以前のユーザー登録を抽出
SELECT
user_id
, registered_at
FROM
`<project>.<dataset>.<table>`
WHERE
date(registered_at) <= @run_date
クエリ管理とデータのメタ管理
クエリやデータの作成情報はGitHubで管理します。
しかし、非エンジニアにとってはGitの利用は馴染みがないことが多く、ハードルが高いため、利用を促すために極力簡易化する必要があります。
GitHubを利用するためのツールはいろいろありますが、できるだけWeb上でできるようにGitHub自体の機能を利用しています。
データの作成情報は、Scheduled Queryに必要なパラメータの他に、データオーナーや作成したテーブルの有効期限などを設定します。
カンパニー、プロジェクト/サービス毎にデータの作成情報をまとめ、データが必要な業務やプロジェクトと、データの作成情報が紐づくように管理します。
管理している情報は以下の通りです。
- 実行クエリ
- データオーナー
- 作成データの説明
- データ(テーブル)の有効期限
- CRM関連データ(配信内容や配信名称)
- 実行スケジュール(開始日・終了日含む)
- データ(テーブル)の更新仕様(上書き/追加、パーティションの有無など)
管理する情報は、以下のようにクエリとデータ作成情報に分け、ファイルで管理します。データ作成情報はYAMLで構成しています。
クエリファイル例:
SELECT
user_id
, registered_at
FROM
`<project>.<dataset>.<table>`
WHERE
date(registered_at) <= @run_date
データ作成情報ファイル例:
delivery_name: campaign
delivery_schedule: every 24 hours
delivery_type: demo_delivery
description: "デモ"
partition_field: date
write_disposition: WRITE_TRUNCATE
GitHubのIssue FormとGitHub Actionsの連動
上記情報のGitHubへのアップロードは、GitのcommitやpushなどGit操作の知識が必要になりますが、これをGitHub Issue FormとGitHub Actionsを利用して自動化することで、簡易化を実現しています。
GitHub Issue Form
GitHubのIssue Formは、これまでの自由入力なIssueに対してリッチな入力フォームを作成することができる機能になります。テンプレートにより、ユーザーに設定してほしい項目を構造化し、簡単なワークフローを作成することができます。
なお、執筆時点ではbeta版となっており、変更される可能性があるので、ご注意ください。
Issue Formのテンプレートは、マークダウンで記述するIssueテンプレートと同様に.github/ISSUE_TEMPLATE
配下にYAMLで記述します。
以下のような記述式でテキストエリアやドロップダウンなど構成することができます。
構成できる入力タイプは以下のものです。
- markdown
- input
- textarea
- dropdown
- checkboxes
必須チェックといった簡単な入力チェックも可能です。
詳細についてはこちらのガイドラインをご参照ください。
以下が設定例になります。
name: Request to create deliveries
description: Request to create delivery data for CRM
title: "[Request]: "
labels: ['request delivery']
body:
- type: markdown
attributes:
value: |
CRM向け配信対象データの作成クエリの登録
- type: dropdown
id: company
attributes:
label: Company Name
description: 配信データを作成するカンパニー
options:
- mercari
- merpay
validations:
required: true
- type: input
id: service_name
attributes:
label: Service Name
description: 配信データを作成するサービス名もしくはプロジェクト名
placeholder: e.g. creditdesign
validations:
required: true
- type: input
id: delivery_type
attributes:
label: Delivery Type
description:
placeholder: e.g.
validations:
required: true
- type: input
id: delivery_name
attributes:
label: Delivery Name
description:
placeholder: e.g.
validations:
required: true
- type: textarea
id: delivery_description
attributes:
label: Delivery Description
description:
placeholder: e.g.
validations:
required: true
- type: input
id: delivery_schedule
attributes:
label: Delivery Schedule
description: 実行スケジュール(UTC)
placeholder: e.g. every 24 hour
- type: input
id: start_time
attributes:
label: Start Time
description: 開始日時(UTC)
placeholder: e.g. YYYY-mm-DD HH:MM:SS
- type: input
id: end_time
attributes:
label: End Time
description: 終了日時(UTC)
placeholder: e.g. YYYY-mm-DD HH:MM:SS
- type: textarea
id: query
attributes:
label: Query
description:
placeholder: e.g. select * from A
validations:
required: true
- type: dropdown
id: write_disposition
attributes:
label: Write Disposition
description:
options:
- WRITE_TRUNCATE
- WRITE_APPEND
validations:
required: true
- type: input
id: partition_field
attributes:
label: Partition Field
description:
- type: dropdown
id: ingestion_time_partitioned
attributes:
label: Ingestion Time Partitioned
description: 取り込み時間パーティションの設定
options:
- INGETION_TIME_PARTITIONED
上記を表示すると以下のようなフォームになります。
このIssue Formで作成された入力フォームで必要な情報を入力し、submitするだけで、必要なファイル作成とPullRequestまで自動生成する仕組みを提供しています。
作成されたPullRequestを承認者が問題ないか確認し、マージするワークフローを経ることでクエリの一定の品質を担保します。
さらにPullRequestのマージをトリガーに、自動的にScheduled Queryを登録・更新し、Scheduled Queryがデータを作成します。
このようにユーザーはIssue Formの入力と承認ワークフローを経るだけで、定期的なデータ作成を実現できるようになっています。
自動生成は後述するGitHub Actionsで実現しています。
GitHub Actions
GitHub ActionsはGitHubが提供するCI/CDです。
GitHubのリソースを直接ビルド、テスト、デプロイが可能で、YAMLにより容易にワークフローを生成することができます。
今回は、このGitHub Actionsの仕組みを活用し、GitHubにpushされたファイルを基にデータ作成までの自動化を実現しています。
今回作成したGitHub Actionsの主なワークフローは以下の通りです。
- GitHub Issueの内容をもとにファイルの作成、コミット、PullRequestを作る
- PullRequestのマージによりScheduled Queryを作成する
PullRequestのマージ時のワークフローの大きな流れは以下のようになっています。
GitHub ActionsはワークフローをYAML形式で記述し、.github/workflows
内に保存することで実行できるようになります。
起動タイミングは以下のようにon
要素に記述します。上記のワークフローは以下のように記述しています。
Issue作成:
on:
issues:
types: ['opened']
issue_comment:
types: ['created']
Issue_comment
も設定しているのは、Issueの内容を修正し、再度PullRequestを作成したいときに、コメントにrebuild please
としたときに再度ワークフローを起動するようにしているためです。
関係のないIssueが作成されるケースがあるので、Issueにラベルをつけて、該当ラベルのときだけ起動するよう条件を指定するようにしています。
PullRequestマージ時:
on:
push:
branches:
- main
paths:
- 'deliveries/**'
上記はmainブランチにマージされたときに起動する記述になります。
リポジトリにはデータ作成情報のファイル以外にも保存するファイルがあるので、該当ディレクトリ配下の変更時だけ起動するようにpaths
を指定しています。
ワークフローの各処理は jobs
要素内のsteps
要素に処理を記述します。
BQの操作には、BQの操作アカウントでまず認証・認可が必要になります。
以下はWorkload Identity で認証するステップの例です。
- id: auth
name: Authenticate
uses: google-github-actions/auth@v0
with:
workload_identity_provider: ${{ steps.settings.outputs.wip }}
service_account: ${{ steps.settings.outputs.sa }}
複数のスケジュールが一度に登録された場合に複数のジョブに分けてそれぞれ実行されるようにするために matrix strategies
を利用します。
以下の例では、実行の単位となる親ディレクトリのJSON配列service_df
分だけジョブが分割され、それぞれのジョブでステップが実行されます。
jobs:
check:
runs-on: ubuntu-latest
outputs:
service_df: ${{ steps.diff.outputs.service_df }}
steps:
...
needs: check
if: ${{ needs.check.outputs.service_df != '' }}
strategy:
matrix:
diff: ${{fromJson(needs.check.outputs.service_df)}}
steps:
...
GitHub Actionsの仕様詳細を知りたい場合は、こちらをご参照ください。
上記のGitHub Actionsのワークフローにより自動実行されることで、利用者はGitの操作やScheduled Queryの登録を意識しないで済むようになり、Scheduled Queryの登録やデータ作成上のルールを統一し、データ作成を一元管理することが可能になります。
おわりに
今回は非エンジニアのためのデータ集計環境の取り組みについて紹介させていただきました。
当環境で、データ作成の自動化、クエリの管理手段、承認プロセスやワークフローを非エンジニアを含むデータ利用者に提供することで、オペレーションのミス、情報管理上のリスクや思わぬ事故を極力減らし、防ぐことができる、と考えています。
今後は、Scheduled Queryの誤登録を防ぐための入力チェックの強化や、Scheduled Query登録時や実行時の通知機能の実装を検討中です。
今回の記事が読者のみなさんにとって少しでも有益なものになれば幸いです。
明日の記事は @mikichinさんです。引き続きお楽しみください。