Cloud Run上で動く商品画像一括アップロード機能を作った話

はじめに

こんにちは! ソウゾウのSoftware engineerの@gotomoです。「メルカリShops [フライング] アドベントカレンダー2022」の16日目を担当します。本記事ではメルカリShopsの商品画像一括アップロード機能を作ったときの話をします。

商品画像一括アップロード機能とは?

商品画像一括アップロード機能とは、CSV商品一括登録のために作られた機能で、複数の商品画像を一括でアップロードすることができます。CSV商品一括登録機能ではCSVファイルを用いて商品を複数個一括で登録することができます。その際、CSVファイルと商品画像は別々にアップロードする仕様になっており、事前にアップロードした商品画像の名前をCSVファイル中で指定するようになっています。CSV商品一括登録機能に関する詳しい説明はこちらの機能紹介ページにあります。

CSVファイルよりも画像の方がファイルサイズが大きく、アップロードに時間がかかるので、アップロードを分けることによってCSVファイルに不備があった際の再度アップロード時間を短縮しています。また、過去にアップロードした商品画像を他の商品でも使い回すことができるようになっています。

現在の仕様として、一度に1000枚かつ合計1GB以下の画像をアップロードすることができます。また、1枚あたりの上限は8MBになっています。

現在の構成

商品画像一括アップロード機能は主に次の要素で構成されています。

  • Google Cloud Storage(GCS)
  • Cloud Pub/Sub(Pub/Sub)
  • Cloud Tasks
  • frontend
    • メルカリShopsのフロントエンド
  • BFF
    • メルカリShopsのGraphQLサーバー
  • assetadapter
    • メルカリShopsのマイクロサービス
    • 画像などの管理を司るassetサービスへの中継地点として動くサービス
    • ZIPファイルの解凍処理を行い、画像をassetサービスとproductサービスへ登録する
    • Cloud Run上で動作
    • Cloud Tasksからも呼び出される
  • asset
    • メルカリShopsのマイクロサービス
    • 商品の画像などの管理を司るサービス
  • product
    • メルカリShopsのマイクロサービス
    • 商品の情報を司るサービス

これらの構成図を下に載せます。

本機能では出店者(Seller)の画像アップロードを起点に処理が動きます。基本的な処理の流れとしては以下のようになっています。

  1. Sellerが画像をアップロード
  2. frontendで画像をZIPファイルにまとめる
  3. Signed URL経由でGCSへとZIPファイルをアップロードする
  4. assetadapterがGCSからアップロード完了通知を受け取り、GCSからZIPファイルをダウンロード
  5. ZIPファイルを解凍し、商品画像情報を登録する

これをシーケンス図で詳細に表すと次のようになります。

  1. 出店者はまず画像アップロード画面に行き、登録したい画像を選択します(シーケンス図1)
  2. フロントエンドで選択された画像をZIPファイルにまとめます(シーケンス図2)
  3. フロントエンドでGCSにアップロードするための署名付きURL(Signed URL)を取得します(シーケンス図3, 4, 5, 6, 7, 8)
  4. 取得したSigned URLを用いて、作成したZIPファイルのアップロードを行います(シーケンス図9)
  5. ZIPファイルのアップロードが終わったタイミングでGCSからPub/SubにFINALIZEイベントを発行します(シーケンス図10)
  6. イベントをassetadapterで受信したらイベント情報と共にCloud Tasksにasset importのタスクを積み、実行します(シーケンス図11, 12, 13)
  7. イベント情報からアップロードしたZIPファイルの情報を取得し、GCSからのZIPファイルのダウンロードを行います(シーケンス図13, 14, 15)
  8. ダウンロードしたZIPファイルを解凍し、各ファイルをassetサービスとproductサービスへと登録します(シーケンス図16, 17, 18)

どうして今の構成になったのか?

今の構成にするにあたって以下の2通りの案を考えました。

  1. 画像を一つ一つアップロードする
  2. ZIPファイルなどにまとめてアップロードする(最終的にこちらを採択)

1番目の案の画像を一つずつアップロードする手法は単純で分かりやすく、1回あたりに処理するファイルサイズも少なくて済みます。しかし、一括商品画像アップロード機能では大量の画像を同時にアップロードすることが予想されます。その際、途中でブラウザを閉じてしまったり、クラッシュしてしまったときにどこまでアップロードが完了していたかが分かりにくくなり、再度全ての画像のアップロードを行う必要がでてきます。また、リクエストを大量に送ることになるので、1回あたりのリクエストのオーバーヘッドも気になります。

2番目の案では最終的にアップロードするのは1つのZIPファイルになるため、結果がその1ファイルのアップロード成功/失敗のみになり、どこまでアップロード完了したかが分かりやすくなります。また、アップロードリクエストも1つになるため、オーバーヘッドもほとんどありません。画像を1つのZIPファイルにまとめるため、1回でアップロードするファイルサイズは増えてしまいますが、全ての画像ファイルをアップロードし終えるまでの時間は1つずつ個別にアップロードするよりも早くなります。

今回は大量に画像をアップロードする出店者のことを考え、アップロードの成否が分かりやすく、かつアップロードを終えるのが早くなる「ZIPファイルにまとめてアップロードする」方式を採用しました。

考慮しないといけない点と対処法

ZIPファイルにまとめてアップロードする方式を採用しましたが、その際にいくつか考慮しなければならない点がでてきます。ここではそれらについて、対処法とともに紹介します。なお、本機能はGo言語で実装しています。

巨大なファイルのアップロード

メルカリShopsではgRPCで通信していますが、1GBに届くような巨大なデータをやり取りすることは想定されていません。Protocol Buffersのドキュメントでもメガバイトを越えるようなデータは他のやり方を用いることが推奨されています。
そこで、ZIPファイルをアップロードする際にGCSのSigned URLを経由することでGCSへ直接アップロードするようにしました。また、アップロードが完了したことをGCSからPub/Subへと通知できるので、それをトリガーとして後続の処理を起動することができます。

処理負荷

最大1000枚の画像が1度に送られるため、ZIPファイルを解凍して画像を各種サービスに登録するところでは大量のリクエストが送られることになります。いつどのタイミングで大量のリクエストが来るかを予測することは難しいです。そこで、最大流量をコントロールするためにCloud Tasksを用いています。Cloud Tasksでは最大同時実行数を設定することができます。なお、Cloud Tasksは後述するリトライ目的でも採用しています。GCS経由でPub/Subから通知を受けたあとはCloud Tasksにタスクを追加して処理を実行しています。こうすることによって処理が集中しても負荷を一定に保つことができています。

メモリの使用量

ZIPファイルはGCS上に置かれています。これをCloud Run上で処理するためには次の二通りが考えられます。

  1. ZIPファイル全体をダウンロードしてから読み込む
  2. ZIPファイルを部分的にダウンロードして読み込む

なお、詳しくは後述しますが、ZIPファイルでは部分的に解凍してファイルを読み込めるようになっています。ZIPファイルのどこからどこまでをGCSからダウンロードするかを指定することで部分的にファイルを読み込むことができます。

1の「ZIPファイル全体をダウンロードしてから読み込む」場合、ダウンロードしたファイルをzip.Readerで読み込むだけなので実装は非常にシンプルになるのですが、ZIPファイルをダウンロードした際に一時的に大量のディスク容量を消費するのが問題になります。Cloud Runの場合、公式ドキュメントにもあるように、インメモリファイルシステムのため、ディスクを使用した分だけメモリが消費されるようになっています。そのため、巨大なZIPファイルをGCSからダウンロードしてくるとZIPファイルを解凍しなくてもその時点で大量のメモリが消費されることになります。Cloud Runのスペックを上げることで対処はできますが、最大メモリ消費量を想定すると少量のリクエストしか1つのインスタンスで扱うことができず、コスト面で割高になります。

一方で2の「ZIPファイルを部分的にダウンロードして読み込む」場合、ダウンロードするサイズも小さくなるため、メモリの使用量が抑えられます。Cloud Runのインメモリファイルシステムと相性が良いため、本機能ではこちらを採用しました。しかしながら、いくつか実装面で考慮する点がでてきます。

1と同様に、ZIPファイルを読み込む際にはzip.Readerを用いることができますが、zip.Readerはio.ReaderAtを要求しており、GCSを操作するstorageパッケージにはio.ReaderAtを実装しているものはありません。幸いなことにランダムリードを行うためのfuncとしてNewRangeReaderは用意されているので、それをio.ReaderAtのインタフェースに沿うようにラップすればzip.ReaderのReaderとして用いることができます(参考記事)。こうすることでGCSから部分的にファイルをダウンロードを行えるようになります。

本機能では以下のようにobjectReaderAt structReadAtを実装し、それを用いる形でzip.ReaderをNewしました。

type objectReaderAt struct {
    ctx context.Context
    obj *storage.ObjectHandle
}

func newObjectReaderAt(ctx context.Context, obj *storage.ObjectHandle) *objectReaderAt {
    return &objectReaderAt{
        ctx: ctx,
        obj: obj,
    }
}

func (r *objectReaderAt) ReadAt(b []byte, off int64) (int, error) {
    rc, err := r.obj.NewRangeReader(r.ctx, off, int64(len(b)))
    if err != nil {
        return 0, err
    }
    defer rc.Close()

    ret, err := io.ReadAll(rc)
    if err != nil {
        return 0, err
    }

    cnt := copy(b, ret)
    return cnt, nil
}
cli, err := storage.NewClient(context.Background())
if err != nil {
    return err
}

obj := cli.Bucket(“bucket_name”).Object(“zip_file_name.zip”)
or := newObjectReaderAt(ctx, obj)
attrs, err := obj.Attrs(ctx)
if err != nil {
    return err
}

r, err := zip.NewReader(or, attrs.Size)
If err != nil {
    return err
}

r.RegisterDecompressor(zip.Store, newBufferedNopCloser(1024 * 1024))
r.RegisterDecompressor(zip.Deflate, newBufferedFlateReader(1024 * 1024))

for _, f := range.r.File {
    // 各ファイル毎の処理
}

先程の記事でも触れられていますが、非常に小さい単位で何度もデータを読み込んでいるため、それがネットワーク経由で行われた結果、GCSからzip.Readerでデータを読み取るときにタイムアウトを起こします。そこで、bufio.Readerを用いることでGCSからのレスポンスをバッファリングするようにします。zip.Readerがデータを読み込むときはまずバッファに必要なサイズのデータがあるかを確認します。無ければGCPへとリクエストを飛ばし、バッファに貯めます。その後、必要なサイズのデータが溜まるとバッファからデータを読み込みます。バッファから読む際はネットワークアクセスがないため、高速に読み込むことができます。

io.ReaderAtのバッファリング機構を本機能では次のように実装し、RegisterDecompressorして使っています。渡されたio.Readerbufio.Readerでラップすることでバッファリングを実現しています。この実装では各ファイルのデータ読み込み時にバッファリングが行われます。なお、このプログラムはこちらのコードをベースにバッファリング機能を追加したものになっています。
Cloud Storageのパフォーマンスを最適化する」によると、1MB(1024 * 1024)程度のリクエストの時に最も効率が良くなるとあるので、その値をバッファサイズとして設定しています。

var flateReaderPool sync.Pool

// buffered reader for flate reader.
// https://github.com/golang/go/blob/master/src/archive/zip/register.go#L66-L102
func newBufferedFlateReader(size int) zip.Decompressor {
    return func(r io.Reader) io.ReadCloser {
        fr, ok := flateReaderPool.Get().(io.ReadCloser)
        if ok {
            //nolint:errcheck
            fr.(flate.Resetter).Reset(bufio.NewReaderSize(r, size), nil)
        } else {
            fr = flate.NewReader(bufio.NewReaderSize(r, size))
        }
        return &pooledFlateReader{fr: fr}
    }
}

type pooledFlateReader struct {
    mu sync.Mutex // guards Close and Read
    fr io.ReadCloser
}

func (r *pooledFlateReader) Read(p []byte) (n int, err error) {
    r.mu.Lock()
    defer r.mu.Unlock()
    if r.fr == nil {
        return 0, errors.New("Read after Close")
    }
    return r.fr.Read(p)
}

func (r *pooledFlateReader) Close() error {
    r.mu.Lock()
    defer r.mu.Unlock()
    var err error
    if r.fr != nil {
        err = r.fr.Close()
        flateReaderPool.Put(r.fr)
        r.fr = nil
    }
    return err
}

// buffered reader for nop closer.
// https://github.com/golang/go/blob/master/src/io/io.go#L623-L648
func newBufferedNopCloser(size int) zip.Decompressor {
    return func(r io.Reader) io.ReadCloser {
        if _, ok := r.(io.WriterTo); ok {
            return nopCloserWriterTo{bufio.NewReaderSize(r, size)}
        }
        return nopCloser{bufio.NewReaderSize(r, size)}
    }
}

type nopCloser struct {
    io.Reader
}

func (nopCloser) Close() error { return nil }

type nopCloserWriterTo struct {
    io.Reader
}

func (nopCloserWriterTo) Close() error { return nil }

func (c nopCloserWriterTo) WriteTo(w io.Writer) (n int64, err error) {
    return c.Reader.(io.WriterTo).WriteTo(w)
}

また、先程も少し触れましたが、ここで部分的にファイルをダウンロードすることができたのはZIPファイルの特性も関係しています。

ZIPファイルには最後尾にセントラルディレクトリというものが存在しており、ZIPファイルに格納されている各ファイルへのオフセット情報などのメタデータが格納されています。そこを読み取ることで全てを一度に読み込むことなくランダムアクセスして部分的にファイルを解凍していくことができます(参考:Wikipedia)。

このような特性がない場合、部分的に解凍していくことができず、ファイル全体をダウンロードする必要があります。

リトライ

メルカリShopsではマイクロサービスアーキテクチャを採用しており、画像一括アップロード機能においても複数のサービス呼び出しを行っています。そのため、常にリトライのことは頭に入れておかなければなりません。リトライするたびに再びZIPを全てダウンロードし直していると処理時間が伸びてしまい、最悪のケースではZIPダウンロードだけで毎回タイムアウトし、リトライしても処理が何も進まないということがありえます。

ZIPファイルではセントラルディレクトリを読み込むことで含まれているファイル名の一覧を取得することができるので、実際にファイルを解凍する前に既に画像アップロード済みかを判定し、リトライ時の不要なファイルの再ダウンロードは行わないようにしています。画像をアップロード済みかどうかはDBに保存しています。こうすることによってリトライ時に再び全ての画像の処理を行う必要がなく、リトライを重ねる毎に処理が完了した画像が着実に増えるようになっています。

まとめ

本記事ではCloud Run上で大量の画像データをアップロードする方法について述べました。
1GBという大きめのサイズを扱う機能はメルカリShopsにとって初めてのものでしたが、本実装によって安定稼働しています。また、ZIPファイルの特性によりCloud Runのメモリにやさしく、かつリトライ可能な機能を作ることができました。

GCSには様々な機能が実装されていっており、今後本アプローチよりも効率的なやり方が出てくるかもしれません。Cloud StorageをCloud Runにマウントする方法も気になってはいますが、現時点ではGAになっていないようです。

大規模な事業者さまの出店も増えてきて大量の商品データを扱うニーズも高まってきているので引き続きGCSの動向をウォッチして対応していきたいと思っています。


株式会社ソウゾウではメンバーを大募集中です。メルカリShopsの開発やソウゾウに興味を持った方がいればぜひご応募お待ちしています。詳しくは以下のページをご覧ください。
Software Engineer
Software Engineer, Site Reliability
Software Engineer (Internship) – Mercari Group (※新卒採用に応募するにはまずインターンへの参加をお願いしています。)
またカジュアルに話だけ聞いてみたい、といった方も大歓迎です。こちら の申し込みフォームよりぜひご連絡ください!

  • X
  • Facebook
  • linkedin
  • このエントリーをはてなブックマークに追加