Chatwork Creator's Note

ビジネスチャット「Chatwork」のエンジニアとデザイナーのブログです。

ビジネスチャット「Chatwork」のエンジニアとデザイナーのブログです。

読者になる

Biryani プロジェクト(メッセージ検索機能のCloudSearchからElasticsearchへのリプレイス)について vol.3 - データマイグレーション編 -

こんにちは、安達(cw-adachi)です。

前回までの記事では、vol.1でBiryani(ビリヤニ)プロジェクトが発足した背景や概要を、vol.2ではシステム構成、負荷試験や運用周りを紹介してきました。今回はこれまでサラッとしか触れてこなかったHBaseからElasticsearchへのデータマイグレーションについて紹介します。大量(2020年7月時点で60億以上)のメッセージデータを確実に、なおかつ出来るだけ速くマイグレーションするための戦略や細かな工夫をお伝えできればと思います。

Biryani プロジェクト(メッセージ検索機能のCloudsearchからElastcsearchへのリプレイス)について vol.1 - Chatwork Creator's Note

Biryani プロジェクト(メッセージ検索機能のCloudSearchからElasticsearchへのリプレイス)について vol.2 - Chatwork Creator's Note

TL;DR

  • Apache Sparkを使って初期データ投入とデータの過不足の検証を行った
  • 初期データ投入
    • 設定の異なる2つのindexを駆使してマイグレーション時間の短縮を狙ったがほんとど縮まらなかった
    • refresh_intervalに長すぎず短すぎない値をセットしたりBulk APIのバッチサイズを調整して、安定した速度でデータ投入できた
  • データの過不足の検証
    • 検証用ツールは一般的なApache Sparkのチューニングテクニックだけで十分な速度が出た

目次

豆知識

本題に入る前に、同僚のVikas(インド出身)から料理のビリヤニに関する慣用句をもらったので紹介しておきます。ビリヤニはとても複雑な作り方なので、インドでは"複雑なものを作るな"という意味で「ビリヤニを作るな」と言ったりするそうです。プロジェクト名を決めるときに教えて欲しかったなと思いました。Vikasの言葉を引用しておきます。(原文パパ)

Biryani is a very complex delicacy to cook there are so many ingredients and so many things you have to take care to make a perfect biryani, as well as it takes a lot of time and efforts so in some situation, people in India use some phrases say to measure the complexity of the situation " Please don't make biryani, simplify it "

データマイグレーションの概要

BiryaniプロジェクトではHBaseから全メッセージデータを吸い出してElasticsearch上にインデックスすることをデータマイグレーションと呼んでいました。このデータマイグレーションは大きく分けて3つのステップから構成されています。

  1. 初期マイグレーション
    • ある時点のスナップショットから起動したHBaseからメッセージデータをごそっと取り出して、どかっとElasticsearchに流し込む
  2. データ検証
    • HBaseとElasticsearch間でデータの過不足がないことをずばっと検証
  3. 差分マイグレーション
    • 初期マイグレーション以降に送信されたチャットのメッセージをががっとインデックス

差分マイグレーションはvol.1で書かれているKafkaのConsumerが担っています。このconsumerだけでも1つの記事が書けてしまうくらいの工夫が詰まっているので、今回は上記のステップの中でも1の初期マイグレーションと2の検証の部分についてのみ紹介したいと思います。

要件

Biryaniのデータマイグレーションの要件としては次のようなものがありました。2つ目の要件についてはKafkaの設定を変えて延ばすことも出来るので、メッセージデータをすべて確実にElasticsearchにインデックスすることが必須の要件だったと言えます。

  • 過不足なくメッセージデータをElasticsearchにインデックスする
  • 1週間以内にデータ検証まで完了させる必要がある
    • Kafkaにメッセージが残っているのは約1週間

初期データマイグレーション

まずは最初のステップである初期マイグレーションの詳細について説明していきます。初期マイグレーションはさらに4つの細かなステップに分けられます。

  1. HBaseから全メッセージデータ取得
  2. 言語判定などの前処理を実施
  3. Elasticsearchに初期indexを作成
  4. Elasticsearchに検索用indexを作成

図に描くとこんな感じです。

f:id:cw-adachi:20200721234720j:plain
初期データマイグレーションの流れ

1から3までのステップは、Apache Spark使ってマイグレーション用のアプリケーションを実装しました。(以下「migrator」と呼びます) ステップ3までは特に引っかかるものもないと思いますが、ステップ3でindexを作っているのに「どうしてステップ4でもう一回index作るの?」と思われた方は少なくなくないのではないでしょうか。

初期indexと検索用indexをどうして分けちゃったの?

indexを分けた理由は2つあります。というか、最初は1つ目の理由しかなかったのですが、プロジェクトを進めていく上で2つ目の隠れた(?)動機も出てきました。

  • 初期投入の時間を短縮するため
  • 同じメッセージを(rolloverされた)複数のindexに重複してインデックスしないように

ここからそれぞれを掘ってみたいと思います。

ちなみにこの戦略自体はvol.1で紹介したこちらの動画をインスパイアされたもので、Shrink APIを使って見事に時間短縮されている事例です。

www.youtube.com

Shrink APIはprimary shard数を変更するAPIなのですが、細かな制約などもあります。その詳細や挙動はAPIドキュメントに詳しく書かれていますので、気になる方はこちらを参照ください。 Shrink index API | Elasticsearch Reference [7.8] | Elastic

その1. 初期投入の時間を短縮するため (失敗)

検索用indexは3 shards/indexで設計しているため、ElasticsearchのデータノードのCPUを同時に3コアしか使えず、初期投入に時間がかかってしまいます。そこでまずはデータノードを大量に並べ、データノードの総CPUコア数と同じだけのshard数でindexを1つ用意し、migratorでメッセージを放り込んで初期indexを作成し、この初期indexをベースにShrink API or Reindex APIで検索用indexを作ればデータ投入の時間を短縮できるのでは?と考えたからです。

クラスターサイズの変更

初期データ投入の速度を上げるためにデータノードを増やし、データ検証が終わったらデータノードを減らす計画でした。しかし想定外のパフォーマンス問題に備えてリソースに余裕を持たせておきたかったので、CloudSearchからElasticsearchに完全に切り替えるまでデータノードは減らしませんでした。

初期index 検索用index
データノード 12台(i3.2xlarge) 6台(i3.2xlarge)
ロールオーバー なし あり
primary shardの数 96 3
Shrink API or Reindex API

vol.1で紹介したように1 indexのサイズを均一にするためにrolloverさせる必要があったため、今回はShrink APIではなくReindex APIを使うことになりました。Reindexはソースindexのドキュメントを新しいindexに再インデックスさせますが、Shrink APIはメッセージをインデックスし直すわけではなく、既存のsegmentsをハードリンク(もしくはコピー)して新しいindexを作るAPIなので、新しいindexを作りながらrolloverさせることは出来ません。

Reindex APIはShrink APIと違ってインデックスし直すので実は大して短縮できないんじゃない?むしろ遅くなる可能性も?という懸念はありましたが、実際その通りでトータルの所要時間は直接検索用のindexを作るパターンとほぼ変わらず、この目論見は失敗に終わりました。(ズコーッ)

その2. 同じメッセージを(rolloverされた)複数のindexに重複してインデックスしないように

migratorを使って検索indexを直接用意していた頃にverificatorを実行してみると、HBaseとElasticsearch間でdocument数が異なる自体が発覚しました!(がーん)

詳しく調べてみたところ、どうやら同じメッセージが重複してインデックスされている模様。そしてその原因は、SparkのTask(Jobの実行単位)が失敗したとき、そのTaskは他のTaskの1回目の実行が終わってから再実行されるため、再実行される前にロールオーバーが発生すると新しいindexにインデックスされてしまう、というオチでした。

ざっくり図で描くとこんな感じです。

f:id:cw-adachi:20200722015114j:plain
メッセージの重複が発生する流れ

Sparkのtask内でも書き込みのリトライを試みていますが、リトライ回数を増やし過ぎたりリトライ間隔を開け過ぎるとその間にrolloverが発生してしまう可能性があります。なのでElasticsearch側の気持ちをあまり汲み取ってくれないmigratorを動かしながらrolloverさせるより、requests_per_secondの設定やslice数でthrottleを効かせれるReindex APIを使っている間にrolloverさせた方が良さそう!という結論に達しました。この方式に切り替えてから重複しなくなりました。

Reindex API | Elasticsearch Reference [7.8] | Elastic

この問題に関しては他にも打ち方はいくつか思いついているので、次回マイグレーションを実施するタイミングで試してみたいと思っています。

  • Bulk APIのqueueのサイズを調整する
  • queueの状態を利用してバックプレッシャー的な処理をmigratorに組み込む (もはやSpark Streamingにすべき?)

データ検証

次にマイグレーション後のデータ検証について紹介していきます。データ検証は、

  • メッセージがすべてインデックスされたか
  • 重複してインデックスされていないか

を確認することが目的です。データ検証はReindex後の検索用indexに対して行います。ここでもApache Sparkで検証用のアプリケーションを実装し、データの完全性を検証しました。(以下「verificator」と呼びます)

f:id:cw-adachi:20200728155923j:plain
データ検証の流れ

このverificatorに関しては特別な作戦があったわけではなく、真正面からデータ量と戦い勝利しました。このあとのチューニングパートの後半でデータ量と戦うための小道具を紹介します。

パフォーマンスチューニング

ここからはmigratorやverificatorのパフォーマンスチューニングを、

  • Elasticsearchの設定
  • Sparkアプリケーションの実装

の2つの切り口からお届けします。Sparkアプリケーションのチューニングに関しては、過去にメッセージデータのストレージをAuroraからHBaseにメッセージデータをマイグレーションした際もApache Sparkを採用していたので、このときに蓄積した莫大な資産やこんもりとした知見がとても役に立ちました。

初期データマイグレーション

まずは初期データマイグレーションのパフォーマンスを上げるために行ったElasticsearchの設定やSparkの実装方法をつらつら書いていきます。

refresh_intervalの調整

初期データ投入に関するプラクティスとして、「refresh_interval-1にすると良いよ!」という情報を見聞きし、何も考えずに-1を設定して実行していました。しかしbufferが溢れるまでデータを溜め込んでからsegmentに書き出すと一度のrefreshのインパクトが大きくなりCPU使用率が跳ね、indexing rateが瞬間的にガクッと落ちることがありました。

f:id:cw-adachi:20200722165512p:plain
CPU使用率が跳ねる様子

速度が安定しないとSparkがtask単位で失敗 → リトライすることになり、結果としてマイグレーション全体の所要時間が伸びてしまいます。refresh_intervalが短すぎてもindexing rateが上がらないのでトライ&エラーを繰り返す必要があると思いますが、私達のケースでは60秒(1m)にすることでindexing rateをそこそこ高い値で安定させることが出来ました。(CPU使用率のスパイクには定期性があったため、Luceneのflushの可能性もあるかな?と思ったのですが、refresh_intervalを変更するだけで安定したのでrefresh起因かな?と考えております)

Elasticsearchのrefresh, Luceneのflushが内部的にどんな処理を行っているかをざっくり把握しておくとパフォーマンスチューニングでとても役立つので、参考までに分かりやすく説明されている記事をいくつか共有します。

Guide to Refresh and Flush Operations in Elasticsearch

What exactly does -1 refresh_interval in Elasticsearch mean? - Stack Overflow

初期マイグレーション中はnumber_of_replicasを0に

これはよく言われる話ですが、初期マイグレーション中はreplicaにインデックスしないようにするとindexing rateが上がります。primary shardの数(number_of_shards)はindex作成後に変更できませんが、replicaの数(number_of_replicas)は後から変更可能です。

余談ですが、たまたま初期マイグレーション中にデータノードの入れ替えが発生し、クラスターステータスがredになったことがあります。一度経験しておくと本番のclusterがredになっても慌てずに対処できるかもしれませんし、できないかもしれません。

Bulk APIで一度に書き込む量を調整

migratorでElasticsearchにメッセージデータを流し込むときも、verificatorでElasticsearchからデータを吸い出すときもElasticsearch for Apache Hadoopを使用しました。このライブラリは書き込みのバッチサイズやリトライ回数から、ingestのpipelineなどの指定まで対応していてとても使い勝手が良いです。 www.elastic.co

デフォルトではバッチ書き込みのサイズが 1MB となっていますが、私達は5MB まで増やしてパフォーマンスを向上させました。これはindexのmappingやらデータの特性次第で最適値が変わってくると思いますし、私達はここを触る前にもう十分なパフォーマンスが出ていたのでこの値の調整にあまり時間をかけませんでした。

ちなみにAmazon Elasticsearch Serviceの場合はElasticsearchの前段にELBがある(かもしれない)せいか、どれだけ大きなインスタンスタイプを指定してもHTTP Payloadの上限は100MBとなっています。

Amazon Elasticsearch Service の制限 - Amazon Elasticsearch Service

データ検証

次にverificatorに対して行ったチューニングを紹介していきます。こちらはデータ量を削減する観点でのチューニングばかりです。

Elasticsearchから吸い出すデータ量の削減

データの過不足を検証する時には使わないfieldもElasticsearch上には存在するため、読み取るfieldを減らすことでmigratorのメモリの消費量を抑えたり、Sparkのshuffle時にネットワークを流れるデータ量を抑えました。

Configuration | Elasticsearch for Apache Hadoop [7.1] | Elastic

(Spark) カスタムPartitionerを用いてshuffleを抑制する (失敗)

さらにSparkのshuffleのコストを下げるために、カスタムPartitionerを用いてpartitioningすることも検討しました。というのも、HBaseから取得したデータとElasticsearchから取得したデータを同じPartitionerでpartitioningしておけば、joinするときにshuffleの回数が減ってパフォーマンスがあがるかも?と思ったからです。

試しにカスタムPartitionerを利用した版を用意して動かし、カスタムPartitionerを利用しない版とDAGを比較してみました。下のような差が出てきたので、「これはshuffleの抑制に成功したかな?」と盛り上がりました。

f:id:cw-adachi:20200721200219p:plain
カスタムPartitionerなし

f:id:cw-adachi:20200721200257p:plain
カスタムPartitionerあり

しかし結局は次の2つの理由からカスタムPartitionerの採用を見送りました。

  • Elasticsearch for Apache HadoopにはカスタムPartitionerを組み込めなかったため、JavaのClientを使う必要があり、ライブラリを切り替えることによるパフォーマンス劣化を懸念
  • 小さなデータセットでは問題なかったが、本番相当のデータ量ではpartitionの初期化に時間がかかるようになってしまった

後者はpartitionerの実装を改良すれば改善しそうな気がしましたがSpark力が足りず、また元々このチューニングを行わなくても現実的な時間で処理できていたので撤退してしまいました。

(Spark) SerializerをJava標準のシリアライズからKryoに切り替える

これはとても一般的なプラクティスですが、上述のように大規模なshuffleを真正面から立ち向かう場合は特に有効なチューニング方法です。Java標準のシリアライズと比較して高速にシリアライズでき、圧縮率も高いです。 Tuning - Spark 2.4.0 Documentation

参考情報

初期データマイグレーションのパフォーマンスチューニングのときに役立った(役立ちそうな)リンクをいくつか共有したいと思います。

Elasticsearch側のチューニングはこちらのドキュメントが大変参考になりました。 www.elastic.co

AWSさんの方でも一般的なチューニングの知識を共有されていて、こちらもとても参考になると思います。 aws.amazon.com

まとめ

今回は初期マイグレーションの概要からパフォーマンスチューニングについて紹介しました。パフォーマンスチューニングに関しては打ち手のBefore/Afterを記録していなかったものが大半で、比較結果を紹介できないことが残念ですが、本番では約65時間で初期マイグレーションとデータ検証の工程が完了し、余裕を持った状態で差分マイグレーションをスタートすることが出来ました。まだ差分マイグレーションで使用したKafka Consumer (Scalaで実装したindexer)については紹介できていませんが、これはこれで技術的おもしろトピック満載なので別の機会に書く or お話させていただきたいです。

では最後は大阪の都島にあるインド料理レストラン「ナンタラ」のビリヤニで締めたいと思います。

f:id:cw-adachi:20200722131558j:plain
ナンタラのビリヤニ、おいしいよ

nantala.com