kubell Creator's Note

ビジネスチャット「Chatwork」のエンジニアのブログです。

ビジネスチャット「Chatwork」のエンジニアのブログです。

読者になる

Akka Actors を便利につかう

Chatwork アドベントカレンダー および Scala アドベントカレンダー の 9 日目の記事です。(盛大に遅刻しました 🙇‍♂️ )

こんにちは。hayasshi です。 サーバーサイド開発部で Scala プロダクトの開発運用保守をしています。

Akka は分散並列処理のためのツールキットです。 クラスタリング、シャーディングの機能をつかい CQRS + ES なシステムを構築したり、ストリーム処理を構築するための機能と、それ向けの様々なミドルウェアとのコネクタをつかい、容易にストリームアプリケーションを構築できたりします。

それらの機能の土台として、Akka Actors という アクターモデル を実装しているモジュールがあります。 Akka Actors (アクターモデル) には、その考え方からくるいくつかの特徴があります。

Akka Actors をつかえば、クラスタリング処理やストリーム処理だけでなく、その特徴をつかって便利に処理を書くことが可能です。

Akka Actors の特徴

  • 他のコンポーネントとのやりとりはメッセージのみの Shared-Nothing モデル
  • 基本的に非同期で、メールボックスを介したメッセージの直列処理
  • ActorRef を介した位置透過性、インターフェースからの独立性

もちろん他にもありますが、今回は特にこれらの特徴を利用します。

他の特徴や Akka 自体が気になる方は、Akka 実践バイブルがおすすめです。 サンプルコードが Classic Actors と呼ばれる、メッセージングに型がない旧来の書き方ですが、Akka やアクターモデルの設計思想を学ぶのには最適な本になっています。

今回のコードと確認バージョン

今回の全てのサンプルコードはこちらにあります。

私も Classic Actors のほうが扱い慣れているため、今回はそちらで記述しています。 ですが、アクター自体の考え方、特徴はかわらないので、Akka Actors (Typed Actors) でも同様に取り扱えると思われます。(いまは Typed が主流となっています)

記事を書いた時点での確認バージョンは下記になります。

scalaVersion := "2.13.7",
libraryDependencies ++= Seq(
  "org.scalatest"     %% "scalatest"    % "3.2.10" % Test,
  "com.typesafe.akka" %% "akka-actor"   % "2.6.17",
  "com.typesafe.akka" %% "akka-testkit" % "2.6.17" % Test,
)

スレッドアンセーフなモジュールを取り扱う

上述した特徴の通り、個々のアクターはメールボックスにあるメッセージが一つずつ割り当てられて処理を行います。これにより、アクター内部のフィールドが複数のスレッドから操作されることがありません。

この特徴をつかい、スレッドセーフではない(スレッドアンセーフな)コンポーネントを、並列処理から安全に取り扱うことができます。

object CacheActor {
  case class Set(key: String, value: Int)
  sealed trait SetResponse
  case object Succeeded extends SetResponse

  case class Get(key: String)
  sealed trait GetResponse
  case class Found(value: Int) extends GetResponse
  case object NotFound extends GetResponse

  def props: Props = Props(new CacheActor)
}

class CacheActor extends Actor {
  import CacheActor._
  import scala.collection.mutable

  val cache: mutable.Map[String, Int] = mutable.Map.empty[String, Int]

  override def receive: Receive = {
    case Set(key, value) =>
      cache.update(key, value)
      sender() ! Succeeded
    case Get(key) =>
      val result = cache.get(key).map(Found).getOrElse(NotFound)
      sender() ! result
  }
}
val system = ActorSystem()
val cacheActor = system.actorOf(CacheActor.props)

cacheActor ! Set("one", 1)

import akka.pattern.ask
implicit val askTimeout = Timeout(3.seconds)
(cacheActor ? Get("one")).mapTo[GetResponse].foreach(println) // Found(1)

(単純な例とするため、可変 Map をつかったキャッシュの例にしていますが、キャッシュはライブラリ含め、様々なアルゴリズムのものがスレッドセーフに提供されているので、いまいち例としては適切ではない気もします。)

また、ActorRef を介したインターフェースになっているため、容易に処理を並列化できます。

class CacheBroker extends Actor {
  import CacheActor._

  val numOfCache = 10

  val cacheActors = (1 to numOfCache).map(_ -> context.actorOf(Props(new CacheActor))).toMap

  override def receive: Receive = {
    case msg @ Set(key, _) =>
      cacheActors(key.hashCode() % numOfCache) forward msg
    case msg @ Get(key) =>
      cacheActors(key.hashCode() % numOfCache) forward msg
  }
}

object CacheActor {
  ...
  // 提供してきた I/F は変えずに内部のアクターのみを差し替える
  def props: Props = Props(new ChacheBroker)
}
...

今回の例のように、キーに対して同じアクターにルーティングが必要になる等、機能によってルーティング要件があったりするため、そこは注意が必要です。

そうでない場合は、ランダムに割り当てたり、ルーティングアルゴリズムを持った Router も用意されているので、必要に応じてつかい分けることが可能です。

Typed Routers
Classic Routing

定期実行タスクを取り扱う

定期的に処理を実行しつつ、機能を他に提供するようなコンポーネントを書くことがあります。 このコンポーネントが状態を持っている場合、ロックを用い、定期処理と他のコンポーネントからの処理を排他的に取り扱わないと、競合状態に陥ることがあります。

アクターでは Scheduler の機能で定期実行を簡単に実装でき、さらにメッセージを一つずつ処理する特徴からロックフリーにこれらを取り扱うことができます。

AWS SQS からメッセージを Receive し、削除要求がくるまで定期的に VisibilityTimeout を更新するアクターの例です。 (エラーハンドリング等は省略しています)

case class SqsReceiverSettings(
    sqsClient: SqsAsyncClient,
    queueUrl: String,
    visibilityTimeout: FiniteDuration
)

object SqsReceiverActor {
  case object Receive
  sealed trait ReceiveResponse
  case class Received(message: Message) extends ReceiveResponse
  case object EmptyReceive              extends ReceiveResponse

  private case class ReplyResult(replyTo: ActorRef, received: Option[Message])

  private case class ChangeVisibilityTimeout(receiptHandle: String)

  case class Delete(message: Message)
  
  def props(settings: SqsReceiverSettings): Props = Props(new SqsReceiverActor(settings))
}

class SqsReceiverActor(settings: SqsReceiverSettings) extends Actor {
  import SqsReceiverActor._
  import context.dispatcher

  val scheduler: Scheduler = context.system.scheduler

  import scala.collection.mutable
  val inProcessMessages: mutable.Set[String] = mutable.Set.empty[String]

  // 半分過ぎたら VisibilityTimeout を更新する
  val changeVisibilityTimeoutInterval: FiniteDuration = settings.visibilityTimeout / 2

  private def schedule(msg: ChangeVisibilityTimeout): Unit =
    scheduler.scheduleOnce(changeVisibilityTimeoutInterval, context.self, msg)

  override def receive: Receive = {
    case Receive =>
      val replyTo = sender()
      val request =
        ReceiveMessageRequest
          .builder()
          .queueUrl(settings.queueUrl)
          .maxNumberOfMessages(1)
          .visibilityTimeout(settings.visibilityTimeout.toSeconds.toInt)
          .build()
      settings.sqsClient
        .receiveMessage(request)
        .thenApply(_.messages().asScala.headOption)
        .thenApply(ReplyResult(replyTo, _))
        .pipeTo(context.self)

    case ReplyResult(replyTo, None) =>
      replyTo ! EmptyReceive

    case ReplyResult(replyTo, Some(m)) =>
      inProcessMessages += m.receiptHandle()
      replyTo ! Received(m)
      schedule(ChangeVisibilityTimeout(m.receiptHandle()))

    case msg @ ChangeVisibilityTimeout(receiptHandle) if inProcessMessages.contains(receiptHandle) =>
      val request =
        ChangeMessageVisibilityRequest
          .builder()
          .queueUrl(settings.queueUrl)
          .receiptHandle(receiptHandle)
          .visibilityTimeout(settings.visibilityTimeout.toSeconds.toInt)
          .build()
      settings.sqsClient.changeMessageVisibility(request).thenAccept(_ => schedule(msg))

    case _: ChangeVisibilityTimeout => // 管理対象から外れたものは操作しない

    case Delete(m) =>
      // 先に管理対象から外す. delete message に失敗しても, SQS の枠組みの中で再実行される
      inProcessMessages -= m.receiptHandle()
      val request =
        DeleteMessageRequest.builder().queueUrl(settings.queueUrl).receiptHandle(m.receiptHandle()).build()
      settings.sqsClient.deleteMessage(request)
  }
}

サンプルのため責務が一つのアクターに集中していますが、適切に別のアクターに委譲することも可能です。

先の例もそうでしたが、アクターへのアクセスは ActorRef を介したメッセージでしかおこなえず、かつメッセージが一つずつの処理になるので、可変な状態を扱いやすいです。

ただ注意点として、メッセージを一つずつ処理するがゆえに、処理が追いつかず、メッセージがたまっていくと、スケジュールが破綻する可能性が出てきます。 この例では、ChangeVisibilityTimeout メッセージの実際の処理が遅延し、実行前に VisibilityTimeout を超えてしまうようなケースが考えられます。

そのため、メールボックスのモニタリングや、必要に応じてアクターをスケールアウトできるようにしておくことが重要です。

まとめ

Akka には、非同期なアプリケーションを構築する上で便利なツールがたくさん含まれており、またテストもしやすくなるツールが揃っているので、便利につかい堅牢なアプリケーションを構築していきましょう。

ただし、利用するその特徴に応じて注意すべき点ももちろん出てくるので、アクターの挙動を把握し、達成したいこと(要件)と照らし合わせて良い設計実装をおこなっていきましょう。