队列(queue
)是非常常用的一个数据结构,它只允许在表的前端(head
)进行出队(dequeue
)操作,而在表的后端(tail
)进行入队(enqueue
)操作。和栈数据结构一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾(tail
),进行删除操作的端称为队头(header
)。
在并发环境中使用队列,就必须考虑到多线程(多纤程)并发读写的问题,可能存在多个写(入队)操作线程,同时也可能存在多个线程读操作线程,在这种情况下,我们要保证数据的不丢失,不重复,而且也要保证队列的功能不变,也就是先入先出的逻辑,只要存在数据,就可以出列。
诚然,通过一个排外锁可以实现队列的并发访问。一般实现队列的时候通过指针,而且只在队头队尾操作,所以这种排外锁保护的临界区并没有很复杂的执行逻辑,临界区的处理很快,所以一般情况下通过排外锁实现队列的效率已经很高了。但是在一些情况下,通过实现 lock-free 算法,我们可以进一步提升并发队列的性能。
本文介绍 lock-free queue 算法的一些背景知识,并实现了三种并发队列,并提供了性能测试的结果。
代码库可以在github上找到: smallnest/queue 。
lock-free queue 算法
说起 lock-free queue 算法,不得不提到 Maged M. Michael 和 Michael L. Scott 1996年发表的论文 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms ,这篇文章回顾了并发队列的一些实现以及局限性,提出了一种非常简洁的lock-free queue的实现,并且还提供了一个在特定机器比如不存在CAS指令的机器上的two-lock queue算法。这篇文章的被引用次数将近1000次。
只得一提的是, Java中的ConcurrentLinkedQueue就是基于这个算法实现的:
This implementation employs an efficient non-blocking algorithm based on one described in Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael and Michael L. Scott.
大部分lock-free的算法都是通过CAS
操作实现的。
这篇文章提供了一个lock-free queue算法的伪代码,代码量也非常少,所以很容易通过各种编程语言实现。在这里我把伪代码列在这里:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
structure pointer_t {ptr: pointer to node_t, count: unsigned integer }
structure node_t {value: data type , next: pointer_t}
structure queue_t {Head: pointer_t, Tail: pointer_t}
initialize(Q: pointer to queue_t)
node = new_node()
node-> next. ptr = NULL
Q-> Head. ptr = Q-> Tail. ptr = node
enqueue(Q: pointer to queue_t, value: data type )
E1: node = new_node()
E2: node-> value = value
E3: node-> next. ptr = NULL
E4: loop
E5: tail = Q-> Tail
E6: next = tail. ptr-> next
E7: if tail == Q-> Tail
E8: if next. ptr == NULL
E9: if CAS(& tail. ptr-> next, next, < node, next. count+ 1 > )
E10: break
E11: endif
E12: else
E13: CAS(& Q-> Tail, tail, < next. ptr, tail. count+ 1 > )
E14: endif
E15: endif
E16: endloop
E17: CAS(& Q-> Tail, tail, < node, tail. count+ 1 > )
dequeue(Q: pointer to queue_t, pvalue: pointer to data type ): boolean
D1: loop
D2: head = Q-> Head
D3: tail = Q-> Tail
D4: next = head. ptr-> next
D5: if head == Q-> Head
D6: if head. ptr == tail. ptr
D7: if next. ptr == NULL
D8: return FALSE
D9: endif
D10: CAS(& Q-> Tail, tail, < next. ptr, tail. count+ 1 > )
D11: else
D12: * pvalue = next. ptr-> value
D13: if CAS(& Q-> Head, head, < next. ptr, head. count+ 1 > )
D14: break
D15: endif
D16: endif
D17: endif
D18: endloop
D19: free(head. ptr)
D20: return TRUE
initialize
初始化一个队列,并使用一个辅助的空的节点做header
,方便入队和出队的处理。
在入对的时候, E1~E3
先创建一个新的节点,并把入队的数据保存在这个节点上,下一步就要插入到队尾。
E4~E16
是一个循环,不断尝试将数据插入到队列中,在并发的情况下CAS
可能不成功,所以胡不断尝试,并发的线程中总会有一个是成功的,所以它是一个lock-free的算法。
E5~E6
是得到尾指针和尾指针指向的下一个节点。如果没有并发,没有并发的情况下,这里尾指针指向的下一个节点为空。但是如果在并发的情况下,在E7
行的时候可能有别的线程已经加入了新的节点,或者先前的尾节点已经出对,所以在E7
的实现先做一个判断,如果不满足的话重新获取。
在E8
条件满足的情况下,说明当前获取的尾指针还是尾指针,那么在E9
行通过CAS
把这个节点加入到队列中,跳出循环,但是这个时候尾指针还没有改变。 否则可能在这个过程中已经有新的节点加入到队列中,那么在E12
行,尝试把尾指针往后移动,指向新的节点。
在循环结束后,肯定已经入队,尝试把尾指针指向新插入的节点。当然这个时候可能又有新的节点加入了,导致CAS
不成功,不过没有关系,因为这个节点已经加入了队列,只不过它已经不是尾节点了而已。更新加入的节点的逻辑会移动尾节点到最后的新加入的节点上。
在出队的时候,D2~D4
获得头指针和尾指针,D5
在头指针未变的情况下记一步处理,说明这个时候还没有其他出队操作。
D6~D10
是尾指针和头指针指向的节点相同。有两种情况:1是空队列,则直接返回false,因为无数据可出列,2是新入列一个数据,还没来得及调整尾指针,那么这个时候移动一下尾指针。再重新尝试。
否则的话,D12
先获取第一个数据,先把数据保存起来,再尝试把头指针移动到这个节点上。返回这个数据并将当前的头指针的节点数据置空,因为头指针是一个辅助节点,不需要保存数据。
此处宜补一张或几张图
实现
lock-free queue
根据论文中的伪代码,我们可以使用Go语言实现一个lock-free的queue。这里指针我们使用unsafe.Pointer
来实现,这样方便进行CAS
操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package queue
import (
"sync/atomic"
"unsafe"
)
type LKQueue struct {
head unsafe.Pointer
tail unsafe.Pointer
}
type node struct {
value interface {}
next unsafe.Pointer
}
func NewLKQueue() *LKQueue {
n := unsafe.Pointer(&node{})
return &LKQueue{head: n, tail: n}
}
func (q *LKQueue) Enqueue(v interface {}) {
n := &node{value: v}
for {
tail := load(&q.tail)
next := load(&tail.next)
if tail == load(&q.tail) {
if next == nil {
if cas(&tail.next, next, n) {
cas(&q.tail, tail, n)
return
}
} else {
cas(&q.tail, tail, next)
}
}
}
}
func (q *LKQueue) Dequeue() interface {} {
for {
head := load(&q.head)
tail := load(&q.tail)
next := load(&head.next)
if head == load(&q.head) {
if head == tail {
if next == nil {
return nil
}
cas(&q.tail, tail, next)
} else {
v := next.value
if cas(&q.head, head, next) {
return v
}
}
}
}
}
func load(p *unsafe.Pointer) (n *node) {
return (*node)(atomic.LoadPointer(p))
}
func cas(p *unsafe.Pointer, old, new *node) (ok bool ) {
return atomic.CompareAndSwapPointer(
p, unsafe.Pointer(old), unsafe.Pointer(new ))
}
two-lock queue
上面的lock-free queue通过CAS
实现了高效的并发队列,同时,这篇论文还实现了一种two-lock算法,可以应用在没有原子操作的多处理器上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package queue
import (
"sync"
)
type CQueue struct {
head *cnode
tail *cnode
hlock sync.Mutex
tlock sync.Mutex
}
type cnode struct {
value interface {}
next *cnode
}
func NewCQueue() *CQueue {
n := &cnode{}
return &CQueue{head: n, tail: n}
}
func (q *CQueue) Enqueue(v interface {}) {
n := &cnode{value: v}
q.tlock.Lock()
q.tail.next = n
q.tail = n
q.tlock.Unlock()
}
func (q *CQueue) Dequeue() interface {} {
q.hlock.Lock()
n := q.head
newHead := n.next
if newHead == nil {
q.hlock.Unlock()
return nil
}
v := newHead.value
newHead.value = nil
q.head = newHead
q.hlock.Unlock()
return v
}
mutex-based queue
传统的,我们可以实现一个mutex
+ slice组成的queue, 在不过分追求性能(时间+空间)的情况下实现一个简单的queue。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package queue
import "sync"
type SliceQueue struct {
data []interface {}
mu sync.Mutex
}
func NewSliceQueue(n int ) (q *SliceQueue) {
return &SliceQueue{data: make ([]interface {}, 0 ,n)}
}
func (q *SliceQueue) Enqueue(v interface {}) {
q.mu.Lock()
q.data = append (q.data, v)
q.mu.Unlock()
}
func (q *SliceQueue) Dequeue() interface {} {
q.mu.Lock()
if len (q.data) == 0 {
q.mu.Unlock()
return nil
}
v := q.data[0 ]
q.data = q.data[1 :]
q.mu.Unlock()
return v
}
性能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
goos : darwin
goarch : amd64
pkg : github.com/smallnest/queue
BenchmarkQueue/lock -free_queue#4-4 8399941 177 ns/op
BenchmarkQueue/two-lock_queue#4-4 7544263 155 ns/op
BenchmarkQueue/slice-based_queue#4-4 6436875 194 ns/op
BenchmarkQueue/lock -free_queue#32-4 8399769 140 ns/op
BenchmarkQueue/two-lock_queue#32-4 7486357 155 ns/op
BenchmarkQueue/slice-based_queue#32-4 4572828 235 ns/op
BenchmarkQueue/lock -free_queue#1024-4 8418556 140 ns/op
BenchmarkQueue/two-lock_queue#1024-4 7888488 155 ns/op
BenchmarkQueue/slice-based_queue#1024-4 8902573 218 ns/op