内製ワークフローエンジンの設計とメルカリでの活用事例

こんにちは。メルペイでソフトウェアエンジニアをしている @sapuri です。この記事は Merpay & Mercoin Tech Openness Month 2026 の 9日目の記事です。

はじめに

本記事は、2026年4月27日の Background Job Talk 〜 Temporal 活用と独自実装の舞台裏編〜 で発表した「内製ワークフローエンジンの設計とメルカリでの活用事例」を記事化したものです。

マイクロサービスアーキテクチャのような分散システムでは、複数のサービスにまたがる処理のデータ整合性をどう保つか、いわゆる分散トランザクションの扱いが大きな課題となります。

メルカリでは、この課題を Saga パターンによる結果整合性で解決するために、自社でワークフローエンジンを開発して運用しています。

このワークフローエンジンは、もともとメルコインの決済基盤における分散トランザクション管理のために開発したものです。メルペイの Payment Service で得た知見も取り入れながら設計し、現在はメルカリグループ内の複数のユースケースで利用が広がっています。

この記事では、内製に至った背景とワークフローエンジンの具体的な設計、社内での活用事例について紹介します。

分散トランザクション管理の課題と Saga パターン

メルカリでは主にマイクロサービスアーキテクチャを採用しています。

そのため、お客さまがアプリで1つの操作をすると、そのリクエストは基本的に複数のサービスをまたいで処理されます。

例えば、メルカリアプリからビットコインを購入するときの決済リクエストでは、取引データの作成、メルコインの日本円残高の減算、メルペイのポイントの減算、ビットコイン残高の加算、取引データの更新といった複数の処理が関わります。

この決済リクエストは、1つのトランザクションとして扱う必要があります。つまり、一連の処理をすべて成功させるか、すべて失敗させるかのどちらかに寄せる必要があります。

しかし、各サービスがそれぞれデータベースを持っているため、単純にロールバックすることはできません。この点を考慮せずに実装すると、エラーのタイミングによってデータの不整合が発生します。

例えば、このような不整合が起こりえます。

  • 決済が失敗したのにメルコインの日本円残高が減っている
  • 残高は減ったがビットコイン残高が加算されない
  • ビットコインと交換できているのに取引が完了扱いになっていない

また、Two-Phase Commit のような分散トランザクションでは長期間リソースをロックするため、サービスの可用性が下がる可能性があります。

そのため、メルカリでは結果整合性のアプローチで、このような分散トランザクションを解決しています。

Saga

この結果整合性を実現するためのアーキテクチャの1つとして、Saga というパターンがあります。

Saga は、トランザクションを複数の小さなトランザクションに分割して順次実行することで長時間のロックを不要にします。途中でリトライ不可能なエラーが出た場合は、成功済みの処理に対する補償トランザクションを逆順で実行します。

先ほどの暗号資産購入の例で、途中のビットコイン残高を増やす処理でリトライできないエラーが発生した場合を考えます。

この場合、この時点までに成功した処理を取り消す補償トランザクションを逆順に実行します。すでにメルコインの残高とメルペイのポイントが減らされているので、まずポイントを戻し、その次にメルコイン残高を戻し、最後に取引データを失敗として更新します。

このように実装することで、途中のどこで失敗しても結果整合性を保って処理を完了させることができます。

このあたりの話は以前の記事でも紹介しているので、興味のある方はそちらもぜひご覧ください。
メルコイン決済基盤における分散トランザクション管理 | メルカリエンジニアリング

ワークフローエンジンの検討

実装の方針が決まったので、実際に Saga パターンを実装するためにワークフローエンジンの導入を検討しました。

主に検討したツールは、GCP WorkflowsCadenceTemporal です。メルカリでは主に GCP を使ってサービスを構築しているため、まず GCP Workflows を検討しました。ただ、各処理を HTTP のエンドポイントとして実装する必要があり、ユニットテストがやりにくいという懸念がありました。また、YAML ではなく Go のコードでワークフローを記述したいという要望もありました。

Cadence と Temporal も検討しましたが、メルカリでは Cloud Spanner をメインに使っているため、Spanner に対応していなかったことから採用できませんでした。また、Temporal はシステムの規模が大きく、仕組みも比較的複雑なため、運用面にも不安がありました。

このように、既存ツールでは要件を満たせなかったため、自社でワークフローエンジンを開発することにしました。

開発時には、Cadence / Temporal のインターフェースの良さを取り入れつつ、メルペイの Payment Service ですでに実績があった「DB への実行状態の永続化 x インメモリキュー x Worker での実行管理」のアーキテクチャを再利用する方針にしました。また、Go 専用で必要な機能のみに絞ることで、数人の兼務メンテナーでも運用できる規模にしています。

ワークフローエンジンの設計

アーキテクチャ

このワークフローエンジンは、アプリケーションサーバーと同じ Pod でデプロイされることを想定しています。

Go runtime で動作し、利用者は SDK として扱います。

アプリケーションは Manager というインターフェースを使ってワークフローエンジンを操作します。主に Register と Execute という2種類のインターフェースを使います。

アプリケーションはワークフローを普通の Go の関数として実装するので、その関数の内容を事前にワークフローエンジンに登録する必要があります。

manager.RegisterWorkflow() が呼び出されると、Manager は Registry というインメモリの領域に関数を格納します。

manager.Workflow().Execute() は、実際にワークフローを実行するインターフェースです。

呼び出されると、Manager は Engine Server という gRPC サーバーに対して Workflow や Activity を作成するリクエストを送ります。

Engine Server は関数名や引数、実行状態を DB に保存し、インメモリキューである Channel に WorkflowStarted イベントを publish します。

その後、Worker という goroutine が WorkflowStarted イベントを subscribe し、Registry から実行する関数を取得して、Go のリフレクションを使って実行します。

実行が完了すると Worker は Engine Server に完了を報告し、Engine Server は結果を保存して WorkflowCompleted イベントを publish します。

その後、Worker が WorkflowCompleted イベントを subscribe し、アプリケーションに関数の実行結果を返却します。

もしその結果がエラーだった場合は、後述する ErrorMarshaler というインターフェースで、その Workflow を完了させるかどうかを判定します。

ここまで説明したコンポーネントの役割を整理します。

  • Manager: SDK のエントリーポイント。アプリケーションは Workflow()Activity()RegisterWorkflows() などを呼び出します。
  • Engine Server: Create、Complete、List などの gRPC API を提供するサーバー。DB に Workflow や Activity の I/O と状態を保存します。
  • Channel: Workflow や Activity の状態遷移イベントのハブとなるインメモリキュー。
  • Workers: Workflow や Activity を実行する goroutine 群。Channel から状態遷移イベントを購読し、イベントの種別に応じた処理を実行します。
  • Registry: Register された関数をインメモリで保持します。
  • Recovery Worker: Engine Server に対して定期的に未完了の Workflow と Activity を List してリトライします。

コードサンプル

アプリケーション側の実装イメージは次のようになります。

func (s *Service) createExchangeWorkflow(ctx context.Context, params *CreateExchangeParams) (*CreateExchangeResult, error) {
    saga := workflow.NewSaga(s.wm)

    if err := s.wm.Activity(s.authorizeBalance, params.Balance).ExecuteWait(ctx); err != nil {
        return nil, err
    }
    saga.AddCompensation(s.cancelBalance, params.Balance)

    if err := s.wm.Activity(s.authorizePoint, params.Point).ExecuteWait(ctx); err != nil {
        if !isCompletableError(err) {
            return nil, err
        }

        if cerr := saga.Execute(ctx, func(e execution.Execution) error {
            return e.Wait(ctx)
        }); cerr != nil {
            return nil, fmt.Errorf("failed to execute compensation activities: %w, orig_err: %v", cerr, err)
        }

        return nil, err
    }

    return &CreateExchangeResult{}, nil
}

まず、createExchangeWorkflow という関数が定義されています。

この関数は、残高を確保する authorizeBalance という Activity と、ポイントを確保する authorizePoint という Activity を順に実行して結果を返す処理です。

特徴的なのは、それぞれの Activity を実行した直後に、この SDK が提供する Saga の AddCompensation インターフェースで補償トランザクションを登録している点です。

これにより、authorizeBalance Activity が成功した後に authorizePoint Activity が失敗した場合は、authorizeBalance を取り消す処理である cancelBalance という関数が補償トランザクションとして実行されます。

エラーハンドリング

このワークフローエンジンでは、3種類のエラーを定義しています。

  • Completable Error: Workflow や Activity を失敗として完了させてよい、想定されたエラーです。例として、残高不足や利用制限があります。
  • Retryable Error: リトライ対象のエラーです。
  • Incompletable Error: Workflow を完了させずに停止し、Recovery Worker が後でリトライするエラーです。

Completable Error は、明示的に完了できるエラーだけを完了扱いにするための仕組みです。

  • クライアント側で ErrorMarshaler というインターフェースを実装したエラーとして定義される
  • 該当しないエラーはすべて未完了として実行を停止し、Recovery Worker によってリトライされる
  • 明示的に Completable Error を返さない限り Workflow は完了しない
  • アプリケーションが意図していない異常な状態で Workflow が完了しない設計になる
type ErrorMarshaler interface {
    MarshalCompletableError(error) ([]byte, error)
    UnmarshalCompletableError(marshaledErr []byte) error
}

具体例として、ドメインのカスタムエラー型に Completable() というメソッドを定義し、それを使って ErrorMarshaler を実装します。

このようなカスタムエラー型を作っておくことで、ビジネスロジックで特定のエラーコードを含むエラーを返すと、ワークフローエンジンで完了可能なエラーとして処理されます。

type Error struct {
    code ErrorCode
    msg  string
}

func (e *Error) Error() string {
    return e.msg
}

func (e *Error) Completable() bool {
    return e.code == ErrCodeCompletable
}

type workflowError struct {
    Code ErrorCode
    Msg  string
}

type workflowErrorMarshaler struct{}

func (workflowErrorMarshaler) MarshalCompletableError(err error) ([]byte, error) {
    var aerr *Error
    if !errors.As(err, &aerr) {
        return nil, err
    }

    if !aerr.Completable() {
        return nil, err
    }

    return json.Marshal(&workflowError{
        Code: aerr.code,
        Msg:  aerr.msg,
    })
}

func (workflowErrorMarshaler) UnmarshalCompletableError(data []byte) error {
    var werr workflowError
    if err := json.Unmarshal(data, &werr); err != nil {
        return err
    }

    return &Error{
        code: werr.Code,
        msg:  werr.Msg,
    }
}

活用事例

ここまで、独自に開発したワークフローエンジンの設計について紹介しました。

ここからは、決済以外のユースケースも含めて、社内での活用事例を3つ紹介します。

1. メルカリモバイル: 同期レスポンスと非同期処理の分離

例えば、メルカリモバイルの回線開通フローでは、Workflow と Child Workflow というサブのワークフローを定義し、同期レスポンスと非同期処理を分離しています。

具体的には、Workflow と Child Workflow で役割を分けています。Workflow は回線開通リクエストを DB に保存し、Child Workflow を fire-and-forget で起動してレスポンスを返します。

Child Workflow は、非同期で Pub/Sub イベントを発行します。失敗した場合は、Recovery Worker が Child Workflow を復旧します。

これにより、クライアントに即座にレスポンスを返しつつ、Pub/Sub 発行のような後続処理がバックグラウンドで実行されることを保証できます。

2. メルカリ グローバル EC 基盤: Saga によるチェックアウト管理

メルカリ グローバル EC 基盤では、Spanner ではなく PostgreSQL を採用しています。

ここでは、購買代行パートナーを経由して海外のお客さまが日本のメルカリの商品を購入する処理を例にします。

例えばチェックアウト確定フローでは、クーポン消費、注文作成、購買代行パートナーへの注文連携、注文確定通知送信の順に処理が進みます。

このチェックアウトの処理で、冒頭で紹介した暗号資産購入のユースケースと同様に Saga パターンを使ってトランザクションを管理しています。

途中で失敗した場合には、Saga による補償トランザクションでキャンセルします。

内製のワークフローエンジンを採用した理由には、社内にすでにある類似実装や運用基盤を活用したかったことがあります。また、メンテナーが社内にいるため直接サポートを受けられることや、必要な機能を柔軟に追加できて最適化しやすいことも大きな理由でした。

3. eKYC: Signal を使った long-running workflow

eKYC によるお客さまの本人確認フローは、Workflow の中で数時間から数日の審査待ちが発生するという、いわゆる long-running workflow になっています。

これを実現するために、Temporal でも提供されている Signal という機能を実装しました。

Signal を使うことで、Workflow を中断し、外部から Workflow に情報を送って再開させるユースケースを実現できます。

長期フローを細切れのジョブとして分割せず、書類検証、審査待ち、承認または拒否という一連の流れを1本の Workflow として表現できます。

Signal を使ったアプリケーション側の実装イメージは次のようになります。

type ApprovalSignalParams struct {
    Approved     bool
    RejectReason string
}

func (s *Service) approvalWorkflow(ctx context.Context, req *ApprovalRequest) (*ApprovalResult, error) {
    var result *ApprovalResult
    if err := s.wm.Activity(s.verifyDocument, req).ExecuteGet(ctx, &result); err != nil {
        return nil, err
    }

    var params ApprovalSignalParams
    if err := s.wm.Signal("approval").Receive(ctx, &params); err != nil {
        return nil, err
    }

    if !params.Approved {
        return nil, newCompletableError(params.RejectReason)
    }

    return result, nil
}

func (s *Service) handleApproval(ctx context.Context, workflowIdempotencyKey string, params ApprovalSignalParams) error {
    return s.wm.Signal("approval").Send(ctx, workflowIdempotencyKey, params)
}

approvalWorkflow という関数は、verifyDocument という Activity を実行した後に、approval という signal を待機します。

審査が終わり、外部から approval signal をこの Workflow に送信すると、approvalWorkflow が途中から再開します。

まとめ

分散システムにおけるデータ整合性の課題に対して、メルカリでは Saga パターンによる結果整合性を採用し、それを支える仕組みとしてワークフローエンジンを内製しています。

この記事では、その背景と設計、社内での活用事例について紹介しました。

なお、この SDK を使った開発を支援するために、専用の静的解析ツールも開発して運用しています。このあたりの運用面についても発表で触れているので、興味のある方は Speaker Deck のスライドもぜひご覧ください。

次の記事は kubomiさんの「Build First, Discuss Later|初回ミーティングに動くプロトタイプを持ち込んだら、意思決定が爆速になった」です。引き続きお楽しみください。

  • X
  • Facebook
  • linkedin
  • このエントリーをはてなブックマークに追加