default image

こんにちは、メルペイ DataPlatform チームの @syu_cream です。

突然なのですが皆さんは BigQuery は使っていますか?
うんうん、やっぱり使っていますよね。
メルカリ、メルペイでも KPI 分析や機械学習への応用、お客様からのお問い合わせに関わる調査、開発用ログへのクエリなど様々な用途で活用しています。
筆者が所属する DataPlatform チームでも様々なマイクロサービスと BigQuery を繋ぎこむのに一役買ってたりもします。

メルペイにおける大規模バッチ処理

メルペイにおける大規模バッチ処理

GCPでStreamなデータパイプライン始めました

GCPでStreamなデータパイプライン始めました

この記事では、そんな BigQuery が持つ便利であるがあまり触れられる機会がない外部テーブルと、最近追加された機能である Hive パーティショニングレイアウトのサポートについて触れていきます。

いかにして BigQuery でクエリできるようにするか

先述の通り、メルカリ及びメルペイでは BigQuery が多様なシーンで様々なメンバーが利用しています。
となると「 BigQuery 上で SQL でクエリできること」を実現することの価値が非常に高いと言えます。

BigQuery 上でデータをクエリできるようにするには、典型的には BigQuery 上にデータを読み込みます。
この機能は CLI や API 、ブラウザにてローカルファイルもしくは GCS から、 Avro や JSON など様々な形式のファイルを読み込め、かつ無料で使えることもあり大変重宝します。
また BigQuery はストレージ管理もしてくれているわけですが、Capacitor というクエリでの読み出しや集計が高速になる傾向があるカラムナフォーマットでデータを保存してくれます。

cloud.google.com

この操作は BigQuery Data Transfer Service に任せたり、 Cloud Composer を使って複雑なワークフローに組み込んでも良いでしょう。

この便利な BigQuery のデータ読み込みなのですが、以下のような課題も存在します。

  • 毎分以上の頻度で読み込みジョブを発行する事は不可能
  • 読み込み対象のデータの収集が遅れた際に、 BigQuery への読み込みが漏れるデータが発生する可能性がある
  • 読み込みジョブが成功したかのモニタリングやリトライすることを考慮する必要がある

最初の課題は BigQuery の読み込みジョブの制約 から来ています。
読み込みジョブは「1 日あたりのテーブルあたり読み込みジョブ数 – 1,000(失敗を含む)」との制限が存在するため、毎分のように高頻度でテーブルを更新したい条件はこの機能では実現できなくなります。
この課題に対しては ストリーミング挿入 を実施するのが適していると思われますが、ストリーミング挿入を行うアプリケーションの運用が発生し、スキーマを更新したいときのオペレーションが複雑になります。

二番目の課題は BigQuery に読み込むデータと BigQuery の読み込みジョブに厳格な依存関係が存在しない場合に課題になってきます。
BigQuery の読み込みジョブが独立してスケジューリングされていて、かつ読み込み対象のデータが揃っていることを保証する手立てがなければ、まだ読み込むべきデータが揃っていない状態で BigQuery への読み込みが行われて結果 BigQuery 上からは一部データが不足しているように見えるかも知れません。
この課題に対しては Cloud Composer の DAG で BigQuery の読み込みジョブに対応して読み込み対象データを揃えるタスクに依存関係を与えるなどの対処法があります。

最後の課題は運用上の課題です。
BigQuery の読み込みジョブは常に成功する訳ではないのでモニタリングの仕組みやリトライを行える仕組みにした方が良いでしょう。
この課題は BigQuery Data Transfer Service, Cloud Composer などを用いている場合はそれほど気にする必要はないでしょうが、独自に CLI や API 経由で読み込みジョブを実行するようなアプリケーションを実装している場合は考慮した方が良さそうです。

上記の通り、 BigQuery にはデータ読み込みやストリーミング挿入などリッチな機能があるものの、同時に幾つかの課題も存在します。
他に BigQuery 上でクエリ可能にする良い手段はあるでしょうか。本記事で紹介する外部テーブルの利用はその解決策の一つです。

BigQuery の外部テーブル

外部テーブル概要

BigQuery は GCP 上の幾つかのデータを BigQuery 上でテーブルに見立ててクエリ可能にする機能を持っています。
これらのテーブルにクエリする時、クエリを実行する人には実際に BigQuery 上に読み込まれたテーブルとそれほど差異を体感せずに参照することができます。

cloud.google.com

先述の読み込みジョブを実行する際、先んじて GCS に読み込み対象のファイルをアップロードするケースが多々あると思います。
このようなワークロードにおいて、 GCS 上のオブジェクトに対する外部テーブルを作成することで、読み込みジョブを実行することなく BigQuery からクエリできるようになります。
この場合読み込みジョブのような 1 日の実行回数などの制限はないため、なるべく早く収集したデータをクエリしたいようなケースで役立ちます。
また、読み込みジョブに関連して複雑なワークフローを組んでいる場合、検証のために BigQuery に読み込まれたテーブルとそれの元になるデータソースの比較を BigQuery 上で実現することもできます。

この BigQuery の外部テーブルなのですが、複数のデータソースへのクエリが可能、クエリ実行速度が高速であることから presto を触っているかのような体験になりそうです。
であれば Hive パーティション風にスキャンする GCS 上のオブジェクトの範囲を狭められることを期待したくなってしまいます。
加えて外部テーブルは参照先リソースへの課金が BigQuery とは独立して発生するので、スキャン範囲の選択はコスト低減のためにも重要です。
実は最近 BigQuery でのサポートが開始されて実現可能になりました!

外部テーブルと Hive パーティショニングレイアウト

BigQuery の外部テーブルにおける Hive パーティショニングレイアウトのサポートは本記事執筆段階ではプレリリースの機能です。
リリースノート によると 2019 年 10 月 31 日よりサポートを開始したようです。

cloud.google.com

この機能は見ての通り、 Hive パーティションのように = でディレクトリ名がセパレートされているようなパスに対してパーティション指定を行えます。
これを用いて、たとえば日時でパーティショニングされており特定日時だけにクエリ実行したい場合は適切なパーティションキーを指定することで GCS 上のスキャン範囲を狭めることができます。
なお、この機能は BigQuery ネイティブのテーブルのパーティションとは異なり、あくまで知っているパターンと指定されたパーティションキーに配慮して GCS へのリクエストを行っているもので、存在しないパーティションを参照しようとすると GCS のエラーが返るなど挙動の違いが多々あります。l-

BigQuery の外部テーブルと Hive パーティショニング機能を実際に使ってみる

Hive パーティショニングレイアウトを認識させるための操作はおなじみの bq コマンドで手軽に行うことができます。
またパーティションキーの型も指定が面倒であれば自動で解決させることも可能です。

ここでは実際に、 GCS 上に Hive パーティショニングされたパスに Apache Avro 形式のファイルを置いて外部テーブルを作成した上で BigQuery からクエリしてみます。
なお、以下で現れる GCP のリソース名はダミーのものに差し替えています。

サンプルの Avro ファイルのスキーマは適当に以下のように用意します。

{
"doc": "A weather reading.",
"name": "Weather",
"namespace": "test",
"type": "record",
"fields": [
{"name": "station", "type": "string"},
{"name": "time", "type": "long"},
{"name": "temp", "type": "int"}
]
}

avro-tools で、上記スキーマに従った適当なファイルを用意し、 dt=20200101 という適当なパーティションが切られた GCS 上のパスにファイルをアップロードします。

 $ java -jar ~/avro-tools-1.8.2.jar random --schema-file schema01.avsc --count 1000 weather01.avro 2>/dev/null
$ gsutil cp weather01.avro gs://merpay_dataplatform_syucream_test/logs/dt=20200101/
...

bq mkdef で Hive パーティショニングされたテーブルの定義ファイルを作成して、それを元に bq mk で外部テーブルを作成します。

 $ bq mkdef --source_format=AVRO --hive_partitioning_mode=AUTO --hive_partitioning_source_uri_prefix='gs://merpay_dataplatform_syucream_test/logs/' 'gs://merpay_dataplatform_syucream_test/logs/*' > sche
ma
$ bq mk --external_table_definition=./schema merpay-dataplatform-test-tmp:merpay_dataplatform_syucream_test.weather
Table 'merpay-dataplatform-test-tmp:merpay_dataplatform_syucream_test.weather' successfully created.

無事に外部テーブルが認識され、クエリも実行できています!
また存在しないパーティションを参照しようとするとエラーになります。

 $ bq show merpay-dataplatform-test-tmp:merpay_dataplatform_syucream_test.weather
merpay-dataplatform-test-tmp:merpay_dataplatform_syucream_test.weather
Last modified               Schema                 Type     Total URIs   Expiration   Labels
----------------- ------------------------------- ---------- ------------ ------------ --------
10 Jan 17:29:47   |- station: string (required)   EXTERNAL   1
|- time: integer (required)
|- temp: integer (required)
|- dt: integer
$ bq query --project_id merpay-dataplatform-test-tmp --nouse_legacy_sql 'select * from merpay-dataplatform-test-tmp.merpay_dataplatform_syucream_test.weather WHERE dt = 20200101 LIMIT 3'
Waiting on bqjob_r505a33872f01e8ca_000001700e2b143d_1 ... (0s) Current status: DONE
+------------+----------------------+-------------+----------+
|  station   |         time         |    temp     |    dt    |
+------------+----------------------+-------------+----------+
| rwdadkocv  | -4488018877210651649 | -1292683068 | 20200101 |
| ubidxrikpa |  2635535744841427830 |    84294490 | 20200101 |
| asjbs      | -1662947693450027161 | -1518057760 | 20200101 |
+------------+----------------------+-------------+----------+
$ bq query --project_id merpay-dataplatform-test-tmp --nouse_legacy_sql 'select * from merpay-dataplatform-test-tmp.merpay_dataplatform_syucream_test.weather WHERE dt = 20200102 LIMIT 3'
BigQuery error in query operation: Error processing job ...

この状態で dt=20200102 というパーティションが追加されたらどうなるでしょうか。
実は GCS にオブジェクトが追加されればすぐに BigQuery から参照可能です。
パーティションの明示的な追加などBigQuery の外部テーブルの更新などは要求されず、なるべく早くクエリできるようにしたい要件が合っても対応しやすいです。

 $ gsutil cp weather01.avro gs://merpay_dataplatform_syucream_test/logs/dt=20200102/
...
$ bq query --project_id merpay-dataplatform-test-tmp --nouse_legacy_sql 'select * from merpay-dataplatform-test-tmp.merpay_dataplatform_syucream_test.weather WHERE dt = 20200102 LIMIT 3'
Waiting on bqjob_r4f59b0e63ab224e_000001700e2eb161_1 ... (0s) Current status: DONE
+------------+----------------------+-------------+----------+
|  station   |         time         |    temp     |    dt    |
+------------+----------------------+-------------+----------+
| rwdadkocv  | -4488018877210651649 | -1292683068 | 20200102 |
| ubidxrikpa |  2635535744841427830 |    84294490 | 20200102 |
| asjbs      | -1662947693450027161 | -1518057760 | 20200102 |
+------------+----------------------+-------------+----------+

BigQuery の外部テーブルとスキーマ更新

ちなみに外部テーブルではスキーマ更新が発生した場合はどうなるのでしょうか?
先述の Avro スキーマに互換性が取れるような、オプショナルなフィールド追加の変更を行った後で dt=20200103 パーティションにアップロードしてみます。

{
"doc": "A weather reading.",
"name": "Weather",
"namespace": "test",
"type": "record",
"fields": [
{"name": "station", "type": "string"},
{"name": "time", "type": "long"},
{"name": "temp", "type": "int"},
{"name": "name", "type": ["null", "string"], "default": null}
]
}
$ java -jar ~/avro-tools-1.8.2.jar random --schema-file schema02.avsc --count 1000 weather02.avro 2>/dev/nul
$ gsutil cp weather02.avro gs://merpay_dataplatform_syucream_test/logs/dt=20200103/    

この場合でも既知のカラムにはクエリを実行可能です。
ですが、残念ながら追加したカラムは参照されていません。
これは BigQuery の外部テーブルが認識しているスキーマが GCS 上のオブジェクトと齟齬が生まれているためです。

 $ bq query --project_id merpay-dataplatform-test-tmp --nouse_legacy_sql 'select * from merpay-dataplatform-test-tmp.merpay_dataplatform_syucream_test.weather WHERE dt = 20200103 LIMIT 3'
Waiting on bqjob_r2af36b37898eb0fa_000001700e307a57_1 ... (1s) Current status: DONE
+-----------------+----------------------+------------+----------+
|     station     |         time         |    temp    |    dt    |
+-----------------+----------------------+------------+----------+
| tuw             |  3797085577026903696 |  887728355 | 20200103 |
| italpagneiy     | -5827774653228662992 | 1718460251 | 20200103 |
| badsjraldbkvjjj |  3556097700364350281 | 1083287675 | 20200103 |
+-----------------+----------------------+------------+----------+
$ bq show merpay-dataplatform-test-tmp:merpay_dataplatform_syucream_test.weather
Table merpay-dataplatform-test-tmp:syucream_test.weather
Last modified               Schema                 Type     Total URIs   Expiration   Labels
----------------- ------------------------------- ---------- ------------ ------------ --------
04 Feb 12:07:43   |- station: string (required)   EXTERNAL   1
|- time: integer (required)
|- temp: integer (required)
|- dt: integer

この場合は BigQuery の外部テーブルの更新を行うことで Avro ファイルの最新のスキーマを元にスキーマ更新してもらえます。
これは CLI では bq update コマンドより実行できます。

 $  bq update --external_table_definition=./schema merpay-dataplatform-test-tmp:merpay_dataplatform_syucream_test.weather
Table 'merpay-dataplatform-test-tmp:merpay_dataplatform_syucream_test.weather' successfully updated.
$ bq show merpay-dataplatform-test-tmp:merpay_dataplatform_syucream_test.weather
Table merpay-dataplatform-test-tmp:merpay_dataplatform_syucream_test.weather
Last modified               Schema                 Type     Total URIs   Expiration   Labels
----------------- ------------------------------- ---------- ------------ ------------ --------
04 Feb 12:16:39   |- station: string (required)   EXTERNAL   1
|- time: integer (required)
|- temp: integer (required)
|- name: string
|- dt: integer

外部テーブルの更新後は無事に追加されたカラムもクエリで参照できるようになりました。
Avro のスキーマ更新以前にアップロードされたファイルにおける追加のカラムに関しては nullable であることからも NULL 値として返ってきます。

 $ bq query --project_id merpay-dataplatform-test-tmp --nouse_legacy_sql 'select * from merpay-dataplatform-test-tmp.merpay_dataplatform_syucream_test.weather WHERE dt = 20200103 LIMIT 3'
Waiting on bqjob_r3f34623c8011086e_000001700e33b395_1 ... (0s) Current status: DONE
+-----------------+----------------------+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|     station     |         time         |    temp    |                                                                                                                          name                                                                                                                          |    dt    |
+-----------------+----------------------+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
| tuw             |  3797085577026903696 |  887728355 | hhmolpckewoooccdewnthnrywiltggqmxokdyuwpnholdcxouutkbwtfxpoqocvdsaxwwlgjjsqmlinuvasdsybqcskyjoobbndyviyxpnwuhcndisgodrcydprvpxhwyyqwwlmrgvwgxsnixjyewwkybkgucufhxebsitvjciwknelcfardvocutamxvrcqqcrxmcmoinoerhynaxpcnenuetsrxonjflxadangeofjkgsurdiire | 20200103 |
| italpagneiy     | -5827774653228662992 | 1718460251 | vh                                                                                                                                                                                                                                                     | 20200103 |
| badsjraldbkvjjj |  3556097700364350281 | 1083287675 |                                                                                                                                                                                                                                                        | 20200103 |
+-----------------+----------------------+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
$ bq query --project_id merpay-dataplatform-test-tmp --nouse_legacy_sql 'select * from merpay-dataplatform-test-tmp.merpay_dataplatform_syucream_test.weather WHERE dt = 20200101 LIMIT 3'
Waiting on bqjob_r71ab0283b34fc4c9_000001700e33e951_1 ... (0s) Current status: DONE
+------------+----------------------+-------------+------+----------+
|  station   |         time         |    temp     | name |    dt    |
+------------+----------------------+-------------+------+----------+
| rwdadkocv  | -4488018877210651649 | -1292683068 | NULL | 20200101 |
| ubidxrikpa |  2635535744841427830 |    84294490 | NULL | 20200101 |
| asjbs      | -1662947693450027161 | -1518057760 | NULL | 20200101 |
+------------+----------------------+-------------+------+----------+

外部テーブルの運用面の話

ここまで紹介してきた外部テーブル機能ですが、実際に運用していく上で留意すべき点が幾つかあります。
ここでは筆者が試験運用を開始している現在気にしている点をあげてみます。

外部テーブル自動更新

スキーマ更新が発生した際に外部テーブルを手動で更新するのは面倒です。
メルカリ、メルペイでは数多くのマイクロサービスが存在し、それらから流入するログのスキーマ更新に人力で対応していくのは非常に困難と言えます。

理想的には AWS における AWS Glue のように GCS のオブジェクトをクロールして、外部テーブルの更新が必要な場合は自動的に行ってほしくなります。
とはいえ類似するサービスは現状存在しないので、今回は我々は Cloud Composer 上の Airflow の DAG に自動クローリング&外部テーブル更新を任せることにしました。
我々の GCS 上のデータレイクは典型的に以下のようなルールに従ってディレクトリが分割されています。

prefix/microservice_name/data_kind_name/{hive partition keys...}/*.avro

パスの先頭数階層を参照すれば、 BigQuery の外部テーブルが決まる構成になっており、更により深い階層では日付などでパーティショニングされています。
今回用意した Airflow DAG では、指定の階層分ディレクトリを再帰的に取得して、このパターンにマッチしたディレクトリの一覧を取得した後、 本記事で触れてきた bq mkdef, bq mk, bq update に相当する処理と API 呼び出しを実行するようにしました。
この DAG を daily 間隔で実行することにより、スキーマ更新があった場合でも遅くとも 1 日経過すれば BigQuery 上で参照可能にしています。

BigQuery 読み込みやカラムナフォーマットの利用

外部テーブルを利用する頻度が増えてくると、 GCS に対する課金やパフォーマンスが気になるシーンが出てくるかと思われます。

前提として、 BigQuery のドキュメントで推奨されている 通り高速で低コストであることを求めるなら BigQuery への読み込みを行うのが良いでしょう。
BigQuery の読み込みを行い、ストレージの管理を BigQuery に任せることは、パフォーマンス等はもちろんデータ再配置など運用上の

BigQuery への読み込みを行わない場合は GCS に配置するファイルのフォーマットを Parquet や ORC などカラムナフォーマットに変更することも、 GCS の課金 や高速化に重要です。
この場合でも BigQuery の読み込みを行い BigQuery ネイティブのテーブルにするより課金やパフォーマンスの面では遅れを取りますが、 Avro など行志向のファイルフォーマットでデータを保存してクエリし続けるよりは格段に良いはずです。

アクセス制御について

GCS データソースにおいて BigQuery の外部テーブルを利用するには、実は BigQuery とは別に GCS のオブジェクトへの get 権限が求められます
GCS では IAM の設定がバケットもしくはオブジェクト単位で付与することになり、ディレクトリ単位で権限管理したい時に課題になってきます。
特に我々のケースでは現在、マイクロサービスやログの種類ごとに GCS のディレクトリを分割しており、例えばあるマイクロサービスに関わるログはそのマイクロサービス関係者にしかアクセスさせたくないケースなどが存在します。

実はこの課題は最近ベータ版になった新機能、 Cloud IAM Conditions で解決できそうです。

cloud.google.com

Cloud IAM Conditions では GCS のディレクトリベースの権限管理を、 startsWith() を用いてオブジェクトのパスのプレフィックスがマッチするかどうかの条件付き権限付与で実現できます。

そもそも外部テーブルと Hive パーティショニングレイアウトの組み合わせを使うべきか問題

最後に本記事の趣旨を覆すようになりかつ個人の意見が多分に含まれますが、この機能はオンプレで Hive を利用してきた人々が BigQuery に段階的に移行しやすくする目的のためのものかなと思います。
GCP の Customer Engineer の方の記事で以下のような内容も見かけられます。

medium.com

既存の資産があまりなく BigQuery ネイティブの機能で事足りるワークロードを持つ人々は、なるべく BigQuery で完結させるよう心がけ、本記事で挙げたような課題や不足する部分を補うのみに徹するべきなのかも知れません。

おわりに

BigQuery とその周辺の幾つかの新機能の紹介、いかがでしたでしょうか。
筆者としては、これらにより柔軟なデータ基盤の構築ができるようになると実感しています。

とはいえ紹介した Hive パーティショニングと Cloud IAM Conditions はまだ正式リリースされていない機能でもあり利用シーンは限定すべきだと考えられます。
現に筆者の所属するチームではまだこれらの機能は試験的に運用している段階であり、ミッションクリティカルな用途には用いていません。
しばらくは試験運用を続け、どう活用するか更に検討しながら導入していこうかと考えている次第です。

余談ですが、メルペイでは来月 3/17(火)に Merpay Tech Fest という技術系カンファレンスを開催する予定です!
筆者が所属するメルペイ DataPlatform チームでも登壇や対話可能なブースの出典などを行おうと思いますので、ご興味がある方はぜひ!ご参加くださいませ!

events.merpay.com