Go并发设计模式之 Active Object

设计模式曾经很火,尤其是1995年的时候Erich Gamma, Richard Helm, Ralph Johnson 和 John Vlissides (GoF)推出的《设计模式》一书,可谓经典。这本书总结了面向对象设计中最有价值的经验,并且用简洁可复用的形式表达出来。书中分类描述了23种经典实用的设计模式,这些实际模式依然在现在的实际开发中被广泛实用。

当然,这23种设计模式并不能涵盖所有的模式场景,同时,书中也没有对其它领域的一些设计模式进行归纳总结和介绍,比如并发场景,数据库设计、前端设计、架构模式等等,这个问题GoF中也已经进行了说明。二十几年过去了,期间也陆陆续续的出了一些介绍模式的书,有些是对GoF的23中设计模式的具体语言的介绍、阐述等等,也有一些设计模式的书,介绍了企业开发中的其它领域的设计模式,有一些书还是非常值得一读的。

我会写一系列介绍并发设计模式的文章,主要介绍实用Go语言去实现这些并发设计模式,但是我不想遵循介绍设计模式的模版,而是结合很多流行的Go的项目和库,从实践的角度去介绍这些并发设计模式。这种介绍方式一是可以让读者更容易的去理解设计模式,而不是拿一些老掉牙、根本不会使用的例子来介绍,二来可以坚定读者的信心,因为这些并发设计模式已经在流行的项目中使用了,得到了实际的检验。

作为开篇一章,我介绍的是 Active Object设计模式,为什么拿它作为第一篇呢,因为它的首字母是A,最大。

模式介绍

Active Object设计模式解耦了方法的调用和方法的执行,方法的调用和方法的执行运行在不同的线程之中(或者纤程、goroutine, 后面不再一一注释)。它引入了异步方法调用,允许应用程序可以并发的处理多个客户端的请求,通过调度器进行调用并发的方法执行,提供了并发执行方法的能力。

这个模式有时候也会叫做Concurrency ObjectActor设计模式。

很多程序会使用并发对象来提高它们的性能,例如并发地的处理客户端的请求,方法的调用和执行都在每个客户端的线程之中,并发对象也就存在于各个客户端线程之中,因为并发对象需要在各个线程之间共享,免不了要使用锁等同步方式控制并发对象的访问,这就要求我们为了保证服务的质量,需要设计程序满足:

  • 对并发对象的方法调用不应该阻塞完整的处理流程
  • 同步访问并发对象应该设计简单
  • 应用程序应该透明的使用软硬件的并发能力

而Active Object这个并发设计模式解耦了方法的调用和执行,但是客户端线程还像调用普通方法一样,方法调用自动转换成一个method request,交给另外一个处理线程,然后这个method request会在这个线程中被调用。

这种模式包含6个组件:

  • proxy: 定义了客户端要调用的Active Object接口。当客户端调用它的方法是,方法调用被转换成method request放入到scheduler的activation queue之中。
  • method request: 用来封装方法调用的上下文
  • activation queue:待处理的 method request队列
  • scheduler:一个独立的线程,管理activation queue,调度方法的执行
  • servant:active object的方法执行的具体实现,
  • future:当客户端调用方法时,一个future对象会立即返回,允许客户端可以获取返回结果。

一些正式的实现,比如一些Java程序的实现,可以严格的按照这些组件实现对应的类,而对于Go语言来讲,可能实现形式上略微不同,因为Go并不是严格意义上的面向对象的编程,而且Go的语言设计目标时简单,所以实现这个并发模式的时候,有时候你不必使用面向对象的设计来实现,使用函数、方法的形式更简洁。而且这种并发设计模式也有一些变种,比如使用callback代替future,或者在不需要返回值的情况下省略future。

"Sometimes, the elegant implementation is just a function. Not a method. Not a class. Not a framework. Just a function." - John Carmack

简单例子

首先我们看一个hello world一样一个简单的例子,再详细分析一个标准库中使用Active Object的例子。

1
2
3
4
5
6
7
8
9
10
11
type Service struct {
v int
}
func (s *Service) Incr() {
s.v++
}
func (s *Service) Decr() {
s.v--
}

上面这个例子Service对象并不是线程安全的,当多个goroutine并发调用的时候会有data race问题。当然你可以通过增加一个sync.Mutex的方式保证同步,对于这个例子来说,使用Mutex去保护比较简单,但是如果对于复杂的业务来说,并发控制将变得很难,并且性能上影响也会非常大。我们可以使用Active Object方式去实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type MethodRequest int
const (
Incr MethodRequest = iota
Decr
)
type Service struct {
queue chan MethodRequest
v int
}
func New(buffer int) *Service {
s := &Service{
queue: make(chan MethodRequest, buffer),
}
go s.schedule()
return s
}
func (s *Service) schedule() {
for r := range s.queue {
if r == Incr {
s.v++
} else {
s.v--
}
}
}
func (s *Service) Incr() {
s.queue <- Incr
}
func (s *Service) Decr() {
s.queue <- Decr
}

从上面这个简单的例子,你可以大致找到Active Object对应的组件。MethodRequest对应method request, Service对应proxy,schedule对应scheduler,Service.queue对应activation queue,因为不需要返回值,我们没有实现future。这里Service也对应servant,不像某些语言,为了保证面向对象的设计,以及接口和实现的分离,会定义很多的接口和对象,Go不一样,以简单为主,一个Service类型实现了多种角色,这也简化了Active Object设计模式的实现。

实际案例

在标准库中,有一个非常好的Active Object设计模式的例子,就是标准库net/rpc的Client的实现。

对于一个rpc服务Arith来说,

1
2
3
4
5
6
7
8
9
10
type Args struct {
A, B int
}
type Arith int
func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}

它的客户端是线程安全的,可以在多个goroutine中并发的调用,通过一个tcp connection和服务器端调用:

1
2
3
4
5
6
7
8
args := &server.Args{7,8}
var reply int
call := client.Go("Arith.Multiply", args, &reply, nil)
<- call.Done
if call.Error != nil {
log.Fatal("arith error:", call.Error)
}
fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)

那么它又是如何实现的Active Object模式的呢?

方法调用

Client提供了Go方法实现异步的方法调用。

它将上下文(请求参数和返回)封装成一个Call对象, call对象的done字段提供了future的功能。你可以利用它获取方法是否已经执行完毕。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}

然后调用client.send将这个call对象放入到待处理处理队列中(实际实现需要处理并发放入的问题,以及异常情况error的处理):

1
2
3
4
5
6
7
func (client *Client) send(call *Call) {
......
seq := client.seq
client.seq++
client.pending[seq] = call
......
}

实际这个send要更复杂一点,它还会把请求发送给服务端,所以严格意义上来讲,它做了一些方法执行的逻辑。如果网络有问题,就可以快速地返回。

调度

Client初始化的时候,就会启动一个goroutine去处理Client.input

Client.input是独立于调用goroutine的一个单独的goroutine,它不断的从服务器读取消息,处理异常和正常的返回,并找到对应的Call对象。

它会调用Call.done方法提供给调用者一个调用完成的信号,客户端可以监控这个channel感知到方法调用是否完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
switch {
case call == nil:
......
case response.Error != "":
......
call.done()
default:
......
call.done()
}
}
......
}
func (call *Call) done() {
select {
case call.Done <- call:
// ok
default:
......
}
}

可以看到,net/rpc相对于标准的Active Object有所简化,其中相当于Active ObjectClient具有多个角色,他是这个设计模式的核心。通过将方法调用的上下文封装成一个Call对象,客户端可以像传统方法调用一样异步去处理,客户端并不需要理会内部的复杂的处理逻辑。针对这种网络访问的场景,Client又将部分方法执行的逻辑放在了方法调用send里面,作为guard condition可以快速返回。

我也将这种模式应用在rpcx的客户端调用上。

当然这种模式也仅限于网络调用上,比如服务端的单一进程的程序中也可以应用。由于Go语言的先天的并发处理的优势,很多情况下我们都request-per-goroutine + mutex/shared object 的方式去处理。

同步调用也很简单,可以很容易的基于异步调用Go实现同步调用Call:

1
2
3
4
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}

参考文档

  1. https://en.wikipedia.org/wiki/Active_object
  2. http://www.laputan.org/pub/sag/act-obj.pdf
  3. http://software-pattern.org/Book/29
  4. https://rosettacode.org/wiki/Active_object#Go
  5. http://wiki.c2.com/?ActiveObject
  6. http://www.orizondo.org/2011-12-3/refs/uml/activeobjects.pdf