Disruptor簡單應用


前段時間閑得蛋疼就嘗試翻譯了一下有關Disruptor的一些文章,第一次做這事,爛得自己也不忍回頭去看了。。。

今天寫日志看到那幾篇文章,於是想寫一個簡單的例子,好讓一些對Disruptor有興趣但不是很明白的人更快的了解Disruptor的基本用法。

Disruptor使用起來非常簡單,初始化->啟動消費者線程,然后每當生產者產生資源就往disruptor里放。

下面是一個接收短信狀態報告的例子。

首先自定義一個Event類:

 1 public class DeliveryReportEvent {
 2     private DeliveryReport deliveryReport;
 3 
 4     public DeliveryReport getDeliveryReport() {
 5         return deliveryReport;
 6     }
 7 
 8     public void setDeliveryReport(DeliveryReport deliveryReport) {
 9         this.deliveryReport = deliveryReport;
10     }
11 
12 
13     public final static EventFactory<DeliveryReportEvent> EVENT_FACTORY = new EventFactory<DeliveryReportEvent>() {
14         public DeliveryReportEvent newInstance() {
15             return new DeliveryReportEvent();
16         }
17     };
18 }

代碼很簡單,就是用來存放短信狀態報告DeliveryReport類的,外加一個EventFactory,用來生成Event,也很簡單。

然后寫一個EventHandler類,用來處理Event,

1 public class DeliveryReportEventHandler implements EventHandler<DeliveryReportEvent> {
2 //    private static DeliveryReportRepository repository = new DeliveryReportRepository();
3     public void onEvent(DeliveryReportEvent event, long sequence,
4             boolean endOfBatch) throws Exception {
5         System.out.println(event.getDeliveryReport().getMessageId());
6 //        repository.updateDeliveryReport(event.getDeliveryReport());
7     }
8 }

三個參數,event就是生產者向Disruptor發布資源生產的事件,sequence是這個事件在ringbuffer中的序列號,endOfBatch指明該事件是不是ringbuffer中的最后一個事件。

在onEvent方法里處理消費者要做的事。本例直接打印狀態報告ID或是更新數據庫中的狀態報告。

在這里我寫了一個輔助類:

 1 public class DisruptorHelper {
 2     /**
 3      * ringbuffer容量,最好是2的N次方
 4      */
 5     private static final int BUFFER_SIZE = 1024 * 8;
 6     private RingBuffer<DeliveryReportEvent> ringBuffer;
 7     private SequenceBarrier sequenceBarrier;
 8     private DeliveryReportEventHandler handler;
 9     private BatchEventProcessor<DeliveryReportEvent> batchEventProcessor;
10     private  static DisruptorHelper instance;
11     private static boolean inited = false;
12     private DisruptorHelper(){
13         ringBuffer = new RingBuffer<DeliveryReportEvent>(
14                 DeliveryReportEvent.EVENT_FACTORY, new SingleThreadedClaimStrategy(
15                         BUFFER_SIZE), new YieldingWaitStrategy());
16         sequenceBarrier = ringBuffer.newBarrier();
17         handler = new DeliveryReportEventHandler();
18         batchEventProcessor = new BatchEventProcessor<DeliveryReportEvent>(
19                     ringBuffer, sequenceBarrier, handler);
20         ringBuffer.setGatingSequences(batchEventProcessor
21                 .getSequence());
22     }
23 
24     public static void initAndStart(){
25         instance = new DisruptorHelper();
26         new Thread(instance.batchEventProcessor).start();
27         inited = true;
28     }
29 
30     public static void shutdown(){
31         if(!inited){
32             throw new RuntimeException("Disruptor還沒有初始化!");
33         }
34         instance.shutdown0();
35     }
36 
37     private void shutdown0(){
38         batchEventProcessor.halt();
39     }
40     private void produce0(DeliveryReport deliveryReport) {
41         //獲取下一個序列號
42         long sequence = ringBuffer.next();
43         //將狀態報告存入ringBuffer的該序列號中
44         ringBuffer.get(sequence).setDeliveryReport(deliveryReport);
45         //通知消費者該資源可以消費
46         ringBuffer.publish(sequence);
47     }
48 
49     /**
50      * 將狀態報告放入資源隊列,等待處理
51      * @param deliveryReport
52      */
53     public static void produce(DeliveryReport deliveryReport) {
54         if(!inited){
55             throw new RuntimeException("Disruptor還沒有初始化!");
56         }
57         instance.produce0(deliveryReport);
58     }
59 }

哈哈,因為本例的initAndStart/shutdown方法是在一個loadonstartup的servlet中的init/destory方法里調用,所以就不加synchronized了,執行前也不判斷inited是否為true了。

最后是生產者調用的方法,在接收短信狀態報告的webservice里:

1 public void NotifySmsDeliveryReport(DeliveryReport deliveryReport) {
2 ...
3 ...
4 //向disruptor發布資源
5 DisruptorHelper.produce(deliveryReport);
6 }

 

關於disruptor的性能,可以去官網查看測試數據。也可以從官方svn檢出代碼,自己運行一下src/perf里的性能測試方法,

 

本例用到的包只有一個disruptor-2.7.1.jar,

<dependency>
  <groupId>com.googlecode.disruptor</groupId>
  <artifactId>disruptor</artifactId>

  <version>2.7.1</version>
</dependency>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM