版權聲明:原創作品,謝絕轉載!否則將追究法律責任。
Disruptor是一個優秀的並發框架,可以實現單個或多個生產者生產消息,單個或多個消費者消息,且消費者之間可以存在消費消息的依賴關系。網上其他博客往往僅針對框架的一部分使用示例進行了介紹,對於某些場景下介紹並不完全:如多生產者間復雜的依賴關系的使用編碼。
本文盡可能對Disruptor的所有使用場景進行總結,如有不全之處歡迎指出,請諒解。
具體關於Disruptor的原理,參見:http://ifeve.com/disruptor/,本文不在贅述。
Disruptor類的handleEventsWith,handleEventsWithWorkerPool方法的聯系及區別
在disruptor框架調用start方法之前,往往需要將消息的消費者指定給disruptor框架。
常用的方法是:disruptor.handleEventsWith(EventHandler ... handlers),將多個EventHandler的實現類傳入方法,封裝成一個EventHandlerGroup,實現多消費者消費。
disruptor的另一個方法是:disruptor.handleEventsWithWorkerPool(WorkHandler ... handlers),將多個WorkHandler的實現類傳入方法,封裝成一個EventHandlerGroup實現多消費者消費。
兩者共同點都是,將多個消費者封裝到一起,供框架消費消息。
不同點在於,
1. 對於某一條消息m,handleEventsWith方法返回的EventHandlerGroup,Group中的每個消費者都會對m進行消費,各個消費者之間不存在競爭。handleEventsWithWorkerPool方法返回的EventHandlerGroup,Group的消費者對於同一條消息m不重復消費;也就是,如果c0消費了消息m,則c1不再消費消息m。
2. 傳入的形參不同。對於獨立消費的消費者,應當實現EventHandler接口。對於不重復消費的消費者,應當實現WorkHandler接口。
因此,根據消費者集合是否獨立消費消息,可以對不同的接口進行實現。也可以對兩種接口同時實現,具體消費流程由disruptor的方法調用決定。
在進行場景分析之前,首先定義公共的生產者Producer,消費者OrderHandler1,消息Order,消息工廠OrderFactory。定義分別如下:
package liuqiang.complex.common; public class Order { private String id; public String getId() { return id; } public void setId(String id) { this.id = id; } }
package liuqiang.complex.common; import com.lmax.disruptor.EventFactory; public class OrderFactory implements EventFactory<Order> { @Override public Order newInstance() { return new Order(); } }
package liuqiang.complex.common; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; //EventHandler用於EventHandlerGroup,WorkHandler用於WorkPool。同時實現兩接口,該類對象可同時用於EventHandlerGroup和WorkPool public class OrderHandler1 implements EventHandler<Order>, WorkHandler<Order> { private String consumerId; public OrderHandler1(String consumerId){ this.consumerId = consumerId; } //EventHandler的方法 @Override public void onEvent(Order order, long sequence, boolean endOfBatch) throws Exception { System.out.println("OrderHandler1 " + this.consumerId + ",消費信息:" + order.getId()); } //WorkHandler的方法 @Override public void onEvent(Order order) throws Exception { System.out.println("OrderHandler1 " + this.consumerId + ",消費信息:" + order.getId()); } }
package liuqiang.complex.common; import com.lmax.disruptor.RingBuffer; public class Producer { private final RingBuffer<Order> ringBuffer; public Producer(RingBuffer<Order> ringBuffer){ this.ringBuffer = ringBuffer; } public void onData(String data){ long sequence = ringBuffer.next(); try { Order order = ringBuffer.get(sequence); order.setId(data); } finally { ringBuffer.publish(sequence); } } }
下面定義兩種不同的消費者集合關系:
場景一:單生產者單消費者
package liuqiang.complex.single; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.Order; import liuqiang.complex.common.OrderFactory; import liuqiang.complex.common.OrderHandler1; import liuqiang.complex.common.Producer; import java.util.concurrent.Executors; public class Main1 { //單生產者模式,單消費者模式 public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); //設置一個消費者 disruptor.handleEventsWith(new OrderHandler1("1")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); //單生產者,生產3條數據 for (int l = 0; l < 3; l++) { producer.onData(l + ""); } //為了保證消費者線程已經啟動,留足足夠的時間。具體原因詳見另一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
這種情況最為簡單,單生產者,僅需在Disruptor初始化時,傳入ProducerType.SINGLE即可。使用disruptor.handleEventsWith傳入單消費者。Thread.sleep方法調用是為了保證,在調用disruptor.shutdown方法前,所有的消費者線程都已經啟動,防止shutdown失效的問題。具體問題詳見本人另一篇博客:Disruptor中shutdown方法失效,及產生的不確定性源碼分析。
輸出結果如下:
OrderHandler1 1,消費信息:0
OrderHandler1 1,消費信息:1
OrderHandler1 1,消費信息:2
場景二:單生產者多消費者,多消費者間形成依賴關系,每個依賴節點只有一個消費者。
package liuqiang.complex.single; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.*; import java.util.concurrent.Executors; public class Main2 { //單生產者,多消費者,但多消費者間形成依賴關系,每個依賴節點單線程。 public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); //多個消費者間形成依賴關系,每個依賴節點的消費者為單線程。 disruptor.handleEventsWith(new OrderHandler1("1")).then(new OrderHandler1("2"), new OrderHandler1("3")).then(new OrderHandler1("4")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); //單生產者,生產3條數據 for (int l = 0; l < 3; l++) { producer.onData(l + ""); } //為了保證消費者線程已經啟動,留足足夠的時間。具體原因詳見另一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
四個消費者之間的依賴圖如下:
消費者C2、C3只有在C1消費完消息m后,才能消費m。消費者C4只有在C2、C3消費完m后,才能消費該消息。
可能的輸出結果如下(可能因為線程執行先后順序不同略有區別,但輸出一定滿足相關依賴約束):
OrderHandler1 1,消費信息:0
OrderHandler1 1,消費信息:1
OrderHandler1 2,消費信息:0
OrderHandler1 3,消費信息:0
OrderHandler1 1,消費信息:2
OrderHandler1 2,消費信息:1
OrderHandler1 2,消費信息:2
OrderHandler1 3,消費信息:1
OrderHandler1 3,消費信息:2
OrderHandler1 4,消費信息:0
OrderHandler1 4,消費信息:1
OrderHandler1 4,消費信息:2
場景三:單生產者,多消費者模式。多消費者對於消息不重復消費。
package liuqiang.complex.multi; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.*; import java.util.concurrent.Executors; public class Main3 { //單生產者,多消費者模式。多消費者對於消息不重復消費。例如:1線程消費了消息0,則2線程只能從0后面的消息消費,不能對消息0進行消費。 public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); /* * 該方法傳入的消費者需要實現WorkHandler接口,方法的內部實現是:先創建WorkPool,然后封裝WorkPool為EventHandlerPool返回。 * 消費者1、2對於消息的消費有時有競爭,保證同一消息只能有一個消費者消費 */ disruptor.handleEventsWithWorkerPool(new OrderHandler1("1"), new OrderHandler1("2")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); //單生產者,生產3條數據 for (int l = 0; l < 3; l++) { producer.onData(l + ""); } //為了保證消費者線程已經啟動,留足足夠的時間。具體原因詳見另一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
調用handleEventsWithWorkerPool形成WorkerPool,並進一步封裝成EventHandlerGroup。對於同一條消息,兩消費者不重復消費。
可能輸出結果如下:
OrderHandler1 1,消費信息:0
OrderHandler1 2,消費信息:1
OrderHandler1 1,消費信息:2
場景四:單生產者多消費者,多消費者對於消息m獨立消費。
package liuqiang.complex.multi; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.Order; import liuqiang.complex.common.OrderFactory; import liuqiang.complex.common.OrderHandler1; import liuqiang.complex.common.Producer; import java.util.concurrent.Executors; public class Main4 { //單生產者,多消費者模式。多消費者對於消息獨立消費。例如:對於消息m,兩個消費者都要對其進行消費。 public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); /* * 兩個消費者創建EventHandlerGroup。該消費者需要實現EventHandler類。兩個消費者對於RingBuffer中的每個消息,都獨立消費一次。 * 兩個消費者在消費消息的過程中,各自獨立,不產生競爭。 */ disruptor.handleEventsWith(new OrderHandler1("1"), new OrderHandler1("2")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); //單生產者,生產3條數據 for (int l = 0; l < 3; l++) { producer.onData(l + ""); } //為了保證消費者線程已經啟動,留足足夠的時間。具體原因詳見另一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
可能輸出結果如下:
OrderHandler1 1,消費信息:0
OrderHandler1 2,消費信息:0
OrderHandler1 2,消費信息:1
OrderHandler1 2,消費信息:2
OrderHandler1 1,消費信息:1
OrderHandler1 1,消費信息:2
場景五:單生產者,多消費者間存在依賴關系的模式。消費者1、2消息獨立消費。消費者3、4僅能消費1、2均消費過的消息,消費者5僅能消費3、4均消費過的消息
package liuqiang.complex.multi; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.Order; import liuqiang.complex.common.OrderFactory; import liuqiang.complex.common.OrderHandler1; import liuqiang.complex.common.Producer; import java.util.concurrent.Executors; public class Main5 { //單生產者,多消費者間存在依賴關系的模式。消費者1、2組成EventHandlerGroup,消息獨立消費。消費者3、4僅能消費1、2均消費過的消息,且獨立消費。消費者5僅能消費3、4均消費過的消息 public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); //相當於在各個EventHandlerGroup之間進行級聯,形成依賴關系。 disruptor.handleEventsWith(new OrderHandler1("1"), new OrderHandler1("2")).then(new OrderHandler1("3"), new OrderHandler1("4")).then(new OrderHandler1("5")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); //單生產者,生產3條數據 for (int l = 0; l < 3; l++) { producer.onData(l + ""); } //為了保證消費者線程已經啟動,留足足夠的時間。具體原因詳見另一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
消費者之間的依賴關系如下:
可能的輸出結果如下:
OrderHandler1 2,消費信息:0
OrderHandler1 1,消費信息:0
OrderHandler1 1,消費信息:1
OrderHandler1 1,消費信息:2
OrderHandler1 2,消費信息:1
OrderHandler1 2,消費信息:2
OrderHandler1 3,消費信息:0
OrderHandler1 3,消費信息:1
OrderHandler1 3,消費信息:2
OrderHandler1 4,消費信息:0
OrderHandler1 4,消費信息:1
OrderHandler1 4,消費信息:2
OrderHandler1 5,消費信息:0
OrderHandler1 5,消費信息:1
OrderHandler1 5,消費信息:2
場景六:單生產者,多消費者。多消費者之間不重復消費,且不同的消費者WorkPool之間存在依賴關系。
package liuqiang.complex.multi; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.Order; import liuqiang.complex.common.OrderFactory; import liuqiang.complex.common.OrderHandler1; import liuqiang.complex.common.Producer; import java.util.concurrent.Executors; public class Main6 { /* * 單生產者,多消費者。多消費者之間不重復消費,且不同的消費者WorkPool之間存在依賴關系。 * 消費者1、2不重復消費消息,消費者3、4不重復消費1或者2消費過的消息,消費者5消費消費者3或4消費過的消息。 */ public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleEventsWithWorkerPool(new OrderHandler1("1"), new OrderHandler1("2")).thenHandleEventsWithWorkerPool(new OrderHandler1("3"), new OrderHandler1("4")).thenHandleEventsWithWorkerPool(new OrderHandler1("5")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); //單生產者,生產3條數據 for (int l = 0; l < 3; l++) { producer.onData(l + ""); } //為了保證消費者線程已經啟動,留足足夠的時間。具體原因詳見另一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
消費者之間的依賴圖如下所示:
可能的輸出結果如下:
OrderHandler1 2,消費信息:0
OrderHandler1 1,消費信息:1
OrderHandler1 2,消費信息:2
OrderHandler1 3,消費信息:0
OrderHandler1 3,消費信息:2
OrderHandler1 4,消費信息:1
OrderHandler1 5,消費信息:0
OrderHandler1 5,消費信息:1
OrderHandler1 5,消費信息:2
場景七:單生產者,多消費者模式。消費者1、2不重復消費消息,消費者3、4消費消費者1或2消費過的消息,且獨立重復消費。消費者5消費消費者3、4均消費過的消息。
package liuqiang.complex.multi; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.Order; import liuqiang.complex.common.OrderFactory; import liuqiang.complex.common.OrderHandler1; import liuqiang.complex.common.Producer; import java.util.concurrent.Executors; public class Main7 { //單生產者,多消費者模式。消費者1、2不重復消費消息,消費者3、4消費消費者1或2消費過的消息,且獨立重復消費。消費者5消費消費者3、4均消費過的消息。 public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleEventsWithWorkerPool(new OrderHandler1("1"), new OrderHandler1("2")).then(new OrderHandler1("3"), new OrderHandler1("4")).then(new OrderHandler1("5")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); //單生產者,生產3條數據 for (long l = 0; l < 3; l++) { producer.onData(l + ""); } //為了保證消費者線程已經啟動,留足足夠的時間。具體原因詳見另一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
消費者之間的依賴圖如下:
可能的輸出結果如下:
OrderHandler1 1,消費信息:1
OrderHandler1 2,消費信息:0
OrderHandler1 1,消費信息:2
OrderHandler1 4,消費信息:0
OrderHandler1 4,消費信息:1
OrderHandler1 3,消費信息:0
OrderHandler1 4,消費信息:2
OrderHandler1 3,消費信息:1
OrderHandler1 3,消費信息:2
OrderHandler1 5,消費信息:0
OrderHandler1 5,消費信息:1
OrderHandler1 5,消費信息:2
場景八:單生產者,多消費者模式。消費者1、2獨立消費每一條消息,消費者3、4不重復消費消費者1、2均處理過的消息,消費者5消費消費者3或4消費過的消息
package liuqiang.complex.multi; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.Order; import liuqiang.complex.common.OrderFactory; import liuqiang.complex.common.OrderHandler1; import liuqiang.complex.common.Producer; import java.util.concurrent.Executors; public class Main8 { //單生產者,多消費者模式。消費者1、2獨立消費每一條消息,消費者3、4不重復消費消費者1、2均處理過的消息,消費者5消費消費者3或4消費過的消息 public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleEventsWith(new OrderHandler1("1"), new OrderHandler1("2")).thenHandleEventsWithWorkerPool(new OrderHandler1("3"), new OrderHandler1("4")).then(new OrderHandler1("5")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); //單生產者,生產3條數據 for (int l = 0; l < 3; l++) { producer.onData(l + ""); } //為了保證消費者線程已經啟動,留足足夠的時間。具體原因詳見另一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
消費者間的依賴圖如下:
可能的輸出結果如下:
OrderHandler1 2,消費信息:0
OrderHandler1 2,消費信息:1
OrderHandler1 2,消費信息:2
OrderHandler1 1,消費信息:0
OrderHandler1 1,消費信息:1
OrderHandler1 1,消費信息:2
OrderHandler1 3,消費信息:0
OrderHandler1 3,消費信息:1
OrderHandler1 3,消費信息:2
OrderHandler1 5,消費信息:0
OrderHandler1 5,消費信息:1
OrderHandler1 5,消費信息:2
場景九:多生產者,單消費者模式
該場景較為簡單,只需將ProducerType.SINGLE改為ProducerType.MULTI,並且編寫多線程生產者的相關代碼即可。
package liuqiang.complex.multi; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.Order; import liuqiang.complex.common.OrderFactory; import liuqiang.complex.common.OrderHandler1; import liuqiang.complex.common.Producer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; public class Main9 { //多生產者,單消費者版本。三個生產者獨立生產消息。 public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; //ProducerType要設置為MULTI,后面才可以使用多生產者模式 Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy()); //簡化問題,設置為單消費者模式,也可以設置為多消費者及消費者間多重依賴。 disruptor.handleEventsWith(new OrderHandler1("1")); disruptor.start(); final RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); //判斷生產者是否已經生產完畢 final CountDownLatch countDownLatch = new CountDownLatch(3); //單生產者,生產3條數據 for (int l = 0; l < 3; l++) { Thread thread = new Thread() { @Override public void run() { for(int i = 0; i < 3; i++) { new Producer(ringBuffer).onData(Thread.currentThread().getName() + "'s " + i + "th message"); } countDownLatch.countDown(); } }; thread.setName("producer thread " + l); thread.start(); } countDownLatch.await(); //為了保證消費者線程已經啟動,留足足夠的時間。具體原因詳見另一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
以上是,對disruptor的各個使用場景的簡單介紹。
后面會寫博客針對Disruptor的各部分源碼做一分析,詳細介紹其消費者之間依賴關系的實現機制、單生產者、多生產者之間的不同實現方式等。