很多時候我們只需要消息中間件這樣的功能,那么直需要RinBuffer就可以了。
入口:
1 import java.util.concurrent.Callable; 2 import java.util.concurrent.ExecutorService; 3 import java.util.concurrent.Executors; 4 import java.util.concurrent.Future; 5 6 import com.lmax.disruptor.BatchEventProcessor; 7 import com.lmax.disruptor.EventFactory; 8 import com.lmax.disruptor.RingBuffer; 9 import com.lmax.disruptor.SequenceBarrier; 10 import com.lmax.disruptor.YieldingWaitStrategy; 11 12 public class Main1 { 13 14 public static void main(String[] args) throws Exception { 15 int BUFFER_SIZE=1024; 16 int THREAD_NUMBERS=4; 17 /* 18 * createSingleProducer創建一個單生產者的RingBuffer, 19 * 第一個參數叫EventFactory,從名字上理解就是"事件工廠",其實它的職責就是產生數據填充RingBuffer的區塊。 20 * 第二個參數是RingBuffer的大小,它必須是2的指數倍 目的是為了將求模運算轉為&運算提高效率 21 * 第三個參數是RingBuffer的生產都在沒有可用區塊的時候(可能是消費者(或者說是事件處理器) 太慢了)的等待策略 22 */ 23 final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() { 24 @Override 25 public Trade newInstance() { 26 return new Trade(); 27 } 28 }, BUFFER_SIZE, new YieldingWaitStrategy()); 29 30 //創建線程池 31 ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); 32 33 //創建SequenceBarrier 34 SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); 35 36 /****************** @beg 消費者消費數據 2017-1-11******************/ 37 //創建消息處理器 38 BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>( 39 ringBuffer, sequenceBarrier, new TradeHandler()); 40 41 //這一步的目的就是把消費者的位置信息引用注入到生產者 如果只有一個消費者的情況可以省略 42 ringBuffer.addGatingSequences(transProcessor.getSequence()); 43 44 //把消息處理器提交到線程池 45 executors.submit(transProcessor); 46 /****************** @end 消費者消費數據 2017-1-11******************/ 47 48 //如果存在多個消費者 那重復執行上面3行代碼 把TradeHandler換成其它消費者類 49 50 /****************** @beg 生產者生產數據 2017-1-11******************/ 51 52 Future<?> future= executors.submit(new Callable<Void>() { 53 @Override 54 public Void call() throws Exception { 55 long seq; 56 for(int i=0;i<10;i++){ 57 seq = ringBuffer.next();//占個坑 --ringBuffer一個可用區塊 58 ringBuffer.get(seq).setPrice(Math.random()*9999);//給這個區塊放入 數據 59 ringBuffer.publish(seq);//發布這個區塊的數據使handler(consumer)可見 60 } 61 return null; 62 } 63 }); 64 65 /****************** @end 生產者生產數據 2017-1-11******************/ 66 67 future.get();//等待生產者結束 68 Thread.sleep(1000);//等上1秒,等消費都處理完成 69 transProcessor.halt();//通知事件(或者說消息)處理器 可以結束了(並不是馬上結束!!!) 70 executors.shutdown();//終止線程 71 } 72 }
消費者:
1 import java.util.UUID; 2 3 import com.lmax.disruptor.EventHandler; 4 import com.lmax.disruptor.WorkHandler; 5 6 public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> { 7 8 @Override 9 public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { 10 this.onEvent(event); 11 } 12 13 @Override 14 public void onEvent(Trade event) throws Exception { 15 //這里做具體的消費邏輯 16 event.setId(UUID.randomUUID().toString());//簡單生成下ID 17 System.out.println(event.getId()); 18 } 19 }
數據對象:
1 import java.util.concurrent.atomic.AtomicInteger; 2 3 public class Trade { 4 5 private String id;//ID 6 private String name; 7 private double price;//金額 8 private AtomicInteger count = new AtomicInteger(0); 9 10 public String getId() { 11 return id; 12 } 13 public void setId(String id) { 14 this.id = id; 15 } 16 public String getName() { 17 return name; 18 } 19 public void setName(String name) { 20 this.name = name; 21 } 22 public double getPrice() { 23 return price; 24 } 25 public void setPrice(double price) { 26 this.price = price; 27 } 28 public AtomicInteger getCount() { 29 return count; 30 } 31 public void setCount(AtomicInteger count) { 32 this.count = count; 33 } 34 35 36 }