こんにちは、安達(cw-adachi)です。
前回までの記事では、vol.1でBiryani(ビリヤニ)プロジェクトが発足した背景や概要を、vol.2ではシステム構成、負荷試験や運用周りを紹介してきました。今回はこれまでサラッとしか触れてこなかったHBaseからElasticsearchへのデータマイグレーションについて紹介します。大量(2020年7月時点で60億以上)のメッセージデータを確実に、なおかつ出来るだけ速くマイグレーションするための戦略や細かな工夫をお伝えできればと思います。
Biryani プロジェクト(メッセージ検索機能のCloudsearchからElastcsearchへのリプレイス)について vol.1 - Chatwork Creator's Note
Biryani プロジェクト(メッセージ検索機能のCloudSearchからElasticsearchへのリプレイス)について vol.2 - Chatwork Creator's Note
TL;DR
- Apache Sparkを使って初期データ投入とデータの過不足の検証を行った
- 初期データ投入
- 設定の異なる2つのindexを駆使してマイグレーション時間の短縮を狙ったがほんとど縮まらなかった
refresh_interval
に長すぎず短すぎない値をセットしたりBulk APIのバッチサイズを調整して、安定した速度でデータ投入できた
- データの過不足の検証
- 検証用ツールは一般的なApache Sparkのチューニングテクニックだけで十分な速度が出た
目次
豆知識
本題に入る前に、同僚のVikas(インド出身)から料理のビリヤニに関する慣用句をもらったので紹介しておきます。ビリヤニはとても複雑な作り方なので、インドでは"複雑なものを作るな"という意味で「ビリヤニを作るな」と言ったりするそうです。プロジェクト名を決めるときに教えて欲しかったなと思いました。Vikasの言葉を引用しておきます。(原文パパ)
Biryani is a very complex delicacy to cook there are so many ingredients and so many things you have to take care to make a perfect biryani, as well as it takes a lot of time and efforts so in some situation, people in India use some phrases say to measure the complexity of the situation " Please don't make biryani, simplify it "
データマイグレーションの概要
BiryaniプロジェクトではHBaseから全メッセージデータを吸い出してElasticsearch上にインデックスすることをデータマイグレーションと呼んでいました。このデータマイグレーションは大きく分けて3つのステップから構成されています。
- 初期マイグレーション
- ある時点のスナップショットから起動したHBaseからメッセージデータをごそっと取り出して、どかっとElasticsearchに流し込む
- データ検証
- HBaseとElasticsearch間でデータの過不足がないことをずばっと検証
- 差分マイグレーション
- 初期マイグレーション以降に送信されたチャットのメッセージをががっとインデックス
差分マイグレーションはvol.1で書かれているKafkaのConsumerが担っています。このconsumerだけでも1つの記事が書けてしまうくらいの工夫が詰まっているので、今回は上記のステップの中でも1の初期マイグレーションと2の検証の部分についてのみ紹介したいと思います。
要件
Biryaniのデータマイグレーションの要件としては次のようなものがありました。2つ目の要件についてはKafkaの設定を変えて延ばすことも出来るので、メッセージデータをすべて確実にElasticsearchにインデックスすることが必須の要件だったと言えます。
- 過不足なくメッセージデータをElasticsearchにインデックスする
- 1週間以内にデータ検証まで完了させる必要がある
- Kafkaにメッセージが残っているのは約1週間
初期データマイグレーション
まずは最初のステップである初期マイグレーションの詳細について説明していきます。初期マイグレーションはさらに4つの細かなステップに分けられます。
- HBaseから全メッセージデータ取得
- 言語判定などの前処理を実施
- Elasticsearchに初期indexを作成
- Elasticsearchに検索用indexを作成
図に描くとこんな感じです。
1から3までのステップは、Apache Spark使ってマイグレーション用のアプリケーションを実装しました。(以下「migrator」と呼びます) ステップ3までは特に引っかかるものもないと思いますが、ステップ3でindexを作っているのに「どうしてステップ4でもう一回index作るの?」と思われた方は少なくなくないのではないでしょうか。
初期indexと検索用indexをどうして分けちゃったの?
indexを分けた理由は2つあります。というか、最初は1つ目の理由しかなかったのですが、プロジェクトを進めていく上で2つ目の隠れた(?)動機も出てきました。
- 初期投入の時間を短縮するため
- 同じメッセージを(rolloverされた)複数のindexに重複してインデックスしないように
ここからそれぞれを掘ってみたいと思います。
ちなみにこの戦略自体はvol.1で紹介したこちらの動画をインスパイアされたもので、Shrink APIを使って見事に時間短縮されている事例です。
Shrink APIはprimary shard数を変更するAPIなのですが、細かな制約などもあります。その詳細や挙動はAPIドキュメントに詳しく書かれていますので、気になる方はこちらを参照ください。 Shrink index API | Elasticsearch Reference [7.8] | Elastic
その1. 初期投入の時間を短縮するため (失敗)
検索用indexは3 shards/indexで設計しているため、ElasticsearchのデータノードのCPUを同時に3コアしか使えず、初期投入に時間がかかってしまいます。そこでまずはデータノードを大量に並べ、データノードの総CPUコア数と同じだけのshard数でindexを1つ用意し、migratorでメッセージを放り込んで初期indexを作成し、この初期indexをベースにShrink API or Reindex APIで検索用indexを作ればデータ投入の時間を短縮できるのでは?と考えたからです。
クラスターサイズの変更
初期データ投入の速度を上げるためにデータノードを増やし、データ検証が終わったらデータノードを減らす計画でした。しかし想定外のパフォーマンス問題に備えてリソースに余裕を持たせておきたかったので、CloudSearchからElasticsearchに完全に切り替えるまでデータノードは減らしませんでした。
初期index | 検索用index | |
---|---|---|
データノード | 12台(i3.2xlarge) | 6台(i3.2xlarge) |
ロールオーバー | なし | あり |
primary shardの数 | 96 | 3 |
Shrink API or Reindex API
vol.1で紹介したように1 indexのサイズを均一にするためにrolloverさせる必要があったため、今回はShrink APIではなくReindex APIを使うことになりました。Reindexはソースindexのドキュメントを新しいindexに再インデックスさせますが、Shrink APIはメッセージをインデックスし直すわけではなく、既存のsegmentsをハードリンク(もしくはコピー)して新しいindexを作るAPIなので、新しいindexを作りながらrolloverさせることは出来ません。
Reindex APIはShrink APIと違ってインデックスし直すので実は大して短縮できないんじゃない?むしろ遅くなる可能性も?という懸念はありましたが、実際その通りでトータルの所要時間は直接検索用のindexを作るパターンとほぼ変わらず、この目論見は失敗に終わりました。(ズコーッ)
その2. 同じメッセージを(rolloverされた)複数のindexに重複してインデックスしないように
migratorを使って検索indexを直接用意していた頃にverificatorを実行してみると、HBaseとElasticsearch間でdocument数が異なる自体が発覚しました!(がーん)
詳しく調べてみたところ、どうやら同じメッセージが重複してインデックスされている模様。そしてその原因は、SparkのTask(Jobの実行単位)が失敗したとき、そのTaskは他のTaskの1回目の実行が終わってから再実行されるため、再実行される前にロールオーバーが発生すると新しいindexにインデックスされてしまう、というオチでした。
ざっくり図で描くとこんな感じです。
Sparkのtask内でも書き込みのリトライを試みていますが、リトライ回数を増やし過ぎたりリトライ間隔を開け過ぎるとその間にrolloverが発生してしまう可能性があります。なのでElasticsearch側の気持ちをあまり汲み取ってくれないmigratorを動かしながらrolloverさせるより、requests_per_second
の設定やslice数でthrottleを効かせれるReindex APIを使っている間にrolloverさせた方が良さそう!という結論に達しました。この方式に切り替えてから重複しなくなりました。
Reindex API | Elasticsearch Reference [7.8] | Elastic
この問題に関しては他にも打ち方はいくつか思いついているので、次回マイグレーションを実施するタイミングで試してみたいと思っています。
- Bulk APIのqueueのサイズを調整する
- queueの状態を利用してバックプレッシャー的な処理をmigratorに組み込む (もはやSpark Streamingにすべき?)
データ検証
次にマイグレーション後のデータ検証について紹介していきます。データ検証は、
- メッセージがすべてインデックスされたか
- 重複してインデックスされていないか
を確認することが目的です。データ検証はReindex後の検索用indexに対して行います。ここでもApache Sparkで検証用のアプリケーションを実装し、データの完全性を検証しました。(以下「verificator」と呼びます)
このverificatorに関しては特別な作戦があったわけではなく、真正面からデータ量と戦い勝利しました。このあとのチューニングパートの後半でデータ量と戦うための小道具を紹介します。
パフォーマンスチューニング
ここからはmigratorやverificatorのパフォーマンスチューニングを、
- Elasticsearchの設定
- Sparkアプリケーションの実装
の2つの切り口からお届けします。Sparkアプリケーションのチューニングに関しては、過去にメッセージデータのストレージをAuroraからHBaseにメッセージデータをマイグレーションした際もApache Sparkを採用していたので、このときに蓄積した莫大な資産やこんもりとした知見がとても役に立ちました。
初期データマイグレーション
まずは初期データマイグレーションのパフォーマンスを上げるために行ったElasticsearchの設定やSparkの実装方法をつらつら書いていきます。
refresh_intervalの調整
初期データ投入に関するプラクティスとして、「refresh_interval
を-1
にすると良いよ!」という情報を見聞きし、何も考えずに-1
を設定して実行していました。しかしbufferが溢れるまでデータを溜め込んでからsegmentに書き出すと一度のrefreshのインパクトが大きくなりCPU使用率が跳ね、indexing rateが瞬間的にガクッと落ちることがありました。
速度が安定しないとSparkがtask単位で失敗 → リトライすることになり、結果としてマイグレーション全体の所要時間が伸びてしまいます。refresh_interval
が短すぎてもindexing rateが上がらないのでトライ&エラーを繰り返す必要があると思いますが、私達のケースでは60秒(1m
)にすることでindexing rateをそこそこ高い値で安定させることが出来ました。(CPU使用率のスパイクには定期性があったため、Luceneのflushの可能性もあるかな?と思ったのですが、refresh_intervalを変更するだけで安定したのでrefresh起因かな?と考えております)
Elasticsearchのrefresh, Luceneのflushが内部的にどんな処理を行っているかをざっくり把握しておくとパフォーマンスチューニングでとても役立つので、参考までに分かりやすく説明されている記事をいくつか共有します。
Guide to Refresh and Flush Operations in Elasticsearch
What exactly does -1 refresh_interval in Elasticsearch mean? - Stack Overflow
初期マイグレーション中はnumber_of_replicasを0に
これはよく言われる話ですが、初期マイグレーション中はreplicaにインデックスしないようにするとindexing rateが上がります。primary shardの数(number_of_shards
)はindex作成後に変更できませんが、replicaの数(number_of_replicas
)は後から変更可能です。
余談ですが、たまたま初期マイグレーション中にデータノードの入れ替えが発生し、クラスターステータスがred
になったことがあります。一度経験しておくと本番のclusterがredになっても慌てずに対処できるかもしれませんし、できないかもしれません。
Bulk APIで一度に書き込む量を調整
migratorでElasticsearchにメッセージデータを流し込むときも、verificatorでElasticsearchからデータを吸い出すときもElasticsearch for Apache Hadoop
を使用しました。このライブラリは書き込みのバッチサイズやリトライ回数から、ingestのpipelineなどの指定まで対応していてとても使い勝手が良いです。
www.elastic.co
デフォルトではバッチ書き込みのサイズが 1MB となっていますが、私達は5MB まで増やしてパフォーマンスを向上させました。これはindexのmappingやらデータの特性次第で最適値が変わってくると思いますし、私達はここを触る前にもう十分なパフォーマンスが出ていたのでこの値の調整にあまり時間をかけませんでした。
ちなみにAmazon Elasticsearch Serviceの場合はElasticsearchの前段にELBがある(かもしれない)せいか、どれだけ大きなインスタンスタイプを指定してもHTTP Payloadの上限は100MBとなっています。
Amazon Elasticsearch Service の制限 - Amazon Elasticsearch Service
データ検証
次にverificatorに対して行ったチューニングを紹介していきます。こちらはデータ量を削減する観点でのチューニングばかりです。
Elasticsearchから吸い出すデータ量の削減
データの過不足を検証する時には使わないfieldもElasticsearch上には存在するため、読み取るfieldを減らすことでmigratorのメモリの消費量を抑えたり、Sparkのshuffle時にネットワークを流れるデータ量を抑えました。
Configuration | Elasticsearch for Apache Hadoop [7.1] | Elastic
(Spark) カスタムPartitionerを用いてshuffleを抑制する (失敗)
さらにSparkのshuffleのコストを下げるために、カスタムPartitionerを用いてpartitioningすることも検討しました。というのも、HBaseから取得したデータとElasticsearchから取得したデータを同じPartitionerでpartitioningしておけば、joinするときにshuffleの回数が減ってパフォーマンスがあがるかも?と思ったからです。
試しにカスタムPartitionerを利用した版を用意して動かし、カスタムPartitionerを利用しない版とDAGを比較してみました。下のような差が出てきたので、「これはshuffleの抑制に成功したかな?」と盛り上がりました。
しかし結局は次の2つの理由からカスタムPartitionerの採用を見送りました。
Elasticsearch for Apache Hadoop
にはカスタムPartitionerを組み込めなかったため、JavaのClientを使う必要があり、ライブラリを切り替えることによるパフォーマンス劣化を懸念- 小さなデータセットでは問題なかったが、本番相当のデータ量ではpartitionの初期化に時間がかかるようになってしまった
後者はpartitionerの実装を改良すれば改善しそうな気がしましたがSpark力が足りず、また元々このチューニングを行わなくても現実的な時間で処理できていたので撤退してしまいました。
(Spark) SerializerをJava標準のシリアライズからKryoに切り替える
これはとても一般的なプラクティスですが、上述のように大規模なshuffleを真正面から立ち向かう場合は特に有効なチューニング方法です。Java標準のシリアライズと比較して高速にシリアライズでき、圧縮率も高いです。 Tuning - Spark 2.4.0 Documentation
参考情報
初期データマイグレーションのパフォーマンスチューニングのときに役立った(役立ちそうな)リンクをいくつか共有したいと思います。
Elasticsearch側のチューニングはこちらのドキュメントが大変参考になりました。 www.elastic.co
AWSさんの方でも一般的なチューニングの知識を共有されていて、こちらもとても参考になると思います。 aws.amazon.com
まとめ
今回は初期マイグレーションの概要からパフォーマンスチューニングについて紹介しました。パフォーマンスチューニングに関しては打ち手のBefore/Afterを記録していなかったものが大半で、比較結果を紹介できないことが残念ですが、本番では約65時間で初期マイグレーションとデータ検証の工程が完了し、余裕を持った状態で差分マイグレーションをスタートすることが出来ました。まだ差分マイグレーションで使用したKafka Consumer (Scalaで実装したindexer)については紹介できていませんが、これはこれで技術的おもしろトピック満載なので別の機会に書く or お話させていただきたいです。
では最後は大阪の都島にあるインド料理レストラン「ナンタラ」のビリヤニで締めたいと思います。