こんにちは!!
私はメルカリでSREをしている k-oguma ( ktykogm ) です。
ちょうど1年くらい前にジョインしました。
よろしくお願いします!
今日は、タイトルの件で対応した方法をご紹介したいと思います。
それはある日突然やってきた
ある日、ETL作業 (データ分析基盤運用)の依頼がUSチームからやってきました。
要件は次のようなものでした。
- 1.4TB サイズの MySQL innodb tableを1つをBigQueryに上げる
- 約1年分。期間指定。
- 期限数日、なる早
- 対象のcoloumsは全て
- 期間指定以外のデータ加工は不要
…やっていき!
TL;DR
- 1日でTB越え (18億行以上) のMySQLのtableを BigQuery に LOAD することが出来た
- Digdag, Embulk, Shell scriptで一気にloopでretry等も対応して送れた
- 複雑ではないので応用が効きやすい
- 結論
- Digdagだけでも出来たかもだけど Digdag + Embulk は使いやすいし本当に良い
- やり方だけ見たい方は、 #Embulk の項目からお読みください
BigQueryへLOADさせる方法を考える
今回、巨大なtableを高速且つ信頼性を高く簡単にBQへLOADできる方法 を模索しました。
BigQueryにLOADさせるには、様々な方法があります。
Dataflow, Airflow, Google Cloud Composer, Digdag, Embulk, GCS (CSV, Avro, etc), etc
今回は、それほど難しいデータの抽出等の処理は必要ありません。
よって、DataflowやAirflow/Composer等ではなく、DigdagとEmbulkで簡単且つ迅速に対応出来そうです。
使用したのは、以下になります。
- Embulk
- Embulk plugins
- Digdag
- Digdag-slack
しかし、一気にこのサイズをBQにLOADすることは出来ませんし、一度にLOADさせる量が大きすぎると進行状況がなかなか見えないのも問題です。
進捗状況が見える + 失敗がすぐに検出できてretry*1等ができるようにすることも必要でした。
初期の検討
その対象table は以下のようなschemaでした。
+--------------+---------------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +--------------+---------------------+------+-----+---------+----------------+ | id | bigint(20) unsigned | NO | PRI | NULL | auto_increment | ...<snip> | data | text | YES | | NULL | | | created | datetime | NO | | NULL | | +--------------+---------------------+------+-----+---------+----------------+
そこで最初のプランは以下を検討しました。
id
を元に数万件を並列で処理させたものを for_range>
および for_each>
で回す
しかし、その方法には以下の問題がありました。
id
が連番ではなかった- auto_increment ですが、全てのRecordがそのまま残っているtableではなかった
- 他のtableからFKされているなどリレーションを持ったtableだったのも関係
- ばらつきが生じてBQのLimit (1日の更新は1000回まで https://cloud.google.com/bigquery/quotas )を超えてしまい、Errorになることもあった
- これでは速度が出ない
- auto_increment ですが、全てのRecordがそのまま残っているtableではなかった
for_range>
とfor_each>
組み合わせても速度が出ないid
を 細切りにしてfor_range>
に渡しても自動的にloopさせるのに難があったfor_range>
のfromとtoにnullが入るといった問題も発生
見直し
上記のやり方だと問題があることが分かったので、以下のように変更しました。
id
だと問題があるので、created
で日付を見つつ一定期間づつ並列で最速に回す- BQ Limit (1,000/day) に引っかからないけれど全期間一気に処理できるようにする
最初、created
ではなく、 id
にしたのは、そう…
INDEXがありません でした。
そこで、作業用のslaveのtableだけ created
に ADD INDEXしちゃいます。
(これらも出来るための作業用DB Slave)
+--------------+---------------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +--------------+---------------------+------+-----+---------+----------------+ | id | bigint(20) unsigned | NO | PRI | NULL | auto_increment | ...<snip> | data | text | YES | | NULL | | | created | datetime | NO | MUL | NULL | | +--------------+---------------------+------+-----+---------+----------------+
こうなりました。
これで、embulk-input-mysql pluginの WHERE句 にて created
を検索する時の時間を少なく出来ます。
Embulk
Embulkのliquid ファイルを以下のように用意しました。
これは、後に説明するDigdagから動かすためのEmbulk です。
XXXXXX-bulk.yml.liquid
in: type: mysql host: localhost user: XXXXX_user password: {{ env.MYSQL_PASS }} database: XXXXX incremental: true incremental_column: [created] table: XXXXX select: "*" where: "'{{ env.S }}' <= created AND created <= '{{ env.E }}'" options: {useLegacyDatetimeCode: false, serverTimezone: UTC} out: type: bigquery mode: append table: XXXXX auth_method: json_key json_keyfile: /path/to/your/directory/serviceAccount.json project: XXXXXX dataset: us_XXXXXXX compression: GZIP auto_create_table: true
Embulk 説明
- in:
where:
- ここでcreated の日付で範囲指定をして一回LOAD分の抽出をしています
incremental_column:
- ここでは対象が created ですが、WHERE の絞り込み対象のcolumn 名が異なる場合はそれに合わせてください
- out:
json_keyfile:
- serverAccount.json は、別途 GCP consoleで取得しておく必要があります
mode:
- append にした理由は、replaceだと上書きしてしまうからです
- 共通
default_timeset:
- 今回USだったので、Embulkの defaultである UTC でしたので省いています
Digdag
上記を動かすDigdag のDAG (Workflow) ファイルは、以下のように用意しました。
XXXXXX.dag
_export: plugin: repositories: - https://jitpack.io dependencies: - com.github.szyn:digdag-slack:0.1.2 webhook_url: https://hooks.slack.com/services/${ YOUR_WEBHOOK_TOKEN } workflow_name: slack ENV: develop _start_day: ${start_day} _end_day: ${end_day} timezone: UTC +setup: echo>: start ${session_time} +prepare: _retry: limit: 3 interval: 10 interval_type: exponential +repeat: _export: start: ${moment(_start_day).format("YYYY-MM-DD HH:mm:ss")} end : ${moment(_end_day) .format("YYYY-MM-DD HH:mm:ss")} loop>: ${moment(end).diff(moment(start), 'day') + 1 } _do: _export: start_date: ${moment(start).add(i, 'days').format('YYYY-MM-DD HH:mm:ss')} end_date: ${moment(start).add(i, 'days').format('YYYY-MM-DD 23:59:59')} +first_day: if>: ${i == 0} _do: sh>: S="${start}" E="${end_date}" embulk run XXXXXX-bulk.yml.liquid +not_first_day: if>: ${i != 0} _do: sh>: S="${start_date}" E="${end_date}" embulk run XXXXXX-bulk.yml.liquid _parallel: true _error: slack>: danger-template.yml +slack: slack>: good-template.yml +teardown: echo>: finish ${session_time}
Digdag 説明
webhook_url:
- Slack通知用に事前にWebhookのTOKENを取得して設定します
- Digdag-slack を使用して成功と失敗を通知されるようにしています
+repeat:
- これがWorkflow のJob本体です
- DATE_TIME format で開始日と終了日を得ます
moment()...
- Moment.js を参照してください。変更する場合はMoment.jsの記法に合わせる形です
loop>:
- この処理により、指定した期間の中で + 1 dayをloopさせて足し続けて長期間でも処理を回し続けられるようにしています
first_day:
- loopの初回だけ(
${i == 0}
)は、${moment(start).add(i, 'days')
で +1 day される前の日付 = ${start} から開始である必要があります
- loopの初回だけ(
Digdag呼び出し処理
2017-11-20 から 2018-12-17 まで実施の例です。
Dry-run
試すときは、echo print debugで良いですね!
$ START_DAY='2017-11-20'; for ((i=1; i<= 131; i++));do END_DAY=$(date --date="$START_DAY 2 days" +%Y-%m-%d);echo digdag run -p start_day=$START_DAY -p end_day=$END_DAY -a XXXXXX.dig ;START_DAY=$(date --date="$START_DAY 3 days" +%Y-%m-%d); done | head -5 digdag run -p start_day=2017-11-20 -p end_day=2017-11-22 -a XXXXXX.dig digdag run -p start_day=2017-11-23 -p end_day=2017-11-25 -a XXXXXX.dig digdag run -p start_day=2017-11-26 -p end_day=2017-11-28 -a XXXXXX.dig digdag run -p start_day=2017-11-29 -p end_day=2017-12-01 -a XXXXXX.dig digdag run -p start_day=2017-12-02 -p end_day=2017-12-04 -a XXXXXX.dig
$ START_DAY='2017-11-20'; for ((i=1; i<= 131; i++));do END_DAY=$(date --date="$START_DAY 2 days" +%Y-%m-%d);echo digdag run -p start_day=$START_DAY -p end_day=$END_DAY -a XXXXXX.dig ;START_DAY=$(date --date="$START_DAY 3 days" +%Y-%m-%d); done | tail -2 digdag run -p start_day=2018-12-12 -p end_day=2018-12-14 -a XXXXXX.dig digdag run -p start_day=2018-12-15 -p end_day=2018-12-17 -a XXXXXX.dig
いざ、実行
まず前提として、tmux 等や nohup + background処理にして長時間実行でも切断されないようにしておきます。
次に先ほどdry-runで試した処理を echo 無しで実行です!
$ START_DAY='2017-11-20'; for ((i=1; i<=131; i++));do END_DAY=$(date --date="$START_DAY 2 days" +%Y-%m-%d);digdag run -p start_day=$START_DAY -p end_day=$END_DAY -a XXXXXX.dig ;START_DAY=$(date --date="$START_DAY 3 days" +%Y-%m-%d);done
これで、1日ごとにBQへLOADするEmbulk処理を3日先までまとめて並列で実行しています。
そのまま帰宅して、翌日くらいに見てみましょう。
補足: もっと高速化させたいなら
例えば、上記のDidgag 説明にある loop>:
の + 1
の数値を増やして、こちらの END_DAY
や 最後の START_DAY
に入る n days
の数値を増やせば、Embulkの一度にLOADさせる対象を増やすことが出来てLoopが減るので速度が早まる可能性があります。
しかしその反動で、一回の処理に時間が掛かるようになるため、確認が遅くなって色々試しづらかったのもあったので、今回は 1 day * 3 づつLOAD させています。
終わったあとは
確認してみます。
% bq query 'SELECT min(created) as start_date, max(created) as end_date FROM [project.dataset.table]' Waiting on bqjob_r7bc1e8862af918df_00000167decc19a5_1 ... (0s) Current status: DONE +---------------------+---------------------+ | start_date | end_date | +---------------------+---------------------+ | 2017-11-20 23:05:45 | 2018-12-17 23:59:59 | +---------------------+---------------------+
% bq query 'SELECT COUNT(1) FROM [project.dataset.table]' Waiting on bqjob_r2b8104e26b860aad_00000167ded120dc_1 ... (5s) Current status: DONE +------------+ | f0_ | +------------+ | 1840266370 | +------------+
全部で最終的にBQにLOADさせたRecord数は 18億4,026万6370件
でした。
たったこれだけで完了です!!
簡単で早くて最高ですね!!
最後に
最後に宣伝ですが、 SRE Loungeというコミュニティの運営にも私は携わらせていただいています。
来年 1/18(金) 19:00から #7
が開催されます!
メルカリが会場スポンサーとしてサポートさせていただきます。
私も登壇させていただく予定ですので、お時間ご興味ある方はぜひ以下のリンクからお申し込みいただけると幸いです。
平成最後の年末ですね。今年は暖冬とは言われていますが
寒くなってきたので、みなさま風邪など引かないように!
それではまた!! ノシ
参考にしたURL
*1:失敗で中途半端なRecordが存在しうることと、重複する可能性があるのもダメなので、実際にDigdagの_retryとEmbulkのmode: appedがそういった問題を起こさないか(atomic性があるか)は検証しました。mode: append mode now expresses a transactional append, and mode: append_direct is one which is not transactional. とのことなので、Caused by: org.jruby.exceptions.RaiseException: (ConnectionFailed) Failed to open TCP connection to www.googleapis.com:443 (execution expired) などのErrorをわざと途中で発生させたり、再度同じ日付を繰り返し実行したりしてretryが正常に行われることを確認しました。