kubell Creator's Note

ビジネスチャット「Chatwork」のエンジニアのブログです。

ビジネスチャット「Chatwork」のエンジニアのブログです。

読者になる

モバイルプッシュ通知システムの一部をScalaに移行した

はじめまして、サーバーサイド開発部(Scala)の阿部(rabe1028)です。 業務ではScalaを書いていますが、一番好きな言語はRustです。

Chatwork Advent Calendar 2022 13日目の記事です。

昨年から今年の頭にかけて実施したシステム移行のお話をチームメンバーを代表して書こうと思います。 *1

今回の移行対象となるシステムについて

Chatworkでは、モバイル端末にプッシュ通知を送信しています。 Chatworkユーザーの利便性向上のために、モバイルプッシュ通知にChatworkに投稿したメッセージ本文を載せる機能開発を行うことが決定しました。

blog-ja.chatwork.com

Chatworkは2011年にリリースされ、リリース時からモバイル端末へのプッシュ通知の機能がありました。PHPで実装され、11年間継続的に開発・運用されています。 既存システムを確認し新規機能開発が行えるか検討したところ、当時の開発リソースだとPHPで実装することが困難であったため、まず対象となるシステムをPHPからScalaに移行することになりました。

なぜPHPからScalaに移行しなければならないのか

Chatworkでメッセージを投稿するとき、色々なメッセージの記法を利用することができます。

Chatworkのメッセージ記法*2は、単一のブロックでなる inline token と呼ばれるもの、前後のブロックで囲むことで特殊な表記をするblock token、Chatwork専用の絵文字を表示するための記法 emoticon などから構成されます。 記法変換せずに、そのまま通知に載せてしまうと、ユーザーは画面で見たときのメッセージと違う印象を受けてしまうと思います。

メッセージ記法変換の一例

この問題を防ぐためには、Chatwork記法を解析し、各種Tokenを変換する必要があります。

Chatwork記法の解析は、社内ライブラリとしてScalaで実装されたものが提供されている *3 ので、そちらを利用するために、PHPからScalaへのシステム移行を実施する運びとなりました。

通知全体の流れ

Chatworkでは、グループチャット内での各種操作やコンタクト周りの操作が行われた時にモバイル端末へ通知を送信しています。

通知全体のシステムの流れとしては以下のような流れになっています

移行前の通知システム構成図

  1. 各種イベント発生時にSQSに積む
    • Chatworkの通知イベントは、メッセージの追加・編集、タスクの追加・編集・完了のようなグループチャット内のイベントや、コンタクト追加などアカウントに紐づくイベントなどがあります。
  2. PHP通知を送るアカウントを解析して、展開する
    • Chatworkでは、[TO ALL]のような宛先を指定するメッセージ記法が存在し、メッセージ中から送信すべき対象の特定を行う処理を行っています
  3. 各種イベントを送信すべきデバイスの特定と、モバイル端末に送るためのPayloadに変換します
    • ここが今回の移行対象
  4. iOS / AndroidのOSごとに適したペイロードにデータを変換し、Amazon SNSを介してユーザーの持つスマートフォンに通知を送ります

本文の表示機能に対応させるためには、3の処理を担当するバッチを移行することとなりました。

システムを無停止で移行するための方法

PHPのcronで動作させている既存のバッチアプリケーションを、今回開発するストリームアプリケーション (アプリケーション名:pegasus-iris, 以降 pegasus-irisと記載します)へ無停止で移行するために、以下のようなフローで移行を実施する計画となりました。

  1. 今回開発するアプリケーション用のSQS Queueを用意する
  2. 開発したアプリケーションを動作させる
  3. イベント送信元を改修し、新しいQueueに送られるようにする

3のリリースのタイミングから、新規開発したアプリケーションにイベントが流れ始めます。

移行段階のシステム構成図

これにより、イベント送信元で送るSQSを制御することで、部分的に移行先システムにイベントを流すことができるようになります。

開発パート

pegasus-iris で利用する、Stream Applicationを開発するためのScalaライブラリを選定する必要があります。 Streamを扱うライブラリとして、今回は以下の3つが候補に上がっていました。

  • Akka Streams
  • fs2
  • ZIO(ZStream)

以上の3つについて、いくつかの観点で比較検討しました。検討軸は以下を用いました。

  • ライブラリの継続可能性
    • 開発が活発に行われているか
    • Scala3の対応状況
  • 学習コスト
    • ドキュメントがどの程度存在するか
  • 社内実績
  • PoCで使ってみた感想

以上の項目で比較検討した結果、typelevel projectはScala3のサポートも進んでおり、検討当時はZIOのメジャーアップデート前でタイミングが悪かったこと、ライブラリの依存を局所的にしたかったこと、そして、社内でまだ採用実績のないライブラリを使ってみたい!ということでfs2を採用しました。

開発手順

PHPからScalaに実装するにあたり、全く同じ処理を書き写すのではなく、PHPで各所に散らばったドメイン知識を集約して書き直す方式を取りました。 そのため以下のように複数回ドメイン分析を実施しています。

  1. PHPのコードを読んで、ドメイン分析を行う
  2. モバイルプッシュ通知の関係者(PHPコードを実装していた人たち)を集めてドメインモデルの認識合わせを実施
  3. 分析したドメインに基づいて実装する
  4. 2,3を繰り返す

今回のシステム移行では、ドメインモデル図が5回ほど作り直されています。作り直した理由としては以下のようなものがあります。

  • 実装していって、より適切な分割が思いついた
  • 最初のドメインモデル図では、既存の処理として不足している部分があった
    • PHPの元の実装だと、ドメインに該当しそうな処理が各所に散らばっており、実装時に後から見つかった処理がドメインにいた方が適切であったなど

苦戦したところ

余談ですが、ちょっと開発時に悩んだポイントについて共有しようと思います。

fs2には、Graceful Shutdownの機構がないため、処理中にStreamがterminateされないようにする必要があります。 fs2はpull型のストリームライブラリなので、SIGTERMを受け取った後にSourceのStreamが要素を出力しなくなれば、以降の処理が完了し次第 Streamを安全に停止させることができます。

これをサポートするために以下のような関数を作成して、Sourceからのconsumeを止められるようにしています。

import cats.effect.{IO, Ref}

// Streamは遅延評価されるので、runningがtrueになるまで再帰的に呼び出し続ける
def repeatWhile[A](running: Ref[IO, Boolean], s: fs2.Stream[IO, A]): fs2.Stream[IO, A] = {
  fs2.Stream.eval(running.get).flatMap {
    case true => s ++ repeatWhile(running, s)
    case _ => fs2.Stream.empty
  }
}

def consume: IO[Int] = ???

def consumeStream(running: Ref[IO, Boolean]): fs2.Stream[IO, Int] = {
  repeatWhile(running, fs2.Stream.eval(consume))
}

SIGTERMを受け取ったタイミングでrunningに falseを設定すれば、consumeが停止するようになっています。

fs2-kafkaのGraceful Shutdownの章などを見ると、SIGTERMでStreamが停止させられた時に投げられるExceptionを取り回す方針をとっているようですが、こちらはStreamの途中処理がどこまで進んだか捕捉しにくくなる気がしているので、このような方針はとっていません

検証フェーズ

今回移行したシステムは、データを「読み込む」側のシステムではなく、Amazon SQSにデータを「書き込む」側のシステムです。読み込み側のシステムであれば、移行前後のシステムのレスポンスを比較して機能が問題ないか担保できますが、今回は書き込み側のシステムなので、その手段は利用できません。

特に、ストリームアプリケーションのデータを比較するとなると、Streamに流れてきたイベント、移行前後のシステムから出力されたイベント双方を記録し、同じ入力のイベント同士を比較するというシステムを別途作成しなければなりません。

このようなシステムを設計し、開発するのは大変であったので、今回はモバイルプッシュ通知の網羅テストをクリアすることで、機能要件を満たしているか確認しました。

リリースと後始末

網羅試験が終わったので、あとはリリースのみです。 リリースの際は、移行前のシステムと移行後のシステムの間でSQSに流す割合を段階的に変化させることでカナリアリリースを行います。

流量を1%, 10%, 30%, 50%, 100%と段階的に増やしていき、無事pegasus-irisのリリースが完了しました🎉

移行が完了したら、既存の不要なコードのお掃除です。 PHPのコードの削除・変更対象は141ファイルで、削除行数の合計は 11419 行でした。

移行後の動き

システム移行が無事完了しましたが、今回はこれが完遂すべき目的ではなく、本来やりたい「プッシュ通知本文表示」の前段階です。 この後にプッシュ通知本文表示のPJで機能開発を実施しました。

その話は、また機会があれば書こうかなあと思います。

*1:一緒に移行を担当したチームメンバーには最後までお世話になりました。ありがとうございました。

*2:メッセージ記法の説明はAPIドキュメントに詳しく記載されています

*3:ScalaMatsuriでの発表で使われたスライドで実装について紹介されています