這篇博客將主要通過幾個示例,簡單講述 Disruptor 的使用方法;
一、disruptor 簡介
Disruptor 是英國外匯交易公司 LMAX 開發的一個無鎖高性能的線程間消息傳遞的框架。目前包括 Apache Storm、Camel、Log4j2 等知名項目都是用了 Disruptor;
因為 Disruptor 中的一個很重要的結構 RingBuffer
和 JDK 中的 ArrayBlockingQueue
很相似,其內部都是一個環形數組,所以經常將他們放在一起比較,以下是官網公布測試結果

從圖中可以明顯看到他們之間性能的巨大差異;
此外在使用 Disruptor 的項目中也能看到其性能的差異,例如 Log4j

其中 Loggers all async
采用的是 Disruptor,Async Appender
采用的是 ArrayBlockingQueue, Sync
是同步模式;從圖中可以看到,線程越多競爭越激烈的時候 Disruptor 的性能優勢越明顯,其原因很很容易想到,因為 ArrayBlockingQueue 的進出由同一把鎖控制,所以競爭對其性能有巨大的影響;
此外我的筆記本配置為 “i7-8550U 8G”,使用的版本為:
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
二、ArrayBlockingQueue 性能對比
以下通過一個單線程的 demo,演示Disruptor 的基本用法,並個 ArrayBlockingQueue 做簡單對比;
public class Contrast {
public static final int count = 50000000;
public static final int size = 1024;
private static CountDownLatch latch = new CountDownLatch(1);
public void testDisruptor() throws InterruptedException {
long start = System.currentTimeMillis();
final Disruptor<Event> disruptor = new Disruptor<>(
() -> new Event(), // 綁定事件工廠,主要用於初始化 RingBuffer
size, // RingBuffer 大小
DaemonThreadFactory.INSTANCE, // 指定生產者線程工廠,也可以直接傳入線程池
ProducerType.SINGLE, // 指定生產者為單線程,也支持多線程模式
new YieldingWaitStrategy() // 等待策略
// new BlockingWaitStrategy()
);
Handler handler = new Handler();
disruptor.handleEventsWith(handler); // 綁定事件處理程序
disruptor.start();
RingBuffer<Event> ringBuffer = disruptor.getRingBuffer(); // 開始之后 RingBuffer 的所有位置就已經初始化完成
for (int i = 0; i < count; i++) {
long seq = ringBuffer.next(); // 獲取下一個放置位置
Event event = ringBuffer.get(seq); // 等到指定位置的槽
event.seId(i); // 更新事件,注意這里是更新,不是放入新的,所以不會有 GC 產生
ringBuffer.publish(seq); // 發布事件
}
latch.await();
System.out.println("time: " + (System.currentTimeMillis() - start));
}
private void testQueue() throws InterruptedException {
long start = System.currentTimeMillis();
final BlockingQueue<Event> queue = new ArrayBlockingQueue<>(size);
new Thread(() -> {
for (int i = 0; i < count; i++) {
try {
queue.put(new Event(i));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(() -> {
for (int i = 0; i < count; i++) {
try {
Event event = queue.take();
if (i == count - 1) {
System.out.println("last: " + event.getLogId());
latch.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
latch.await();
System.out.println("time: " + (System.currentTimeMillis() - start));
}
class Event {
private long id;
Event() {}
Event(long id) { this.id = id; }
public long getLogId() { return id; }
public void seId(int id) { this.id = id; }
}
class Handler implements EventHandler<Event> {
private int i = 0;
@Override
public void onEvent(Event event, long seq, boolean bool) {
if (++i == count) {
System.out.println("last: " + event.getLogId());
latch.countDown();
}
}
}
public static void main(String[] args) throws InterruptedException {
Contrast contrast = new Contrast();
contrast.testDisruptor();
// contrast.testQueue();
}
}
Disruptor-YieldingWaitStrategy: 919
Disruptor-BlockingWaitStrategy: 3142
ArrayBlockingQueue : 4307其中 BlockingWaitStrategy 等待策略和 ArrayBlockingQueue 大致相識
三、多消費者
上面的例子在使用多個消費這時,會出現重復消費的情況,如果想要一條消息只消費一次,可以參照下面的代碼:
public class MoreConsumer {
public static final int count = 5000;
public static final int size = 16;
public void testDisruptor() {
long start = System.currentTimeMillis();
final Disruptor<Event> disruptor = new Disruptor<>(
() -> new Event(),
size, DaemonThreadFactory.INSTANCE,
ProducerType.SINGLE,
new BlockingWaitStrategy()
);
disruptor.handleEventsWithWorkerPool(new Handler("h1"), new Handler("h2"), new Handler("h3"));
disruptor.start();
RingBuffer<Event> ringBuffer = disruptor.getRingBuffer();
for (int i = 0; i < count; i++) {
long seq = ringBuffer.next();
Event event = ringBuffer.get(seq);
event.id = i;
ringBuffer.publish(seq);
}
System.out.println("time: " + (System.currentTimeMillis() - start));
}
class Event { public long id; }
class Handler implements WorkHandler<Event> {
private String name;
Handler(String name) { this.name = name; }
@Override
public void onEvent(Event event) { System.out.println(name + ": " + event.id); }
}
public static void main(String[] args) {
MoreConsumer moreConsumer = new MoreConsumer();
moreConsumer.testDisruptor();
}
}
如上面的代碼所示使用 WorkHandler
即可,同時還需要注意選擇等待策略,策略不同也可能導致重復消費的問題,同時官網也只出需要在代碼里面保證重復消費問題;
四、復雜業務邏輯
很多也業務邏輯會出現以下的類似情況,第三個消費者,需要等待前面的任務完成后才能繼續執行的情況;通常我們會使用鎖、同步工具以及一些其他的方式,但都顯得比較麻煩,而且效率比較低,這里如果我們使用 Disruptor 就能很方便的解決;

disruptor.handleEventsWith(c1Handler, c2Handler);
disruptor.after(c1Handler, c2Handler).handleEventsWith(c3Handler);
如此僅需兩行代碼,就能將上面的關系表述清楚,對於更復雜的情況同樣;
對於更多的使用技巧就需要你根據實際情況分析了,下一篇博客將主要分析 Disruptor 為什么會那么快;