Chatwork Creator's Note

ビジネスチャット「Chatwork」のエンジニアとデザイナーのブログです。

ビジネスチャット「Chatwork」のエンジニアとデザイナーのブログです。

読者になる

Biryani プロジェクト(メッセージ検索機能のCloudSearchからElasticsearchへのリプレイス)について vol.2

こんにちわ、cw-tomitaです。

前回の記事に続いて、Biryaniプロジェクトに関して、あれこれと書いていきたいと思います。なお、この記事は、先日公開した以下の記事の続きとなりますので、こちらを未読の方は、是非vol.1からお読みいただければと!

creators-note.chatwork.com

先の記事では、PJが始まった経緯、Elasticsearchを第一候補として選んだ理由等について書きました。今回は、POC/実装を進めていく中で、苦戦したポイントと解決方法に関して、主だったポイントを挙げていきたいと思います。

最初にどのようなものを作ったかをざっと紹介し、その上で、 今回の記事の中では、Elasticsearch 自体にフォーカスをあてて、色々と紹介していきたいと思います。 Sparkでのデータマイグレーションや、Scalaのindexerの実装等に関しては今回は取り上げないですが、別の機会で取り上げ予定なので、お待ちいただければと!

目次

開発内容の概要

Biryani PJで開発したパーツは大きく2つになります。

1つがHBaseに蓄積されているこれまでの全メッセージデータをElasticsearchに突っ込む、Sparkを使った初期データマイグレーションプログラム

f:id:cw-tomita:20200722182639p:plain


もう1つが、vol.1でも紹介した、Kafkaに積まれたメッセージ投稿イベントの内容をElasticsearchに反映するindexer。

f:id:cw-tomita:20200722182726p:plain


プラスで、AWS Elasticsearch serviceというmanagedなモノを使ってはいるけど、ミドルウェアとして、新しいチャレンジとなったのがElasticsearchです。 繰り返しになりますが、この記事では、Sparkやindexerの部分には触れませんので、その辺りの内容は、別の機会をお待ちいただければと思います!

Chatworkのメッセージ検索clusterの特徴

完成のイメージを掴んでもらったところで、Chatworkのメッセージ検索クラスタを構築するにあたって、特徴的だなと感じる部分を以下に挙げていきたいと思います。

  • 書き込みスループットの方が、読み込みスループットよりも圧倒的に多い
    • Chatwork含め、自分がビジネスチャットツールを使っている状態を想像していただければイメージつくかと思いますが、メッセージが書き込まれる頻度の方が、メッセージが検索される頻度よりも圧倒的に多いです。
  • メッセージ数は単調増加していく
    • 現在のChatworkでは、期間を設けずに全メッセージを検索対象としているため、メッセージ数は単調増加していきます。vol.1で紹介した通り、現在は約60億件のメッセージが保存されています。
  • 過去のメッセージに対しての編集・削除が可能
    • ログのような単調増加するデータに対しては、index alias + 時間軸やサイズでindexをrolloverさせていく、というのが一般的に取られる手法で、この時、rolloverされたindexは書き込み不可にしてしまうのが一般的ですが、その手法が取れません。

このようなユースケースに対して、どのようなElasticseach clusterを構築するべきか、検討・調査・試行錯誤を重ね、本番リリースに至りましたので、その辺りをあれこれと紹介していきたいと思います。

Elasticsearch cluster 負荷試験

まずは、Elasticsearchの負荷試験に関してのtipsです。
vol.1の中で、事前調査でいけそうな気がした旨の記載はしましたが、当然それだけで見切り発車することはなくて、負荷試験をしっかりと行いました。

書き込み負荷試験

先に書いた通り、Chatworkの検索クラスタはwrite-heavyだし、投稿される多くのメッセージは日本語で、その形態素解析などもあるので、writeの負荷試験が1つの山になるかと思っていました。しかし、実際に負荷をかけてみたところ、少ない台数でも相当に余裕を持って捌けました。Elasticsearch凄い。
このため、試行錯誤して、あれこれチューニングすることはなかったのですが、もしも、書き込みパフォーマンスが要求に届かないという場合は、indexのshard数を増やす & ノード数をshard数より大きく保つというのが基本戦略となります。

やや余談になりますが、Elasticsearchの書き込みの速さはLSM(Log Structured Merge) Treeと呼ばれる方式によってもたらされており、その辺りの仕組みは、"データ指向アプリケーションデザイン" という書籍に詳しくのっています。 (Elasticsearchの入門本とかだと、その辺りに触れられておらず。)
さらに余談ですが、この本、私的にはめちゃめちゃ勉強になった一冊で、この記事に興味を持たれるような、各種データストレージなミドルウェアとの関わりの深い方には、オススメできる一冊なので、ご興味持たれた方は読んでみるといいかもしれません。

www.oreilly.co.jp

ちなみに、Chatworkのメッセージデータ本体が格納されているHBaseでもLSM Treeが使われており (そういう話も↑の本でカバーされている)、LSM Treeなくして、Chatworkというサービスは存在しないといっても過言ではないかもしれないです。ありがとう、LSM Tree。

読み込み負荷試験

書き込みの負荷試験がだいぶ余裕があるという結果、加えて読み込みスループットは書き込みに比べてだいぶ少ないという所で、すっかり油断して(ダメ)、取り組み始めた読み込みの負荷試験の方で、思いの外、大苦戦。。。PJ全体を振り返って、1番大きく当初の見積もりを外したポイントで、ここの問題に取り組んでる時が1番心労が溜まった時期でした。。
フェーズ毎に様々な施策を試したのですが、その中で、効果があったものをいくつか紹介したいと思います。

初期データ投入後にforce_mergeをかけるべし

これはデータの作成の仕方にもよりますが、我々の場合は、Sparkのデータマイグレーションプログラムでデータ投入 → 負荷試験という流れでやっていて、Sparkで一気にデータを投入していく関係で、segmentがとっ散らかったindexが出来て、Elasticsearchで検索したらめっちゃ遅いやん、、、て最初になってしまい、ここに気づくまでにしばらく時間を溶かしてしまったのでした。 なので、初期データをガッと入れた場合は、force_mergeをやってsegment数がどうなるか、検索時のレイテンシがどうなるかをチェックしましょう。
Force merge API | Elasticsearch Reference [7.8] | Elastic

こちらは、当時の負荷試験時の結果の一部のキャプチャです。元のデータセットが同じでも、segmentが適切な状態になっていないことで、大きくパフォーマンスが劣化している状態(10倍〜20倍程度のレスポンスの悪化)が見て取れるかと思います。 f:id:cw-tomita:20200722183704p:plain


数値フィールドでも、完全一致のみの時はlongではなく、keywordを使うべし

Chatworkでメッセージ検索を行う場合、自分が所属している部屋を対象に検索が行われます。この部屋のID(room_id)が、RDBMS上ではintになっていて、Elasticsearchのindex mappingでは、将来的なデータ増も見越して最初はlongにしていたのですが、ここをkeyword型にしたことで、パフォーマンスが一気にめちゃくちゃ良くなりました。(さらっと書いているけれど、これに気づくのに数多くの施行錯誤とそこそこの時間の消費があったのでした。。)
かなり気になる挙動だったので、あれこれと調べたのですが、根本原因は上手く分からず。。indexのデータ構造として、longはB-tree構造、keywordはInverted indexで格納されるということで、room_idは数千万あってB-treeがそれなりの深さになる & room_idの数だけ走査が走るので、それが積もり積もって、、って感じなのかな? 的な推測はしたのですが、そこだけにフォーカスした試験方法が思いつかなかったので、結果オーライという形に。ここをしっかりと解明しきれなかったのはやや心残り。(この記事を読んでる方で、心あたりがある方は是非教えて欲しいです!)

なお、旧CloudSearchでは、room_idはintになっていて、フィールドの型を変えたら検索が速くなったのかもしれませんが、そこは未検証です。


2020/07/22 追記

この部分に関して、記事をご覧になった @johtani さんが参考となるリンクを教えてくれました。ありがとうございます!

このページで、先に記載したForce-mergeの話についても触れられていますね💦
mappingの設計時、負荷テスト実施時には、しっかりとこのドキュメントは読み込んだ方が良さそうです。

www.elastic.co

2020/07/22 追記 ここまで



元データが入ったoriginal index + re-indexの組み合わせで、色んなmappingを気軽に試せるようにすべし

先のlong vs keyword問題を試すにあたっては、indexのmappingの修正 + データマイグレーションの再実行が必要です。また、試行錯誤の中で、他のmappingを試したり違うanalyzerを試したり等々、index自体の再作成が必要となる負荷試験を何パターンも行いました。この時に都度プログラム修正 + indexの再作成とやっていると、そこのイテレーションコストが大きくなるので、その対応として、以下のようなことを行いました。

  • 元データをできるだけ元の形に近い状態で格納したindex (original index)を作っておく。

    • 今回の私たちのケースだと、最終的なindexではメッセージ本文は持たせていないですが、Elasticsearch上にはメッセージ本文も持たせたindex (original index)を作っておきます。
  • データマイグレーションのプログラムの修正 & 再実行するのではなく、re-indexのapiを活用して、mapping定義やanalyzerの違うindexを作る

Reindex API | Elasticsearch Reference [7.8] | Elastic

Changing Mapping with Zero Downtime | Elastic Blog

また、re-index方式によって修正コストが低くなったとはいえ、re-index自体もそれなりに時間がかかる処理ではあります。なので、original indexの方を、全量、1/2、1/5、1/10みたいな感じにデータボリューム違いのものも準備するようにすると、少なめデータで迅速にtry-errorを行いつつ、しっかり試したいものに対しては、全量データで試験する、、というようなことができるようになるので、特に全体のデータ量が多い場合、この方式を検討してもいいかもしれません。



Elasticsearch cluster 設定/運用

ここからは、Elasticsearchの設定や運用についてのtipsです。
(初期データマイグレーションを最速で終了させるためのindex定義/設定もあれこれあるのですが、それはまた別の機会に、、!)

メッセージを格納するindexはaliasを利用 + データ量でのrollover

Elasticsearch の制限として、indexのshard数は最初に指定した値から後で変更できないというものがあります。一方で、先に書いた通り、Chatworkのメッセージ検索clusterの特徴として、データは単調増加していくという特徴があります。この2つの条件を満たすために、ワイルドカード指定のalias("messages" indexを検索したら、"messages-*" に当てはまるindexが検索対象になる )を作って、個別のindexは一定サイズになったらrolloverするようにしました。

Add index alias API | Elasticsearch Reference [7.8] | Elastic

Rollover index API | Elasticsearch Reference [master] | Elastic

また、これによって将来的にメッセージの書き込みスループットがどんどん増えてきて、(負荷試験ではめっちゃ余裕だった)Elasticsearchのwriteパフォーマンスが問題になってきた、、という時に、rolloverさせる時のindex定義を変更して、次から作られるindexに関してはshard数を増やす (& 必要だったらノード数も増やす) という設定をすることで、writeのパフォーマンス向上が図れるというメリットもあります。

rollover部分の自動化機能は、Biryaniリリース後、しばらくして発表されるという惜しさ。。リリース当時はこの機能はなかったので、現状、Lambdaで定期的にrolloverをトリガーをさせる、、ということをやっています。(早く置き換えたいけど、次のブロックで紹介する通り、rolloverしても古いindexへの編集・削除を許可しなければならないという制限があるので、この機能がそのまま適用できるのかは不明ですが。)
Amazon Elasticsearch Service でインデックス管理を自動化する

サイズや時間軸でindexをrolloverしていくというのは、ログ基盤としてElasticsearchを使うような場合には、一般的に使われている手法で、cluster設定としてはシンプルで運用も楽ではあります。が、先のChatworkのメッセージ検索clusterの特徴として書いた通り、Chatworkのメッセージはログと違って、メッセージの更新・削除が可能なので、indexerの実装、clusterの運用で一工夫必要でした。また、データの作成時に関しても、最初はaliasの指定でドキュメントを投入すればOKと考えていましたが、何らかの理由でKafkaのoffsetを巻き戻して再実行することになるというようなケースを考えた場合、その間にrolloverが発生していたら、同じメッセージが違うindexに複数存在してしまう可能性が出てしまう、というパティーンを考えると、データ投入を単純にaliasに対して行うのはNGとなってしまい、あれこれと工夫が必要となりました (細か過ぎてだいぶ伝わりづらですが詳細は割愛。。しつこいですが、indexerの詳細は又の機会に、、!)

なお、CloudSearch時代は、1つの巨大indexに全メッセージを格納してましたが、恐らくpartitionが増える (= instanceが自動追加される)時に、裏側でre-indexが行われ、blue-greenでclusterが作り直されていたっぽい、、?(細かな仕組みは公表されていないので推測です & instanceが増える時に接続エラーが出がちだったので、何か大きな処理が動いてそうではある。) そうだとすると、後からindexのshard/partition数をいじれないという、検索クラスタの最初の悩みポイントを見事な力技で解消していて、浪漫を感じる仕組みだなと思ったり、思わなかったり。

利用ストレージ容量が最小になるようにする

vol.1で紹介した"Get the Most out of Your Amazon Elasticsearch Service Domain" でも少し言及があるのですが、mappingの設定をちょっとあれするだけで、利用ストレージ容量が大きく変わってくるので、気をつけましょう。
特に今回のユースケースでは、データ量は単調増加していくので、その伸び幅を抑えることは、時間が経てば経つほど、サーバコストに大きく効いてきますし、あ、コレ要らなかったわ、、ってなっても後から変更するのは大変です(re-indexが必要になる)。
そして、使ってないデータを格納するから単純にストレージのコストのみが増える、、というわけではなく、indexのrolloverをindexのsizeベースで行なっているので、無駄なデータがあることは、segment数、shard数の増加に繋がり、検索のパフォーマンスにも影響し、scale outのタイミングを速めることになることに繋がる可能性もあります。

なお、私たちが行なった設定は大きく以下の2つです。

  • _sourceが不要なfieldをexclude
  • sortに使わないfieldのdoc_valuesを生成しないようにした

特に後者の方は、初めてmapping設計する時には気づきづらいかと思いますし、defaultで有効になっていて、意識しないと勝手に領域が作られてしまうので、気をつけてください。

doc_values | Elasticsearch Reference [7.8] | Elastic

(今回のケースでは、もう何年も動いてるシステムのリプレイスなので、要件はかっちり固まっていましたが、そうでない場合、最初の要件だけでビシバシとdoc_values = falseにしていると、後から、あ〜、このパターンで検索かけたかったけど、re-indexが必要やん、、、みたいなことになるかもしれないので、その辺りの絶妙な按配が求められる所でもあるので、そっちの観点からも気をつけてください。)

force_mergeを定期実行するスクリプト

Elasticsearchで、ログ基盤のようにindexをrolloverさせていくような利用ユースケースでは、rolloverされたindexはforce_mergeされ、以後、read-onlyになるというのが一般的です。
ただ、Chatworkの場合、繰り返しになりますが、メッセージの編集削除が可能なので、rolloverされたindexも書き込み可能な状態を維持する必要があります。また、編集削除が可能なので、放っておくと、古いインデックスでもどんどんとsegmentの数が誇大化していき、検索パフォーマンスの劣化をもたらす要因となりえます。(負荷テストの所でも書いた通り、segmentがとっ散らかってると、同じドキュメント数でもパフォーマンスは全然変わってきます。)
このため、Biryani PJでは、オフピーク時間に定期的にrolloverされたindexに対して(最新のindexは書き込みが激しく行われているので、force merge対象外)、force mergeを実行するスクリプトをlambda経由で実行しています。
かなり泥臭い運用と思われるかもしれませんが、単調増加していくデータセット + 古いデータも編集・削除可能、という条件においてのindex運用の情報が全然見当たらず、どういう風にindexを構成するといいのか色々と悩んだので、Biryani PJではこのような手法を採用しました & 今の所、問題なく動いています、という紹介でした。

モニタリングはCloudWatchとdatadogの二刀流

モニタリングに関して、Elasticsearch serviceでは、CloudWatchで(CloudSearchと比較にならない位に)豊富な種類のメトリクスが提供されているのですが、
Monitoring Cluster Metrics with Amazon CloudWatch - Amazon Elasticsearch Service

CloudWatchでも収集できない、activeなthreadの数、shard数、segmentの総数もモニタリングできた方が良いなということで、datadog-agentも利用して、メトリクスを収集しています。特に、segment数に関しては、先に書いた通り、定期的にforce_mergeを実行するというcluster運用になっていて、何らかの要因でそこのlambdaが動かなくなった時にしっかり検知したいというのがありました。
(ちなみに、datadogが色んなミドルウェアのモニタリングの勘所をdatadog integration絡めて説明しているこのシリーズは、かなりクオリティが高いので、初めて触る、モニタリングするミドルウェアがある時に、最初に一読するのオススメです。)

www.datadoghq.com


datadog-agentをいれたことで、こんな風に、夜中にforce_mergeが実行され、

f:id:cw-tomita:20200709184022p:plain

それに合わせて、segment countがきゅ〜っと減っていることが確認できます。

f:id:cw-tomita:20200709184319p:plain


通常のmergeは常時(ほぼ最新のindexのみに対して)動いてはいるので、ノード単位で見た時の全体としてのsegment数は普段から増減を繰り返し、古いindexに対してのforce_mergeがoff-peakの時間に実行され、こういうグラフの形状になっています。 Merge | Elasticsearch Reference [7.8] | Elastic

Force merge API | Elasticsearch Reference [7.8] | Elastic

最後(?)に

以上、2回に渡ってBiryaniプロジェクトの紹介してきましたが、いかがだったでしょうか?
途中で何度か書いたように、この記事では、Elasticsearch 本体にフォーカスをあてていて、Sparkを使ったデータマイグレーションやscala indexerについての話は全然触れられておらず、まだまだ書ききれていないポイントがたくさんあります。もしかしたら、vol.3 が書かれるかもしれませんし、何かイベント等あったら、そういった機会に積極的に露出していきたいと考えていますので、もし、何か企画されている方がいて、このBiryani PJに興味を持たれた方がいたら、是非、お声かけいただければと!

では、Biryani PJの記事の最後を締めるにぴったりな、このPJのメンバーでもあった弊社のインド人エンジニア(cw-vikas)作のBiryaniの写真で締めたいと思います。
そして、お約束になりますが、Chatworkでは美味しいBiryaniを食べたいエンジニアを (ry

www.wantedly.com

www.wantedly.com

f:id:cw-tomita:20200707145223j:plain