经典并发问题: 理发店的故事

马上就二月二了,理发店就要忙活起来了,托尼、凯文、艾伦等老师的理发剪磨刀霍霍,把椅子擦亮,准备各位顾客的到来。

Sleeping barber problem是一个经典的goroutine交互和并发控制的问题,可以很好的用来演示多写多读的并发问题(multiple writers multiple readers)。

理发店问题最早是由计算机科学先驱 Edsger Dijkstra 在1965指出的,在 Silberschatz 和 Galvin 的 Operating Systems Concepts一书中有此问题的变种。

这个问题是这样子的。
有这么一个理发店,有一位理发师和几个让顾客等待的座位:

  • 如果没有顾客,这位理发师就躺在理发椅上睡觉
  • 顾客必须唤醒理发师让他开始理发
  • 如果一位顾客到来,理发师正在理发
    • 如果还有顾客用来等待的座位,则此顾客坐下
    • 如果座位都满了,则此顾客离开
  • 理发师理完头发后,需要检查是否还有等待的顾客
    • 如果有,则请一位顾客起来开始理发
    • 如果没有,理发师则去睡觉

虽然条件很多,我们可以把它想象成为一个并发的queue。在当前的问题下,有多个并发写(multiple writer, 顾客),一个并发读(single reader),对吧。

使用sync.Cond实现

一般,先前,我们处理并发队列通过使用sync.Cond这个并发原语(在java语言中,我们一般使用wait/notify)。

sync.Cond这个并发原语而言,它并不是一个很容易使用的对象。因为它还需要一个一个sync.Locker配合使用,并且相关的方法要么必须使用Locker、要么可以省略Locker,容易让人迷惑,等待的goroutine被唤醒时还需要检查判断条件,所以用起来总是需要小心翼翼地。

首先,我们定义一个Locker和一个Cond,并且定义顾客等待的座位数。
来了一位顾客,座位数加一,Tony老师叫起一位等待的顾客开始理发时,座位数减一。

1
2
3
4
5
var (
seatsLock sync.Mutex
seats int
cond = sync.NewCond(&seatsLock)
)

理发师的工作就是不断的检查是否有顾客等待,如果有,就交起一位顾客开始理发,理发耗时是随机的,理完再去叫下一位顾客。如果没有顾客,那么理发师就会被阻塞(开始睡觉)。
逐一这里Cond的使用方法,Wait之后需要for循环检查条件是否满足,并且Wait上下会有Locker的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 理发师
func barber() {
for {
// 等待一个用户
log.Println("Tony老师尝试请求一个顾客")
seatsLock.Lock()
for seats == 0 {
cond.Wait()
}
seats--
seatsLock.Unlock()
log.Println("Tony老师找到一位顾客,开始理发")
randomPause(2000)
}
}

customers模拟陆陆续续的顾客的到来:

1
2
3
4
5
6
func customers() {
for {
randomPause(1000)
go customer()
}
}

顾客到来之后,先请求seatsLock, 避免多个顾客同时进来的并发竞争。然后他会检查是否有空闲的座位,如果有则坐下并通知理发师。此时理发师如果睡眠则会被唤醒,如果正在理发会忽略。
如果没有空闲的座位则离开。

1
2
3
4
5
6
7
8
9
10
11
12
13
func customer() {
seatsLock.Lock()
defer seatsLock.Unlock()
if seats == 3 {
log.Println("没有空闲座位了,一位顾客离开了")
return
}
seats++
cond.Broadcast()
log.Println("一位顾客开始坐下排队理发")
}

这里简单的是由整数seats代表空闲的座位,实际处理这样的场景的时候,你可能会使用一个slice作为队列。

本身这个实现还是很简单的,但是Cond+Locker的使用方式还是感觉让人有那么一点不放心,事实上,很多这样的场景,我们可以使用channel来实现。

使用Channel实现Semaphore

我们可以使用Channel实现一个Semaphore来解决理发店问题。Semaphore实现如下,和我们水分子生成的问题中的实现是类似的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Semaphore chan struct{}
func (s Semaphore) Acquire() {
s <- struct{}{}
}
func (s Semaphore) TryAcquire() bool {
select {
case s <- struct{}{}: // 还有空位子
return true
default: // 没有空位子了,离开
return false
}
}
func (s Semaphore) Release() {
<-s
}

有了这个并发原语,我们就容易解决理发店问题了。注意这里我们实现了TryAcquire,就是为了顾客到来的是否检查有没有空闲的座位。
这里为什么不使用Go官方扩展的semaphore.Weighted并发原语呢,是因为semaphore.Weighted有个问题,在Accquire之前调用Release方法的话会panic,所以我们自己实现了一个。

我们定义了有三个等待座位的信号量。Tony老师先调用Release方法,也就是想从座位上请一位顾客过来理发,以便空出一个等待座位。如果没有顾客,Tony就会无奈的等待和睡觉了。

1
2
3
4
5
6
7
8
9
10
11
12
13
var seats = make(Semaphore, 3)
// 理发师
func barber() {
for {
// 等待一个用户
log.Println("Tony老师尝试请求一个顾客")
seats.Release()
log.Println("Tony老师找到一位顾客,开始理发")
randomPause(2000)
}
}

顾客的检查也更简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 模拟顾客陆陆续续的过来
func customers() {
for {
randomPause(1000)
go customer()
}
}
// 顾客
func customer() {
if ok := seats.TryAcquire(); ok {
log.Println("一位顾客开始坐下排队理发")
} else {
log.Println("没有空闲座位了,一位顾客离开了")
}
}

可以看到,如果我们使用自定义的Semaphore话,代码会变得更加的简单。

那么这种channel实现的Semaphore有什么缺陷吗?那就是如果队列的长度太大的话,channel的容量就会很大。不过如果类型设置为strcut{}类型的话,就会节省很多的内存,所以一般也不会有什么问题,虽然比官方扩展的计数器方式的semaphore.Weighted多占用一些空间,但是占用的空间还是有限的。

更进一步,尽然我们要实现一个队列,其实可以通过这个channel来实现,把顾客类型替换struct{}类型,这样就没有额外多余的占用了。

多理发师的情况

更进一步,我们考虑多个理发师的情况。

多个理发师的问题其实就演变成了多写(multiple writer)多读(multiple reader)的场景了。

托尼、凯文和艾伦是理发界的三大巨头,我们演示三位理发师并发理发的场景,同时理发店的规模也扩大了,有10个可以等待的座位。

基于channel实现的Semaphore的解决方案,多个理发师的场景和单个理发师的场景是一样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package main
import (
"log"
"math/rand"
"os"
"os/signal"
"syscall"
"time"
)
type Semaphore chan struct{}
func (s Semaphore) Acquire() {
s <- struct{}{}
}
func (s Semaphore) TryAcquire() bool {
select {
case s <- struct{}{}: // 还有空位子
return true
default: // 没有空位子了,离开
return false
}
}
func (s Semaphore) Release() {
<-s
}
var seats = make(Semaphore, 10)
func main() {
// 托尼、凯文、艾伦理发师三巨头
go barber("Tony")
go barber("Kevin")
go barber("Allen")
go customers()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
}
func randomPause(max int) {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(max)))
}
// 理发师
func barber(name string) {
for {
// 等待一个用户
log.Println(name + "老师尝试请求一个顾客")
seats.Release()
log.Println(name + "老师找到一位顾客,开始理发")
randomPause(2000)
}
}
// 模拟顾客陆陆续续的过来
func customers() {
for {
randomPause(1000)
go customer()
}
}
// 顾客
func customer() {
if ok := seats.TryAcquire(); ok {
log.Println("一位顾客开始坐下排队理发")
} else {
log.Println("没有空闲座位了,一位顾客离开了")
}
}

全部经典并发问题的代码可以参考smallnest/dive-to-gosync-workshop。下一次我们再分析一个更加复杂的理发店问题。