在JDK的多線程與並發庫一文中, 提到了BlockingQueue實現了生產者-消費者模型
BlockingQueue是基於鎖實現的, 而鎖的效率通常較低. 有沒有使用CAS機制實現的生產者-消費者?
Disruptor就是這樣.
disruptor使用觀察者模式, 主動將消息發送給消費者, 而不是等消費者從隊列中取; 在無鎖的情況下, 實現queue(環形, RingBuffer)的並發操作, 性能遠高於BlockingQueue
1.生產者-消費者
1.1使用Disruptor類
RingBuffer通過Disruptor實例獲得
public class Client { public static void main(String[] args) throws Exception { //1.配置並獲得Disruptor ExecutorService executor = Executors.newCachedThreadPool(); LongEventFactory factory = new LongEventFactory(); // 設置RingBuffer大小, 需為2的N次方(能將求模運算轉為位運算提高效率 ), 否則影響性能 int ringBufferSize = 1024 * 1024; //創建disruptor, 泛型參數:傳遞的事件的類型 // 第一個參數: 產生Event的工廠類, Event封裝生成-消費的數據 // 第二個參數: RingBuffer的緩沖區大小 // 第三個參數: 線程池 // 第四個參數: SINGLE單個生產者, MULTI多個生產者 // 第五個參數: WaitStrategy 當消費者阻塞在SequenceBarrier上, 消費者如何等待的策略. //BlockingWaitStrategy 使用鎖和條件變量, 效率較低, 但CPU的消耗最小, 在不同部署環境下性能表現比較一致 //SleepingWaitStrategy 多次循環嘗試不成功后, 讓出CPU, 等待下次調度; 多次調度后仍不成功, 睡眠納秒級別的時間再嘗試. 平衡了延遲和CPU資源占用, 但延遲不均勻. //YieldingWaitStrategy 多次循環嘗試不成功后, 讓出CPU, 等待下次調度. 平衡了延遲和CPU資源占用, 延遲也比較均勻. //BusySpinWaitStrategy 自旋等待,類似自旋鎖. 低延遲但同時對CPU資源的占用也多. Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); // 注冊事件消費處理器, 也即消費者. 可傳入多個EventHandler ... disruptor.handleEventsWith(new LongEventHandler()); // 啟動 disruptor.start(); //2.將數據裝入RingBuffer RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); // 創建生產者, 以下方式一使用原始api, 方式二使用新API //LongEventProducer producer = new LongEventProducer(ringBuffer); LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer); ByteBuffer byteBuffer = ByteBuffer.allocate(8); // 這里只是筆者實驗, 不是必須要用ByteBuffer保存long數據 for(int i = 0; i < 100; ++i){ byteBuffer.putLong(0, i); producer.produceData(byteBuffer); } disruptor.shutdown(); //關閉 disruptor 阻塞直至所有事件都得到處理 executor.shutdown(); // 需關閉 disruptor使用的線程池, 上一步disruptor關閉時不會連帶關閉線程池 } }
// Event封裝要傳遞的數據 public class LongEvent { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
// 產生Event的工廠 public class LongEventFactory implements EventFactory { @Override public Object newInstance() { return new LongEvent(); } }
public class LongEventHandler implements EventHandler<LongEvent> { // 消費邏輯 @Override public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { System.out.println(longEvent.getValue()); } }
//生產者實現一 public class LongEventProducer { // 生產者持有RingBuffer的引用 private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer){ this.ringBuffer = ringBuffer; } public void produceData(ByteBuffer bb){ // 獲得下一個Event槽的下標 long sequence = ringBuffer.next(); try { // 給Event填充數據 LongEvent event = ringBuffer.get(sequence); event.setValue(bb.getLong(0)); } finally { // 發布Event, 激活觀察者去消費, 將sequence傳遞給該消費者 //publish應該放在 finally塊中以確保一定會被調用->如果某個事件槽被獲取但未提交, 將會堵塞后續的publish動作。 ringBuffer.publish(sequence); } } }
//生產者實現二 public class LongEventProducerWithTranslator { // 使用EventTranslator, 封裝 獲取Event的過程 private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { @Override public void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) { event.setValue(buffer.getLong(0)); } }; private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void produceData(ByteBuffer buffer){ // 發布 ringBuffer.publishEvent(TRANSLATOR, buffer); } }
1.2 直接使用RingBuffer
給出了兩種方式:EventProcessor與WorkPool(可處理多消費者)
public class ClientForEventProcessor { public static void main(String[] args) throws Exception { int BUFFER_SIZE = 1024; int THREAD_NUMBERS = 4; // 這里直接獲得RingBuffer. createSingleProducer創建一個單生產者的RingBuffer // 第一個參數為EventFactory,產生數據Trade的工廠類 // 第二個參數是RingBuffer的大小,需為2的N次方 // 第三個參數是WaitStrategy, 消費者阻塞時如何等待生產者放入Event final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(UUID.randomUUID().toString()); } }, BUFFER_SIZE, new YieldingWaitStrategy()); //SequenceBarrier, 協調消費者與生產者, 消費者鏈的先后順序. 阻塞后面的消費者(沒有Event可消費時) SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //創建事件處理器 (消費者): 處理ringBuffer, 用TradeHandler的方法處理(實現EventHandler), 用sequenceBarrier協調生成-消費 //如果存在多個消費者(老api, 可用workpool解決) 那重復執行 創建事件處理器-注冊進度-提交消費者的過程, 把其中TradeHandler換成其它消費者類 BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(ringBuffer, sequenceBarrier, new TradeHandler()); //把消費者的消費進度情況注冊給RingBuffer結構(生產者) !如果只有一個消費者的情況可以省略 ringBuffer.addGatingSequences(transProcessor.getSequence()); //創建線程池 ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); //把消費者提交到線程池, !說明EventProcessor實現了callable接口 executors.submit(transProcessor); // 生產者, 這里新建線程不是必要的 Future<?> future= executors.submit(new Callable<Void>() { @Override public Void call() throws Exception { long seq; for (int i = 0; i < 10; i++) { seq = ringBuffer.next(); ringBuffer.get(seq).setPrice(Math.random() * 9999); ringBuffer.publish(seq); } return null; } }); Thread.sleep(1000); //等上1秒,等待消費完成 transProcessor.halt(); //通知事件處理器 可以結束了(並不是馬上結束!) executors.shutdown(); } }
public class ClientForWorkPool { public static void main(String[] args) throws InterruptedException { int BUFFER_SIZE = 1024; int THREAD_NUMBERS = 4; RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() { public Trade newInstance() { return new Trade(UUID.randomUUID().toString()); } }, BUFFER_SIZE); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); // 第三個參數: 異常處理器, 這里用ExceptionHandler; 第四個參數WorkHandler的實現類, 可為數組(即傳入多個消費者) WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), new TradeHandler()); ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); workerPool.start(executors); // 生產10個數據 for (int i = 0; i < 8; i++) { long seq = ringBuffer.next(); ringBuffer.get(seq).setPrice(Math.random() * 9999); ringBuffer.publish(seq); } Thread.sleep(1000); workerPool.halt(); executors.shutdown(); } }
// 封裝交易數據 public class Trade { private String id; // 訂單ID private String name; private double price; // 金額 public Trade(String id) { this.id = id; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } }
// 消費者, 這里實現一個接口就行, 寫兩個是為了同時測試EventProcessor和WorkPool public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { //具體的消費邏輯 System.out.println(event.getId()); } }
1.3 多生產者-多消費者
一個Event只能被某一個消費者處理
public static void main(String[] args) throws Exception { //創建RingBuffer RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI, new EventFactory<Order>() { @Override public Order newInstance() { return new Order(); } }, 1024 * 1024, new YieldingWaitStrategy()); SequenceBarrier barriers = ringBuffer.newBarrier(); Consumer[] consumers = new Consumer[3]; for(int i = 0; i < consumers.length; i++){ consumers[i] = new Consumer("ct" + i); } // 3個消費者 WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer, barriers, new MyExceptionHandler(), consumers); ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); ExecutorService executors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); workerPool.start(executors); // 10個生產者, 每個生成者生產20個數據 for (int i = 0; i < 10; i++) { final Producer p = new Producer(ringBuffer); new Thread(new Runnable() { @Override public void run() { for(int j = 0; j < 2; j++){ p.produceData(UUID.randomUUID().toString()); } } }).start(); } System.out.println("----開始生產----"); Thread.sleep(1000); // 等待消費完成 System.out.println("總共消費數量:" + consumers[0].getCount() ); workerPool.halt(); executors.shutdown(); } static class MyExceptionHandler implements ExceptionHandler { public void handleEventException(Throwable ex, long sequence, Object event) {} public void handleOnStartException(Throwable ex) {} public void handleOnShutdownException(Throwable ex) {} } }
public class Consumer implements WorkHandler<Order>{ private String consumerId; // 消費計數器 private static AtomicInteger count = new AtomicInteger(0); public Consumer(String consumerId){ this.consumerId = consumerId; } @Override public void onEvent(Order order) throws Exception { System.out.println("當前消費者: " + this.consumerId + ", 消費信息: " + order.getId()); count.incrementAndGet(); } public int getCount(){ return count.get(); } }
public class Producer { private final RingBuffer<Order> ringBuffer; public Producer(RingBuffer<Order> ringBuffer){ this.ringBuffer = ringBuffer; } public void produceData(String data){ long sequence = ringBuffer.next(); try { Order order = ringBuffer.get(sequence); order.setId(data); } finally { ringBuffer.publish(sequence); } } }
public class Order { private String id; private String name; private double price; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } }
2. 並行處理
除了實現生產者-消費者模型, Disruptor還可以進行多路並行處理(一個Event可以進入多個路徑同時進行處理, 因為不同路徑操作的是同一個Event, 路徑可以匯合)
public class Client { public static void main(String[] args) throws InterruptedException { long beginTime=System.currentTimeMillis(); int bufferSize=1024; ExecutorService executor=Executors.newFixedThreadPool(7); // 注意: 線程數>=handler數+1 Disruptor<Trade> disruptor = new Disruptor<Trade>( new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(UUID.randomUUID().toString()); } }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); // 菱形操作 /* // 創建消費者組(含H1,H2) H1,H2並行執行無先后順序 EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2()); // C1,C2都完成后執行C3, 像JMS傳遞消息 handlerGroup.then(new Handler3()); */ // 順序操作 /* disruptor.handleEventsWith(new Handler1()).handleEventsWith(new Handler2()).handleEventsWith(new Handler3()); */ // 六邊形操作. H1, H4串行執行; H2, H5串行執行; 而H1,H4 與 H2,H5 並行執行 Handler1 h1 = new Handler1(); Handler2 h2 = new Handler2(); Handler3 h3 = new Handler3(); Handler4 h4 = new Handler4(); Handler5 h5 = new Handler5(); disruptor.handleEventsWith(h1, h2); disruptor.after(h1).handleEventsWith(h4); disruptor.after(h2).handleEventsWith(h5); disruptor.after(h4, h5).handleEventsWith(h3); disruptor.start(); // 啟動生產線程 executor.submit(new TradePublisher(disruptor)); Thread.sleep(1000); // 等待消費完成 disruptor.shutdown(); executor.shutdown(); System.out.println("總耗時:"+(System.currentTimeMillis()-beginTime)); } }
public class TradePublisher implements Runnable { private Disruptor<Trade> disruptor;
private static final int LOOP = 100;// 模擬百次交易的發生 public TradePublisher(Disruptor<Trade> disruptor) { this.disruptor = disruptor; } @Override public void run() { TradeEventTranslator tradeTransloator = new TradeEventTranslator(); for (int i = 0; i < LOOP; i++) { disruptor.publishEvent(tradeTransloator); } } } class TradeEventTranslator implements EventTranslator<Trade> { private Random random = new Random(); @Override public void translateTo(Trade event, long sequence) { this.generateTrade(event); } private Trade generateTrade(Trade trade) { trade.setPrice(random.nextDouble() * 9999); return trade; } }
public class Handler1 implements EventHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println("handler1: set name"); event.setName("h1"); } }
public class Handler2 implements EventHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println("handler2: set price"); event.setPrice(17.0); } }
public class Handler3 implements EventHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println("handler3: name: " + event.getName() + " , price: " + event.getPrice() + "; instance: " + event.getId()); } }
public class Handler4 implements EventHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println("handler4: append name"); event.setName(event.getName() + "h4"); } }
public class Handler5 implements EventHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { System.out.println("handler5: add price"); event.setPrice(event.getPrice() + 3.0); } }