akka actor的运行原理

最近在调研Scala web框架的性能时遇到一些问题, 比如生成巨多的Actor,GC时间过长,CPU使用率太高, 执行Actor的Receive是遇到耗时操作的问题等。怀疑Akka的调度器有些问题,特意整理了一些Akka调度器的背景知识,以及从源代码分析一下Actor是怎么执行地。

Dispatcher

Akka MessageDispatcher驱动Akka actor运行(tick),也可以说是这个机器的引擎。所有的MessageDispatcher都实现了ExecutionContext trait, 这意味着它们可以用来执行任何代码, 例如 Future.

如果对Actor不做额外配置的话,ActorSystem会使用一个缺省的Dispatcher。缺省的Dispatcher也可以进行参数调整,缺省它使用一个特定的default-executor。如果ActorSystem在创建时传入一个ExecutionContext,则此ExecutionContext 将作为此ActorSystem的所有Dispatcher的缺省executor。缺省的default-executorfork-join-executor,在大部分情况下它的性能还是不错的。

可以通过下面的代码得到一个配置的Dispatcher:

1
2
// for use with Futures, Scheduler, etc.
implicit val executionContext = system.dispatchers.lookup("my-dispatcher")

为Actor设置Dispatcher

如果你希望为你的 Actor 设置非缺省的派发器,你需要做两件事:
首先要配置dispatcher:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
my-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}

或者配置使用thread-pool-executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
my-thread-pool-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "thread-pool-executor"
# Configuration for the thread pool
thread-pool-executor {
# minimum number of threads to cap factor-based core number to
core-pool-size-min = 2
# No of core threads ... ceil(available processors * factor)
core-pool-size-factor = 2.0
# maximum number of threads to cap factor-based number to
core-pool-size-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}

这样,你就可以配置Actor使用图个特定的disptacher:

1
2
import akka.actor.Props
val myActor = context.actorOf(Props[MyActor], "myactor")
1
2
3
4
5
akka.actor.deployment {
/myactor {
dispatcher = my-dispatcher
}
}

或者用另外一种方式,可以在代码中指定dispatcher:

1
2
3
import akka.actor.Props
val myActor =
context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1")

dispatcher的类型

  • Dispatcher

    • 可共享性: 无限制
    • 邮箱: 任何一种类型,为每一个Actor创建一个
    • 使用场景: 缺省派发器,Bulkheading
    • 底层使用: java.util.concurrent.ExecutorService
      可以指定“executor”使用“fork-join-executor”, “thread-pool-executor” 或者 the FQCN(类名的全称) of an akka.dispatcher.ExecutorServiceConfigurator
  • PinnedDispatcher

    • 可共享性: 无
    • 邮箱: 任何一种类型,为每个Actor创建一个
    • 使用场景: Bulkheading
    • 底层使用: 任何 akka.dispatch.ThreadPoolExecutorConfigurator
      缺省为一个 “thread-pool-executor”
  • BalancingDispatcher

    • 可共享性: 仅对同一类型的Actor共享
    • 邮箱: 任何,为所有的Actor创建一个
    • 使用场景: Work-sharing
    • 底层使用: java.util.concurrent.ExecutorService
      指定使用 “executor” 使用 “fork-join-executor”, “thread-pool-executor” 或 the FQCN(类名的全称) of an akka.dispatcher.ExecutorServiceConfigurator
  • CallingThreadDispatcher

    • 可共享性: 无限制
    • 邮箱: 任何,每Actor每线程创建一个(需要时)
    • 使用场景: 仅为测试使用
    • 底层使用: 调用的线程 (duh)

邮箱

kka Mailbox 保存发往某 Actor的消息. 通常每个 Actor 拥有自己的邮箱, 但是如果是使用 BalancingDispatcher 使用同一个 BalancingDispatcher 的所有Actor共享同一个邮箱实例.
内置的邮箱的类型:

  • UnboundedMailbox - 缺省邮箱
  • SingleConsumerOnlyUnboundedMailbox
  • BoundedMailbox
  • NonBlockingBoundedMailbox
  • UnboundedPriorityMailbox
  • BoundedPriorityMailbox
  • UnboundedStablePriorityMailbox
  • BoundedStablePriorityMailbox
  • UnboundedControlAwareMailbox
  • BoundedControlAwareMailbox

工作原理

我们只看本地(同一个JVM进程)的ActRef: LocalActorRef,它定义了send (!)方法:

1
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = actorCell.sendMessage(message, sender)

actorCell实现了akka.actor.dungeon.Dispatch trait。它实现了具体的message的发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def sendMessage(msg: Envelope): Unit =
try {
if (system.settings.SerializeAllMessages) {
val unwrapped = (msg.message match {
case DeadLetter(wrapped, _, _) ⇒ wrapped
case other ⇒ other
}).asInstanceOf[AnyRef]
if (!unwrapped.isInstanceOf[NoSerializationVerificationNeeded]) {
val s = SerializationExtension(system)
s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get
}
}
dispatcher.dispatch(this, msg)
} catch handleException

可以看到,还是交给dispatcher.dispatch进行消息的分发。
看具体的实现类Dispatcher.dispatch:

1
2
3
4
5
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
registerForExecution(mbox, true, false)
}

将消息放在对应的actor的邮箱中后,就会调用registerForExecution方法。
这个方法最重要的一行就是执行mbox,因为mbox实现了ForkJoinTaskRunnable接口。 (如果执行失败,还可能执行一次)

1
executorService execute mbox

其实是将mbox放到线程池中执行。
mbox并不是一次全部执行完的,而是有throughput参数确定。每次只执行throughput个消息,执行完会加入到线程池等待队列中,除非全部执行完毕。
因此当throughput=1的时候对actor来说比较“公平”,这样actor能平均的执行。

由此得出几个结论:

  • 可以调解线程池的大小进行调优
  • 具体的dispatcher实现该如何执行actor。 比如我们可以实现一个优先级队列来执行优先级比较高的Actor。
  • 如果一个Actor执行比较耗时的操作,比如IO操作,就会影响线程池的执行,造成整体吞吐率下降。所以为这些耗时的Actor配置专门的线程池
  • Akka会中断一个Actor而去执行别的actor吗,然后回来继续执行先前的Actor? 答案是不会。 因为一旦Actor交给线程池,线程就会去执行它。 如果你在Actor中sleep线程,会导致线程池中的此线程sleep。 所以你必须想一些办法,比如一个长的业务逻辑分成几个业务逻辑,每次只执行一个业务逻辑,通过状态变换分成多次执行。
  • 如果没有其它情况,线程会执行完actor 邮箱中的一部分消息,如果还有消息,会将此邮箱再放入线程池等待执行,直到没有待处理的消息为止。

参考资料

  1. Dispatchers
  2. tuning dispatchers in akka applications
  3. https://www.zybuluo.com/MiloXia/note/80283
  4. https://github.com/akka/akka/tree/master/akka-actor/src/main/scala/akka/dispatch
  5. http://www.gtan.com/akka_doc/scala/dispatchers.html
评论