架構師養成記--16.disruptor並發框架中RingBuffer的使用


很多時候我們只需要消息中間件這樣的功能,那么直需要RinBuffer就可以了。

入口:

 1 import java.util.concurrent.Callable;
 2 import java.util.concurrent.ExecutorService;
 3 import java.util.concurrent.Executors;
 4 import java.util.concurrent.Future;
 5 
 6 import com.lmax.disruptor.BatchEventProcessor;
 7 import com.lmax.disruptor.EventFactory;
 8 import com.lmax.disruptor.RingBuffer;
 9 import com.lmax.disruptor.SequenceBarrier;
10 import com.lmax.disruptor.YieldingWaitStrategy;
11 
12 public class Main1 {  
13    
14     public static void main(String[] args) throws Exception {  
15         int BUFFER_SIZE=1024;  
16         int THREAD_NUMBERS=4;  
17         /* 
18          * createSingleProducer創建一個單生產者的RingBuffer, 
19          * 第一個參數叫EventFactory,從名字上理解就是"事件工廠",其實它的職責就是產生數據填充RingBuffer的區塊。 
20          * 第二個參數是RingBuffer的大小,它必須是2的指數倍 目的是為了將求模運算轉為&運算提高效率 
21          * 第三個參數是RingBuffer的生產都在沒有可用區塊的時候(可能是消費者(或者說是事件處理器) 太慢了)的等待策略 
22          */  
23         final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  
24             @Override  
25             public Trade newInstance() {  
26                 return new Trade();  
27             }  
28         }, BUFFER_SIZE, new YieldingWaitStrategy());  
29         
30         //創建線程池  
31         ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
32         
33         //創建SequenceBarrier  
34         SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
35           
36         /****************** @beg 消費者消費數據 2017-1-11******************/
37         //創建消息處理器  
38         BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(  
39                 ringBuffer, sequenceBarrier, new TradeHandler());  
40           
41         //這一步的目的就是把消費者的位置信息引用注入到生產者    如果只有一個消費者的情況可以省略 
42         ringBuffer.addGatingSequences(transProcessor.getSequence());  
43           
44         //把消息處理器提交到線程池  
45         executors.submit(transProcessor);  
46         /****************** @end 消費者消費數據 2017-1-11******************/
47         
48         //如果存在多個消費者 那重復執行上面3行代碼 把TradeHandler換成其它消費者類  
49           
50         /****************** @beg 生產者生產數據  2017-1-11******************/
51 
52         Future<?> future= executors.submit(new Callable<Void>() {  
53             @Override  
54             public Void call() throws Exception {  
55                 long seq;  
56                 for(int i=0;i<10;i++){  
57                     seq = ringBuffer.next();//占個坑 --ringBuffer一個可用區塊  
58                     ringBuffer.get(seq).setPrice(Math.random()*9999);//給這個區塊放入 數據 
59                     ringBuffer.publish(seq);//發布這個區塊的數據使handler(consumer)可見  
60                 }  
61                 return null;  
62             }  
63         }); 
64 
65         /****************** @end 生產者生產數據 2017-1-11******************/
66         
67         future.get();//等待生產者結束  
68         Thread.sleep(1000);//等上1秒,等消費都處理完成  
69         transProcessor.halt();//通知事件(或者說消息)處理器 可以結束了(並不是馬上結束!!!)  
70         executors.shutdown();//終止線程  
71     }  
72 }  

 

消費者:

 1 import java.util.UUID;
 2 
 3 import com.lmax.disruptor.EventHandler;
 4 import com.lmax.disruptor.WorkHandler;
 5 
 6 public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> {  
 7       
 8     @Override  
 9     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
10         this.onEvent(event);  
11     }  
12   
13     @Override  
14     public void onEvent(Trade event) throws Exception {  
15         //這里做具體的消費邏輯  
16         event.setId(UUID.randomUUID().toString());//簡單生成下ID  
17         System.out.println(event.getId());  
18     }  
19 }  

 

 

數據對象:

 1 import java.util.concurrent.atomic.AtomicInteger;
 2 
 3 public class Trade {  
 4     
 5     private String id;//ID  
 6     private String name;
 7     private double price;//金額  
 8     private AtomicInteger count = new AtomicInteger(0);
 9     
10     public String getId() {
11         return id;
12     }
13     public void setId(String id) {
14         this.id = id;
15     }
16     public String getName() {
17         return name;
18     }
19     public void setName(String name) {
20         this.name = name;
21     }
22     public double getPrice() {
23         return price;
24     }
25     public void setPrice(double price) {
26         this.price = price;
27     }
28     public AtomicInteger getCount() {
29         return count;
30     }
31     public void setCount(AtomicInteger count) {
32         this.count = count;
33     } 
34       
35       
36 }  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM