FlexTemplateを活用した柔軟なデータ処理

この記事は、Merpay Tech Openness Month 2020 の16日目の記事です。
こんにちは。メルペイのソリューションチームのorfeonです。
ソリューションチームは社内で共通の技術的な課題を見つけて解決するソリューションを提供しています。
今回はメルペイでデータ加工・処理を汎用的に行うためのソリューションとして開発したFlexTemplateを活用したツールと、FlexTemplateの役割について紹介します。

データ加工・処理の課題

メルペイでは日々様々なデータが大量に生成されています。こうした大量の様々なデータを迅速に活用する上では課題も出てきます。

  • マイクロサービスをまたいだデータ活用

マイクロサービスではサービスごとにデータベースを管理し、サービスが管理するデータは通常そのサービスが提供するAPI経由で取得利用することになります。しかし機能によっては複数サービスを跨いで大量のデータを結合する処理が必要なケースも出てきます。こうした場合に各マイクロサービスで必要なデータごとにそれらを取得・書き出して結合・加工する(場合によっては大規模データをサポートする)APIを用意するのは開発工数的に厳しい場合もあります。

  • データ処理の重複開発

BigQueryへのクエリ結果をSpannerに保存したい、Spannerへのクエリ結果を別のSpannerのテーブルに保存・削除したいなど、サービスの開発で共通するデータ加工パターンが存在します。こうしたよく似た処理を各サービスで都度個別に開発してしまうと微妙に違うよく似た機能をたくさん作されてしまいます。

Cloud Dataflow によるデータ処理

こうした課題を解決するために活用する手段の一つとしてCloud Dataflowが挙げられます。
Cloud DataflowはApache Beam SDKで記述したデータ処理Pipelineをフルマネージドな環境で実行することができるサービスです。
フルマネージドなのでインフラ管理も不要で、大規模で重いデータ処理であっても必要なタイミングでサーバが立ち上がるため、通常より低コスト、短期間で処理を実行できます。

Dataflow Template

Cloud DataflowのPipelineを再利用しやすい形で提供するための機能がDataflow Templateです。
BigQueryへのクエリ結果をSpannerに保存するなど、よくあるデータ処理パターンをDataflowのPipelineとして開発してTemplateとしてGCSに登録しておくことができます。
このTemplateでは実行時にパラメータを指定して処理内容を変更することができます。
登録されたTemplateを利用する場合はこのTemplateのGCSパスと共にBigQueryのクエリやデータの宛先となるSpannerテーブルなどのパラメータを指定してDataflowのREST APIを実行することで、簡単にPipelineを実行することができます。

メルペイ社内でも過去の記事で紹介したようにこれまで様々なTemplateを作って活用してきました。
Googleからも様々なTemplateが公開されており、よくある標準的な処理であればここから必要なTemplateを見つけられることも多いと思います。

Templateの問題点

Dataflow Templateには制限があります。実行時にパラメータを指定して処理内容を変更できると書きましたが実行時にPipelineの構成(処理のグラフ構造)自体を変える事はできません。あくまでTemplate開発時に作った構成の上で、データ読み出しや宛先などのパラメータを差し替えて利用する形になります。
またBeamSDKで提供されているデータ読み書きのための各種コンポーネントで全てのパラメータがTemplateで実行時に指定できるようサポートされている訳ではありません。

例えばGoogle公式が提供するTemplateではSpannerのデータをGCSにAvroファイルとして保存するTemplateがあるのですが、SDKのSpannerデータ読み込み用のモジュールがTemplate用のパラメータをサポートしていないため、Template実行時にクエリをパラメータとして指定することはできません(全テーブルのデータが保存されることになります)。
こうした場合、Template実行時にパラメータを指定できるよう自前の読み込み処理を追加したTemplateを開発する必要があります。

またちょっとした追加出力、例えばSpannerのクエリ結果をGCSだけでなく同時にDatastoreなど別のデータベースサービスにも保存したいような場合が出てきてもPipelineの構成が変わるため別々にTemplateを開発して提供する必要があります。
個別開発となるため、細かいユースケースに対応したTemplateを提供するのが難しいといった課題がありました。

FlexTemplate

FlexTemplateはパラメータに加えてPipelineの構成もTemplate実行時に変更することができるCloud DataflowのTemplateの機能です。

これまではDataflowのPipelineの開発者が手元の開発環境やCI環境でPipelineをTemplateに変換してGCSに登録し、利用者がこのTemplateにDataflowAPIを利用してパラメータを指定してDataflow Jobを起動していました。

FlexTemplateを利用すると、これまで開発者やCI環境で実行していたTemplateを登録するステップが、利用者がPipelineを起動するタイミングで実行されるようになります。
これにより利用者がPipelineを実行するタイミングでPipelineの構造自体を変更できるようになりました。

FlexTemplateの活用例としてはBeamSQLを使ったTemplateが挙げられます。
BeamSQLは指定されたSQLテキストを解析してBeamにおけるMapReduce処理に変換することでSQLで指定した処理をDataflowのPipelineとして実行することができます。
FlexTemplateでBeamSQLを利用する場合はTemplateの実行時のパラメータとしてSQLテキストを指定すると、そのSQLに沿ったPipelineが構成されて処理が実行されます。
BeamSQLではJOINをサポートしているので、異なるDatabaseサービス間のデータの結合を利用者がDataflow上でSQLを使って簡単に実現できるようになりました。

FlexTemplateを活用したデータ処理ツール

FlexTemplateの機能を使うことでDataflowのPipeline開発者は実行時に処理内容を柔軟に変更できるTemplateを開発できるようになりました。
そこでこのFlexTemplateの機能をフルに活かして、これまで目的別に個別に作っていたTemplateの処理を一般化して単一のTemplateで実現できるようにしました。
主な機能は以下の通りです。

  • データ処理を設定ファイルとして定義してプログラミング無しで簡単に実行できる
  • 異なるGCPプロジェクト、データベース/ストレージを跨いでデータ処理できる
    • 入出力先としてGCPの主要なサービスに対応する
    • BeamSQLによる複数データソースの結合・加工処理対応
  • ローカル実行対応
    • 入出力先としてクラウド上サービスだけでなくEmulatorへの読み書きもサポート

ここで紹介したFlexTemplateによるツールは現在OSS化を進めており、実装や細かい機能の話はOSS公開時の記事に譲るとして、今回はFlexTemplateによってどれくらい柔軟にPipelineを作れるかをイメージしてもらうために、FlexTemplateで開発したツールの実際の使い方を見ていきます。

複数サービスを跨いだデータ処理実行例

複数のGCSプロジェクト、データソース(BigQueryとSpanner)にまたがるデータを結合・加工して結果を二つのSpannerのテーブルに保存する例を紹介します。
このケースは実際に社内で利用され数千万のレコードを処理しています。

利用するにはまず以下のような処理内容をJSON形式で定義したConfigファイルを作成します。

{
  "sources": [
    {
      "name": "BigQueryInput",
      "module": "bigquery",
      "parameters": {
        "query": "SELECT ... FROM `xxx`"
      }
    },
    {
      "name": "SpannerInput1",
      "module": "spanner",
      "parameters": {
        "projectId": "project1",
        "instanceId": "instance1",
        "databaseId": "database1",
        "table": "table1",
        "columns": "id, column1, column2"
      }
    },
    {
      "name": "SpannerInput2",
      "module": "spanner",
      "parameters": {
        "projectId": "project2",
        "instanceId": "instance2",
        "databaseId": "database2",
        "query": "SELECT * FROM table2 WHERE ..."
      }
    }
  ],
  "transforms": [
    {
      "name": "beamsql",
      "module": "beamsql",
      "inputs": [
        "BigQueryInput",
        "SpannerInput1",
        "SpannerInput2"
      ],
      "parameters": {
        "sql": "SELECT .. FROM BigQueryInput LEFT JOIN SpannerInput1 ON BigQueryInput.id = SpannerInput1.id LEFT JOIN SpannerInput2 ON BigQueryInput.id = SpannerInput2.id"
      }
    }
  ],
  "sinks": [
    {
      "name": "SpannerOutput1",
      "module": "spanner",
      "input": "beamsql",
      "parameters": {
        "projectId": "project3",
        "instanceId": "instance3",
        "databaseId": "database3",
        "table": "table3"
      }
    },
    {
      "name": "SpannerOutput2",
      "module": "spanner",
      "input": "beamsql",
      "parameters": {
        "projectId": "project4",
        "instanceId": "instance4",
        "databaseId": "database4",
        "table": "table4"
      }
    }
  ]
}

Configではsources,transforms,sinksの3種類のモジュールを組み合わせて処理内容を定義します。sourcesはデータの取得を、transformsはデータの処理を、sinksはデータの出力をそれぞれ扱います。

この例ではsourcesとしてBigQueryへのクエリとSpannerへのクエリ、SpannerのTableデータの読み込みの3つのsourceを指定しています。
transformsではBeamSQLモジュールを呼び出して3つのデータソースから読み込んだデータを結合、加工するためのSQLを指定しています。
sinksではBeamSQLのクエリ結果を同時に別々のSpannerTableに保存するように指定しています。

このConfigファイルを指定してTemplateからJobを起動すると、TemplateでConfigファイルをパースして処理ステップを構築し、以下のようなPipelineが実行されます。

苦労したところ

当初、FlexTemplateではBeamSQLによるデータ変換を処理の中心におき、各種データベース間の全てのデータ変換ではBeamSQLで利用されているRowクラスを経由するようにしていました。こうすることでたくさんあるデータベースサービス間のデータフォーマットやスキーマ変換処理の開発を最小限にすることができ、全体的な構成をとてもシンプルにすることができました。

しかし開発を進めていくと、複雑なデータを大量に扱う場合はRow型への変換処理が間に挟まることで変換やシリアライズの負荷が無視できないほど大きいことがわかりました。例えばネストした構造を持つ1000カラムのレコードを5億件ほどBigQueryからParquetに書き出す検証を行ったところ、Rowクラスを経由する場合としない場合で数十倍、実行時間やコストに差が出ました。

社内でのユースケースでは大量のデータを扱うことが多いのでこの問題は無視することができず、結局データ変換でハブとなるクラスは置かずに、Pipeline上で必要になるまでデータ変換は行わないような構成に作り直しました。その結果、開発しないといけないデータ変換処理は増えましたが、実行時の不要なデータ変換を省略できて処理性能は大幅に改善しました。

おわりに

本記事ではCloud DataflowのFlexTemplateを使うことで柔軟なPipelineを提供できることを紹介しました。またFlexTemplateを活用して開発した、設定ファイルで処理を定義してデータ処理パイプラインを簡単に実行できるツールを紹介しました。
ここで紹介したツールはOSSとして公開するよう準備を進めています。このツールはまだBatch処理のみの対応ですが、Streaming処理対応など機能追加も進めています。
OSS公開した際にはまた詳細を紹介する記事を出したいと思います。

  • X
  • Facebook
  • linkedin
  • このエントリーをはてなブックマークに追加