非エンジニアのためのデータ集計環境について

この記事は、Merpay Tech Openness Month 2023 の11日目の記事です。

こんにちは。メルペイのデータマネージャー@katsukitです。
本日は、現在メルペイで取り組んでいる非エンジニアのためのデータ集計環境についてご紹介します。

はじめに

データ活用には可視化、分析、調査、ML、CRMなど、さまざまな場面があると思います。エンジニアはもとよりデータアナリスト、マーケター、プロジェクトマネージャーなどと利用するユーザーもさまざまです。

これらの利用シーンで使用するデータにはお客さまのデータを取り扱うこともあり、データの管理をしっかりとやる必要があります。

一方で、お客さまへのアプローチまでスピード感が求められるマーケティングやCRM配信など、現場にデータ抽出・作成を委ねているデータ活用では、データガバナンスの維持が難しく、現場全体に統制されたデータ管理体制を構築する必要があると思います。

このような、現場にデータ抽出・作成を委ねるデータ活用に対し、データガバナンスの向上を目的とした取り組みの一つをご紹介したいと思います。

データ管理上の課題

マーケティング、CRM配信など関係者が多く、現場に必要なデータ抽出やデータ作成を委ねているデータ活用では、データの作成手段やルールがさまざまでデータ管理上の統制が難しいという問題があります。

データ管理を統制するために社内のデータ基盤を利用する事も考えられますが、関係者のコミュニケーションやシステムの実装・リリースが伴うので、一定の時間が必要なこともあり、スピード感が求められるデータ作成には適しません。

そこで、データ抽出要件からデータ作成まで、現場の非エンジニアに委ねるべきところは委ね、スピード感を維持する一方で、データ管理を統制するための、簡易的なデータの集計環境とルールを提供し、データガバナンス上の問題を改善する取り組みを行っています。

簡易的なデータ集計環境

非エンジニアがCRM配信などで利用するために提供しているデータ集計環境は、以下のような構成とフローになっています。

データの抽出とデータロードはBigQueryのScheduled Queryで行います。

データ基盤により集計された各マイクロサービスのデータ、もしくは加工された中間データをデータソースとして、Scheduled Queryにより、データ抽出・加工を行います。

実行するクエリや、結果データの保存先やスケジュールなどのデータ作成に関するメタ情報はGitHubで管理し、データ作成情報の履歴管理と承認プロセスを提供します。

クエリやデータ作成情報のGitHubリポジトリへのマージをトリガーに、GitHub Actionsを起動し、Scheduled Queryを登録もしくは更新を行います。

上記により、ユーザーは基本的にGitHubだけを利用し、Scheduled Queryを登録・データ作成までを実現することができます。

Scheduled Queryによる簡易的なデータ集計

Scheduled QueryはBigQueryの1機能で、クエリの定期的な実行をスケジュールすることができる機能です。BigQueryのGUIコンソールでも利用可能で、BigQueryのデータを抽出できるユーザーは簡単に利用することができます。

CRM配信関連のデータ作成では、これまでこのScheduled Queryを多用していたこともあり、当環境でも採用しています。

以下にScheduled Queryの利用の仕方についてご紹介します。

クエリのスケジュール登録/更新

Scheduled Queryの登録・更新はコンソールでの利用の他に、bqコマンド、API、Java、Pythonが利用できますが、Scheduled Queryに利用できる設定内容に差があります。例えば、クエリの実行開始時間や終了時間を設定する場合には、bqコマンドではできず、APIやJava/Pythonを利用する必要があります。当環境はPythonで実装しています。

Pythonで作成する場合は、google-cloud-bigquery-datatransferライブラリを使用します。

実装する際は、BigQueryのガイドラインにあるScheduled Queryの設定内容では、仕様の詳細まではわからないので、Pythonライブラリのドキュメントで確認したほうがよいと思います。

Scheduled Queryの登録・更新時の主な設定情報は以下の通りです。

パラメータ 説明
destination_dataset_id String 結果保存先データセット
display_name String スケジュールの名称
params Struct(protobuf)
dictionaryも可
実行内容詳細
├ query String 実行対象のクエリ
├ destination_table_name_template String 作成テーブル名
├ write_disposition String テーブル書込方法
WRITE_TRUNCATE/WRITE_APPEND
├ partitioning_field String パーティション対象のfield名
schedule String スケジュール
schedule_options ScheduleOptions スケジュール詳細
├ start_time Timestamp 開始時間
├ end_time Timestamp 終了時間
service_account_name String 実行サービスアカウント

またコード例を以下に示します。
* 以下は上位のTransferConfigという抽象クラスで初期化処理を実装している例になります
* paramsはjsonで受け取っている例になります

登録:

from google.cloud import bigquery_datatransfer
from google.protobuf import field_mask_pb2

transfer_client = bigquery_datatransfer.DataTransferServiceClient()

class CreateTransferConfig(TransferConfig):

    def __init__(self, config):
        super().__init__(config)

    def execute(self):
        parent = transfer_client.common_project_path(self.project_id)

        schedule_options = bigquery_datatransfer.ScheduleOptions(
                    start_time=start_time,
                    end_time=end_time
                    )

        transfer_config = bigquery_datatransfer.TransferConfig(
                destination_dataset_id=self.target_dataset,
                display_name=self.display_name,
                data_source_id="scheduled_query",
                params=json.loads(self.params),
                schedule=self.schedule,
                schedule_options=schedule_options
                )

        transfer_config = transfer_client.create_transfer_config(
            bigquery_datatransfer.CreateTransferConfigRequest(
                parent=parent,
                transfer_config=transfer_config,
                service_account_name=self.service_account_name,
            )
        )

更新:

class UpdateTransferConfig(TransferConfig):

    def __init__(self, config):
        super().__init__(config)

    def execute(self):
        schedule_options = bigquery_datatransfer.ScheduleOptions(
                    start_time=start_time,
                    end_time=end_time
                    )

        transfer_config = bigquery_datatransfer.TransferConfig(
                name=self.resource_name,
                destination_dataset_id=self.target_dataset,
                display_name=self.display_name,
                params=json.loads(self.params),
                schedule=self.schedule,
                schedule_options=schedule_options
                )

        transfer_config = transfer_client.update_transfer_config(
                {
                    "transfer_config": transfer_config,
                    "update_mask": field_mask_pb2.FieldMask(
                        paths=["params",
                               "destination_dataset_id",
                               "display_name",
                               "schedule",
                               "schedule_options",
                               "service_account_name"
                               ]
                    ),
                    "service_account_name": self.service_account_name,
                }
)

更新時は、FieldMaskで更新対象を指定します。

テーブルの更新仕様

テーブル更新方法はparams内のwrite_disposition で設定できます。
設定できるのは WRITE_TRUNCATE (上書き) もしくは WRITE_APPEND (追加)になります。

取り込み時間でのパーティション分割に設定することで実行毎の履歴データとして保存することができます。指定は以下のように設定します。

"destination_table_name_template": "table_name${run_date}"

このとき、partitioning_fieldには何も設定しないようにしてください。

なお、suffixテーブルとして作成したい場合は、以下のように設定します。

"destination_table_name_template": "table_name_{run_time|"%Y%m%d"}"

Scheduled Queryのバックフィル実行時の冪等性を考えて、実行クエリには実行日時にScheduled Queryで利用できるクエリパラメータ@run_time / @run_dateを利用するようにします。

SQL例:

-- 実行日以前のユーザー登録を抽出
SELECT
  user_id
  , registered_at
FROM
  `<project>.<dataset>.<table>`
WHERE
  date(registered_at) <= @run_date

クエリ管理とデータのメタ管理

クエリやデータの作成情報はGitHubで管理します。
しかし、非エンジニアにとってはGitの利用は馴染みがないことが多く、ハードルが高いため、利用を促すために極力簡易化する必要があります。

GitHubを利用するためのツールはいろいろありますが、できるだけWeb上でできるようにGitHub自体の機能を利用しています。

データの作成情報は、Scheduled Queryに必要なパラメータの他に、データオーナーや作成したテーブルの有効期限などを設定します。

カンパニー、プロジェクト/サービス毎にデータの作成情報をまとめ、データが必要な業務やプロジェクトと、データの作成情報が紐づくように管理します。

管理している情報は以下の通りです。

  • 実行クエリ
  • データオーナー
  • 作成データの説明
  • データ(テーブル)の有効期限
  • CRM関連データ(配信内容や配信名称)
  • 実行スケジュール(開始日・終了日含む)
  • データ(テーブル)の更新仕様(上書き/追加、パーティションの有無など)

管理する情報は、以下のようにクエリとデータ作成情報に分け、ファイルで管理します。データ作成情報はYAMLで構成しています。

クエリファイル例:

SELECT
  user_id
  , registered_at
FROM
  `<project>.<dataset>.<table>`
WHERE
  date(registered_at) <= @run_date

データ作成情報ファイル例:

delivery_name: campaign
delivery_schedule: every 24 hours
delivery_type: demo_delivery
description: "デモ"
partition_field: date
write_disposition: WRITE_TRUNCATE

GitHubのIssue FormとGitHub Actionsの連動

上記情報のGitHubへのアップロードは、GitのcommitやpushなどGit操作の知識が必要になりますが、これをGitHub Issue FormとGitHub Actionsを利用して自動化することで、簡易化を実現しています。

GitHub Issue Form

GitHubのIssue Formは、これまでの自由入力なIssueに対してリッチな入力フォームを作成することができる機能になります。テンプレートにより、ユーザーに設定してほしい項目を構造化し、簡単なワークフローを作成することができます。
なお、執筆時点ではbeta版となっており、変更される可能性があるので、ご注意ください。

Issue Formのテンプレートは、マークダウンで記述するIssueテンプレートと同様に.github/ISSUE_TEMPLATE 配下にYAMLで記述します。

以下のような記述式でテキストエリアやドロップダウンなど構成することができます。
構成できる入力タイプは以下のものです。

  • markdown
  • input
  • textarea
  • dropdown
  • checkboxes

必須チェックといった簡単な入力チェックも可能です。
詳細についてはこちらのガイドラインをご参照ください。

以下が設定例になります。

name: Request to create deliveries
description: Request to create delivery data for CRM
title: "[Request]: "
labels: ['request delivery']

body:
  - type: markdown
    attributes:
      value: |
        CRM向け配信対象データの作成クエリの登録
  - type: dropdown
    id: company
    attributes:
      label: Company Name
      description: 配信データを作成するカンパニー
      options:
        - mercari
        - merpay
    validations:
      required: true
  - type: input
    id: service_name
    attributes:
      label: Service Name
      description: 配信データを作成するサービス名もしくはプロジェクト名
      placeholder: e.g. creditdesign
    validations:
      required: true
  - type: input
    id: delivery_type
    attributes:
      label: Delivery Type
      description: 
      placeholder: e.g. 
    validations:
      required: true
  - type: input
    id: delivery_name
    attributes:
      label: Delivery Name
      description: 
      placeholder: e.g. 
    validations:
      required: true
  - type: textarea
    id: delivery_description
    attributes:
      label: Delivery Description
      description: 
      placeholder: e.g. 
    validations:
      required: true
  - type: input
    id: delivery_schedule
    attributes:
      label: Delivery Schedule
      description: 実行スケジュール(UTC)
      placeholder: e.g. every 24 hour
  - type: input
    id: start_time
    attributes:
      label: Start Time
      description: 開始日時(UTC)
      placeholder: e.g. YYYY-mm-DD HH:MM:SS
  - type: input
    id: end_time
    attributes:
      label: End Time
      description: 終了日時(UTC)
      placeholder: e.g. YYYY-mm-DD HH:MM:SS
  - type: textarea
    id: query
    attributes:
      label: Query
      description: 
      placeholder: e.g. select * from A
    validations:
      required: true
  - type: dropdown
    id: write_disposition
    attributes:
      label: Write Disposition
      description: 
      options:
        - WRITE_TRUNCATE
        - WRITE_APPEND
    validations:
      required: true
  - type: input
    id: partition_field
    attributes:
      label: Partition Field
      description: 
  - type: dropdown
    id: ingestion_time_partitioned
    attributes:
      label: Ingestion Time Partitioned
      description: 取り込み時間パーティションの設定
      options:
        - INGETION_TIME_PARTITIONED

上記を表示すると以下のようなフォームになります。

このIssue Formで作成された入力フォームで必要な情報を入力し、submitするだけで、必要なファイル作成とPullRequestまで自動生成する仕組みを提供しています。

作成されたPullRequestを承認者が問題ないか確認し、マージするワークフローを経ることでクエリの一定の品質を担保します。

さらにPullRequestのマージをトリガーに、自動的にScheduled Queryを登録・更新し、Scheduled Queryがデータを作成します。

このようにユーザーはIssue Formの入力と承認ワークフローを経るだけで、定期的なデータ作成を実現できるようになっています。

自動生成は後述するGitHub Actionsで実現しています。

GitHub Actions

GitHub ActionsはGitHubが提供するCI/CDです。
GitHubのリソースを直接ビルド、テスト、デプロイが可能で、YAMLにより容易にワークフローを生成することができます。

今回は、このGitHub Actionsの仕組みを活用し、GitHubにpushされたファイルを基にデータ作成までの自動化を実現しています。

今回作成したGitHub Actionsの主なワークフローは以下の通りです。

  • GitHub Issueの内容をもとにファイルの作成、コミット、PullRequestを作る
  • PullRequestのマージによりScheduled Queryを作成する

PullRequestのマージ時のワークフローの大きな流れは以下のようになっています。

GitHub ActionsはワークフローをYAML形式で記述し、.github/workflows内に保存することで実行できるようになります。

起動タイミングは以下のようにon要素に記述します。上記のワークフローは以下のように記述しています。

Issue作成:

on:
  issues:
    types: ['opened']
  issue_comment:
    types: ['created']

Issue_commentも設定しているのは、Issueの内容を修正し、再度PullRequestを作成したいときに、コメントにrebuild pleaseとしたときに再度ワークフローを起動するようにしているためです。

関係のないIssueが作成されるケースがあるので、Issueにラベルをつけて、該当ラベルのときだけ起動するよう条件を指定するようにしています。

PullRequestマージ時:

on:
  push:
    branches:
      - main
    paths:
      - 'deliveries/**'

上記はmainブランチにマージされたときに起動する記述になります。
リポジトリにはデータ作成情報のファイル以外にも保存するファイルがあるので、該当ディレクトリ配下の変更時だけ起動するようにpathsを指定しています。

ワークフローの各処理は jobs要素内のsteps要素に処理を記述します。
BQの操作には、BQの操作アカウントでまず認証・認可が必要になります。
以下はWorkload Identity で認証するステップの例です。

      - id: auth
        name: Authenticate
        uses: google-github-actions/auth@v0
        with:
          workload_identity_provider: ${{ steps.settings.outputs.wip }}
          service_account: ${{ steps.settings.outputs.sa }}

複数のスケジュールが一度に登録された場合に複数のジョブに分けてそれぞれ実行されるようにするために matrix strategiesを利用します。

以下の例では、実行の単位となる親ディレクトリのJSON配列service_df分だけジョブが分割され、それぞれのジョブでステップが実行されます。

jobs:
  check:
    runs-on: ubuntu-latest
    outputs:
      service_df: ${{ steps.diff.outputs.service_df }}
    steps:

...

  needs: check
  if: ${{ needs.check.outputs.service_df != '' }}
  strategy:
      matrix:
        diff: ${{fromJson(needs.check.outputs.service_df)}}
  steps:

...

GitHub Actionsの仕様詳細を知りたい場合は、こちらをご参照ください。

上記のGitHub Actionsのワークフローにより自動実行されることで、利用者はGitの操作やScheduled Queryの登録を意識しないで済むようになり、Scheduled Queryの登録やデータ作成上のルールを統一し、データ作成を一元管理することが可能になります。

おわりに

今回は非エンジニアのためのデータ集計環境の取り組みについて紹介させていただきました。

当環境で、データ作成の自動化、クエリの管理手段、承認プロセスやワークフローを非エンジニアを含むデータ利用者に提供することで、オペレーションのミス、情報管理上のリスクや思わぬ事故を極力減らし、防ぐことができる、と考えています。

今後は、Scheduled Queryの誤登録を防ぐための入力チェックの強化や、Scheduled Query登録時や実行時の通知機能の実装を検討中です。

今回の記事が読者のみなさんにとって少しでも有益なものになれば幸いです。

明日の記事は @mikichinさんです。引き続きお楽しみください。

  • Twitter
  • Facebook
  • linkedin
  • このエントリーをはてなブックマークに追加