Chatwork アドベントカレンダー および Scala アドベントカレンダー の 9 日目の記事です。(盛大に遅刻しました 🙇♂️ )
こんにちは。hayasshi です。 サーバーサイド開発部で Scala プロダクトの開発運用保守をしています。
Akka は分散並列処理のためのツールキットです。 クラスタリング、シャーディングの機能をつかい CQRS + ES なシステムを構築したり、ストリーム処理を構築するための機能と、それ向けの様々なミドルウェアとのコネクタをつかい、容易にストリームアプリケーションを構築できたりします。
それらの機能の土台として、Akka Actors という アクターモデル を実装しているモジュールがあります。 Akka Actors (アクターモデル) には、その考え方からくるいくつかの特徴があります。
Akka Actors をつかえば、クラスタリング処理やストリーム処理だけでなく、その特徴をつかって便利に処理を書くことが可能です。
Akka Actors の特徴
- 他のコンポーネントとのやりとりはメッセージのみの Shared-Nothing モデル
- 基本的に非同期で、メールボックスを介したメッセージの直列処理
- ActorRef を介した位置透過性、インターフェースからの独立性
もちろん他にもありますが、今回は特にこれらの特徴を利用します。
他の特徴や Akka 自体が気になる方は、Akka 実践バイブルがおすすめです。 サンプルコードが Classic Actors と呼ばれる、メッセージングに型がない旧来の書き方ですが、Akka やアクターモデルの設計思想を学ぶのには最適な本になっています。
今回のコードと確認バージョン
今回の全てのサンプルコードはこちらにあります。
私も Classic Actors のほうが扱い慣れているため、今回はそちらで記述しています。 ですが、アクター自体の考え方、特徴はかわらないので、Akka Actors (Typed Actors) でも同様に取り扱えると思われます。(いまは Typed が主流となっています)
記事を書いた時点での確認バージョンは下記になります。
scalaVersion := "2.13.7", libraryDependencies ++= Seq( "org.scalatest" %% "scalatest" % "3.2.10" % Test, "com.typesafe.akka" %% "akka-actor" % "2.6.17", "com.typesafe.akka" %% "akka-testkit" % "2.6.17" % Test, )
スレッドアンセーフなモジュールを取り扱う
上述した特徴の通り、個々のアクターはメールボックスにあるメッセージが一つずつ割り当てられて処理を行います。これにより、アクター内部のフィールドが複数のスレッドから操作されることがありません。
この特徴をつかい、スレッドセーフではない(スレッドアンセーフな)コンポーネントを、並列処理から安全に取り扱うことができます。
object CacheActor { case class Set(key: String, value: Int) sealed trait SetResponse case object Succeeded extends SetResponse case class Get(key: String) sealed trait GetResponse case class Found(value: Int) extends GetResponse case object NotFound extends GetResponse def props: Props = Props(new CacheActor) } class CacheActor extends Actor { import CacheActor._ import scala.collection.mutable val cache: mutable.Map[String, Int] = mutable.Map.empty[String, Int] override def receive: Receive = { case Set(key, value) => cache.update(key, value) sender() ! Succeeded case Get(key) => val result = cache.get(key).map(Found).getOrElse(NotFound) sender() ! result } }
val system = ActorSystem() val cacheActor = system.actorOf(CacheActor.props) cacheActor ! Set("one", 1) import akka.pattern.ask implicit val askTimeout = Timeout(3.seconds) (cacheActor ? Get("one")).mapTo[GetResponse].foreach(println) // Found(1)
(単純な例とするため、可変 Map をつかったキャッシュの例にしていますが、キャッシュはライブラリ含め、様々なアルゴリズムのものがスレッドセーフに提供されているので、いまいち例としては適切ではない気もします。)
また、ActorRef を介したインターフェースになっているため、容易に処理を並列化できます。
class CacheBroker extends Actor { import CacheActor._ val numOfCache = 10 val cacheActors = (1 to numOfCache).map(_ -> context.actorOf(Props(new CacheActor))).toMap override def receive: Receive = { case msg @ Set(key, _) => cacheActors(key.hashCode() % numOfCache) forward msg case msg @ Get(key) => cacheActors(key.hashCode() % numOfCache) forward msg } } object CacheActor { ... // 提供してきた I/F は変えずに内部のアクターのみを差し替える def props: Props = Props(new ChacheBroker) } ...
今回の例のように、キーに対して同じアクターにルーティングが必要になる等、機能によってルーティング要件があったりするため、そこは注意が必要です。
そうでない場合は、ランダムに割り当てたり、ルーティングアルゴリズムを持った Router も用意されているので、必要に応じてつかい分けることが可能です。
定期実行タスクを取り扱う
定期的に処理を実行しつつ、機能を他に提供するようなコンポーネントを書くことがあります。 このコンポーネントが状態を持っている場合、ロックを用い、定期処理と他のコンポーネントからの処理を排他的に取り扱わないと、競合状態に陥ることがあります。
アクターでは Scheduler の機能で定期実行を簡単に実装でき、さらにメッセージを一つずつ処理する特徴からロックフリーにこれらを取り扱うことができます。
AWS SQS からメッセージを Receive し、削除要求がくるまで定期的に VisibilityTimeout を更新するアクターの例です。 (エラーハンドリング等は省略しています)
case class SqsReceiverSettings( sqsClient: SqsAsyncClient, queueUrl: String, visibilityTimeout: FiniteDuration ) object SqsReceiverActor { case object Receive sealed trait ReceiveResponse case class Received(message: Message) extends ReceiveResponse case object EmptyReceive extends ReceiveResponse private case class ReplyResult(replyTo: ActorRef, received: Option[Message]) private case class ChangeVisibilityTimeout(receiptHandle: String) case class Delete(message: Message) def props(settings: SqsReceiverSettings): Props = Props(new SqsReceiverActor(settings)) } class SqsReceiverActor(settings: SqsReceiverSettings) extends Actor { import SqsReceiverActor._ import context.dispatcher val scheduler: Scheduler = context.system.scheduler import scala.collection.mutable val inProcessMessages: mutable.Set[String] = mutable.Set.empty[String] // 半分過ぎたら VisibilityTimeout を更新する val changeVisibilityTimeoutInterval: FiniteDuration = settings.visibilityTimeout / 2 private def schedule(msg: ChangeVisibilityTimeout): Unit = scheduler.scheduleOnce(changeVisibilityTimeoutInterval, context.self, msg) override def receive: Receive = { case Receive => val replyTo = sender() val request = ReceiveMessageRequest .builder() .queueUrl(settings.queueUrl) .maxNumberOfMessages(1) .visibilityTimeout(settings.visibilityTimeout.toSeconds.toInt) .build() settings.sqsClient .receiveMessage(request) .thenApply(_.messages().asScala.headOption) .thenApply(ReplyResult(replyTo, _)) .pipeTo(context.self) case ReplyResult(replyTo, None) => replyTo ! EmptyReceive case ReplyResult(replyTo, Some(m)) => inProcessMessages += m.receiptHandle() replyTo ! Received(m) schedule(ChangeVisibilityTimeout(m.receiptHandle())) case msg @ ChangeVisibilityTimeout(receiptHandle) if inProcessMessages.contains(receiptHandle) => val request = ChangeMessageVisibilityRequest .builder() .queueUrl(settings.queueUrl) .receiptHandle(receiptHandle) .visibilityTimeout(settings.visibilityTimeout.toSeconds.toInt) .build() settings.sqsClient.changeMessageVisibility(request).thenAccept(_ => schedule(msg)) case _: ChangeVisibilityTimeout => // 管理対象から外れたものは操作しない case Delete(m) => // 先に管理対象から外す. delete message に失敗しても, SQS の枠組みの中で再実行される inProcessMessages -= m.receiptHandle() val request = DeleteMessageRequest.builder().queueUrl(settings.queueUrl).receiptHandle(m.receiptHandle()).build() settings.sqsClient.deleteMessage(request) } }
サンプルのため責務が一つのアクターに集中していますが、適切に別のアクターに委譲することも可能です。
先の例もそうでしたが、アクターへのアクセスは ActorRef を介したメッセージでしかおこなえず、かつメッセージが一つずつの処理になるので、可変な状態を扱いやすいです。
ただ注意点として、メッセージを一つずつ処理するがゆえに、処理が追いつかず、メッセージがたまっていくと、スケジュールが破綻する可能性が出てきます。
この例では、ChangeVisibilityTimeout
メッセージの実際の処理が遅延し、実行前に VisibilityTimeout を超えてしまうようなケースが考えられます。
そのため、メールボックスのモニタリングや、必要に応じてアクターをスケールアウトできるようにしておくことが重要です。
まとめ
Akka には、非同期なアプリケーションを構築する上で便利なツールがたくさん含まれており、またテストもしやすくなるツールが揃っているので、便利につかい堅牢なアプリケーションを構築していきましょう。
ただし、利用するその特徴に応じて注意すべき点ももちろん出てくるので、アクターの挙動を把握し、達成したいこと(要件)と照らし合わせて良い設計実装をおこなっていきましょう。