前段時間閑得蛋疼就嘗試翻譯了一下有關Disruptor的一些文章,第一次做這事,爛得自己也不忍回頭去看了。。。
今天寫日志看到那幾篇文章,於是想寫一個簡單的例子,好讓一些對Disruptor有興趣但不是很明白的人更快的了解Disruptor的基本用法。
Disruptor使用起來非常簡單,初始化->啟動消費者線程,然后每當生產者產生資源就往disruptor里放。
下面是一個接收短信狀態報告的例子。
首先自定義一個Event類:
1 public class DeliveryReportEvent { 2 private DeliveryReport deliveryReport; 3 4 public DeliveryReport getDeliveryReport() { 5 return deliveryReport; 6 } 7 8 public void setDeliveryReport(DeliveryReport deliveryReport) { 9 this.deliveryReport = deliveryReport; 10 } 11 12 13 public final static EventFactory<DeliveryReportEvent> EVENT_FACTORY = new EventFactory<DeliveryReportEvent>() { 14 public DeliveryReportEvent newInstance() { 15 return new DeliveryReportEvent(); 16 } 17 }; 18 }
代碼很簡單,就是用來存放短信狀態報告DeliveryReport類的,外加一個EventFactory,用來生成Event,也很簡單。
然后寫一個EventHandler類,用來處理Event,
1 public class DeliveryReportEventHandler implements EventHandler<DeliveryReportEvent> { 2 // private static DeliveryReportRepository repository = new DeliveryReportRepository(); 3 public void onEvent(DeliveryReportEvent event, long sequence, 4 boolean endOfBatch) throws Exception { 5 System.out.println(event.getDeliveryReport().getMessageId()); 6 // repository.updateDeliveryReport(event.getDeliveryReport()); 7 } 8 }
三個參數,event就是生產者向Disruptor發布資源生產的事件,sequence是這個事件在ringbuffer中的序列號,endOfBatch指明該事件是不是ringbuffer中的最后一個事件。
在onEvent方法里處理消費者要做的事。本例直接打印狀態報告ID或是更新數據庫中的狀態報告。
在這里我寫了一個輔助類:
1 public class DisruptorHelper { 2 /** 3 * ringbuffer容量,最好是2的N次方 4 */ 5 private static final int BUFFER_SIZE = 1024 * 8; 6 private RingBuffer<DeliveryReportEvent> ringBuffer; 7 private SequenceBarrier sequenceBarrier; 8 private DeliveryReportEventHandler handler; 9 private BatchEventProcessor<DeliveryReportEvent> batchEventProcessor; 10 private static DisruptorHelper instance; 11 private static boolean inited = false; 12 private DisruptorHelper(){ 13 ringBuffer = new RingBuffer<DeliveryReportEvent>( 14 DeliveryReportEvent.EVENT_FACTORY, new SingleThreadedClaimStrategy( 15 BUFFER_SIZE), new YieldingWaitStrategy()); 16 sequenceBarrier = ringBuffer.newBarrier(); 17 handler = new DeliveryReportEventHandler(); 18 batchEventProcessor = new BatchEventProcessor<DeliveryReportEvent>( 19 ringBuffer, sequenceBarrier, handler); 20 ringBuffer.setGatingSequences(batchEventProcessor 21 .getSequence()); 22 } 23 24 public static void initAndStart(){ 25 instance = new DisruptorHelper(); 26 new Thread(instance.batchEventProcessor).start(); 27 inited = true; 28 } 29 30 public static void shutdown(){ 31 if(!inited){ 32 throw new RuntimeException("Disruptor還沒有初始化!"); 33 } 34 instance.shutdown0(); 35 } 36 37 private void shutdown0(){ 38 batchEventProcessor.halt(); 39 } 40 private void produce0(DeliveryReport deliveryReport) { 41 //獲取下一個序列號 42 long sequence = ringBuffer.next(); 43 //將狀態報告存入ringBuffer的該序列號中 44 ringBuffer.get(sequence).setDeliveryReport(deliveryReport); 45 //通知消費者該資源可以消費 46 ringBuffer.publish(sequence); 47 } 48 49 /** 50 * 將狀態報告放入資源隊列,等待處理 51 * @param deliveryReport 52 */ 53 public static void produce(DeliveryReport deliveryReport) { 54 if(!inited){ 55 throw new RuntimeException("Disruptor還沒有初始化!"); 56 } 57 instance.produce0(deliveryReport); 58 } 59 }
哈哈,因為本例的initAndStart/shutdown方法是在一個loadonstartup的servlet中的init/destory方法里調用,所以就不加synchronized了,執行前也不判斷inited是否為true了。
最后是生產者調用的方法,在接收短信狀態報告的webservice里:
1 public void NotifySmsDeliveryReport(DeliveryReport deliveryReport) { 2 ... 3 ... 4 //向disruptor發布資源 5 DisruptorHelper.produce(deliveryReport); 6 }
關於disruptor的性能,可以去官網查看測試數據。也可以從官方svn檢出代碼,自己運行一下src/perf里的性能測試方法,
本例用到的包只有一個disruptor-2.7.1.jar,
<dependency>
<groupId>com.googlecode.disruptor</groupId>
<artifactId>disruptor</artifactId>
<version>2.7.1</version>
</dependency>