disruptor 可以理解為一個生產消費的框架. 具體翻譯教程: http://ifeve.com/disruptor-getting-started/
這個框架從數據上看, 是很強大的. 號稱1s處理600萬數據(不是消費掉600萬). 這里學習一下.
一. Hello World
數據容器:
//數據的載體, 封裝要傳遞的數據 public class LongEvent { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
這里的數據封裝類, 叫 Event, 我們知道 Event 翻譯過來是 事件. 但是這里表示數據. 稍微有點別扭
數據生產工廠:
//數據包裝類工廠, 用來生產空容器的, 用來裝數據 public class LongEventFactory implements EventFactory { @Override public Object newInstance() { return new LongEvent(); } }
這里主要是用來生產數據的空容器的. 給后面用的時候, 進行賦值用的.
消費者:
//數據的處理器, 對數據進行處理, 此處只是簡單的打印 public class LongEventHandlerA implements EventHandler<LongEvent>, WorkHandler<LongEvent> { @Override public void onEvent(LongEvent event, long l, boolean b) throws Exception { System.out.println(Thread.currentThread().getName() + "消費數據(A) : " + event.getValue()); } @Override public void onEvent(LongEvent event) throws Exception { System.out.println(Thread.currentThread().getName() + "消費數據(A) : " + event.getValue()); } }
在Hello World例子中, 只要實現 EventHandler 接口就行了, 后面那個 WorkHandler 是后面例子用的.
在來一個消費者B, 代碼和上面一樣:
//數據的處理器, 對數據進行處理, 此處只是簡單的打印 public class LongEventHandlerB implements EventHandler<LongEvent>, WorkHandler<LongEvent> { @Override public void onEvent(LongEvent event, long l, boolean b) throws Exception { System.out.println(Thread.currentThread().getName() + "消費數據(B) : " + event.getValue()); } @Override public void onEvent(LongEvent event) throws Exception { System.out.println(Thread.currentThread().getName() + "消費數據(B) : " + event.getValue()); } }
生產者:
//數據生產者 public class LongEventProducer { //生產的數據可以往 ringBuffer 里面丟 private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } /** * onData用來發布事件,每調用一次就發布一次事件事件 * 它的參數會通過事件傳遞給消費者 * * @param bb */ public void onData(ByteBuffer bb) { //可以把ringBuffer看做一個事件隊列,那么next就是得到下面一個事件槽 long sequence = ringBuffer.next(); try { //用上面的索引取出一個空的事件用於填充 LongEvent event = ringBuffer.get(sequence); //設置值 event.setValue(bb.getLong(0)); } finally { //發布事件, 或者說發布數據, 通知消費者可以消費了 ringBuffer.publish(sequence); } } }
生產者的另一種寫法, 要稍微簡單點:
public class LongEventProducerWithTranslator { private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { @Override public void translateTo(LongEvent longEvent, long sequence, ByteBuffer bb) { longEvent.setValue(bb.getLong(0)); } }; private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer bb){ ringBuffer.publishEvent(TRANSLATOR, bb); } }
測試方法:
public static void main(String[] args) throws InterruptedException { //線程池 //Executor executor = Executors.newCachedThreadPool(); //裝數據的容器工廠 LongEventFactory factory = new LongEventFactory(); //容器size int bufferSize = 1024; //創建 disruptor 實例 //這種方式已經不推薦使用 //Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor); //推薦使用這種, 自定義線程工廠的方式 ThreadFactory threadFactory = new ThreadFactory() { private final AtomicInteger index = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(null, r, "disruptor-thread-" + index.incrementAndGet()); } }; Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, threadFactory , ProducerType.SINGLE , new YieldingWaitStrategy()); //引入數據處理器 //這種方式是消費相同的數據 disruptor.handleEventsWith(new LongEventHandlerA()) .then(new LongEventHandlerB()); //這種方式是消費不同的數據 //disruptor.handleEventsWithWorkerPool(new LongEventHandlerA(), new LongEventHandlerB()); //啟動 disruptor 容器 disruptor.start(); //從 disruptor 中拿取裝數據的容器 RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); //將這個容器給生產者, 生產者產生的數據, 可以直接丟進去 //LongEventProducer producer = new LongEventProducer(ringBuffer); LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long num = 0L; num <= 100L; num++) { bb.putLong(0, num); //發布數據 producer.onData(bb); Thread.sleep(100); } }
消費者消費的時候, 大的方向上, 有兩種模式:
1. 多個消費者, 消費的數據都是一樣的: handleEventsWith
2. 多個消費者, 消費的數據是不一樣的: handleEventsWithWorkerPool
disruptor 有個比較有意思的功能, 就是拼接消費模型.
如此例中, 我修改一句代碼:
disruptor.handleEventsWith(new LongEventHandlerA()) .then(new LongEventHandlerB()).handleEventsWithWorkerPool(new LongEventHandlerC(), new LongEventHandlerD());
看結果:
仔細觀察, 就能發現, A永遠在B前面, 因為 B 是 then() 在A后面的.
C和D永遠不會消費同一條消息. 比如 C 消費了96, 那么D就不能再消費96了, 繼而只能在下一輪中消費97.
二. 等待模式
既然是生產消費, 就肯定有個速度問題. 可能是生產快了, 也可能是消費快了. 那么這種情況, 在 disruptor 也是有策略處理的. 這里直接引用譯文.
Disruptor默認的等待策略是BlockingWaitStrategy。這個策略的內部適用一個鎖和條件變量來控制線程的執行和等待(Java基本的同步方法)。BlockingWaitStrategy是最慢的等待策略,但也是CPU使用率最低和最穩定的選項。然而,可以根據不同的部署環境調整選項以提高性能。
SleepingWaitStrategy
和BlockingWaitStrategy一樣,SpleepingWaitStrategy的CPU使用率也比較低。它的方式是循環等待並且在循環中間調用LockSupport.parkNanos(1)來睡眠,(在Linux系統上面睡眠時間60µs).然而,它的優點在於生產線程只需要計數,而不執行任何指令。並且沒有條件變量的消耗。但是,事件對象從生產者到消費者傳遞的延遲變大了。SleepingWaitStrategy最好用在不需要低延遲,而且事件發布對於生產者的影響比較小的情況下。比如異步日志功能。
YieldingWaitStrategy
YieldingWaitStrategy是可以被用在低延遲系統中的兩個策略之一,這種策略在減低系統延遲的同時也會增加CPU運算量。YieldingWaitStrategy策略會循環等待sequence增加到合適的值。循環中調用Thread.yield()允許其他准備好的線程執行。如果需要高性能而且事件消費者線程比邏輯內核少的時候,推薦使用YieldingWaitStrategy策略。例如:在開啟超線程的時候。
BusySpinWaitStrategy
BusySpinWaitStrategy是性能最高的等待策略,同時也是對部署環境要求最高的策略。這個性能最好用在事件處理線程比物理內核數目還要小的時候。例如:在禁用超線程技術的時候。
在 new Disruptor() 的時候, 可以指定引用哪一種等待策略.
三. RingBuffer
RingBuffer 具體是啥, 這里我也不解析了, 可以把它理解為一個 環形結構的 數據存儲器.
這里需要注意, 在給 RingBuffer 分配數據槽 的時候, 數量最好是 2的冪次倍. 這種的性能比隨便寫的要好很多.
這個 RingBuffer 也可以拿出來單獨用, 不和 disruptor 合着用
測試方法:
public static void main2(String[] args) throws InterruptedException { ExecutorService pool = Executors.newCachedThreadPool(); LongEventHandlerA handlerA = new LongEventHandlerA(); LongEventHandlerB handlerB = new LongEventHandlerB(); RingBuffer ringBuffer = RingBuffer.createSingleProducer(new LongEventFactory(), 1024); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //創建消息處理器, 相當於消費者A BatchEventProcessor<LongEvent> eventProcessorA = new BatchEventProcessor<LongEvent>(ringBuffer, sequenceBarrier, handlerA); //這一步的目的就是把消費者的位置信息引用注入到生產者 如果只有一個消費者的情況可以省略 ringBuffer.addGatingSequences(eventProcessorA.getSequence()); //把消息處理器提交到線程池 pool.execute(eventProcessorA); //創建消息處理器, 相當於消費者B BatchEventProcessor<LongEvent> eventProcessorB = new BatchEventProcessor<LongEvent>(ringBuffer, sequenceBarrier, handlerB); ringBuffer.addGatingSequences(eventProcessorB.getSequence()); pool.execute(eventProcessorB); for (int i = 0; i < 100; i++) {
//拿取空槽位置 long seq = ringBuffer.next();
//對空槽進行數據填充 LongEvent event = (LongEvent) ringBuffer.get(seq); event.setValue(i);
//發布數據, 通知消費者進行數據消費 ringBuffer.publish(seq); } Thread.sleep(1000); //通知事件(或者說消息)處理器 可以結束了(並不是馬上結束!!!) eventProcessorA.halt(); eventProcessorB.halt(); //關閉線程 pool.shutdown(); }
結果:
我這里只貼了一部分, 其實是都消費完了.
從圖中可以看出, 消費沒有順序, 並不是A消費了B才消費, 也不是交替消費. 他們消費的數據是相同的.
除了以上這種寫法, 他還有一種寫法, 使用 WorkPool:
public static void main3(String[] args) throws InterruptedException { ExecutorService pool = Executors.newCachedThreadPool(); LongEventHandlerA handlerA = new LongEventHandlerA(); LongEventHandlerB handlerB = new LongEventHandlerB(); LongEventFactory eventFactory = new LongEventFactory(); RingBuffer ringBuffer = RingBuffer.createSingleProducer(eventFactory, 1024); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); WorkerPool<LongEvent> workerPoolA = new WorkerPool<LongEvent>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handlerA); workerPoolA.start(pool); WorkerPool<LongEvent> workerPoolB = new WorkerPool<LongEvent>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handlerB); workerPoolB.start(pool); for (int i = 0; i < 100; i++) { long seq = ringBuffer.next();//占個坑 --ringBuffer一個可用區塊 LongEvent longEvent = (LongEvent) ringBuffer.get(seq);//給這個區塊放入 數據 longEvent.setValue(i); ringBuffer.publish(seq);//發布這個區塊的數據使handler(consumer)可見 } Thread.sleep(1000);//等上1秒,等消費都處理完成 //通知事件(或者說消息)處理器 可以結束了(並不是馬上結束!!!) workerPoolA.halt(); //通知事件(或者說消息)處理器 可以結束了(並不是馬上結束!!!) workerPoolB.halt(); //終止線程 pool.shutdown(); }
結果:
數據少的時候, 你可能會看到 A 和 B 交替出現, 但事實上, 這里並沒有順序.