新卒エンジニアが Airflow のバグを発見してからコントリビュートするまで

この記事は、Merpay Advent Calendar 2023 の9日目の記事です。

こんにちは。今年の春に新卒でメルペイに入社し、Credit Platform Team でバックエンドエンジニアをしている@champonです。Credit Platform Team では主に ML(いわゆるAI与信) を用いた与信枠の算出を行っていますが、その中でも自分はワークフローエンジンである Airflow を用いたデータパイプラインの開発・運用を行っています。
今回は、業務中に Airflow のバグを見つけてからその原因を調査し、実際にコントリビュートするまでの過程をお話したいと思います。

Airflow とは

まず簡単に、Airflow について説明します。
Airflow とは、ワークフローエンジンの一種であり、Apache Software Foundation が管理する OSS です。
DAG と呼ばれる有向非巡回グラフの形式でワークフローを定義し、それぞれのノードは Task と呼ばれるワークフロー処理の構成要素となっています。
Task には、Airflow から提供されている様々な Operator を使用することができ、例えば BashOperator や PythonOperator などがあり、それぞれ Bash コマンドや Python プログラムを実行できます。
また、Amazon Web Service (AWS) や Google Cloud Platform (GCP) のサービス・プロバイダも公開されているため、クラウドサービス上のデータを容易に扱うことができます。
自分のチームでは、GCP 上の Cloud Composer で Airflow 環境を構築し、BigQuery や Dataflow と連携しながらデータパイプラインとしてメルペイの与信枠計算の一部を管理しています。

予期せぬエラーの発生

QA Engineer によるテスト実施中に、Dataflow を使っている Task で以下のエラーが発生しはじめました。

Exception: Google Cloud Dataflow job <xxx> is in an unexpected terminal state: JOB_STATE_DONE, expected terminal state: JOB_STATE_DONE

直訳すると、「予期していた終着状態は JOB_STATE_DONE でしたが、Google Cloud Dataflow job が予期せぬ終着状態 JOB_STATE_DONE となりました」でしょうか。
明らかに筋が通っていないこちらの1文を読んで、もしかしたら Airflow 側に何かバグがあるかもなと思い、Airflow のソースコードを探ることにしました。

エラーの原因調査

こういうときはまず、該当箇所の直近 commit を見ることにします。
スタックトレースもエラーメッセージと一緒に出力されていたため、それを頼りに該当ファイルにたどり着きました。
このファイルの最新 commit を見てみると、PR #34217 が merge されていることがわかりました。
さらに深掘ってみると、どうやら apache-airflow-providers-google==10.9.0 のリリースに入った変更で、expected_terminal_state という引数を DataflowHook に加える対応のようです。

この expected_terminal_state というのは、こちらで議論されており、Dataflow job が完了したとみなすステートをユーザーが設定できるというものです。
(Airflow には Dataflow job のステートがいくつか定義されており (※1)、どれを job 完了状態とみなすか、といったもの)

話を戻しますが、この PR #34217 の変更を見てみると、ちょうどエラー発生箇所に変更が加えられていました。
また、念のため Cloud Composer の package 一覧を確認したところ、該当環境の apache-airflow-providers-google のバージョンが 10.9.0 となっていたので、原因はこちらで間違いなさそうです。

gcloud composer environments list-packages <your environment> –project <your project> –location <your location>

原因はわかったので、対症療法としてバージョンを 10.8.0 に落とせばエラーをなくすことができますが、せっかくなので自分で直すことにしました。

(※1) https://github.com/apache/airflow/blob/providers-google/10.9.0/airflow/providers/google/cloud/hooks/dataflow.py#L130-L141

Issue, PR の作成

とりあえず Issue を出しました。
Issue テンプレートの下部に “Are you willing to submit PR?” という文とともにチェックボックスが添えてあったので、チェックをして PR 作成に取り掛かります。

修正箇所は前述の通り、expected_terminal_state の挙動によるものと思われます(正確には、DataflowJobsController の check_dataflow_job_state メソッド (※2))。

特に、expected_terminal_state = None (デフォルト値) のときに考慮漏れがありました。
expected_terminal_state がデフォルトのときに関係するコードを次に抜き出します(今回は Dataflow のバッチ処理なのでストリーミング処理に関係するコードは省きます)。

AWAITING_STATES = {
    JOB_STATE_RUNNING,
    JOB_STATE_PENDING,
    JOB_STATE_QUEUED,
    JOB_STATE_CANCELLING,
    JOB_STATE_DRAINING,
    JOB_STATE_STOPPED,
}

def _check_dataflow_job_state(self, job) -> bool:
    current_state = job["currentState"]

    if self._expected_terminal_state is None:
        self._expected_terminal_state = DataflowJobStatus.JOB_STATE_DONE

    if not self._wait_until_finished and current_state == self._expected_terminal_state:
        return True

    if current_state in DataflowJobStatus.AWAITING_STATES:
        return self._wait_until_finished is False

    raise Exception(
        f"Google Cloud Dataflow job {job['name']} is in an unexpected terminal state: {current_state}, "
        f"expected terminal state: {self._expected_terminal_state}"
    )

ここで、wait_until_finished という要素が新たに登場します。
このパラメータは expected_terminal_state が導入される以前から存在したもので、簡単に言うと “Dataflow job が終了するまで処理を待機するかどうか” のフラグです。
これを踏まえて上記のコードを解釈すると、例えば次の全てを満たす状態のときに Exception が返ってしまうことがわかります。

  • wait_until_finished = True
  • current_state = DataflowJobStatus.JOB_STATE_DONE
  • expected_terminal_state = DataflowJobStatus.JOB_STATE_DONE

ここでようやく、今回のエラー発生時の状態にたどり着きました。
後は修正するだけです。
if not self._wait_until_finished and current_state == self._expected_terminal_state の分岐処理を以下のように変更します。

if current_state == self._expected_terminal_state:
    if self._expected_terminal_state == DataflowJobStatus.JOB_STATE_RUNNING:
        return not self._wait_until_finished
    return True

wait_until_finished の条件が悪さをしていたので、expected_terminal_state が DataflowJobStatus.JOB_STATE_RUNNING のときの分岐を増やし、それ以外の場合は current_state == self._expected_terminal_state であれば True となるようにしました。
詳細は修正 PRを御覧ください。

(※2) Helper method to check the state of one job in dataflow for this task if job failed raise exception: https://github.com/apache/airflow/blob/providers-google/10.9.0/airflow/providers/google/cloud/hooks/dataflow.py#L389-L433

余談: このような実装になった原因

このようなエラーが引き起こされた原因として、wait_until_finished と expected_terminal_state という似たようなパラメータが共存することが大いに関係あると考えられます。
どちらも Dataflow job の完了状態を考慮する必要があるため、完了判定条件がより複雑になってしまったことが考えられます。
また、wait_until_finished = True は、expected_terminal_state = DataflowJobStatus.JOB_STATE_DONE と実質同じ意味なのかなと考えており、将来的には wait_until_finished を廃止することでより簡潔な実装になるのかなと思いました(一応 PR 内のコメントで提案しておきました (※3))。

(※3) https://github.com/apache/airflow/pull/34785#discussion_r1348054361

まとめ

今回は、Airflow におけるバグ発見から PR を作成するまでの課程を、自分の思考を振り返りながら記事にしました。
その後、無事 apache-airflow-providers-google==10.12.0 にてリリースされたので、今後は同様のエラーが起こることはないはずです。
普段は OSS 等へコントリビュートはあまりしない(バグ見つけたら Issue 書くか、時間があったら PR 出すくらい)ですが、久々に結構楽しめたので、今後もちょくちょく Issue 見つつ手伝えそうであればコントリビュートしていこうかなと思いました。

明日の記事は @ryuyama さんです。引き続きお楽しみください。

  • X
  • Facebook
  • linkedin
  • このエントリーをはてなブックマークに追加