default image

この記事は、 Merpay Tech Openness Month 2020 の 2 日目の記事です。 こんにちは。メルペイ DataPlatform チーム、ソフトウェアエンジニアの @syucream です。

主にメルペイのデータ基盤の開発・運用の業務に携わっております。今回はみんな大好きお金とビッグデータとコスト削減のお話をします。

はじめに

様々なデータを収集して横断的に分析可能にする、いわゆるデータ基盤には少なくないコストが掛かります。「ビッグデータ」という言葉が示すような大量のデータを保存して処理する際には、永続化のためのストレージコストがかかります。この課題は我々メルペイ DataPlatform チームも無関係ではありません。以前の記事で紹介させていただきましたが、我々のデータ基盤はさまざまなマイクロサービスからのデータベースや出力されたログを横断管理できるようにしています。

https://engineering.mercari.com/blog/entry/2018-12-14-125639/

加えて今日ではメルペイのみならずメルカリにもデータ基盤としての機能を提供し始めています。このようなビジネスと組織の拡大と基盤としてのあり方の変化、加えて時間の経過によってストレージコストは当初と比べて膨れ上がっており、コスト削減を検討する必要が生まれるに至りました。

データレイクがどういう構成だったのか

メルペイ DataPlatform では以下の記事の通り GCS をデータレイクとして、 BigQuery をデータウェアハウスとして活用しています。

GCS には Apache AvroObject Container Files 形式で格納しています。合わせて圧縮コーデックに Snappy を利用しています。また Avro ファイルスキーマが自己記述的で BigQuery に読み込むことが容易 です。さらに DEFLATE か Snappy であれば無加工で読み込ませることができます。加えてスキーマを各レコードで保持しなくても良くなり、かつ Snappy でも無圧縮よりはデータサイズが削減でき、効率的にデータを保持できます。これがもし JSON 形式でログを永続化するなら、ログのフィールド名やスキーマは各レコードでそれぞれ保持されて冗長になります。 とはいえ年月が経過して大量のデータが GCS に積み上がるとこの構成でもストレージのコストが無視できなくなってきます。 BigQuery であれば 自動的にストレージコストの最適化が行われる のですが、 GCS ではこれに相当する操作は自前で行う必要があります。 また可用性を重視して GCS のストレージクラスをマルチリージョンに設定していたのですが、長期にわたるデータの保持において遠い過去のデータをマルチリージョンで持ち続けるのは無駄が多いとも考えられます。

デカ過ぎて固定資産税がかかりそうなデータレイクの古いログをどうにかする

というわけで本題です。今月この課題に大して以下の2つの施策を行ったのでご紹介します。

  • 古いログの粒度を圧縮しなおす
  • ストレージクラスを見直す

古いログの粒度を圧縮しなおす

GCS に格納していた Avro 形式のファイルなのですが、長期に渡る保存となると別のオプションも出てきます。 今回はファイルサイズと将来の参照コストを少しでも減らすための対策として具体的に以下の作業を行いました。

  • ファイルフォーマットを Avro から Parquet に変更
  • 細かい粒度で保持していたファイルを荒い粒度に揃え直す
  • 圧縮コーデックを gzip に変更

まず最初のファイルフォーマットの変更ですが、 GCS に格納していたデータには一部頻繁に繰り返される特性のフィールドが存在し、行指向なフォーマットである Avro では圧縮効率を上げられない最適化の余地がありました。また将来 GCS からデータを読み直すことがあった際により低コストでクエリ可能になると良いと考えました。 これらを考慮して、カラムナフォーマットである Apache Parquet に変換し直すことにしました。

次にファイルの粒度です。ストリーミングジョブから GCS にデータを書き込むワークロードが存在しており、レイテンシの増加と処理の詰まりを回避するために頻繁にフラッシュして細かい粒度でファイルを書き込んでいたのですが、読み込み時にはこれがネックになりますし圧縮効率も下がる懸念があります。今回検討した、圧縮し直すようなバッチ処理ではこれらの制約を取り払ってファイルサイズの粒度を考え直すことができます。 これを加味して今回、長期に保持するデータはより荒い粒度に揃えることにしました。

加えて圧縮コーデックを Snappy から gzip に変更することにしました。今回は既存のファイルを圧縮し直すので書き込み時の圧縮コスト低減よりファイルサイズの削減を狙いたかったたです。

これらを実現するために、今回は Apache Spark のジョブを PySpark を利用して実装して、 Cloud Dataproc のクラスタ上で実行することにしました。これらの技術スタックは実行環境を用意するのが簡単で、かつ既に Avro ファイルを持っており Parquert で出力したい際に実装コストもそれほどかからないためです。 実際に実装した PySpark のジョブは以下のように簡素なものとなりました。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
from typing import Dict, List

from pyspark.sql import SparkSession

APP_NAME = 'distconv'

def run_job(
        in_prefixes: List[str], out_prefix: str, in_format: str, out_format: str,
        in_opts: Dict[str, str], partitions: int, drop_columns: List[str]):
    spark = SparkSession.builder.appName(APP_NAME).getOrCreate()

    df = spark.read.format(in_format).options(**in_opts).load(*in_prefixes)
    out = df.repartition(partitions).drop(*drop_columns)
    out.write.format(out_format).save(out_prefix)

def main():
    parser = argparse.ArgumentParser()

    parser.add_argument('-i', '--in_prefix', action='append', help='input path prefix(es)')
    parser.add_argument('-o', '--out_prefix', help='output path prefix')
    parser.add_argument('--in_format', help='input format', default='avro')
    parser.add_argument('--out_format', help='output format', default='parquet')
    parser.add_argument('--in_basepath', help='input basePath option', default=None)
    parser.add_argument('--partitions', help='number of partitions', type=int, default=100)
    parser.add_argument(
        '-d', '--drop_columns', action='append', help='dropped column(s)', default=[])

    args = parser.parse_args()

    in_opts = {}
    if args.in_basepath:
        in_opts = {'basePath': args.in_basepath}

    run_job(
        in_prefixes=args.in_prefix,
        out_prefix=args.out_prefix,
        in_format=args.in_format,
        out_format=args.out_format,
        in_opts=in_opts,
        partitions=args.partitions,
        drop_columns=args.drop_columns,
    )

#
# e.g.
# $ gcloud --project merpay-dataplatform \
#     dataproc jobs submit pyspark path/to/distconv_spark.py \
#     --cluster cluster-name \
#     --region asia-northeast1 \
#     --properties spark.jars.packages='org.apache.spark:spark-avro_2.12:3.0.0'\
#     ,spark.sql.parquet.compression.codec='gzip' \
#     -- --in_prefix gs://bucket/path/to/ --out_prefix gs://bucket/path/to/ -d hour -d minute
#
if __name__ == "__main__":
    main()

ちなみにこれらの方針を検討するにあたって、 AWS の (GCP のものではなく!) 以下のドキュメントを参考にさせていただきました。サービスは違えどこれらは S3 に蓄積されたデータを S3DistCp もしくは PySpark で移動させる話題であり参考になりました。

ストレージクラスを見直す

前述の通り長期保存するデータを全てマルチリージョンのような高可用性なストレージクラスで保持するのはコストになります。 他の選択肢として、以下の公式ドキュメントの通りいくつかのクラスがあります。可用性とストレージ、読み出しのコストがバランスする選択肢を柔軟に選べるようになっています。

https://cloud.google.com/storage/docs/storage-classes?hl=ja#available_storage_classes

今回はここから Coldline Storage を選択することにしました。直近 90 日以内に参照する予定もなくコスト削減とのバランスも良いと考えたためです。より強気に Archive Storage を選ぶのも考えましたが、最小保存期間がいきなり 365 日に伸びるため今回は見送りました。

コスト削減効果はどうだったのか

成功事例

今月末に PySpark のジョブによる約 2 年分のログのアーカイブ処理とストレージクラス見直しを行うことで、 GCS の課金を一日あたり 1/3 ほどまでカットすることができました! 実のところデータレイクとして利用している GCS バケットは幾つかあり主要なバケットにしか今回の作業を反映していないのですが、これは良い結果と言えると思います。

失敗事例いくつか

PySpark のジョブを動かす上で幾つか失敗をやらかしたので合わせてご紹介します。

今回ストレージクラスを変更する都合、構成をシンプルにするため デフォルトのストレージクラスColdline Storage に設定していたのですが、これは避けるべきでした。 Spark が作成した、短命の一時ファイルも Coldline Storage として保存されてしまい、早期削除した場合でも 90 日間の保存分の課金が発生するのでアーカイブ作業のコストが増加してしまいます。 これを回避するため、 オブジェクトのライフサイクル を利用して保存されてから数日経過したらストレージクラスを変更するようにしました。

また Dataproc のクラスタの構成方法にも苦労しました。作業の最初期はオートスケールをほぼデフォルト値で設定し、複数の Spark ジョブを同時に投入していたのですが、インスタンス数縮小に伴いジョブ内で再計算が行われたりメモリ不足を起こしたりといった問題が見られました。 最終的に、特に大量のデータを扱う場合は 1 クラスタ 1 ジョブでオートスケールをせず手動で心温まるインスタンス数チューニングをすることで回避をしました。 ともかく Spark と Dataproc の運用に関してはもう少し事前調査をすべきだったと反省しております。

おわりに

というわけで、デカ過ぎて固定資産税がかかりそうな(?) GCS 上の古いデータをアーカイブしてコストを減らす記事でした。コスト削減の効果は既存のワークロードに依存しますが、今回紹介させて頂いたようなストリーミングでログを細かい粒度で保存しているような場合には、再圧縮する作業は効果が出やすいかと考えます。 余談ですが、インフラコストの低減は現場のエンジニアとしても意識すべきであり、特に今後のコロナウイルス蔓延に伴い不透明な先行きの中で各メンバーができる努力のひとつかと考えています。この記事にもしなにか参考になる点があれば幸いです。