こんにちは。メルペイData Platformチームの@siyuan.liuです。
この記事は、Merpay & Mercoin Advent Calendar 2024 の記事です。
Merpay Data Platformチームは、社内共通のデータ処理基盤の開発と運用を担当しており、バッチ処理やリアルタイム処理など、さまざまなPipelineを提供しています。その中でも、リアルタイムで大量のログを収集するStream Pipelineがあります。このStream処理は、Kubernetes上でFlink Operatorを使用し、Flinkを自動的に管理することで実現されています。
しかし、このStream Pipelineでは、トラフィックピーク時にCPUやメモリ不足によるコンシューマーラグが発生するという課題がありました。これを防ぐため、通常は余剰のCPUやメモリを割り当てますが、その結果、コスト増加を招く新たな課題も生じます。
これらの課題を解決するため、HPA(Horizontal Pod Autoscaler)を導入し、外部のDatadogメトリクスにより、トラフィックに応じてサーバー数を自動でスケールアウトさせることで、ピーク時のリソース不足を防ぎました。また、Tortoise VPAを利用してリソースの使用率に基づいて動的なリソース割り当てを行い、無駄なリソース配置を抑制しコスト削減を実現しました。
本記事では、HPAとTortoise VPA(Vertical Pod Autoscaler)を活用したこれらの解決策とその効果について、参考コードと具体的なデータとともに詳しく解説します。
背景と課題
本節では、Stream Pipelineの構成、およびStream Pipelineにおけるトラフィックピーク時のリソース不足と過剰なリソース割り当ての課題について解説します。
Stream Pipelineの構成
Data Platformチームは、主に社内のPaaS Data Platformの開発と管理を行います。このプラットフォームは、Batch、Stream、CDCなどのデータパイプラインを提供し、各マイクロサービスに分散されたデータへのアクセスと活用を効率化します。その中でStream Pipelineは、各Microserviceからアクセスログとイベントログを収集し、ニアリアルタイムでデータをGoogle Cloud Storageに保存する役割を担っています。また、ログをBigQueryに直接保存することや、他のシステムに転送することも可能です。
本記事で取り上げる課題は、上図の赤枠で示したFlink処理の部分と関わります。この部分は主に、FlinkOperatorが管理するFlinkを使用し、Pub/SubやKafka Topicのデータを消費します。
課題1:トラフィックピーク時にCPUやメモリ不足によるコンシューマーラグ
イベント、キャンペーンなどにより、一時的にアクセスログやイベントログのトラフィック量が急増することがあります。このような状況下で、Flink処理に十分なリソースを確保できない場合、Pub/Subのデータを消費しきれず、コンシューマーラグが発生します。これにより、データ同期の遅延や、データの損失につながる可能性があります。
このトラフィックピークによるコンシューマーラグをどのように解決するかは、Data Platformチームにとって重要な課題となっています。
課題2:過剰なリソース割り当てによるコスト増加
Flink処理の安定性を確保するため、Data Platformチームでは通常、Flinkに余裕なリソースを割り当てています。たとえば、通常時においてはCPU 0.5 Core、メモリ2048MでFlinkが正常に稼働しますが、トラフィックピークに考慮してCPU 1 Core、メモリ4096Mを割り当てることが一般的です。1つのFlinkに対してこのような対応を行う場合、大きなリソースの無駄にはなりませんが、Data Platformチームが管理するFlinkは200個以上あり、すべてのFlinkにこの方針を適用した結果、リソースの無駄が生じています。
そのため、Flinkのリソースを動的に最適化することが、重要な課題となっています。
HPAによるトラフィックピーク時の自動スケールアウト
本節では、HPAを使用したFlinkの自動スケールアウト方法と、FlinkのReactive ModeによるTask Managerリソースを最大限に活用する方法について、参考コードとともに解説します。
HPAによるFlink TaskManagerの自動スケールアウト
トラフィックピークによるリソース不足のコンシューマーラグを防ぐため、HPAを導入しました。KubernetesのHorizontal Pod Autoscaler(HPA)は、リソース使用量やメトリクスに基づいてPodのレプリカ数を自動的に調整する仕組みです。トラフィックのピーク時にはスケールアウトし、需要が低いときにはスケールダウンすることが可能です。また、Flink OperatorはHPAをサポートしており、HPAの設定でscaleTargetRefのkindをFlinkDeploymentに指定することで、TaskManagerのスケールアウト・スケールダウンを自動的に制御できます。
HPAで使用する外部のDatadogメトリクス oldest_unacked_message_ageは、サブスクライバーによって確認応答(ACK)されていない、サブスクリプションのバックログ内の最も古いメッセージの経過時間(秒単位)の指標です。トラフィックピーク時にFlinkがPub/Subのデータを消費しきれない場合、このメトリクスの値が増加します。このメトリクスをHPAの閾値として設定すれば、トラフィックに応じてFlinkのTaskManagerの数を自動的に調整できます。
参考コードは以下になります。
metadata:
name: event-log-or-access-log-hpa
spec:
scaleTargetRef:
name: event-log-or-access-log
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
maxReplicas: 12
minReplicas: 6
behavior:
scaleUp:
stabilizationWindowSeconds: 0
policies:
- periodSeconds: 15
type: Pods
value: 4
- periodSeconds: 15
type: Percent
value: 100
selectPolicy: Max
scaleDown:
policies:
- periodSeconds: 90
type: Percent
value: 2
selectPolicy: Max
metrics:
- external:
metric:
name: oldest_unacked_message_age
target:
type: Value
value: "420"
type: External
kind: HorizontalPodAutoscaler
apiVersion: autoscaling/v2
この設定により、トラフィックピーク時にPub/Subの消費が7分(420秒)以上の遅延となった場合、FlinkのTaskManagerの数が自動的に増加され、リソース不足を防止します。また、トラフィックピークが収まった際には、TaskManagerの数が元の状態まで自動的にスケールダウンされ、リソースの無駄遣いを防ぎます。
Flink Reactive ModeによるTaskManagerのリソース利用率の最適化
トラフィックピーク時には、TaskManagerのリソースが自動的に増加しますが、それに伴ってJobの並列度(parallelism)を調整しないと、スケールアウトしたTaskManagerのリソースが十分に利用されません。
HPAによって増加したTask Managerのリソースを効率的に利用するため、Flink の Reactive Mode を導入しました。Reactive Modeでは、Jobの並列度が可能な最大値に設定され、Cluster内のTaskManagerのリソースを可能な限り利用します。これにより、トラフィックピーク時に TaskManager のリソースを最大限に活用し、効率的なリソース管理が可能となります。
ただし、Reactive Mode は Standalone Mode でのみ使用可能なため、利用時にはご注意ください。
FlinkのReactive Modeを有効化する参考コードは以下になります。
metadata:
name: flink
spec:
image: flink
imagePullPolicy: IfNotPresent
mode: standalone
flinkVersion: v1_18
flinkConfiguration:
scheduler-mode: reactive
Tortoise VPAによる過剰なリソース割り当ての最適化
本節では、過剰なリソース割り当ての課題を解決するため、Tortoiseの概念、Tortoise VPAのみを導入する理由、およびFlink Operatorが管理するFlinkへのTortoise VPA導入時の注意点を解説します。最後に、これらの理由と注意点を踏まえ、Tortoise VPA導入の参考コードを示します。
Tortoiseとは
Tortoiseは、過去のリソースの使用量や過去のレプリカの数を記録しており、それを元にHPAやVPAを最適化し続ける仕組みです。この中で、Tortoise Vertical Pod Autoscaler (VPA) はCPUやメモリのResource Request/Limitを最適化する役割を担います。これにより、負荷が低下した際に、不要なリソース割り当てを削減することが可能になります。Tortoise の詳細については、以下の公開記事をご参照ください。
「人間によるKubernetesリソース最適化の”諦め”とそこに見るリクガメの可能性」
Tortoise VPAのみを導入する理由
TortoiseはHPAとVPAの両方をサポートしていますが、DataplatformチームではTortoise VPAのみを使用しています。その理由は二つあります。
一つ目は、DataplatformチームではすでにKubernetes HPAを利用しているためです。既存のKubernetes HPAは外部のDatadogメトリクスに基づいてリソースを調整していますが、Tortoise HPAは外部のメトリクスによるリソース調整をサポートしていません。
2つ目は、FlinkOperatorを利用しているためです。FlinkOperatorはカスタムリソースを用いてFlinkを管理していますが、Tortoise HPAはカスタムリソースをサポートしていません。一方、FlinkOperatorはKubernetes HPAをサポートしているため、既存のKubernetes HPAは問題なく利用できます。
以上の理由から、Data PlatformチームはTortoise HPAを導入せず、Tortoise VPAのみを使用しています。
FlinkへのTortoise VPA導入時の注意点
Flink Operatorが管理するFlinkにTortoise VPAを導入する際には、FlinkDeploymentをTortoiseに管理させるのではなく、JobManagerとTaskManagerのDeploymentを分け、それぞれのTortoiseに管理させる必要があります。JobManagerとTaskManagerは異なる役割を持ち、それに応じて必要なリソースも異なります。しかし、Tortoiseはカスタムリソースをサポートしていないため、Flink OperatorのFlinkDeploymentをTortoiseに管理させると、JobManagerとTaskManagerが同一のリソースとして扱われてしまい、適切な推奨値が算出されなくなります。
また、JobManagerのリソースがTortoiseによって過度にスケールダウンされると、リソース不足によるダウンタイムが発生する可能性があります。そのため、minAllocatedResource(リソース割り当ての最低値)を設定することが重要です
参考コード
上記の理由と注意点を踏まえ、Tortoise VPAによってリソースを調整する設定は以下になります。
metadata:
name: event-log-or-access-log-jobmanager-tortoise
spec:
targetRefs:
scaleTargetRef:
kind: Deployment
name: event-log-or-access-log-jobmanager
updateMode: Auto
resourcePolicy:
- containerName: flink-main-container
minAllocatedResources:
memory: 4294967296
autoscalingPolicy:
- containerName: flink-main-container
policy:
cpu: Vertical
memory: Vertical
kind: Tortoise
apiVersion: autoscaling.mercari.com/v1beta3
---
metadata:
name: event-log-or-access-log-taskmgr-tortoise
namespace: merpay-dataplatform-jp-prod
spec:
targetRefs:
scaleTargetRef:
kind: Deployment
name: event-log-or-access-log-taskmanager
updateMode: Auto
autoscalingPolicy:
- containerName: flink-main-container
policy:
cpu: Vertical
memory: Vertical
kind: Tortoise
apiVersion: autoscaling.mercari.com/v1beta3
結果と効果
本節では、具体的なデータをもとに、改善の結果とその効果について説明します。
1. HPAによるトラフィックピーク時の安定性の向上
HPA を導入した結果、トラフィックピークの際に Flink の Pod 数が自動的にスケールアウトされることで、リソース不足によるダウンタイムが発生しなくなりました。
上記の図は、Mercariの検索プロジェクトにおけるoldest_unacked_message_age(最も古い未確認応答メッセージの経過時間)とdeployment_replica(レプリカ数)のメトリクスを対照したものです。10:35 ごろ、 oldest_unacked_message_age メトリクスが 増加する直前に、Flink の Pod 数も増加していることが確認されました。また、同日の 16:55 ごろにピークが収束するとともに、Flink の Pod 数も自動的にスケールダウンされ、安定的なリソース利用が実現されました。
この動的スケーリングの結果、従来はリソース不足で発生していたコンシューマーラグはほぼ 0% にまで削減され、システム全体の安定性が大幅に向上しました。
2. VPAによるコストの削減
2024年5月にTortoise VPA を導入した結果、通常時のリソース使用量に基づいた最適な resource request と limit が自動的に設定され、不要なリソースの割り当てが大幅に削減されました。
その結果、月平均で 100万円以上、年間で 1200万円以上のコスト削減を実現しました。
まとめ
Data Platformチームが管理するStream Pipelineにおいて、トラフィックピーク時のリソース不足によるコンシューマーラグと、過剰なリソース割り当てによるコスト増加という2つの課題とその解決策を解説しました。HPAを活用した自動スケーリングにより、Flinkのスケールアウト・スケールダウンを実現し、Stream Pipeline全体の安定性が向上しました。また、Tortoise VPAを用いることで、リソースの効率的な割り当てを行い、無駄なリソース配置を抑制し、コストの削減を実現しました。
今後もData Platformチームでは、Stream Pipelineのリソース最適化をさらに進め、同様の対策をCDC PipelineのKafka Connectにも取り込みたいと考えています。
次の記事は @togamiさん です。引き続きお楽しみください。