Go中秘而不宣的数据结构 spmc, 10倍性能于 channel

Go 标准库和运行中中,有一些专门针对特定场景优化的数据结构,这些数据结构并没有暴露出来,这个系列就是逐一介绍这些数据结构。

这一次给大家介绍的就是一个 lock-free、高性能的单生产者多消费者的队列:PoolDequeuePoolChain
到底是一个还是两个呢?
主要是 PoolDequeue, 它是一个固定尺寸,使用 ringbuffer (环形队列) 方式实现的队列。
PoolChain 是在它的基础上上,实现的一个动态尺寸的队列。

生产者消费者模式是常见的一种并发模式,根据生产者的数量和消费者的数量,可以分为四种情况:

  • 单生产者-单消费者模式: spsc
  • 单生产者-多消费者模式: spmc
  • 多生产者-单消费者模式: mpsc
  • 多生产者-多消费者模式: mpmc

Channel 基本上可以看做是一种多生产者多消费者模式的队列。可以同时允许多个生产者发送数据,有可以允许多个消费者消费数据,它也可以应用在其他模式的场景,比如 rpc 包中的 oneshot 模式、通知情况下的的单生产者多消费者模式、rpc 和服务端单连接通讯时的消息处理,就是多生产者单消费者模式。

但是 Go 标准库的 sync 包下,有一个针对单生产者多消费者的数据结构,它是一个 lock-free 的数据结构,针对这个场景做了优化,被使用在 sync.Pool 中。

sync.Pool 采用了一种类似 Go 运行时调度的机制,针对每个 p 有一个 private 的数据,同时还有一个 shared 的数据,如果在本地 privateshared 中没有数据,就去其他 P 对应的 shared 去偷取。难么同时可能有多个 P 偷取同一个 shared, 这是多消费者。

同时对 shared 的写只有它隶属的 p 执行 Put 的时候才会发生:

1
2
3
4
5
6
7
l, _ := p.pin()
if l.private == nil {
l.private = x
} else {
l.shared.pushHead(x)
}
runtime_procUnpin()

这有属于单生产者模式。sync.Pool 使用了 PoolDequeuePoolChain 来做优化。

首先我们先来了解 poolDequeue

poolDequeue

poolDequeue 是一个 lock-free 的数据结构,必然会使用 atomic, 同时它要求必须使用单生产者,否则会有并发问题。消费者可以是并发多个,当然你用一个也没问题。

其中,生产者可以使用下面的方法:

  • pushHead: 在队列头部新增加一个数据。如果队列满了,增加失败
  • popHead: 在队列头部弹出一个数据。生产者总是弹出新增加的数据,除非队列为空

消费者可以使用下面的一个方法:

  • popTail: 从队尾处弹出一个数据,除非队列为空。所以消费者总是消费最老的数据,这也正好符合大部分的场景

接下来就是分析代码了,有点枯燥,你可以跳过。

代码分析

首先我们看这个struct的定义:

1
2
3
4
type poolDequeue struct {
headTail atomic.Uint64
vals []eface
}

这里有两个重要的字段:

  • headTail: 一个 atomic.Uint64 类型的字段,它的高 32 位是 head,低 32 位是 tailhead 是下一个要填充的位置,tail 是最老的数据的位置。
  • vals: 一个 eface 类型的切片,它是一个环形队列,大小必须是 2 的幂次方。

生产者增加数据的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (d *poolDequeue) pushHead(val any) bool {
ptrs := d.headTail.Load()
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// 队列满
return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]
// 检查 head slot 是否被 popTail 释放
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
// 另一个 goroutine 正在清理 tail,所以队列还是满的
return false
}
// 如果值为空,那么设置一个特殊值
if val == nil {
val = dequeueNil(nil)
}
// 队列头是空的,将数据写入 slot
*(*any)(unsafe.Pointer(slot)) = val // ①
// 增加 head,这样 popTail 就可以消费这个 slot 了
// 同时也是一个 store barrier,保证了 slot 的写入
d.headTail.Add(1 << dequeueBits)
return true
}

① 处会有并发问题吗?万一有两个 goroutine 同时执行到这里,会不会有问题?这里没有问题,因为要求只有一个生产者,不会有另外一个goroutine同时写这个槽位。

注意它还实现了packunpack方法,用于将 headtail 打包到一个 uint64 中,或者从 uint64 中解包出 headtail

消费者消费数据的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (d *poolDequeue) popTail() (any, bool) {
var slot *eface
for { // ②
ptrs := d.headTail.Load()
head, tail := d.unpack(ptrs)
if tail == head {
// 队列为空
return nil, false
}
// 确认头部和尾部(用于我们之前的推测性检查),并递增尾部。如果成功,那么我们就拥有了尾部的插槽。
ptrs2 := d.pack(head, tail+1)
if d.headTail.CompareAndSwap(ptrs, ptrs2) {
// 成功读取了一个 slot
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
// 剩下来就是读取槽位的值
val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) { // 如果本身就存储的nil
val = nil
}
// 释放 slot,这样 pushHead 就可以继续写入这个 slot 了
slot.val = nil // ③
atomic.StorePointer(&slot.typ, nil) // ④
return val, true
}

② 处是一个 for 循环,这是一个自旋的过程,直到成功读取到一个 slot 为止。在有大量的goroutine的时候,这里可能会是一个瓶颈点,但是少量的消费者应该还不算大问题。

③ 和 ④ 处是释放 slot 的过程,这样生产者就可以继续写入这个 slot 了。

生产者还可以调用popHead方法,用来弹出刚刚压入还没有消费的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (d *poolDequeue) popHead() (any, bool) {
var slot *eface
for {
ptrs := d.headTail.Load()
head, tail := d.unpack(ptrs)
if tail == head {
// 队列为空
return nil, false
}
// 确认头部和尾部(用于我们之前的推测性检查),并递减头部。如果成功,那么我们就拥有了头部的插槽。
head--
ptrs2 := d.pack(head, tail)
if d.headTail.CompareAndSwap(ptrs, ptrs2) {
// 成功取回了一个 slot
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// 释放 slot,这样 pushHead 就可以继续写入这个 slot 了
*slot = eface{}
return val, true
}

这是一个固定大小的队列,如果队列满了,生产者就会失败。这个队列的大小是 2 的幂次方,这样可以用 & 来取模,而不用 %,这样可以提高性能。

PoolChain

PoolChain 是在 PoolDequeue 的基础上实现的一个动态尺寸的队列,它的实现和 PoolDequeue 类似,只是增加了一个 headTail 的链表,用于存储多个 PoolDequeue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type poolChain struct {
// head 是生产者用来push的 poolDequeue。只有生产者访问,所以不需要同步
head *poolChainElt
// tail 是消费者用来pop的 poolDequeue。消费者访问,所以需要原子操作
tail atomic.Pointer[poolChainElt]
}
type poolChainElt struct {
poolDequeue
// next由生产者原子写入,消费者原子读取。它只能从nil转换为非nil。
// prev由消费者原子写入,生产者原子读取。它只能从非nil转换为nil。
next, prev atomic.Pointer[poolChainElt]
}

考虑到文章中代码过多,大家就会感觉很枯燥了,我就不具体展示代码了,你可以在 https://github.com/golang/go/blob/master/src/sync/poolqueue.go#L220-L302 查看具体的实现。
整体的思想就是将多个poolDequeue串联起来,生产者在head处增加数据,消费者在tail处消费数据,当tailpoolDequeue为空时,就从head处获取一个poolDequeue
head满了的时候,就增加一个新的poolDequeue
这样就实现了动态尺寸的队列。

sync.Pool中就是使用的PoolChain来实现的,它是一个单生产者多消费者的队列,可以同时有多个消费者消费数据,但是只有一个生产者生产数据。

为了能将这个数据结构暴露出来使用,我把相关的代码复制到 https://github.com/smallnest/exp/blob/master/gods/poolqueue.go , 增加了单元测试和性能测试的代码。

你可以学到这个方法,使用类似的技术,创建一个 look-free 无线长度的 byte buffer。在一些 Go 的网络优化库中就使用这种方法,避免频繁的 grow 和 copy 既有数据。

与channel的性能比较

我们来看一下poolDequeuePoolChainchannel的性能对比。
我们使用一个goroutine进行写入,10个goroutine进行读取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package gods
import (
"sync"
"testing"
)
func BenchmarkPoolDequeue(b *testing.B) {
const size = 1024
pd := NewPoolDequeue(size)
var wg sync.WaitGroup
// Producer
go func() {
for i := 0; i < b.N; i++ {
pd.PushHead(i)
}
wg.Done()
}()
// Consumers
numConsumers := 10
wg.Add(numConsumers + 1)
for i := 0; i < numConsumers; i++ {
go func() {
for {
if _, ok := pd.PopTail(); !ok {
break
}
}
wg.Done()
}()
}
wg.Wait()
}
func BenchmarkPoolChain(b *testing.B) {
pc := NewPoolChain()
var wg sync.WaitGroup
// Producer
go func() {
for i := 0; i < b.N; i++ {
pc.PushHead(i)
}
wg.Done()
}()
// Consumers
numConsumers := 10
wg.Add(numConsumers + 1)
for i := 0; i < numConsumers; i++ {
go func() {
for {
if _, ok := pc.PopTail(); !ok {
break
}
}
wg.Done()
}()
}
wg.Wait()
}
func BenchmarkChannel(b *testing.B) {
ch := make(chan interface{}, 1024)
var wg sync.WaitGroup
// Producer
go func() {
for i := 0; i < b.N; i++ {
ch <- i
}
close(ch)
wg.Done()
}()
// Consumers
numConsumers := 10
wg.Add(numConsumers + 1)
for i := 0; i < numConsumers; i++ {
go func() {
for range ch {
}
wg.Done()
}()
}
wg.Wait()
}

运行这个benchmark,我们可以看到poolDequeuePoolChain的性能要比channel高很多,大约是channel的10倍。
poolDequeuePoolChain 要好一些,性能是后者的两倍。