ChatWork Creator's Note

ビジネスチャット「チャットワーク」のエンジニアとデザイナーのブログです。

チャットワークのID生成器

2018/3/18 追記

TISさんのreactive system meetupで発表する機会をいただきました。ありがとうございます。発表資料をリンクしておきます。

18桁になったメッセージID

2017年5月28日をもってチャットワークのメッセージIDは10桁から18桁になりました。みなさんお気づきでしたか?

これはメッセージリンクを取得してみることで確認できます。

f:id:cw-yasuda:20180105163109p:plain

それまでは32bitでかつ連番のIDでした。今は64bitになり、また単調増加性だけを保証し、連番ではなくなっています。

この記事では新メッセージIDとそれを発行するシステムについて記述します。

枯渇しそうだったメッセージID

チャットワークのメッセージ数は2017年3月時点で18億を突破しました。

lp.chatwork.com

実は今までメッセージIDの型にsigned intを使用しており、メッセージ数の上限は231=約21億でした。21億を超えるのは時間の問題でしたが、すでにテーブルには大量のメッセージが登録されており、現実的なメンテナンス時間でALTER TABLEを行いこの制約を解除することは不可能でした。

2017年1月にリリースした新メッセージシステムであるFalconのミッションの1つはこのメッセージ数上限に対処することでした。

Falconのサブシステムである分散ID発行器は、メッセージIDを64bitにして上限を広げたほか、単一障害点の排除、メッセージ投稿時のレイテンシの改善という恩恵ももたらしています。

分散IDとは

以前このブログではかとじゅんさんが分散IDについて紹介しました。分散IDについてはここを参照してください。

http://c-note.chatwork.com/post/105946591675/scalaを用いて分散idワーカを実装する
c-note.chatwork.com

今回リリースした分散IDの形式も、このブログにあるようにTwitter snowflakeの形式のIDになります。

このブログではZooKeeperの運用コストについて言及しています。しかし今では私たちはFalconでKafkaとHBaseを採用したのですでにZooKeeperを運用しています。 なので分散ID発行器のワーカーIDの管理にZooKeeperを採用しました。

分散ID発行器の構成

分散IDを発行するシステムの構成図を以下に示します。

分散ID発行器の構成

分散ID発行器はIDを発行するワーカーと、IDを要求するクライアントから構成されます。ワーカーは一意のIDをもっており、そのワーカーIDをもとにメッセージ用IDが発行されるので、ワーカー同士が独立にメッセージ用IDを発行しても重複しないようになっています。クライアントは動的に変わるワーカーの接続情報を自動的に発見し、ワーカーに直接接続してIDの発行を要求します。このようにワーカー間のワーカーIDの一意性の分散合意と、クライアントによるワーカーの発見というサービスディスカバリーの2つの目的を、ZooKeeperを用いて実現しています。

以下がIDを発行可能になるまでのフローです。

  1. クライアントがワーカー一覧を取得します。その際に親ノードを監視します。
  2. ワーカーが起動すると、ワーカーIDを取得します。ワーカーIDの取得はZooKeeperのノードの作成によって行います。他のワーカーが使用しているIDは使用できないため、すでに作成されていた場合は再取得します。またワーカーIDは有限 (64bitのメッセージIDうち一部がワーカーIDに割り当てられています) なため、すべてのIDが使用されている場合は待機します。
  3. ワーカーが追加されたというイベントの通知がクライアントにいきます。
  4. クライアントは変更通知を受け取ると、自分のリストにないワーカーの接続情報を取得します。
  5. クライアントはワーカーに接続します。

クライアントがワーカーに接続すると、IDを発行するリクエストを投げられるようになります。

分散ID発行器で使用しているZooKeeperの機能

分散ID発行器の多くの機能はZooKeeperの機能を活用して実現しています。そのうちのいくつかを紹介します。

エフェメラルノード

ZooKeeperはファイルシステムのようなツリー構造のデータであるノードを操作できるAPIを公開しています。ノードの種類は2種類あり、1つは永続ノード、もう1つはエフェメラルノードです。

その名が示すように、エフェメラルノードはセッション内のみで有効な一時的なノードです。分散ID発行器ではワーカーIDのノードがエフェメラルノードになっています。エフェメラルノードを使うことによって、IDワーカーが停止したときに自動でノードが消滅します。ノードの消滅はクライアントが自動で検知してそのワーカーへのリクエストをやめます。エフェメラルノードを使うことでデータの不整合なくワーカーを気軽に停止させることができるようになっています。

親ノードの監視

ZooKeeperのAPIの1つ、getChildrenはノード一階層下の子ノード一覧を取得します。getChildrenは監視オプションも提供しており、子ノードの変更を監視できます。IDクライアントがIDワーカーを検知するしくみはgetChildrenの監視機能で実現しています。これによって個々のノードを監視する必要はなくなり、監視を集約することでZooKeeperへの負荷を減らすことができます。

分散合意アプリケーションとアクタープログラミング

分散合意を含む分散アルゴリズムは非同期メッセージの送受信とそれに対応する振る舞いで記述できます。ZooKeeperを使ったアプリケーションは基本的にZooKeeperのイベントに対して受動的であり、イベントの発火によって状態が変わる状態機械です。

このようなアプリケーションで問題になる以下の2点、

  1. 複雑な状態遷移
  2. 例外的な状況での適切な内部状態の破棄

は、アクタープログラミングを用いることでうまく対処できます。

複雑な状態遷移

例えば分散ID発行器の場合、以下の状態遷移があります。

IDワーカーの状態遷移

見ての通り複雑な状態遷移です。これを旧来のマルチスレッドプログラミングで行うと難解で危険なコードになりがちです。このようなイベント駆動の状態機械はアクタープログラミングを用いることで簡易に書くことができます。

例えばノードを作成してその存在を監視するコードは以下のように書くことができます。

class SampleActor extends Actor {

  val zookeeperSession = system.actorOf(ZooKeeperSessionActor.props("zookeeper:2181", sessionTimeout = 5000)

  def receive: Receive = state1

  self ! Start

  def state1: Receive = {
    case Start => 
      val data = "data".getBytes()
      self ! CreateExample("/example", data)
      context.become(state2)
  }

  def state2: Receive = {
    case CreateExample(path: String, data: Array[Byte]) => {
      zookeeperSession ! Create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, data)
    }
    case Created(path, name, ctx: Array[Byte]) => 
      self ! ExistsExample
      context.become(state3(path, ctx))
    case CreateFailure(e, path, ctx: Array[Byte]) if e.code() == Code.NODEEXISTS =>
      // do nothing
      self ! ExistsExample
      context.become(state3(path, ctx))
    case CreateFailure(e, path, ctx: Array[Byte]) if e.code() == Code.CONNECTIONLOSS =>
      self ! CreateExample(path, data)
    case CreateFailure(e, _, _) => throw e
  }

  def state3(examplePath: String, exampleData: Array[Byte]): Receive = {
    case ExistsExample => zookeeperSession ! Exists(examplePath, watch = true)
    case DoesExist(path, stat, _) =>
      // wait until the watching node is deleted
    case ExistsFailure(e, _, _) if e.code() == Code.CONNECTIONLOSS => self ! ExistsExample
    case ExistsFailure(e, _, _) if e.code() == Code.NONODE =>
      self ! CreateExample(examplePath, exampleData)
      context.become(state2(examplePath, exampleData))
    case ZooKeeperWatchEvent(e) if e.getType == EventType.NodeDeleted =>
      self ! CreateExample(examplePath, exampleData)
      context.become(state2(examplePath, exampleData))
  }
}

ここではわたしが作成したAkkaアクターを用いたZooKeeperクライアントを用いました。

github.com

またZooKeeperによる分散システム管理サンプルコードをアクターを使って書き直したサンプルも公開しているので、参考になれば幸いです。

reactive-zookeeper/example/src/main/scala/tanukkii/reactivezk/example/zookeeperbook at master · TanUkkii007/reactive-zookeeper · GitHub

セッションの期限切れと内部状態の破棄

ZooKeeperを使ったアプリケーションで陥るともっとも恐ろしい罠がセッションの期限切れに対する不適切な対処です。ZooKeeperのクライアントセッションの期限が切れた場合、もはやZooKeeperに頼った合意が成り立たなくなります。

ここで注意しておきたいのが、セッションの期限切れを判断するのはZooKeeperアンサンブルで、クライアントではないということです。クライアントがセッションの期限切れを確認できるタイミングは、ZooKeeperアンサンブルに再接続できたときです。

分散ID発行器を例にすると、IDワーカーとZooKeeperの間でネットワークが分断していて、セッションの期限が切れたとします。 このときZooKeeperから期限の切れたワーカーIDが破棄されて、他のワーカーがそのワーカーIDを取得することが可能になります。 ところがIDワーカーとIDクライアントは依然として接続している可能性があるため、期限の切れたワーカーIDを保持しているワーカーにリクエストを投げることができてしまいます。結果としてIDが重複する危険性を作ってしまいます。

セッションの期限が切れたIDワーカーのするべきことは、内部状態であるワーカーIDを破棄して再取得することです。ZooKeeperクライアントの設定でsessionTimeoutという項目があるので、この期間以内に状態を破棄し、ワーカーIDの再取得からやり直せば問題はおきません。クライアントは期限の切れたワーカーをZooKeeper経由で感知し、そのワーカーへのリクエストをやめて、新しく参加してきたワーカーを登録してID発行の要求を続けます。

ZooKeeperのセッションの期限切れの際に内部状態の破棄はどのようにやれば簡潔にできるでしょうか。ここでもアクタープログラミングを用います。Akkaではアクター間にエラーハンドリングのためのヒエラルキーという関係があります。セッションの期限切れが起きた際に、"Let it crash!"の考えに基づきアクターのヒエラルキーの最上位層から再起動を行います。これによってヒエラルキーの下層すべての状態破棄が自動的に行われます。これによって下層ではセッションの期限切れに対処する必要がなくなり、アプリケーションのあらゆる部分にセッションの期限切れ対処のコードを散りばめる必要はなくなります。

おわり

ScalaMatsuri 2018に同じような内容で応募したのですが、落選したのでこの記事として書き残しました。