为什么Disruptor会那么快?

Disruptor是一个高性能的线程间消息传递的框架。

LMAX 目标是成为当世最快的商务平台。 为了实现这个目标,LMAX需要做一些特殊的工作在Java平台上取得低延迟和高吞吐率的目标。 性能测试表明使用队列(queue)传递数据会带来延迟, 所以LAMX对这一块做了非常好的优化。

Disruptor就是他们的研究成果。 研究发现CPU级别的cache miss和 需要内核仲裁的锁非常的耗费性能, 所以他们创建了一个Disruptor, 这是一个锁无关的实现。

它不是一个为特别任务实现的方案,不仅仅应用于金融领域。Disruptor可以用来解决并发编程中的一个普遍的问题: 消息队列的处理(producer和consumer)。

它使用了一个和传统不一样方式来实现。 所以你可能不能用文本替换的方式使用ring buffer替换你代码中的Queue等。官方网站上提供了一些例子, 本文的参考文档上也列出了一些。 官方的技术白皮书介绍了一些你想知道的细节。 官方文档还提供了非常多的性能测试的代码,也是学习disruptor好材料。

Disruptor究竟有多块, 看官方的和ArrayBlockingQueue测试结果:

注意y轴的刻度是指数级别的, 如果按照均匀递增的刻度,一张图无法显示。
这张图可以这样解读。 x轴越靠近零的比例越多, 性能越好。
Disrutor延迟时间大部分小于1ns, 而ArrayBlockingQueue平均32ns左右了。

我自己也在笔记本上运行了一下测试。 下面列出了其中的两个测试case:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Starting ArrayBlockingQueue tests
Run 0, BlockingQueue=7,012,622 ops/sec
Run 1, BlockingQueue=6,882,312 ops/sec
Run 2, BlockingQueue=7,251,631 ops/sec
Run 3, BlockingQueue=7,352,941 ops/sec
Run 4, BlockingQueue=7,320,644 ops/sec
Run 5, BlockingQueue=7,272,727 ops/sec
Run 6, BlockingQueue=7,183,908 ops/sec
Starting Disruptor tests
Run 0, Disruptor=82,101,806 ops/sec
Run 1, Disruptor=82,781,456 ops/sec
Run 2, Disruptor=145,560,407 ops/sec
Run 3, Disruptor=140,845,070 ops/sec
Run 4, Disruptor=142,450,142 ops/sec
Run 5, Disruptor=151,745,068 ops/sec
Run 6, Disruptor=141,643,059 ops/sec

这是单个producer和单个consumer的情况, 可以看到Disruptor吞吐率是ArrayBlockingQueue的10倍以上。 快的一塌糊涂。

为什么要和ArrayBlockingQueue对比呢?
这是因为两个底层的数据结构类似,都是通过一个环形数组实现。 也都用到CAS控制原子操作。 应用环境也类似, 都是有生产者和消费者。
而且ArrayBlockingQueue也是一个相对高效的队列实现。
与ArrayBlockingQueue不同的是, Disruptor支持:

  • 支持多个消费者,每个消费者都可以获得相同的消息。 而ArrayBlockingQueue元素被一个消费者取走后,其它消费者就无法从Queue中取到
  • 为事件预分配内存
  • 可以使用锁无关模式

ArrayBlockingQueue

作为对比,我们先来看看ArrayBlockingQueue的底层实现,这样我们在研读Disruptor的代码的时候才会理解它的设计精妙之处。
ArrayBlockingQueue通过ReentrantLock以及它的两个condition来控制并发。

1
2
3
4
5
6
7
8
9
10
11
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();

压入元素时:

1
2
3
4
5
6
7
8
9
10
11
12
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

如果数组已满,则等待notFull,如果消费者取出了元素,则会调用notFull.signal();:

1
2
3
4
5
private E dequeue() {
......
notFull.signal();
return x;
}

这时put方法会被唤醒。

取出元素时:

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

如果数组为空,则调用notEmpty.await();等待, enqueue会调用notEmpty.signal();唤醒它:

1
2
3
4
private void enqueue(E x) {
......
notEmpty.signal();
}

这种wait-notify(signal)也就是教科书上标准的处理队列的方式。

Disruptor的底层实现

有很多论文, 比如参考资料的文章中都已经介绍了Disruptor的设计为什么那么快。
我在这里罗列一下。

  • 可选锁无关lock-free, 没有竞争所以非常快
  • 所有访问者都记录自己的序号的实现方式,允许多个生产者与多个消费者共享相同的数据结构
  • 在每个对象中都能跟踪序列号, 没有为伪共享和非预期的竞争
  • 增加缓存行补齐, 提升cache缓存命中率
  • 环形数组中的元素不会被删除。

Disruptor的几个概念:

  • Ring Buffer:负责存储和更新事件的数据
  • Sequence: 每个消费者(EventProcessor)维护一个Sequence, 并发的大多数代码依赖Sequence值得改动。所以Sequence支持AtomicLong的大部分特性。唯一的不同是Sequence包含额外的功能来阻止Sequence和其它值之间的伪共享(false sharing)
  • Sequencer: Disruptor的核心代码。有两个实现:单生产者和多生产者。 它们实现了在生产者和消费者之间的快速传递的并发算法
  • Sequence Barrier:由Sequencer生成,它包含此Sequencer发布的Sequence指针以及依赖的其它消费者的Sequence。 它包含为消费者检查是否有可用的事件的代码逻辑。
  • Wait Strategy: 消费者等待事件的策略, 这些事件由生产者放入。
  • Event:传递的事件,完全有用户定义
  • EventProcessor:处理事件的主要循环,包含一个Sequence。有一个具体的实现类BatchEventProcessor.
  • EventHandler: 用户实现的接口,代表一个消费者
  • Producer:生产者,先获得占位,然后提交事件。

那么我们看看Disruptor的代码,看看它具体是怎么实现的。

RingBuffer

3.0起RingBuffer的功能很简单了,也可以很纯粹了。 就是负责存储和更新数据。
RingBuffer使用了padding方式来提供CPU cache的命中率,它继承了RingBufferFields类,并计算出需要padding的数量。

1
2
3
BUFFER_PAD = 128 / scale;
// Including the buffer pad in the array base offset
REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);

初始化数组时加上padding:

1
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];

填充数组和从数组中取值时:

1
2
3
4
5
6
7
8
9
10
11
12
13
private void fill(EventFactory<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
@SuppressWarnings("unchecked")
protected final E elementAt(long sequence)
{
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}

RingBuffer可以创建单生产者或者多生产者。 如果你确定你的程序就只有一个生产者,可以选择使用单生产者,性能更好。

1
2
3
createSingleProducer(factory, bufferSize, waitStrategy);
or
createMultiProducer(factory, bufferSize, waitStrategy);

它相应的会使用SingleProducerSequencer或者MultiProducerSequencer类。

得到事件很简单,按数组的下标获取事件元素, 性能显然很好:

1
2
3
4
5
6
7
8
public E get(long sequence)
{
return elementAt(sequence);
}
protected final E elementAt(long sequence)
{
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}

next得到下一个占位来发布事件:

1
2
3
4
public long next()
{
return sequencer.next();
}

具体的sequencer.next()会在下面分析,总是这个方法会返回一个不冲突的sequence,用来发布新的事件。

发布事件基本按照下面的步骤:

1
2
3
4
5
6
7
long sequence = ringBuffer.next();
try {
Event e = ringBuffer.get(sequence);
// Do some work with the event.
} finally {
ringBuffer.publish(sequence);
}

它还提供了tryNext可以避免block。

返回当前sequencer的光标:

1
2
3
4
5
@Override
public long getCursor()
{
return sequencer.getCursor();
}

bufferSize是内部数组的大小。

另外它还提供了一些其它的方法比如translateAndPublishresetTo等。

可以看到, 重要的并发控制的部分其实是由sequencer来实现的。
我们重点看一下两种sequencer的实现。

SingleProducerSequencer

SingleProducerSequencer用作单生产者的情况。
初始化时它的两个字段是-1:

1
2
protected long nextValue = Sequence.INITIAL_VALUE;
protected long cachedValue = Sequence.INITIAL_VALUE;

我们先看一下它的next方法。 单生产者不会产生同时写的竞争问题。
这里gatingSequences的目的是给生产者设置一个闸门,阻止RingBuffer写一圈后覆盖未消费的事件。如果你不担心事件会被覆盖的问题,那么你可以不设置gatingSequences,这样性能更快。
一般你会把最后一个BatchEventProcessor.getSequence()设置为gatingSequences,这样等你最后一个消费者消费完后,会更改它的sequence, 空出更多的空间让生产者放置新的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long nextValue = this.nextValue;
long nextSequence = nextValue + n;
long wrapPoint = nextSequence - bufferSize;
long cachedGatingSequence = this.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
long minSequence;
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
this.cachedValue = minSequence;
}
this.nextValue = nextSequence;
return nextSequence;
}

最后, nextValue记录返回下一个可以用的sequence,cachedValue记录最后待处理的事件。

上面的方法有一个阻塞的操作, 一直要等到有可用的空间生产者才可以继续进行。 而tryNext不会阻塞,如果数组已满,它会抛出一个InsufficientCapacityException异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public long tryNext(int n) throws InsufficientCapacityException
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
if (!hasAvailableCapacity(n))
{
throw InsufficientCapacityException.INSTANCE;
}
long nextSequence = this.nextValue += n;
return nextSequence;
}

空间是否已满由下面的方法取得:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean hasAvailableCapacity(final int requiredCapacity)
{
long nextValue = this.nextValue;
long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
long cachedGatingSequence = this.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
this.cachedValue = minSequence;
if (wrapPoint > minSequence)
{
return false;
}
}
return true;
}

wrapPoint超过minSequence时表明空间已满。

claim强制将指针起始点指向指定的值。
cursor是一个Sequence对象,它用来生成new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack)和计算最小的sequenceUtil.getMinimumSequence(gatingSequences, cursor.get());
ProcessingSequenceBarrier会根据特定的策略取得自上一次以来可用的sequences。 可用的策略包括:

  • BlockingWaitStrategy: 对低延迟和吞吐率不像CPU占用那么重要
  • BusySpinWaitStrategy: CPU使用率高,低延迟
  • LiteBlockingWaitStrategy: BlockingWaitStrategy变种,实验性的
  • PhasedBackoffWaitStrategy: Spins, then yields, then waits,不过还是适合对低延迟和吞吐率不像CPU占用那么重要的情况
  • SleepingWaitStrategy: spin, then yield,然后sleep(LockSupport.parkNanos(1L))在性能和CPU占用率之间做了平衡。
  • TimeoutBlockingWaitStrategy: sleep一段时间。 低延迟。
  • YieldingWaitStrategy: 使用spin, Thread.yield()方式

在没有依赖的情况下(依赖是指几个消费者串联起来, 糖葫芦形或者菱形), 这个等待会非常小。 比如BusySpinWaitStrategy:

1
2
3
4
5
6
7
8
9
10
11
12
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
}
return availableSequence;
}

第一次进入循环就跳出了,因为availableSequence = sequence。
但是如果有依赖,它必须等待依赖dependentSequence也处理到此sequence才能往下进行。

这就是单生产者的基本逻辑。

主要注意两个边界条件。
1) 如果producer生产的快,追上消费者的时候
可以通过gatingSequences让生产者等待消费者消费。
这个时候是通过LockSupport.parkNanos(1L);不停的循环,直到有消费者消费掉一个或者多个事件。

2) 如果消费者消费的快,追上生产者的时候
这个时候由于消费者将自己最后能处理的sequence写回到光标后sequence.set(availableSequence);, 如果生产者还没有写入一个事件, 那么它就会调用waitStrategy.waitFor
等待。 如果生产者publish一个事件,它会更改光标的值:cursor.set(sequence);,然后通知等待的消费者继续处理waitStrategy.signalAllWhenBlocking();
Sequence类似AtomicLong的特性,很多操作是原子的。所以生产者和消费者同时设置的时候没有问题。

在使用BlockingWaitStrategy情况下,其实这和ArrayBlockingQueue类似,因为ArrayBlockingQueue也是通过Lock的方式等待。 性能测试结果显示Disruptor在这种策略下性能比ArrayBlockingQueue要略好一点,但是达不到10倍的显著提升,大概两倍左右。 这大概就是生产者使用不断的LockSupport.parkNanos方式带来的提升吧。

但是如果换为YieldingWaitStrategy, CPU使用率差别不大,但是却带来了10倍的性能提升。 这是因为消费者不需sleep, 通过spin-yield方式降低延迟率,提高了吞吐率。
当然处理这些以外, 还Disruptor还提供了最开始提到的一些性能考虑的提升, 包括多消费者同时处理的支持。 但我感觉最大的提升还是在于刚才提到的这一点。 YieldingWaitStrategy这种锁无关的设计极大的提升了事件处理的性能。

MultiProducerSequencer

MultiProducerSequencer用做多生产者的情况。
多生产者时在请求下一个sequence时有竞争的情况,所以通过cursor.compareAndSet(current, next)的spin来实现,直到成功的设置next才返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
current = cursor.get();
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}

参考资料

  1. Introduction to the Disruptor
  2. Disruptor presentation @ QCon SF
  3. Disruptor Technical Paper
  4. Mechanical Sympathy
  5. Martin Fowler's Technical Review
  6. 并发编程网的一系列的Disruptor译文
  7. Dissecting the Disruptor: Why it's so fast(Part One)
  8. Dissecting the Disruptor: Why it's so fast(Part Two)
评论