高性能的消息框架 go-disruptor

Java程序员都知道,Disruptor是一个高性能的线程间通信的框架,即在同一个JVM进程中的多线程间消息传递,由LMAX开发。

Disruptor性能是如此之高,LMAX利用它可以处理每秒6百万订单,用1微秒的延迟获得吞吐量为100K+。那么Go语言生态圈中有没有这样的库呢?

go-disruptor就是对Java Disruptor的移植,它也提供了与Java Disruptor类似的API设计,使用起来也算不上麻烦。

至于性能呢,下面就会介绍,这也是本文的重点。

因为Disruptor的高性能, 好多人对它都有所关注, 有一系列的文章介绍Disruptor,比如下列的文章和资源:

也有一些中文的翻译和介绍,比如 并发编程网的Disrutpor专题
阿里巴巴封仲淹:如何优雅地使用Disruptor

Disruptor由LMAX开发,LMAX目标是要称为世界上最快的交易平台,为了取得低延迟和高吞吐率的目标,它们不得不开发一套高性能的生产者-消费者的消息框架。Java自己的Queue的性能还是有所延迟的,下图就是Disruptor和JDK ArrayBlockingQueue的性能比较。

图片来自于Disruptor官网

X轴显示的是延迟时间,Y轴是操作次数。可以看到Disruptor的延迟小,吞吐率高。

Disruptor有多种使用模型和配置,官方的一些模型的测试结果的链接在这里

我想做的其实就是go-disruptor和官方的Java Disruptor的性能比较。因为Disruptor有多种配置方式,单生产者和多生产者,单消费者和多消费者,配置的不同性能差别还是蛮大的,所以公平地讲,两者的比较应该使用相同的配置,尽管它们是由不同的编程语言开发的。

我选取的一个测试方案是:3个生产者和一个消费者,如果使用一个生产者Java Disruptor的性能会成倍的提升。

Java Disruptor

Java的测试主类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class Main {
private static final int NUM_PUBLISHERS = 3;//Runtime.getRuntime().availableProcessors();
private static final int BUFFER_SIZE = 1024 * 64;
private static final long ITERATIONS = 1000L * 1000L * 20L;
private final ExecutorService executor = Executors.newFixedThreadPool(NUM_PUBLISHERS + 1, DaemonThreadFactory.INSTANCE);
private final CyclicBarrier cyclicBarrier = new CyclicBarrier(NUM_PUBLISHERS + 1);
private final RingBuffer<ValueEvent> ringBuffer = createMultiProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new BusySpinWaitStrategy());
private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
private final BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, handler);
private final ValueBatchPublisher[] valuePublishers = new ValueBatchPublisher[NUM_PUBLISHERS];
{
for (int i = 0; i < NUM_PUBLISHERS; i++)
{
valuePublishers[i] = new ValueBatchPublisher(cyclicBarrier, ringBuffer, ITERATIONS / NUM_PUBLISHERS, 16);
}
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
}
public long runDisruptorPass() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
handler.reset(latch, batchEventProcessor.getSequence().get() + ((ITERATIONS / NUM_PUBLISHERS) * NUM_PUBLISHERS));
Future<?>[] futures = new Future[NUM_PUBLISHERS];
for (int i = 0; i < NUM_PUBLISHERS; i++)
{
futures[i] = executor.submit(valuePublishers[i]);
}
executor.submit(batchEventProcessor);
long start = System.currentTimeMillis();
cyclicBarrier.await(); //start test
for (int i = 0; i < NUM_PUBLISHERS; i++)
{
futures[i].get();
} //all published
latch.await(); //all handled
long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
batchEventProcessor.halt();
return opsPerSecond;
}
public static void main(String[] args) throws Exception
{
Main m = new Main();
System.out.println("opsPerSecond:" + m.runDisruptorPass());
}
}

生产者和消费者类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final class ValueAdditionEventHandler implements EventHandler<ValueEvent>
{
private long value = 0;
private long count;
private CountDownLatch latch;
public long getValue()
{
return value;
}
public void reset(final CountDownLatch latch, final long expectedCount)
{
value = 0;
this.latch = latch;
count = expectedCount;
}
@Override
public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception
{
value = event.getValue();
if (count == sequence)
{
latch.countDown();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public final class ValueBatchPublisher implements Runnable
{
private final CyclicBarrier cyclicBarrier;
private final RingBuffer<ValueEvent> ringBuffer;
private final long iterations;
private final int batchSize;
public ValueBatchPublisher(
final CyclicBarrier cyclicBarrier,
final RingBuffer<ValueEvent> ringBuffer,
final long iterations,
final int batchSize)
{
this.cyclicBarrier = cyclicBarrier;
this.ringBuffer = ringBuffer;
this.iterations = iterations;
this.batchSize = batchSize;
}
@Override
public void run()
{
try
{
cyclicBarrier.await();
for (long i = 0; i < iterations; i += batchSize)
{
long hi = ringBuffer.next(batchSize);
long lo = hi - (batchSize - 1);
for (long l = lo; l <= hi; l++)
{
ValueEvent event = ringBuffer.get(l);
event.setValue(l);
}
ringBuffer.publish(lo, hi);
}
}
catch (Exception ex)
{
throw new RuntimeException(ex);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final class ValueEvent
{
private long value;
public long getValue()
{
return value;
}
public void setValue(final long value)
{
this.value = value;
}
public static final EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>()
{
public ValueEvent newInstance()
{
return new ValueEvent();
}
};
}

生产者使用三个线程去写数据,一个消费者进行处理。生产者运行在三个线程中,批处理写入,每次写16个数据。

实际测试每秒能达到 183486238 的吞吐率, 也就是1.8亿的吞吐率。

go-disruptor

下面看看go-disruptor的性能能达到多少。

我们知道,Go语言内置的goroutine之间的消息传递是通过channel实现的,go-disruptor官方网站上比较了go-disruptor和channel的性能,明显go-disruptor要比channel要好:

cenario Per Operation Time
Channels: Buffered, Blocking, GOMAXPROCS=1 58.6 ns
Channels: Buffered, Blocking, GOMAXPROCS=2 86.6 ns
Channels: Buffered, Blocking, GOMAXPROCS=3, Contended Write 194 ns
Channels: Buffered, Non-blocking, GOMAXPROCS=1 26.4 ns
Channels: Buffered, Non-blocking, GOMAXPROCS=2 29.2 ns
Channels: Buffered, Non-blocking, GOMAXPROCS=3, Contended Write 110 ns
Disruptor: Writer, Reserve One 4.3 ns
Disruptor: Writer, Reserve Many 1.0 ns
Disruptor: Writer, Reserve One, Multiple Readers 4.5 ns
Disruptor: Writer, Reserve Many, Multiple Readers 0.9 ns
Disruptor: Writer, Await One 3.0 ns
Disruptor: Writer, Await Many 0.7 ns
Disruptor: SharedWriter, Reserve One 13.6 ns
Disruptor: SharedWriter, Reserve Many 2.5 ns
Disruptor: SharedWriter, Reserve One, Contended Write 56.9 ns
Disruptor: SharedWriter, Reserve Many, Contended Write 3.1 ns

在与Java Disruptor相同的测试条件下go-disruptor的性能呢?

下面是测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package main
import (
"fmt"
"runtime"
"sync"
"time"
disruptor "github.com/smartystreets/go-disruptor"
)
const (
RingBufferSize = 1024 * 64
RingBufferMask = RingBufferSize - 1
ReserveOne = 1
ReserveMany = 16
ReserveManyDelta = ReserveMany - 1
DisruptorCleanup = time.Millisecond * 10
)
var ringBuffer = [RingBufferSize]int64{}
func main() {
NumPublishers := 3 //runtime.NumCPU()
totalIterations := int64(1000 * 1000 * 20)
iterations := totalIterations / int64(NumPublishers)
totalIterations = iterations * int64(NumPublishers)
fmt.Printf("Total: %d, Iterations: %d, Publisher: %d, Consumer: 1\n", totalIterations, iterations, NumPublishers)
runtime.GOMAXPROCS(NumPublishers)
var consumer = &countConsumer{TotalIterations: totalIterations, Count: 0}
consumer.WG.Add(1)
controller := disruptor.Configure(RingBufferSize).WithConsumerGroup(consumer).BuildShared()
controller.Start()
defer controller.Stop()
var wg sync.WaitGroup
wg.Add(NumPublishers + 1)
var sendWG sync.WaitGroup
sendWG.Add(NumPublishers)
for i := 0; i < NumPublishers; i++ {
go func() {
writer := controller.Writer()
wg.Done()
wg.Wait()
current := disruptor.InitialSequenceValue
for current < totalIterations {
current = writer.Reserve(ReserveMany)
for j := current - ReserveMany + 1; j <= current; j++ {
ringBuffer[j&RingBufferMask] = j
}
writer.Commit(current-ReserveMany, current)
}
sendWG.Done()
}()
}
wg.Done()
t := time.Now().UnixNano()
wg.Wait() //waiting for ready as a barrier
fmt.Println("start to publish")
sendWG.Wait()
fmt.Println("Finished to publish")
consumer.WG.Wait()
fmt.Println("Finished to consume") //waiting for consumer
t = (time.Now().UnixNano() - t) / 1000000 //ms
fmt.Printf("opsPerSecond: %d\n", totalIterations*1000/t)
}
type countConsumer struct {
Count int64
TotalIterations int64
WG sync.WaitGroup
}
func (cc *countConsumer) Consume(lower, upper int64) {
for lower <= upper {
message := ringBuffer[lower&RingBufferMask]
if message != lower {
warning := fmt.Sprintf("\nRace condition--Sequence: %d, Message: %d\n", lower, message)
fmt.Printf(warning)
panic(warning)
}
lower++
cc.Count++
//fmt.Printf("count: %d, message: %d\n", cc.Count-1, message)
if cc.Count == cc.TotalIterations {
cc.WG.Done()
return
}
}
}

实际测试go-disruptor的每秒的吞吐率达到137931020

好了,至少我们在相同的测试case情况下得到了两组数据,另外我还做了相同case情况的go channel的测试,所以一共三组数据:

  • Java Disruptor : 183486238 ops/s
  • go-disruptor : 137931020 ops/s
  • go channel : 6995452 ops/s

可以看到go-disruptor的性能要略微低于Java Disruptor,但是也已经足够高了,达到1.4亿/秒,所以它还是值的我们关注的。go channel的性能远远不如前两者。

Go Channel

如果通过Go Channel实现,每秒的吞吐率为 6995452。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func main() {
NumPublishers := 3 //runtime.NumCPU()
totalIterations := int64(1000 * 1000 * 20)
iterations := totalIterations / int64(NumPublishers)
totalIterations = iterations * int64(NumPublishers)
channel := make(chan int64, 1024*64)
var wg sync.WaitGroup
wg.Add(NumPublishers + 1)
var readerWG sync.WaitGroup
readerWG.Add(1)
for i := 0; i < NumPublishers; i++ {
go func() {
wg.Done()
wg.Wait()
for i := int64(0); i < iterations; {
select {
case channel <- i:
i++
default:
continue
}
}
}()
}
go func() {
for i := int64(0); i < totalIterations; i++ {
select {
case msg := <-channel:
if NumPublishers == 1 && msg != i {
//panic("Out of sequence")
}
default:
continue
}
}
readerWG.Done()
}()
wg.Done()
t := time.Now().UnixNano()
wg.Wait()
readerWG.Wait()
t = (time.Now().UnixNano() - t) / 1000000 //ms
fmt.Printf("opsPerSecond: %d\n", totalIterations*1000/t)
}