This implementation employs an efficient non-blocking algorithm based on one described in Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael and Michael L. Scott.
structure pointer_t {ptr: pointer to node_t, count: unsigned integer} structure node_t {value: data type, next: pointer_t} structure queue_t {Head: pointer_t, Tail: pointer_t} initialize(Q: pointer to queue_t) node = new_node() // Allocate a free node node->next.ptr = NULL // Make it the only node in the linked list Q->Head.ptr = Q->Tail.ptr = node // Both Head and Tail point to it enqueue(Q: pointer to queue_t, value: data type) E1: node = new_node() // Allocate a new node from the free list E2: node->value = value // Copy enqueued value into node E3: node->next.ptr = NULL // Set next pointer of node to NULL E4: loop // Keep trying until Enqueue is done E5: tail = Q->Tail // Read Tail.ptr and Tail.count together E6: next = tail.ptr->next // Read next ptr and count fields together E7: if tail == Q->Tail // Are tail and next consistent? // Was Tail pointing to the last node? E8: if next.ptr == NULL // Try to link node at the end of the linked list E9: if CAS(&tail.ptr->next, next, <node, next.count+1>) E10: break // Enqueue is done. Exit loop E11: endif E12: else // Tail was not pointing to the last node // Try to swing Tail to the next node E13: CAS(&Q->Tail, tail, <next.ptr, tail.count+1>) E14: endif E15: endif E16: endloop // Enqueue is done. Try to swing Tail to the inserted node E17: CAS(&Q->Tail, tail, <node, tail.count+1>) dequeue(Q: pointer to queue_t, pvalue: pointer to data type): boolean D1: loop // Keep trying until Dequeue is done D2: head = Q->Head // Read Head D3: tail = Q->Tail // Read Tail D4: next = head.ptr->next // Read Head.ptr->next D5: if head == Q->Head // Are head, tail, and next consistent? D6: if head.ptr == tail.ptr // Is queue empty or Tail falling behind? D7: if next.ptr == NULL // Is queue empty? D8: return FALSE // Queue is empty, couldn't dequeue D9: endif // Tail is falling behind. Try to advance it D10: CAS(&Q->Tail, tail, <next.ptr, tail.count+1>) D11: else // No need to deal with Tail // Read value before CAS // Otherwise, another dequeue might free the next node D12: *pvalue = next.ptr->value // Try to swing Head to the next node D13: if CAS(&Q->Head, head, <next.ptr, head.count+1>) D14: break // Dequeue is done. Exit loop D15: endif D16: endif D17: endif D18: endloop D19: free(head.ptr) // It is safe now to free the old node D20: return TRUE // Queue was not empty, dequeue succeeded
// LKQueue is a lock-free unbounded queue. type LKQueue struct { head unsafe.Pointer tail unsafe.Pointer }
type node struct { value interface{} next unsafe.Pointer }
// NewLKQueue returns an empty queue. funcNewLKQueue() *LKQueue { n := unsafe.Pointer(&node{}) return &LKQueue{head: n, tail: n} }
// Enqueue puts the given value v at the tail of the queue. func(q *LKQueue) Enqueue(v interface{}) { n := &node{value: v} for { tail := load(&q.tail) next := load(&tail.next) if tail == load(&q.tail) { // are tail and next consistent? if next == nil { if cas(&tail.next, next, n) { cas(&q.tail, tail, n) // Enqueue is done. try to swing tail to the inserted node return } } else { // tail was not pointing to the last node // try to swing Tail to the next node cas(&q.tail, tail, next) } } } }
// Dequeue removes and returns the value at the head of the queue. // It returns nil if the queue is empty. func(q *LKQueue) Dequeue() interface{} { for { head := load(&q.head) tail := load(&q.tail) next := load(&head.next) if head == load(&q.head) { // are head, tail, and next consistent? if head == tail { // is queue empty or tail falling behind? if next == nil { // is queue empty? returnnil } // tail is falling behind. try to advance it cas(&q.tail, tail, next) } else { // read value before CAS otherwise another dequeue might free the next node v := next.value if cas(&q.head, head, next) { return v // Dequeue is done. return } } } } }
// CQueue is a concurrent unbounded queue which uses two-Lock concurrent queue qlgorithm. type CQueue struct { head *cnode tail *cnode hlock sync.Mutex tlock sync.Mutex }
type cnode struct { value interface{} next *cnode }
// NewCQueue returns an empty CQueue. funcNewCQueue() *CQueue { n := &cnode{} return &CQueue{head: n, tail: n} }
// Enqueue puts the given value v at the tail of the queue. func(q *CQueue) Enqueue(v interface{}) { n := &cnode{value: v} q.tlock.Lock() q.tail.next = n // Link node at the end of the linked list q.tail = n // Swing Tail to node q.tlock.Unlock() }
// Dequeue removes and returns the value at the head of the queue. // It returns nil if the queue is empty. func(q *CQueue) Dequeue() interface{} { q.hlock.Lock() n := q.head newHead := n.next if newHead == nil { q.hlock.Unlock() returnnil } v := newHead.value newHead.value = nil q.head = newHead q.hlock.Unlock() return v }
// SliceQueue is an unbounded queue which uses a slice as underlying. type SliceQueue struct { data []interface{} mu sync.Mutex }
// NewSliceQueue returns an empty queue. // You can give a initial capacity. funcNewSliceQueue(n int) (q *SliceQueue) { return &SliceQueue{data: make([]interface{}, 0,n)} }
// Enqueue puts the given value v at the tail of the queue. func(q *SliceQueue) Enqueue(v interface{}) { q.mu.Lock() q.data = append(q.data, v) q.mu.Unlock() }
// Dequeue removes and returns the value at the head of the queue. // It returns nil if the queue is empty. func(q *SliceQueue) Dequeue() interface{} { q.mu.Lock() iflen(q.data) == 0 { q.mu.Unlock() returnnil } v := q.data[0] q.data = q.data[1:] q.mu.Unlock() return v }
性能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
goos: darwin goarch: amd64 pkg: github.com/smallnest/queue