Disruptor是一个高性能的线程间消息传递的框架。
LMAX 目标是成为当世最快的商务平台。 为了实现这个目标,LMAX需要做一些特殊的工作在Java平台上取得低延迟和高吞吐率的目标。 性能测试表明使用队列(queue)传递数据会带来延迟, 所以LAMX对这一块做了非常好的优化。
Disruptor就是他们的研究成果。 研究发现CPU级别的cache miss和 需要内核仲裁的锁非常的耗费性能, 所以他们创建了一个Disruptor, 这是一个锁无关的实现。
它不是一个为特别任务实现的方案,不仅仅应用于金融领域。Disruptor可以用来解决并发编程中的一个普遍的问题: 消息队列的处理(producer和consumer)。
它使用了一个和传统不一样方式来实现。 所以你可能不能用文本替换的方式使用ring buffer替换你代码中的Queue等。官方网站上提供了一些例子, 本文的参考文档上也列出了一些。 官方的技术白皮书介绍了一些你想知道的细节。 官方文档还提供了非常多的性能测试的代码,也是学习disruptor好材料。
Disruptor究竟有多块, 看官方的和ArrayBlockingQueue测试结果:
注意y轴的刻度是指数级别的, 如果按照均匀递增的刻度,一张图无法显示。
这张图可以这样解读。 x轴越靠近零的比例越多, 性能越好。
Disrutor延迟时间大部分小于1ns, 而ArrayBlockingQueue平均32ns左右了。
我自己也在笔记本上运行了一下测试。 下面列出了其中的两个测试case:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Starting ArrayBlockingQueue tests Run 0, BlockingQueue=7,012,622 ops/sec Run 1, BlockingQueue=6,882,312 ops/sec Run 2, BlockingQueue=7,251,631 ops/sec Run 3, BlockingQueue=7,352,941 ops/sec Run 4, BlockingQueue=7,320,644 ops/sec Run 5, BlockingQueue=7,272,727 ops/sec Run 6, BlockingQueue=7,183,908 ops/sec Starting Disruptor tests Run 0, Disruptor=82,101,806 ops/sec Run 1, Disruptor=82,781,456 ops/sec Run 2, Disruptor=145,560,407 ops/sec Run 3, Disruptor=140,845,070 ops/sec Run 4, Disruptor=142,450,142 ops/sec Run 5, Disruptor=151,745,068 ops/sec Run 6, Disruptor=141,643,059 ops/sec
|
这是单个producer和单个consumer的情况, 可以看到Disruptor吞吐率是ArrayBlockingQueue的10倍以上。 快的一塌糊涂。
为什么要和ArrayBlockingQueue对比呢?
这是因为两个底层的数据结构类似,都是通过一个环形数组实现。 也都用到CAS控制原子操作。 应用环境也类似, 都是有生产者和消费者。
而且ArrayBlockingQueue也是一个相对高效的队列实现。
与ArrayBlockingQueue不同的是, Disruptor支持:
- 支持多个消费者,每个消费者都可以获得相同的消息。 而ArrayBlockingQueue元素被一个消费者取走后,其它消费者就无法从Queue中取到
- 为事件预分配内存
- 可以使用锁无关模式
ArrayBlockingQueue
作为对比,我们先来看看ArrayBlockingQueue的底层实现,这样我们在研读Disruptor的代码的时候才会理解它的设计精妙之处。
ArrayBlockingQueue通过ReentrantLock以及它的两个condition来控制并发。
1 2 3 4 5 6 7 8 9 10 11
| final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition();
|
压入元素时:
1 2 3 4 5 6 7 8 9 10 11 12
| public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
|
如果数组已满,则等待notFull,如果消费者取出了元素,则会调用notFull.signal();
:
1 2 3 4 5
| private E dequeue() { ...... notFull.signal(); return x; }
|
这时put方法会被唤醒。
取出元素时:
1 2 3 4 5 6 7 8 9 10 11
| public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
|
如果数组为空,则调用notEmpty.await();
等待, enqueue会调用notEmpty.signal();
唤醒它:
1 2 3 4
| private void enqueue(E x) { ...... notEmpty.signal(); }
|
这种wait-notify(signal)也就是教科书上标准的处理队列的方式。
Disruptor的底层实现
有很多论文, 比如参考资料的文章中都已经介绍了Disruptor的设计为什么那么快。
我在这里罗列一下。
- 可选锁无关lock-free, 没有竞争所以非常快
- 所有访问者都记录自己的序号的实现方式,允许多个生产者与多个消费者共享相同的数据结构
- 在每个对象中都能跟踪序列号, 没有为伪共享和非预期的竞争
- 增加缓存行补齐, 提升cache缓存命中率
- 环形数组中的元素不会被删除。
Disruptor的几个概念:
- Ring Buffer:负责存储和更新事件的数据
- Sequence: 每个消费者(EventProcessor)维护一个Sequence, 并发的大多数代码依赖Sequence值得改动。所以Sequence支持AtomicLong的大部分特性。唯一的不同是Sequence包含额外的功能来阻止Sequence和其它值之间的伪共享(false sharing)
- Sequencer: Disruptor的核心代码。有两个实现:单生产者和多生产者。 它们实现了在生产者和消费者之间的快速传递的并发算法
- Sequence Barrier:由Sequencer生成,它包含此Sequencer发布的Sequence指针以及依赖的其它消费者的Sequence。 它包含为消费者检查是否有可用的事件的代码逻辑。
- Wait Strategy: 消费者等待事件的策略, 这些事件由生产者放入。
- Event:传递的事件,完全有用户定义
- EventProcessor:处理事件的主要循环,包含一个Sequence。有一个具体的实现类BatchEventProcessor.
- EventHandler: 用户实现的接口,代表一个消费者
- Producer:生产者,先获得占位,然后提交事件。
那么我们看看Disruptor的代码,看看它具体是怎么实现的。
RingBuffer
3.0起RingBuffer的功能很简单了,也可以很纯粹了。 就是负责存储和更新数据。
RingBuffer使用了padding方式来提供CPU cache的命中率,它继承了RingBufferFields类,并计算出需要padding的数量。
1 2 3
| BUFFER_PAD = 128 / scale; REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
|
初始化数组时加上padding:
1
| this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
|
填充数组和从数组中取值时:
1 2 3 4 5 6 7 8 9 10 11 12 13
| private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } @SuppressWarnings("unchecked") protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); }
|
RingBuffer可以创建单生产者或者多生产者。 如果你确定你的程序就只有一个生产者,可以选择使用单生产者,性能更好。
1 2 3
| createSingleProducer(factory, bufferSize, waitStrategy); or createMultiProducer(factory, bufferSize, waitStrategy);
|
它相应的会使用SingleProducerSequencer或者MultiProducerSequencer类。
得到事件很简单,按数组的下标获取事件元素, 性能显然很好:
1 2 3 4 5 6 7 8
| public E get(long sequence) { return elementAt(sequence); } protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); }
|
next
得到下一个占位来发布事件:
1 2 3 4
| public long next() { return sequencer.next(); }
|
具体的sequencer.next()会在下面分析,总是这个方法会返回一个不冲突的sequence,用来发布新的事件。
发布事件基本按照下面的步骤:
1 2 3 4 5 6 7
| long sequence = ringBuffer.next(); try { Event e = ringBuffer.get(sequence); } finally { ringBuffer.publish(sequence); }
|
它还提供了tryNext
可以避免block。
返回当前sequencer的光标:
1 2 3 4 5
| @Override public long getCursor() { return sequencer.getCursor(); }
|
bufferSize
是内部数组的大小。
另外它还提供了一些其它的方法比如translateAndPublish
和resetTo
等。
可以看到, 重要的并发控制的部分其实是由sequencer来实现的。
我们重点看一下两种sequencer
的实现。
SingleProducerSequencer
SingleProducerSequencer用作单生产者的情况。
初始化时它的两个字段是-1:
1 2
| protected long nextValue = Sequence.INITIAL_VALUE; protected long cachedValue = Sequence.INITIAL_VALUE;
|
我们先看一下它的next
方法。 单生产者不会产生同时写的竞争问题。
这里gatingSequences的目的是给生产者设置一个闸门,阻止RingBuffer写一圈后覆盖未消费的事件。如果你不担心事件会被覆盖的问题,那么你可以不设置gatingSequences,这样性能更快。
一般你会把最后一个BatchEventProcessor.getSequence()设置为gatingSequences,这样等你最后一个消费者消费完后,会更改它的sequence, 空出更多的空间让生产者放置新的事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long nextValue = this.nextValue; long nextSequence = nextValue + n; long wrapPoint = nextSequence - bufferSize; long cachedGatingSequence = this.cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { LockSupport.parkNanos(1L); } this.cachedValue = minSequence; } this.nextValue = nextSequence; return nextSequence; }
|
最后, nextValue记录返回下一个可以用的sequence,cachedValue记录最后待处理的事件。
上面的方法有一个阻塞的操作, 一直要等到有可用的空间生产者才可以继续进行。 而tryNext
不会阻塞,如果数组已满,它会抛出一个InsufficientCapacityException异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public long tryNext(int n) throws InsufficientCapacityException { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } if (!hasAvailableCapacity(n)) { throw InsufficientCapacityException.INSTANCE; } long nextSequence = this.nextValue += n; return nextSequence; }
|
空间是否已满由下面的方法取得:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public boolean hasAvailableCapacity(final int requiredCapacity) { long nextValue = this.nextValue; long wrapPoint = (nextValue + requiredCapacity) - bufferSize; long cachedGatingSequence = this.cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { long minSequence = Util.getMinimumSequence(gatingSequences, nextValue); this.cachedValue = minSequence; if (wrapPoint > minSequence) { return false; } } return true; }
|
当wrapPoint
超过minSequence
时表明空间已满。
claim
强制将指针起始点指向指定的值。
cursor
是一个Sequence对象,它用来生成new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack)
和计算最小的sequenceUtil.getMinimumSequence(gatingSequences, cursor.get());
。
ProcessingSequenceBarrier
会根据特定的策略取得自上一次以来可用的sequences。 可用的策略包括:
- BlockingWaitStrategy: 对低延迟和吞吐率不像CPU占用那么重要
- BusySpinWaitStrategy: CPU使用率高,低延迟
- LiteBlockingWaitStrategy: BlockingWaitStrategy变种,实验性的
- PhasedBackoffWaitStrategy: Spins, then yields, then waits,不过还是适合对低延迟和吞吐率不像CPU占用那么重要的情况
- SleepingWaitStrategy: spin, then yield,然后sleep(LockSupport.parkNanos(1L))在性能和CPU占用率之间做了平衡。
- TimeoutBlockingWaitStrategy: sleep一段时间。 低延迟。
- YieldingWaitStrategy: 使用spin, Thread.yield()方式
在没有依赖的情况下(依赖是指几个消费者串联起来, 糖葫芦形或者菱形), 这个等待会非常小。 比如BusySpinWaitStrategy
:
1 2 3 4 5 6 7 8 9 10 11 12
| public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); } return availableSequence; }
|
第一次进入循环就跳出了,因为availableSequence = sequence。
但是如果有依赖,它必须等待依赖dependentSequence也处理到此sequence才能往下进行。
这就是单生产者的基本逻辑。
主要注意两个边界条件。
1) 如果producer生产的快,追上消费者的时候
可以通过gatingSequences让生产者等待消费者消费。
这个时候是通过LockSupport.parkNanos(1L);
不停的循环,直到有消费者消费掉一个或者多个事件。
2) 如果消费者消费的快,追上生产者的时候
这个时候由于消费者将自己最后能处理的sequence写回到光标后sequence.set(availableSequence);
, 如果生产者还没有写入一个事件, 那么它就会调用waitStrategy.waitFor
等待。 如果生产者publish一个事件,它会更改光标的值:cursor.set(sequence);
,然后通知等待的消费者继续处理waitStrategy.signalAllWhenBlocking();
。
Sequence类似AtomicLong的特性,很多操作是原子的。所以生产者和消费者同时设置的时候没有问题。
在使用BlockingWaitStrategy情况下,其实这和ArrayBlockingQueue类似,因为ArrayBlockingQueue也是通过Lock的方式等待。 性能测试结果显示Disruptor在这种策略下性能比ArrayBlockingQueue要略好一点,但是达不到10倍的显著提升,大概两倍左右。 这大概就是生产者使用不断的LockSupport.parkNanos方式带来的提升吧。
但是如果换为YieldingWaitStrategy, CPU使用率差别不大,但是却带来了10倍的性能提升。 这是因为消费者不需sleep, 通过spin-yield方式降低延迟率,提高了吞吐率。
当然处理这些以外, 还Disruptor还提供了最开始提到的一些性能考虑的提升, 包括多消费者同时处理的支持。 但我感觉最大的提升还是在于刚才提到的这一点。 YieldingWaitStrategy这种锁无关的设计极大的提升了事件处理的性能。
MultiProducerSequencer
MultiProducerSequencer用做多生产者的情况。
多生产者时在请求下一个sequence时有竞争的情况,所以通过cursor.compareAndSet(current, next)
的spin来实现,直到成功的设置next才返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { current = cursor.get(); next = current + n; long wrapPoint = next - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); continue; } gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; }
|
参考资料
- Introduction to the Disruptor
- Disruptor presentation @ QCon SF
- Disruptor Technical Paper
- Mechanical Sympathy
- Martin Fowler's Technical Review
- 并发编程网的一系列的Disruptor译文
- Dissecting the Disruptor: Why it's so fast(Part One)
- Dissecting the Disruptor: Why it's so fast(Part Two)