Chatwork Creator's Note

ビジネスチャット「Chatwork」のエンジニアとデザイナーのブログです。

ビジネスチャット「Chatwork」のエンジニアとデザイナーのブログです。

読者になる

Biryani プロジェクト(メッセージ検索機能のCloudSearchからElasticsearchへのリプレイス)について vol.4 - 差分マイグレーション編 -

こんにちは、梶原(cw-kajiwara)です。 Biryani PJシリーズでの投稿です。今回の記事ではメッセージ検索機能におけるメッセージの差分マイグレーションを行うKafka Consumerアプリケーション、通称Indexerについて紹介いたします。

vol.1ではプロジェクト発足の背景・概要を、vol.2ではPJ前後のシステム構成、Elasticsearchの負荷試験考察やCWでの運用方法、vol.3ではデータマイグレーションについて紹介していますのでご興味あればぜひこちらもご覧ください。(ご興味あれば、と言いつつも読んでもらっていることを前提にしてしまっているところは所々あります)

Biryani プロジェクト(メッセージ検索機能のCloudsearchからElastcsearchへのリプレイス)について vol.1 - Chatwork Creator's Note

Biryani プロジェクト(メッセージ検索機能のCloudSearchからElasticsearchへのリプレイス)について vol.2 - Chatwork Creator's Note

Biryani プロジェクト(メッセージ検索機能のCloudSearchからElasticsearchへのリプレイス)について vol.3 - データマイグレーション編 - - Chatwork Creator's Note

目次

TL; DR

Indexerは

  • 高い処理性能を、バッチ処理で実現しました
    • 整合性を保つために同一ドキュメントIDのドキュメントに対しては注意が必要
  • 冪等性を、既にElasticsearch上にあるドキュメントの状態を考慮したリクエストを送ることで実現しました
    • rolloverを使っていると注意すべき点がわりとあります
  • 耐障害性を、AkkaのSupervisioningで実現しました

結果として、リリース時の1週間分たまったKafka上のメッセージマイグレーションではシステム全体で最大スループット分間50万リクエストを記録し、Chatworkに投稿されたメッセージの検索結果への反映を1分以内に行うことができています。

構成

これまでのBiryani PJの記事で紹介されていたように、データマイグレーションによってHBaseに格納されている全メッセージデータがElasticsearch上にインデックスされます。

その後、Kafkaに積まれているメッセージ投稿・編集・削除のイベントを基にElasticsearchへ順次リクエストを送ることで、メッセージデータの差分マイグレーションを行うConsumerアプリケーションをIndexerと呼んでいます。

f:id:cw-kajiwara:20200729104942p:plain
構成図(再掲)

Kafkaからメッセージデータをconsumeして、Elasticsearchにインデックスするという流れはストリーミング処理として実現していて、パイプライン実装にはAkka Stream Kafkaを使っています。

技術ポイント

高い処理性能

Chatworkに送られるメッセージ投稿・編集・削除リクエストはそれぞれに対応したイベントとしてKafkaに永続化されていますが、これらを効率よくインデックスする必要があります。 Indexerは、バッチ処理(bulk API)によって指定したバッチサイズ分のメッセージ投稿・編集・削除リクエストをまとめて一気にElasticsearchに投げることで、スループットを向上させています。

ちなみに夜間などトラフィックが少ないときのために、指定したバッチサイズ分のメッセージが入るまで無理に待つことはせずタイムアウトを設定しています。

バッチ処理とメッセージの編集・削除

Indexerでは、Kafkaに永続化されているメッセージの投稿・編集・削除イベントに応じてIndex, Update, Deleteリクエストをバッチに詰め込んでいきます。 このとき、バッチ内に同じドキュメントIDに対する複数のリクエストが含まれる場合に整合性を保つためにバッチ内でのリクエストのマージ処理を入れています。 例えば、同じドキュメントIDのIndexリクエストとUpdateリクエストが同じバッチに入った場合、Elasticsearchには更新後のメッセージの内容を盛り込んだIndexリクエストを発行する、といった具合です。

具体的には下図のように、バッチ内で同一ドキュメントID宛のリクエストをよしなにマージしています。

f:id:cw-kajiwara:20200731190928j:plain
バッチ内リクエストのマージロジック

組み合わせの数がそれなりに多いため、ユニットテストとプロパティベーステストを通して手厚く行いました。

これにより、リクエスト間の整合性を保ちつつバッチ処理によるメッセージの高速な処理が可能になりました。

Biryani PJ以前のシステム構成はvol.1で紹介されていたように重厚なものだったので、それをKafkaから直接イベントをconsumeするようにしたところによる処理速度への影響が大きく、新旧検索システムの単純な処理性能の比較はできませんが、現在のChatworkでは、メッセージ投稿からメッセージ検索結果への反映までの遅延を1分以内にはおさめることができています。

Akka Stream Kafkaによるスケーラビリティの確保

上記の取り組みでアプリケーション単体での性能を向上させましたが、Akka Stream Kafkaを使うことによりスケーラビリティの向上も実現しています。

IndexerはKubernetes上にデプロイされていますが、podを増やすことで同時に処理するpartitionを増やすことができます。 PJメンバーでわいわいしながら行った本番リリース時(クリスマスの夜でした)では、Kafkaに1週間分まるまる積まれたメッセージを、podを増やしつつ 順次indexしていきました。(だいたい2時間ほどですべてのparititonのラグが解消しました)

1podだけで分間10万リクエスト以上は捌けていましたが、podを増やすことで分間50万リクエストを捌くことができました。 今後のトラフィック増加にも余裕で耐えられそう、ということで安心です。

f:id:cw-kajiwara:20200731175550p:plain
メキメキ捌いてくれました

冪等性

前回までの記事であった通り、ChatworkではElasticsearchをメッセージ用index alias + indexのサイズに応じてrolloverさせる、という仕組みで動かしています。 また、メッセージの編集・削除が可能であることから、Rolloverされた後に既存のindexをreadonlyにはしていません。 この先の説明では、index aliasで指定されたindex(messages-001, messages-002など)がよく登場するので、それらを物理indexとよぶことにします。

rolloverとメッセージ編集・削除

メッセージが投稿された際はそのメッセージのIDをドキュメントIDとしてindex alias宛(Chatworkではmessagesという名前のindex)にindexリクエストを投げます。 このとき、最新の物理indexがindex aliasの書き込み対象になります。

しかし、メッセージが更新・削除された際はそのメッセージに対応するドキュメントが格納されている物理indexを指定する必要があります。 メッセージ編集・削除はすでにindexされたメッセージが対象になるため、最新の物理indexと該当メッセージがインデックスされている物理indexが異なる可能性があります。そのため、機械的に最新の物理index名を指定するわけにはいきません。 (更新したいドキュメントがどの物理indexに格納されるかわからない)

rolloverとメッセージ投稿

また、rolloverを意識しなければならないのはメッセージ更新・削除時だけではありません。 メッセージ投稿によって新しくメッセージをインデックスする場合においても、リトライ処理が行われることを考慮すると、rolloverを意識した実装にする必要があります。 例えば、タイムアウトのエラーが投げられることによってリトライをした場合、実際にはElasticsearchではインデックスが完了しているかもしれません。 rolloverを使っていない場合は、このケースに関しては2度目の試行で前回インデックスが成功していたことを検知できるため問題ありません。 しかし、rolloverを使っている場合、そのリトライの間にrolloverが発生して最新の物理indexが変わってしまう可能性があります。 この場合はElasticsearch上に重複したメッセージがインデックスされることになってしまいます。(vol.3で紹介されていた重複インデックスと同様の経緯です)

f:id:cw-kajiwara:20200731190920j:plain
重複インデックスが起きてしまう場合

上記のように、rolloverを使う場合はアプリケーション側でIndex, Update, Deleteのすべてのリクエストの発行時にrolloverによって整合性が崩れないかに注意する必要があります。

これらの問題を解決するために、Indexerではリクエスト対象のドキュメントIDを基に都度Elasticsearchに問い合わせをしてメッセージがインデックスされているか否か、されている場合はどの物理indexにインデックスされているかを取得するようにしています。

取得した情報によって、そのドキュメントに対してリクエストを発行するか否か、やリクエスト内におけるindex指定を変更したりしています。 この一手間でRolloverに起因するメッセージデータの重複インデックスやメッセージ更新の失敗を防いでいます。

ということでリクエストを投げる前にElaticsearchにそのドキュメントがどこの物理indexに保存されているか(いないか)を問い合わせることですべて解決しそうですが、さらに注意点があります。

get/mGet と search の違い、ご存知ですか

Elasticsearchでは新しくindexされたドキュメントはrefreshが実行されるまでsearchリクエストをElasticsearchに送ってもそのドキュメントを取得できません。 (refreshが実行されることでメモリ上のバッファに保存されているドキュメントが同じくメモリ上のセグメントに書き込まれます) また、get/mGetはsearchとは異なりrefreshされていないドキュメントも取得できますが、物理indexを指定しなければなりません。

Elasticsearch内部でindexがどのように管理されているかは、vol.3でも紹介されていた以下のリンクをご覧いただければと思います。 qbox.io

Indexerでは、ドキュメントがrefreshされているか否かに関わらず処理をするために、

  1. getで最新の2つの物理index経由で取得する
    • refreshされていないとしたら、最近のindexリクエストなはずで、indexリクエスト処理直後のrolloverを考慮して直近2つの物理indexまで見れば十分
  2. (取得できていないドキュメントがあれば)searchでindex alias経由で取得する

という流れで既存のドキュメントを取得しています。

一見Elasticsearchへけっこうな負荷がかかるんじゃないの、と思われるこの方法ですが、現在Chatworkで運用しているElasticsearchはびくともしていません。 さすがElasticsearchです。ドキュメントIDによる検索ではリクエストの頻度が高くても問題にはならない、という知見を得ました。

Kafkaのオフセット巻き戻しによる再インデックス

また、Indexerは、Kafkaのオフセットを巻き戻すことによる再インデックスが可能です。 Elasticsearchにドキュメントが残っている状態で、Elasticsearchのドキュメントの状態を考慮せずにKafkaのオフセットを巻き戻して再インデックスを行った場合、Elasticsearchが処理できないリクエストを送ってしまう可能性があります。

例えば、あるメッセージに関して作成・編集・削除を経てElasticsearchのインデックス上では削除されているドキュメントがあるとします。 このときKafkaのオフセットを巻き戻して再インデックスをかけたとき、巻き戻し位置によっては削除されたドキュメントに対しての更新リクエストを投げてしまう場合があります。

f:id:cw-kajiwara:20200731190653j:plain
Kafkaのオフセット巻き戻し時

Indexerは該当ドキュメントがElasticsearchにどの状態で存在している(or していない)か確認してから、不整合のないリクエストを送ります。

上記のように rolloverを使っている or Kafkaからconsumeしてリクエストを作っている 場合に冪等性を実現するには、リクエスト発行時にElasticsearchに現在の ドキュメントの状態を問い合わせる、ということが都度必要になりそうだな、という知見はBiryani PJをやらなければ手に入らなかったものだと思います。

耐障害性

最後に耐障害性について紹介します。 Indexerは差分マイグレーションをリアルタイムで行うことを要件としています。 そのため、障害によってアプリケーションが落ちてしまったとしても速やかに処理を再開させる必要があります。

Indexerでは、この要件を差分マイグレーション用のアクターとそれを監督するアクターで分けることによって実現しています。 差分マイグレーション用のアクターが障害で落ちた場合、それを監督するアクターが検知し、該当アクターを再起動します。(再起動タイミングはエクスポーネンシャルバックオフで再起動の度に間隔を広げていきます)

これにより、KafkaやElasticsearchと何らかの理由で通信ができない場合などでエラーが発生しストリームが落ちたときも、その原因が取り除かれればストリームが落ちる直前のメッセージから差分マイグレーションを自動的に再開します。   また、graceful shutdownを取り入れることで、consumeしたメッセージに関してはcommitまでするようにしてあります。(冪等性は担保されているので、ここに関してはconsumeしたメッセージを無駄に再取得しない、という精神です)

まとめ

今回はBiryani PJのIndexerの紹介を通して、Elasticsearchのindex alias + rolloverを使っている際のアプリケーション実装における注意点を取り上げました。 今後、KafkaとElasticsearch間をつなぐアプリケーションを実装する方がいらっしゃればご参考にしていただければ、とおもいます。

恒例のビリヤニ写真ですが、心斎橋のアリーズキッチン (Ali's Kitchen)を掲載いたします。 手元にビリヤニ写真がなく途方にくれていたところ、cw-adachiさんが恵んでくださったものです。

チキンマライティッカが柔らかい、とのことでした。ちなみに私はチキンマライティッカがいかなるものかわかっていません。

f:id:cw-kajiwara:20200731184547p:plain
おいしいビリヤニ

これまた恒例

Chatworkでは、ユーザーにとってより価値のあるサービスを追求し、大規模な基盤システムの開発運用保守を担うScalaエンジニアを募集しています。

Biryani PJにおけるElasticsearchやKafkaのような様々なミドルウェアを使ったアプリケーションを実装したい方もぜひ!ビリヤニが大好きなエンジニアの方もぜひ!

www.wantedly.com