最近在调研Scala web框架的性能时遇到一些问题, 比如生成巨多的Actor,GC时间过长,CPU使用率太高, 执行Actor的Receive是遇到耗时操作的问题等。怀疑Akka的调度器有些问题,特意整理了一些Akka调度器的背景知识,以及从源代码分析一下Actor是怎么执行地。
Dispatcher
Akka MessageDispatcher驱动Akka actor运行(tick),也可以说是这个机器的引擎。所有的MessageDispatcher都实现了ExecutionContext trait, 这意味着它们可以用来执行任何代码, 例如 Future.
如果对Actor不做额外配置的话,ActorSystem
会使用一个缺省的Dispatcher。缺省的Dispatcher也可以进行参数调整,缺省它使用一个特定的default-executor
。如果ActorSystem
在创建时传入一个ExecutionContext,则此ExecutionContext 将作为此ActorSystem
的所有Dispatcher的缺省executor
。缺省的default-executor
是fork-join-executor
,在大部分情况下它的性能还是不错的。
可以通过下面的代码得到一个配置的Dispatcher:
|
|
为Actor设置Dispatcher
如果你希望为你的 Actor 设置非缺省的派发器,你需要做两件事:
首先要配置dispatcher:
|
|
或者配置使用thread-pool-executor
|
|
这样,你就可以配置Actor使用图个特定的disptacher:
|
|
|
|
或者用另外一种方式,可以在代码中指定dispatcher:
|
|
dispatcher的类型
Dispatcher
- 可共享性: 无限制
- 邮箱: 任何一种类型,为每一个Actor创建一个
- 使用场景: 缺省派发器,Bulkheading
- 底层使用:
java.util.concurrent.ExecutorService
可以指定“executor”使用“fork-join-executor”, “thread-pool-executor” 或者 the FQCN(类名的全称) of an akka.dispatcher.ExecutorServiceConfigurator
PinnedDispatcher
- 可共享性: 无
- 邮箱: 任何一种类型,为每个Actor创建一个
- 使用场景: Bulkheading
- 底层使用: 任何 akka.dispatch.ThreadPoolExecutorConfigurator
缺省为一个 “thread-pool-executor”
BalancingDispatcher
- 可共享性: 仅对同一类型的Actor共享
- 邮箱: 任何,为所有的Actor创建一个
- 使用场景: Work-sharing
- 底层使用: java.util.concurrent.ExecutorService
指定使用 “executor” 使用 “fork-join-executor”, “thread-pool-executor” 或 the FQCN(类名的全称) of an akka.dispatcher.ExecutorServiceConfigurator
CallingThreadDispatcher
- 可共享性: 无限制
- 邮箱: 任何,每Actor每线程创建一个(需要时)
- 使用场景: 仅为测试使用
- 底层使用: 调用的线程 (duh)
邮箱
kka Mailbox 保存发往某 Actor的消息. 通常每个 Actor 拥有自己的邮箱, 但是如果是使用 BalancingDispatcher 使用同一个 BalancingDispatcher 的所有Actor共享同一个邮箱实例.
内置的邮箱的类型:
- UnboundedMailbox - 缺省邮箱
- SingleConsumerOnlyUnboundedMailbox
- BoundedMailbox
- NonBlockingBoundedMailbox
- UnboundedPriorityMailbox
- BoundedPriorityMailbox
- UnboundedStablePriorityMailbox
- BoundedStablePriorityMailbox
- UnboundedControlAwareMailbox
- BoundedControlAwareMailbox
工作原理
我们只看本地(同一个JVM进程)的ActRef: LocalActorRef
,它定义了send (!
)方法:
|
|
actorCell
实现了akka.actor.dungeon.Dispatch
trait。它实现了具体的message的发送:
|
|
可以看到,还是交给dispatcher.dispatch进行消息的分发。
看具体的实现类Dispatcher.dispatch
:
|
|
将消息放在对应的actor的邮箱中后,就会调用registerForExecution
方法。
这个方法最重要的一行就是执行mbox,因为mbox实现了ForkJoinTask
和Runnable
接口。 (如果执行失败,还可能执行一次)
|
|
其实是将mbox放到线程池中执行。
mbox并不是一次全部执行完的,而是有throughput
参数确定。每次只执行throughput
个消息,执行完会加入到线程池等待队列中,除非全部执行完毕。
因此当throughput=1
的时候对actor来说比较“公平”,这样actor能平均的执行。
由此得出几个结论:
- 可以调解线程池的大小进行调优
- 具体的dispatcher实现该如何执行actor。 比如我们可以实现一个优先级队列来执行优先级比较高的Actor。
- 如果一个Actor执行比较耗时的操作,比如IO操作,就会影响线程池的执行,造成整体吞吐率下降。所以为这些耗时的Actor配置专门的线程池
- Akka会中断一个Actor而去执行别的actor吗,然后回来继续执行先前的Actor? 答案是不会。 因为一旦Actor交给线程池,线程就会去执行它。 如果你在Actor中sleep线程,会导致线程池中的此线程sleep。 所以你必须想一些办法,比如一个长的业务逻辑分成几个业务逻辑,每次只执行一个业务逻辑,通过状态变换分成多次执行。
- 如果没有其它情况,线程会执行完actor 邮箱中的一部分消息,如果还有消息,会将此邮箱再放入线程池等待执行,直到没有待处理的消息为止。