この記事は、Merpay Tech Openness Month 2023 の7日目の記事です。
はじめに
こんにちは。メルコイン Payment Platform チームの @sapuri です。
メルコインではマイクロサービスアーキテクチャを採用しており、お客さまによりアプリの操作が行われると、それぞれのマイクロサービスを横断してリクエストが処理されます。
メルコインの Payment Platform は、お客さまの残高の管理や各種帳簿の作成などの決済事業のための基盤となる仕組みを提供しています。
そのなかで、Payment Service は決済トランザクションを管理するサービスとして、下位層のサービスが提供する各種決済手段を利用して、上位層のサービスが共通して利用できる決済 API を提供しています。
この記事ではマイクロサービスアーキテクチャにおける分散トランザクション管理の課題を説明して、Payment Service で運用されている管理手法を簡単にご紹介します。
分散トランザクションの課題
分散トランザクションとは、複数のノードや複数のデータベースをまたがって実行されるトランザクションのことを指します。
マイクロサービスアーキテクチャでは各サービスが独自のデータベースを持つため、複数のサービスにまたがるトランザクションを行う場合、アプリケーションは単純にローカルトランザクション (ACID) を使用することができません。
そのため、各サービスのデータの整合性をどのようにして保つのかが課題になります。
例えば、メルコイン口座の残高とメルペイのポイントを使ってビットコインを購入する場合を想定してみます。
この場合、決済処理としてざっくり次のような処理を行うことになります。
- 取引データを作成する
- メルコイン口座の残高を減らす
- メルペイのポイントを消費する
- ビットコイン残高を増やす
- 取引データを更新する
- 決済の結果を通知する
これは全ての処理が成功するとした場合のシーケンスです。
しかしながら、実際にはネットワークや依存先のサービスの障害などによってエラーが発生することがあるため、それぞれの処理が失敗した場合を想定しなければなりません。
途中で処理が失敗した場合、例えば次のような状態が発生する可能性があります。
- 決済が失敗したのにメルコイン残高が減っている
- 決済が失敗したのにメルコイン残高とポイントが消費されている
- ビットコイン残高が増えない(もしくは増えたかどうかわからない)
- ビットコインとの交換が行われたが、取引が完了していないことになっている
- 決済は成功したが結果が通知されず、サブスクライバーの処理が実行されない
このように、サービス間のデータ整合性を保つためにどのようなハンドリングをすべきか、どのようにロールバックを実現するかなどについて適切な設計を考える必要があります。
分散トランザクション管理手法: Saga パターン
メルコインの Payment Service は、複数のマイクロサービスにまたがる決済トランザクションを処理するために Saga パターンを採用しています。
Saga は複数サービス間のデータの整合性を維持するためのトランザクション管理手法です。
これは、トランザクション処理に数分、数時間、あるいは数日かかるような LLT (Long Lived Transactions) に対する問題解決のために考案されたアプローチです。
操作するリソースごとのサブトランザクションにトランザクションを分解し、それらを独立に処理することでデータを長期間ロックする必要がなくなります。
各サブトランザクションは独立してコミットされるため、単純にロールバックを実行することはできません。
そのため、サブトランザクションによるリソースの変更を取り消すようなトランザクション(補償トランザクション)を実行することによってロールバックを行います。
このようにして、データを長期間ロックすることなく補償トランザクションによってトランザクションの最終的な整合性(結果整合性)を担保します。
前章のユースケースを Saga パターンで実装する場合、次のようにリソースの操作をサブトランザクションとして分割し、それらを取り消す補償トランザクションを設計します。
例えばビットコイン残高を増やすトランザクションが失敗してそれ以上処理を進められなくなった場合、それまでに行ったリソースの操作である「メルコイン残高の減少」と「ポイントの消費」を取り消すトランザクションを順に実行し、最後に取引データの状態を失敗として更新します。
ここでは単純に表現するために直接残高を増減させているかのように書いていますが、実際には TCC パターンのようにリソースの操作ごとに「仮押さえ」と「確定」の二段階の処理に分割しています。
これにより、ロールバック時は仮押さえしたものを解放するだけなので、履歴が汚れるなどの副作用を発生させずに補償処理を実現することができています。
ここで、「では補償トランザクションや確定処理のトランザクションが失敗したときはどうするのか?」と思う方もいるかもしれません。
この問題については、成功するまでリトライし続ける仕組みを用意して最終的に必ず成功させるように実装することで解決できます。
そのためには他のサービスのデータの状態を気にせずにリトライできるように、各サービスは冪等性を持った API を提供する必要があります。
また、Saga パターンには「コレオグラフィ」と「オーケストレーション」の2つのアプローチがありますが、Payment Service ではオーケストレーションのアプローチを採用しています。
オーケストレーションベースの Saga を実現するためには、各サブトランザクションや補償トランザクションを登録して実行するためのインターフェース、そしてそれらを調整するコーディネーターとしてのツールが必要になります。
分散トランザクション管理ツールの選定
オーケストレーションベース Saga を実現するためのツールとして、GCP Workflows があります。
GCP Workflows ではビジネスロジックとフローの定義が分かれており、フローの定義は YAML ファイルに記述します。
一方、Uber が提供する OSS である Cadence は、コードベースのワークフローというコンセプトで、ワークフローを管理するためのイベントソーシングに基づくオーケストレーターを提供します。
ここでのワークフローとはアプリケーションのビジネスロジックの主要な単位であり、状態を持ち長期間実行されるコードの定義を意味します。
提供されている SDK を使用することで、ビジネスロジックを含む関数としてワークフローを記述することができるため、通常のプログラミングと似たような開発体験を実現できます。
しかしながら、Cadence は Spanner をサポートしておらず、OSS の性質上、自社でのデプロイとメンテナンスが必要となります。(Cadence の内部を理解している専門家が必要になる)
また、Temporal は Cadence と同じくコードベースのワークフロー管理を提供します。
Temporal は Cloud Native Computing Foundation に加入しているプロジェクトで、Cadence をベースに開発されました。
このようないくつかの既存のツールを調査した結果、メルコインでは Cadence と Temporal を参考にした独自のワークフローコーディネーターを開発することになりました。
この独自のコーディネーターの詳しい仕組みについてはこの記事では割愛しますが、アプリケーションは提供される SDK を使うことで Cadence と似たインターフェースでワークフローを管理できます。
実装
ワークフローは複数の独立したコミット (アクティビティ) で成り立っており、それらを手続き的にコードで表現します。
Payment Service を例にすると、大まかに次のようなコードでワークフローを記述します。
type PaymentService struct {
manager *workflow.Manager
}
type CreateExchangeRequest struct {
IdempotencyKey string
// ...
}
type Exchange struct {
ID string
Status int64
// ...
}
func (s *PaymentService) CreateExchange(ctx context.Context, req *CreateExchangeRequest) (ex *Exchange, _ error) {
exe, err := s.manager.Workflow(s.createExchangeWorkflow, req).Execute(ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute workflow: %w", err)
}
if err := exe.Get(ctx, &ex); err != nil {
return nil, fmt.Errorf("failed to get result: %w", err)
}
return ex, nil
}
func (s *PaymentService) createExchangeWorkflow(ctx context.Context, params *CreateExchangeRequest) (*Exchange, error) {
ex, err := s.createExchangeActivity(ctx, params)
if err != nil {
return nil, fmt.Errorf("failed to create exchange: %w", err)
}
saga := workflow.NewSaga(s.manager)
if err := s.executeAuthorizeActivities(ctx, saga, ex.ID); err != nil {
if !isCompletableError(err) {
return nil, fmt.Errorf("returned a non-completable error: %w", err)
}
if cerr := saga.Execute(ctx, func(e execution.Execution) error {
return e.Wait(ctx)
}); cerr != nil {
return nil, fmt.Errorf("failed to execute compensation activities: %w, orig_err: %v", cerr, err)
}
if err := s.manager.Activity(s.markExchangeAsFailedActivity, ex.ID).ExecuteGet(ctx, &ex); err != nil {
return nil, fmt.Errorf("failed to mark exchange as failed: %w", err)
}
return nil, fmt.Errorf("failed to authorize exchange: %w", err)
}
if err := s.manager.Activity(s.markExchangeAsAuthorizedActivity, ex.ID).ExecuteGet(ctx, &ex); err != nil {
return nil, fmt.Errorf("failed to mark exchange as authorized: %w", err)
}
if err := s.manager.ChildWorkflow(s.captureExchangeWorkflow, ex).ExecuteGet(ctx, &ex); err != nil {
return nil, fmt.Errorf("failed to execute child workflow: %w", err)
}
return ex, nil
}
func (s *PaymentService) captureExchangeWorkflow(ctx context.Context, ex *Exchange) (*Exchange, error) {
if err := s.executeCaptureActivities(ctx, ex.ID); err != nil {
return nil, fmt.Errorf("failed to capture exchange: %w", err)
}
if err := s.manager.Activity(s.markExchangeAsCapturedActivity, ex.ID).ExecuteGet(ctx, &ex); err != nil {
return nil, fmt.Errorf("failed to mark exchange as captured: %w", err)
}
return ex, nil
}
func (s *PaymentService) executeAuthorizeActivities(ctx context.Context, saga *workflow.Saga, id string) error {
if err := s.manager.Activity(s.authorizeBalanceExchangeActivity, id, uint64(1000)).ExecuteWait(ctx); err != nil {
return fmt.Errorf("failed to authorize balance exchange: %w", err)
}
saga.AddCompensation(s.cancelBalanceExchangeActivity, id)
if err := s.manager.Activity(s.authorizeMerpayPaymentChargeActivity, id, uint64(500)).ExecuteWait(ctx); err != nil {
return fmt.Errorf("failed to authorize merpay payment charge: %w", err)
}
saga.AddCompensation(s.cancelMerpayPaymentChargeActivity, id)
return nil
}
func (s *PaymentService) executeCaptureActivities(ctx context.Context, id string) error {
if err := s.manager.Activity(s.captureBalanceExchangeActivity, id).ExecuteWait(ctx); err != nil {
return fmt.Errorf("failed to capture balance exchange: %w", err)
}
if err := s.manager.Activity(s.captureMerpayPaymentChargeActivity, id).ExecuteWait(ctx); err != nil {
return fmt.Errorf("failed to capture merpay payment charge: %w", err)
}
return nil
}
アクティビティが失敗した場合は、そのエラーが「完了可能エラー」なのかどうかによって二種類のハンドリングに分かれます。
- 完了可能エラー
- ワークフローを失敗として完了する。必要に応じて補償トランザクションを実行してワークフローを終了する。
- 前章で述べたように、補償トランザクションや確定処理のトランザクションはこのエラーを返却することはありません。
- 例:
- 予期されたエラー
- 残高不足エラーや利用制限によるエラーなど
- 予期されたエラー
- その他のエラー
- Recovery worker によってワークフローがリトライされる。
- 例:
- 一時的なエラー
- 通信の遅延によるタイムアウトなど
- 予期しないエラー
- サービスのバグなどによる Internal エラー
- 一時的なエラー
Recovery worker によって、ワークフローは成功するか、あるいは完了可能エラーが発生するまで一定間隔で無限にリトライされ続けます。
そのため、アプリケーションはそれぞれのアクティビティを冪等にし、ワークフローで実行されるアクティビティの実行順序が決定的になるように実装します。
アクティビティが完了可能エラーによって終了した場合は、補償トランザクションでそれまでに成功したアクティビティによるリソース変更を取り消すためのアクティビティを実行します。
このように、ワークフローによって成功すべきリクエストは最終的には必ず成功し、マイクロサービスをまたいだトランザクションにおいてもデータ整合性を実現することができます。
おわりに
この記事では、マイクロサービスアーキテクチャにおける分散トランザクションをワークフローコーディネーターを用いたオーケストレーションベースの Saga によって管理する手法を簡単にご紹介しました。
今回紹介した SDK は、メルコインだけでなくメルペイのいくつかのサービスにも導入される予定で、開発を支援するためにこの SDK に特化した静的解析ツールも開発して運用しています。
この他にも、メルペイ・メルコインでは決済データの不整合によるリスクを回避するために自動的にリコンサイルを行う仕組みを導入しています。
興味のある方はこちらの記事もご覧ください。
マイクロサービスにおけるリコンサイルの話 | メルカリエンジニアリング