バッチ処理実装時に考慮すべき事項

はじめに

メルペイバックエンドエンジニアの @r_yamaoka です。この記事は、Merpay Tech Openness Month 2022 の16日目の記事です。
私がつい最近まで所属していた加盟店管理業務を担うマイクロサービス群(以下、加盟店管理システム)では様々なバッチが稼働しています。本記事ではそれらの実装において過去に発生したトラブルやヒヤリハットから得た知見を共有したいと思います。

背景

本題に入る前に加盟店管理システムでどのような箇所にバッチ処理が採用されているかについて少し解説します。バッチ処理を採用するか否かの観点としては大きく下記2点があります。

  • 機能要件上バッチ処理を採用しなければならない
  • 非機能要件の都合で同期処理を採用できない

前者の例としては「配送業者との伝票情報連携」や「行政システムとの連携処理」というものがあり、これは連携先である配送業者や行政の業務の都合上、ある一定のウィンドウ(例えば1日分)をまとめて処理したり特定の時間帯に処理を実行してはいけない等といった制約によりバッチ処理を採用する必要があるケースです。

後者には「CSVによるデータ一括投入」や「大量のリソース作成処理」等といったものがあります。こちらは本来業務的な制約は存在しないものの、同期処理を行うと長時間レスポンスが返らないことによるUXの毀損やタイムアウトのようなシステム的不都合を招く恐れがあるためバッチによる非同期処理を採用するケースです。また、マイクロサービス特有の問題として複数のサービスにまたがったデータ整合性を担保するためにバッチが必要となるケースもあります(参考: https://engineering.mercari.com/blog/entry/20210923-7171b62de5/ )。

そしてこれらのバッチはKubernetesのCronJobによって起動を管理されており、大抵のものは毎分コンテナを起動することによるマイクロバッチ的な振る舞いをしています。

考慮が必要なポイント

OOMの防止

ここでは例としてJobsテーブルというものがあるとして、1レコードが1つのジョブに対応しておりそれをSELECTで複数取得後ループの中で処理をするというものを想定します。

CREATE TABLE Jobs (
  JobID STRING(MAX),
  Status STRING(MAX),
  Param1…
)

安直に実装すると、下記SQLのように「未処理のジョブを全て取得して処理する」という形になりますがこれは危険です。

SELECT *
FROM Jobs
WHERE Status = ‘Unprocessed’
ORDER BY CreatedAt ASC

何らかの理由により大量にジョブが登録されるとOOMが発生する可能性があり、そうなると何度バッチを起動してもOOMを繰り返し処理が全く進捗しなくなってしまいます。ジョブの取得処理には必ずLIMITを入れると共に、最大値でどの程度メモリを消費するか事前の検証が必要です。

データベース負荷

これはバッチ処理に限った話ではありませんが、前項目のようなSQLを追加・修正する場合は実行計画を取得しジョブの件数が増えてもデータベースに過剰な負荷がかからないかを確認する必要があります。

バッチ処理のスループットが多少低下しても然程緊急性の高い問題とはならないケースが多いと思われますが、APIやその他リアルタイム性が求められる処理に影響を与える可能性についても常に念頭におくべきです。

回復可能なエラーのハンドリング

gRPCのUnavailableやDeadline Exceededのような再度のリクエストによって成功が見込まれるエラーについては、リトライ処理を実装するとよいでしょう。これはバッチの処理内部で実装するリトライというよりは、個々のジョブ自体をリトライ可能にするという意味合いです。ジョブのステータス管理については、初期状態に戻したりリトライ中である旨に変更したりする等いくつか方策が考えられますが、加盟店管理システムでは実装の簡便さから前者を採用しています。

リトライの注意点としてはExponential BackoffとJitterでリトライによる高負荷を回避することと、何らかの予期しない理由によりリトライが成功し得ない状態に陥ったことを検出することです。例えば本来Invalid Argumentとなるべきエラーがリクエスト先のハンドリングミスによってUnavailableになってしまっている、等といったことがあり得ます。具体的な実装としてはジョブ毎にリトライ回数をカウントし一定数を超えたら失敗のステータスに遷移した上でエラー通知やアラートを発報するといった対策が必要です。

またリトライの前提として処理内容が冪等でなければなりませんが、仮にそれが不可能な場合はリトライを諦め全てのエラーを失敗とした上で運用での対処が必要になってしまいます。既に方々で言及されていますが、このような事態を避けるため全てのマイクロサービスのAPIは原則として冪等に実装されなければなりません。

回復不可能なエラーのハンドリング

Invalid ArgumentやNot Foundのように再度のリクエストでも成功が見込めないエラーについては当然ステータスを失敗に遷移させた上で過不足無くエラー通知を行う必要がありますが、加えてエラー発生以降でも可能な限り処理を継続する実装とするべきです。

例えばバッチの内部動作として下記SQLのように「未処理のジョブを古い方から100件取得し処理する」というものがあったとします。

SELECT *
FROM Jobs
WHERE Status = ‘Unprocessed’
ORDER BY CreatedAt ASC
LIMIT 100

この場合、30件目のジョブでエラーが起きた場合でもそこで処理を停止せず残り70件の処理を継続することが望ましいです。中断してもステータス遷移が適切であれば次回以降のバッチで最終的には処理が完了しますが、エラーの割合が多いと一時的にスループットが極端に低下する懸念があります。

突き抜けとジョブ滞留の発見

バッチを設計する際は処理時間を見積もりますが、時間の経過による利用増等により当初想定しないような長時間実行となってしまい後続で問題が起きる可能性があります(俗にいう突き抜け)。また表面的には正しく処理できているように見えてもスループットが足りず未処理のジョブが大量に溜まってしまうこともあります。

更にバッチ処理の最中にエラーが発生した場合は通知が行われますが(Sentryを利用しています)バッチそのものが何らかの理由によりスタックしてしまうと通知も行われずいつの間にか処理が停止したままになることもあり得るため、バッチの外部から監視して発見する必要があります。

手法としては未処理件数を直接監視するというやり方がまず考えられますが、外部から値を取得するための実装が必要であり少々コストの高いやり方となるため加盟店管理システムではバッチの起動回数を元に監視しています。

前述の通り、大概のバッチがKubernetesのCronJobで毎分起動する仕組みになっており多重起動を禁止しています。

spec:
  schedule: "* * * * *"
  concurrencyPolicy: Forbid
  jobTemplate:
    spec:
(略)
      template:
        spec:
          containers:
            - name: merpay-xxx-batch
              image: gcr.io/xxx-prod/xxx
              command: ["sh", "-c", "sleep 20 && /app/xxx-cli batch-create-xxx"]

これにより、もし処理が長引いたり処理がスタックしたりする場合は単位時間あたりのバッチ起動回数が減少することになるためDatadogで下記のようなクエリーを設定し監視しています。

sum(last_30m):sum:trace.command.batch_create_xxx.hits{env:production,cluster-name:xxx}.as_count() < 1

スケーラビリティとデータ整合性

前項目とも関連しますがこちらは解消という観点です。基本的なアプローチとしては以下のようなことが考えられます。

  • Podの性能強化
  • ミドルウェア・インフラの性能強化
  • 同一バッチ内での並行処理
  • 複数Podでの並行処理

前者2つの方法は所謂スケールアップやチューニングで、単純にCPU/メモリの割当を増やしたりより高速なディスクに置き換える、データベースのインデックスを最適化する等によりスループットを向上させます。適切に施せば一定の効果が望める手法ですが要求性能が右肩上がりの場合いずれ限界に達します。

後者2つの並行処理についてはどちらも本質的には同じことでスケールアウトによる解決を図ります。こちらは実装変更が必要というデメリットはありますが、適切に実装すれば大きなスケーラビリティを得られるのがメリットです。

但し平行処理を行う場合は同一のジョブが複数のプロセスで重複して実行されないための状態管理が必要で実装が複雑になってしまいます。現状の加盟店管理システムではまだ多少のスケールアップで対処可能な事例が中心であるため、YAGNIの精神に則りシンプルな直列処理の実装を保っているのが現状です(参考: https://engineering.mercari.com/blog/entry/20211217-163d5c4e4e/ イベントの状態管理)

予期せぬクラッシュ

完全なソフトウェアを実装することは容易ではないため、常に突然のプロセスクラッシュが起きることを前提とする必要があります。またコンテナ環境下ではアプリケーション側で状態を持つことは推奨されず、いつでもプリエンプションされたりクラウド基盤側で障害が起きたりすることを前提にして実装することが望ましいです。

前項目で挙げた参考記事でも少し触れていますが仮にスケーラビリティを得るために並行処理を実装し「処理中」のようなステータスを設けてジョブの排他制御をした場合、ステータス遷移できないままクラッシュすると未処理のままのジョブが発生する可能性があります。

具体的な対策としては、処理が冪等であるならばステータスと最終更新の時刻を元に一定期間「処理中」のままのジョブは再度実行する等といった方法が考えられます。もし、冪等でなく安全な再実行ができない場合は同様の条件でエラー通知した上で運用対処が必要となるでしょう。

終わりに

バッチ処理はAPIとは異なる複数の配慮が必要であり難易度の高い実装になるケースが多いように思います。実際のところ加盟店管理システムでも上に挙げたような実装を全て完全に行っているわけではなく要件、リスク、実装コストを総合的に判断しどこまで対策を行うか検討して折り合いを付けているため必ずしも理想の仕組みにはなっていません。

私事になってしまいますが、一ヶ月ほど前に加盟店管理システムの開発からこれまで以上にバッチ処理を実装する機会が多いチームへ異動となりました。ここに書いたようなこれまでの知見を活かしより質の高い決済システムの構築を行い、また更なる知見の獲得と皆様への共有ができればと思っています。

  • X
  • Facebook
  • linkedin
  • このエントリーをはてなブックマークに追加