【書き起こし】KafkaとFlink SQL on K8sで作るストリーム処理基盤 – 田中克典/中村智行【Merpay Tech Fest 2022】

Merpay Tech Fest 2022 は、事業との関わりから技術への興味を深め、プロダクトやサービスを支えるエンジニアリングを知ることができるお祭りで、2022年8月23日(火)からの3日間、開催しました。セッションでは、事業を支える組織・技術・課題などへの試行錯誤やアプローチを紹介していきました。

この記事は、「KafkaとFlink SQL on K8sで作るストリーム処理基盤」の書き起こしです。

それでは、「KafkaとFlink SQL on K8sで作るストリーム処理基盤」というタイトルで、メルペイDataPlatformチームの田中と中村が発表します。

自己紹介

田中:私はメルペイDataPlatformチームでソフトウェアエンジニアとして働いています、田中克典と申します。主な業務は、後半で発表する中村とともに携わっている、メルカリ/メルペイのサービスで使われているDataPlatformの開発・運用です。主に使っている技術はScalarやPython、Kafka、Flinkなどです。

中村:メルペイDataPlatformチームの中村と申します。メルペイでPythonとScalaを使って、いい感じにデータを処理しているエンジニアです。

本セッションのテーマ

田中:本セッションは、大きく2つのパートからなります。前半は田中から、メルカリ/メルペイで使われているCDC基盤について、全体像から詳細まで発表します。後半は、CDC基盤で集めたデータを使ってリアルタイムにデータを処理するストリームデータ処理基盤について、中村から発表します。

メルカリ/メルペイのCDC基盤

田中:それでは、メルカリ/メルペイのCDC基盤について説明します。

CDCとは

CDCとはChange Data Captureの略称で、データベース内のCREATE、UPDATE、DELETEのようなデータの変更をリアルタイムに取得し、データレイクやデータウェアハウスに保存する仕組みのことです。バッチ処理よりも早くデータを活用できるというメリットがあります。具体的な手法としてはLog-Based CDC、Trigger-Based CDC、Audit Columnsなどがあり、対象となるデータベースによって変わってきます。

引用:Change Data Capture (CDC): What it is and How it Works
https://www.striim.com/blog/change-data-capture-cdc-what-it-is-and-how-it-works/

メルカリ/メルペイにおけるCDCの活用

メルカリ/メルペイにおいてCDCがどのような場所で必要かといいますと、たとえばアンチマネーロンダリング対策のように、ほぼリアルタイムに近い鮮度のデータが必要な分析用途などがあります。このユースケースについては、本セッション後半で詳しく説明します。

データ変遷の履歴が必要な場合もCDCが活用できます。たとえばユーザーのステータスなど、データベースには最新の情報しか残っていないけれど、どのような遷移があったのかを追いたい場合などです。

メルカリ/メルペイのCDC基盤

ここでユースケースの話から離れますが、メルペイDataPlatformチームで開発・運用しているCDCの概要図を説明します。

一番左に実際のサービスを構成しているMicroserviceとそのデータベースがあります。そこからデータをKubernetesクラスター内に構築したKafka ConnectやCDCのOSSであるDebeziumクラスターで吸い出し、Confluent Cloud内に設置されているKafkaクラスターに送信します。

さらにFlinkとKafka ConnectでそKafkaからデータを取得し、加工して分析のためのBigQueryやGCSに送ったり、サービスで活用するためにCloud PubSubや他のデータベースなどに送ったり、3rd Partyのサービスにデータを送ったりしています。

まとめると、Kafkaをハブにしたオーソドックスな構成になっています。

Kafka ConnectとDebeziumによるCDC環境

MicroserviceのデータベースからCDCでデータを取得する部分について、詳しく説明します。

データの取得には、Kafka ConnectやDebeziumを使用しています。Kafka Connectとは、Kafkaと外部システムとの間でデータ入出力を扱うフレームワークです。

Connectorという形でさまざまなデータソースからデータを取得し、Kafkaに送ったり、逆にKafkaからデータを他のデータベースに送ったりできます。メルペイでは主にSpannerからデータを取得するためにJDBC Connectorを、BigQueryにデータを出力するためにBigQuery Connectorを使っています。

DebeziumはLog-Based CDCのOSSで、Kafka ConnectのConnectorとして動作します。メルペイでは、MySQL、PostgreSQLからデータを取り出すために使っています。DebeziumとKafka Connectについては、エンジニアリングブログ(メルペイDataPlatformのCDC DataPipeline | メルカリエンジニアリング)にも記載しましたので、参考にしてみてください。

Confluent Cloud

次にKafkaクラスターについて説明します。メルペイでは、Confluent Cloudに構築したKafkaクラスターを使っています。Confluent Cloudとは、Kafkaのマネージドサービスで、数クリックでKafkaのクラスタを構築・利用できます。

マネージドサービスなので、オンプレミスや自分で構築した場合と比べて、さまざまなタスクから解放されます。またKafka以外にも、その周辺のKafka ConnectやSchema Registryなどもマネージドサービスとして提供されています。

Kafka管理の自動化

便利なConfluent Cloudですが、メルペイDataPlatformではさらなる自動化・簡略化のためにツールを自作しています。

構築したCDC基盤では、異なるMicroserviceのデータを同じKafkaに送信するため、Microserviceごとにアクセス制御が必要になってきます。Microserviceの数に対して管理業務をスケールできるようにするには、Kafkaクラスターだけではなく、その中のTopicやConsumer Groupの管理などもなるべく属人性を排除して省力化できるとよいです。そのためのツールを開発して使用しています。

なおこのツールの開発中に、Confluent PlatformをTerraformから操作するためのTerraform Providerの開発が始まり、先日バージョン1.0がリリースされました。今使っている自作のツールもこちらに乗り換えられないか、検討しているところです。

引用:Confluent CLI
https://docs.confluent.io/confluent-cli/current/overview.html
Terraform Confluent Provider
https://registry.terraform.io/providers/confluentinc/confluent/latest/docs

Kafka Connect on GKE(1)


Kafka ConnectやDebeziumについて説明します。Kafka ConnectやDebeziumはGKEのKubernetesクラスター上で動かしています。Kafka Connectクラスタは、Kubernetes
のDeploymentを利用してデプロイしています。

データベースごとにクラスタをわけて、各ConnectorはKubernetesのジョブとしてデプロイしています。こちらも属人性の排除と省力化のために、共通の設定はカスタマイズを使用して1か所にまとめて小さい差分だけ書くことで、新規のCDCを簡単に構築できるようにしています。

監視については、オーソドックスにjmxのメトリックをDatadogに送信し、Datadog上で監視しています。またConnectorの死活監視を行うツールも作成して使っています。

Kafka Connect on GKE(2)

続いて、セキュリティ周りについても簡単に説明します。

KafkaやDBにアクセスするためのクレデンシャルについては、GCPのSecret Managerに保存して利用しています。また、GKEからはWorkload Identityを利用して、KubernetesのサービスアカウントにGCPのサービスアカウントを紐付けることで、Secret ManagerにアクセスするためのGCPのクレデンシャルを不要にしています。

データベースから取得したデータは、Kafkaに送信する前に暗号化を施しています。具体的には、Kafka ConnectのTransformationという機能を使って実現しています。こちらはOSSとして公開しているので参考にしてみてください。

kafka-connect-transform-kryptonite-gcp
https://github.com/mercari/kafka-connect-transform-kryptonite-gcp

前半まとめ

前半では、メルペイDataPlatformで構築したKafkaを中心とした、CDC基盤について説明しました。Confluent CloudでマネージドなKafkaを構築し、Kafka ConnectやDebeziumを使ってデータをDBから取得し、Kafkaへ送るシステムを構築しました。

Kafka ConnectやDebeziumはKubernetes上で動かして、運用の省力化や脱属人化のためのツールを独自で開発して使っています。それでは、後半の中村にバトンタッチします。

中村:後半は、Flink SQL on Kubernetesによるストリーム処理基盤と題しまして、AMLのCDCデータを利用したパイプラインを例に、メルペイでどのようなストリーム処理基盤を構築しているかを紹介します。

AMLのデータパイプライン

はじめに、AMLのデータパイプラインについてです。メルカリでは、オンプレミス環境のMySQLサーバーが稼働しており、GKE上の特定のノードプールから接続できるようになっています。このノードプールを使って、GKEからDebeziumでMySQLサーバーに接続して、KafkaにCDCのデータを送信しています。

KafkaはConfluent Cloudを利用していますが、GCP上に専用のクラスターを構築し、GKEクラスターとVPCピアリングして、パブリックなインターネットに出ずにデータを送信できるようにしています。Kafkaに送信したデータはGKE上のFlinkアプリケーションでSQLを使ってデータの整形やエンリッチ化を行い、PubSubに送信します。

エンリッチ化データの取得先は、CloudSQLです。PubSubに送信したデータはさらにGKE上のFlinkアプリケーションからSplunkに送信されます。SplunkはHTTPイベントコレクタを利用しています。

FlinkにはSplunkのConnector実装がないので、独自にConnectorを実装しています。一旦PubSubにデータを集約しているのですが、スループットの調整がしやすいという理由や、Splunk以外のBigQueryにもデータを配送していて、複数の配送先にデータを出し分けしやすいようにこのような構成にしています。

FlinkはBigQueryのConnectorの実装もないので、独自にリアルタイムに書き込みができるConnectorを実装しています。

Flink on K8s

次に、Kubernetes上でどのようにFlinkアプリケーションを構築しているかについてです。FlinkをKubernetes上で動かすにはいくつか方法があるのですが、メルペイではStandaloneクラスタのApplicationModeを利用しています。

ジョブマネージャのコマンドは、オフィシャルのイメージに含まれているentrypointシェルを利用して、スライドのようなコマンドでジョブマネージャ上でアプリケーションを起動する形で動かしています。ジョブマネージャーとタスクマネージャーのYAMLの管理にはKustomizeを利用して、dev/prod環境の設定の違いを差分で管理しています。

デプロイはSpinnakerを利用し、モニタリングにはDatadogを利用しています。DatadogはHttpReporterを使ってアプリケーション毎にメトリックを送信しています。ログ収集にはDatadogログを利用しています。

FlinkSQL

次に、FlinkSQLについてです。

こちらはFlinkオフィシャルドキュメントに記載されている図で、非常にわかりやすく表現されていると思います。FlinkSQLでは、CREATE TABLEでストリームデータをDynamicTableと呼ばれるものに変換して、DynamicTableに対してSQLを実行します。

CREATE TABLEでは、WITHの中に接続先の情報やデータフォーマットを記述します。クエリの結果はDynamicTableにINSERTすることで、再度ストリームデータに変換されます。

KafkaのデータをFlinkSQLで処理

続いて、実際にKafkaのデータをFlinkSQLで処理する例です。

Kafkaからデータを取得するSourceテーブルを定義します。左側のCREATE文です。AMLのパイプラインでは、PubSubにデータを送信するので、データ送信先のSinkテーブルを定義します。右側のテーブルです。真ん中のSELECT・INSERT文で、SourceテーブルからSinkテーブルにINSERTすることでデータを処理します。SELECT文の中では、別のテーブルとJOINすることなどができます。

PubSubのデータをFlinkSQLで処理

AMLのパイプラインではSinkテーブルとしてPubSubを利用しています。FlinkにはPubSubのコネクタ実装があるのですが、SQLには対応していません。FLINK-23501でissueがあげられており、SQLが使える実装がされています。これと同じようなものを自前で実装し、SQLで使えるようにしています。

CREATE文で、「’connector’ = ‘PubSub’」を指定するだけです。Kafkaと同じようにSource/Sink先のテーブルを作成してSELECT INSERTする形で使います。PubSubにAckを返すタイミングがチェックポイントのタイミングで、チェックポイント単位で安全にデータを配送できます。

データフォーマット

次にデータフォーマットについてです。KafkaのデータはAvroを利用しています。スキーマレジストリはConfluentのマネージドスキーマレジストリを使っています。

CREATE文で「key.format/value.format = avro-confluent」を指定するだけで簡単に使えます。PubSubのデータはJSONまたはProtoBufを利用しています。FLINK-18202でissueがあげられており、こちらの実装を利用しております。おそらく次のバージョンである1.16から正式にサポートされると思います。CREATEで「’format’ = ‘protobuf’」を指定し、クラス名を書くだけで簡単に利用できます。

DBデータとのJOIN

次に、DBデータとのJOINについてです。CDCのデータは正規化されていることが多く、JOINして足りない情報を担う必要があります。Flink SQLでは、JOIN句でストリームデータとDBデータを簡単にLookup JOINできます。

左側のテーブルがKafkaのストリームデータです。ストリングデータにproctimeというフィールドを追加します。右側のテーブルがDBデータのテーブルです。JDBCconnectorでPostgreSQLのテーブルを参照しています。SELECT文のJOIN句で、JOIN先のテーブル(この場合usersテーブル)を指定し、FOR SYSTEM_TIME ASに、ストリームデータに追加したprocktimeフィールドを追加する形です。

実際には、このようにWHERE句にIDを指定したSELECT文が実行され、データがJOINできます。JOINの方法はここで紹介したLookup JOIN以外にもいくつかパターンがあります。GitHubにFlink SQL Cookbookというリポジトリがあるので参照していただけると良いと思います。6パターンくらいのJOINが紹介されています。

UDF

次にUDFについてです。UDFは主に日付フォーマッタと、JOINと同じようにDBデータをLookupできるファンクションを独自に実装して使っています。

日付はソースによってフォーマットが違う場合が多く、ビルトインのファンクションで対応しようとするとSQLが複雑になりやすいので、簡単に整形できるファンクションを定義しています。

DBデータをLookupするファンクションは、単純にDBにクエリを投げて結果を取得するファンクションです。1対多で紐づくデータを配列として取得したい場合やSQLだと複雑になりやすい集計処理を行ったり、JOINするテーブルが多くなったりするとSQLが複雑になりやすいのでそれらを避けるために使っています。

このUDFは、H2 Databaseを使ったテストを書けるようにしており、実際にDBに実行するクエリのテストが簡単に書けるようにしています。

HA Mode

次にHA Modeについてです。

FlinkはKubernetes上でZookeeperを使わなくてもHA構成にできます。Flinkの設定ファイルに、このように3つぐらいの設定を書くだけで簡単に設定できます。メルペイでは、リカバリ用のデータの管理にGCSを利用しています。HAの構成情報はConfigmapで自動的に管理されます。名前の末尾に-leaderとつくようなConfigmapが四つほど自動的に作られ、管理されます。HA構成にすることで、単一障害点のジョブマネージャを複数起動できるようになります。ジョブマネージャーが落ちてもスタンバイのマネージャーがリーダーとなり、最新のチェックポイントから自動的に復旧されます。

Kubernetesの場合はステートが重要でないアプリで多少のダウンタイムが許容できるのであれば、podが落ちてもKubernetesが自動的に上げ直してくれますので、Kubernetesに管理を任せるのもありだと思います。

Auto Scaling

次に、Auto Scalingについてです。Flink 1.13からReactiveModeという機能が追加されています。Flinkの設定ファイルで「scheduler-mode: reactive」を追加することで簡単に使えます。ReactiveModeはStandaloneクラスタのApplicationModeのみ対応しています。

このModeでは、タスクマネージャーの数を変更すると、自動的にジョブマネージャの並列実行数がタスクマネージャーの数に合わせられます。この機能を使ってKubernetesの場合は、HPAと組み合わせることでオートスケーリングできるようになります。

DatadogのメトリックはHPAの外部メトリックとして使えます。外部メトリックを使って、KafkaやPubSubの未処理のメッセージ数や遅延時間に応じてスケールアウトできます。Kafkaの推定遅延時間は、OSSで公開されているKafka Lag Exporterによって取得できます。メルペイでは、Datadogの外部メトリックを利用して、ピーク時とオフピーク時でトラフィック差があるようなアプリケーションにオートスケーリングを導入しはじめています。

まとめ

メルペイでは、Kubernetes上にFlinkを使ってストリーム処理基盤を構築しています。

ストリーミングデータに対して、SQLで整形処理やデータのエンリッチ化が簡単に行えるようにしています。Kubernetes上で動かしているので、他のアプリケーション(主にGo)とCI/CDの仕組みやモニタリング、スケーリングの仕組みを共通化できます。

Kubernetes上では簡単にHA構成にできますので、チェックポイント/セーブポイントの仕組みと組み合わせて、より安全にデータを配送できる環境を構築しています。

発表は以上です。ご清聴いただきありがとうございました。

  • X
  • Facebook
  • linkedin
  • このエントリーをはてなブックマークに追加