disruptor 高並發編程 簡介demo


原文地址:http://www.cnblogs.com/qiaoyihang/p/6479994.html  

  disruptor適用於大規模低延遲的並發場景。可用於讀寫操作分離、數據緩存,速度匹配(因為其實現了生產者-消費者模型)、或者是基於內存的事件流處理機制的場景。

  disruptor的主要設計思想是無鎖的高並發,特別適用於對時間高度敏感的多線程應用。如果app對時間不敏感完全可以不用disruptor 而只用array blocking queue,在設計上采用內存屏障的機制和CAS操作實現此思想。主流的並發程序 
  都離不開鎖對資源的管控,或者盡量避開鎖的使用。 
  其主要的實現原理總結有如下三點,當然還有很多地方設計得很巧妙,需要細細閱讀源碼和官方文檔。雖然這個 過程對我來說很尷尬,但痛並快樂者,有朝聞道、夕可死也的感覺。 
  1.采用消費者-生產者模型進行讀寫的分離。 
  2.用循環緩存(實際是一個循環隊列)實現了數據的暫存和讀寫速度的匹配。 
  3.用內存屏障加序列號的方式實現了無鎖的並發機制。 

在插一下cas機制

CAS,又稱Compare-and-Swap,代表一種原子操作

一, 為每一個Node在Set的時候分配一個cas值,(本質是版本號,返回的Node和存儲Node的cas值一樣,每次要更新這個Node時要檢查cas的值是否與取出來時一致)

二, 只有在Update一個key的value時才會造成多線程沖突,只是Set/Get是不會的,單線程也不會並發問題。

三, 如何維護每個線程/進程的cas的值:

        增加步進的概念:cas每次自增每個線程都不一樣,這樣的話,每個線程有一個確定的變量,如果是由其它線程修改的一定與本線程的cas不一樣

        1, 每個線程/進程有一個初始化的index,如果有10個進程就是編號為0 ~9

        2, 每次cas值增加都是按進程數來加,step[0] += 10,這樣能保證每個進程的cas都不會一樣

        3,缺點是需要額外的初始化

cas 使用場景:

線程T1對key1、線程T2對Key1並發Get更新了Value值后想Set回去,可能會出現后一個操作覆蓋前一個操作值,而且這個值是涉及到事務性的。正確是應該是T1 Set完后,T2才能取,串行化操作。

CAS就是解決這個問題,如果發現cas值不一樣了,就會Set失敗,需要重取再設置,假定某時刻T1 的cas值為20,T2 的cas值為21。如果沒有步進時,T1處理后cas值為21,T2再處理就認為沒有改變過。

 

disruptor的主要編程部件 
   1.Disruptor:用於控制整個消費者-生產者模型的處理器 
   2.RingBuffer:用於存放數據 
   3.EventHandler:一個用於處理事件的接口(可以當做生產者,也可以當做消費者)。 
   4.EventFactory:事件工廠類。 
   5.WaitStrategy:用於實現事件處理等待RingBuffer游標策略的接口。 
   6.SequeueBarrier:隊列屏障,用於處理訪問RingBuffer的序列。 
   7.用於運行disruptor的線程或者線程池。

 -disruptor編程主要的編程流程 
   1.定義事件 
   2.定義事件工廠 
   3.定義事件處理類 
   4.定義事件處理的線程或者線程池 
   5.指定等待策略 
   6.通過disruptor處理器組裝生產者和消費者 
   7.發布事件 
   8.關閉disruptor業務邏輯處理器 

disruptor實現無鎖高並發,主要采用的消費者-生產者模型。所以編程的實踐場景如下 
   1.一個生產者—一個消費者的場景 
   2.一個生產者—多個消費者的場景 
   3.多個生產者—一個消費者的場景 
   4.多個生產者—多個消費者的場景 

 

記錄一下自己寫的demo,模擬三個消費者消費一個生產者的數據,最后等待所有線程都執行完畢才進行下一步操作:

//首先定義一個事件
public
class MyEvent { private String name; private CountDownLatch countDownLatch; public CountDownLatch getCountDownLatch() { return countDownLatch; } public String getName() { return name; } public void setName(String name) { this.name = name; } public void setCountDownLatch(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } public void setMyEvent(MyEvent myEvent){ name = myEvent.name; countDownLatch = myEvent.countDownLatch; } }

 

//生產數據的工廠
public
class MyEventFactory implements EventFactory<MyEvent> { @Override public MyEvent newInstance() { // TODO Auto-generated method stub return new MyEvent(); } }
//數據構造
public
class MyEventProduce implements Runnable { private final int SIZE = 3; private CountDownLatch countDownLatch; private Disruptor<MyEvent> disruptor; public MyEventProduce() { countDownLatch = new CountDownLatch(SIZE); } public MyEventProduce setDisruptor(Disruptor<MyEvent> disruptor) { this.disruptor = disruptor; return this; } public CountDownLatch getCountDownLatch() { return countDownLatch; } @Override public void run() { for (int i = 1; i <= SIZE; i++) { MyEvent event = new MyEvent(); event.setName("name--" + i); event.setCountDownLatch(countDownLatch); disruptor.publishEvent(new MyEventTranslator(event)); } } }
public class MyEventTranslator implements EventTranslator<MyEvent> {
    private MyEvent myEvent;

    public MyEventTranslator(MyEvent myEvent) {
        this.myEvent = myEvent;
    }

    @Override
    public void translateTo(MyEvent event, long sequence) {
        event.setMyEvent(myEvent);
    }

}
//第一個消費者
public
class Handler1 implements EventHandler<MyEvent>, WorkHandler<MyEvent> { private static final Logger log = LoggerFactory.getLogger(Handler1.class); @Override public void onEvent(MyEvent event) throws Exception { log.debug(event.getName() + "====Handler1 。。。。"); // throw new RuntimeException("測試異常"); } @Override public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println("not go"); onEvent(event); } }
//第二個消費者
public
class Handler11 implements EventHandler<MyEvent>, WorkHandler<MyEvent> { private static final Logger log = LoggerFactory.getLogger(Handler11.class); @Override public void onEvent(MyEvent event) throws Exception { log.debug(event.getName() + "====Handler11 。。。。。"); } @Override public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception { onEvent(event); } }
//第三個消費者
public
class Handler2 implements EventHandler<MyEvent>, WorkHandler<MyEvent> { private static final Logger log = LoggerFactory.getLogger(Handler2.class); @Override public void onEvent(MyEvent event) throws Exception { log.debug(event.getName() + "====Handler2........"); event.getCountDownLatch().countDown(); } @Override public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception { onEvent(event); } }
//異常處理事件類
public
class MyHandlerException implements ExceptionHandler { /* * (non-Javadoc) * * @see * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable * , long, java.lang.Object) */ @Override public void handleEventException(Throwable ex, long sequence, Object event) { // TODO Auto-generated method stub System.out.println("MyHandlerException handleEventException..."); } /* * (non-Javadoc) * * @see * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang. * Throwable) */ @Override public void handleOnStartException(Throwable ex) { System.out.println("MyHandlerException handleOnStartException..."); } /* * (non-Javadoc) * * @see * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang * .Throwable) */ @Override public void handleOnShutdownException(Throwable ex) { System.out.println("MyHandlerException handleOnShutdownException..."); } }

 

//單元測試類
public
class TestDisruptor { private static final Logger log = LoggerFactory .getLogger(TestDisruptor.class); @Test public void myTest() throws Exception { Disruptor<MyEvent> disruptor = new Disruptor<>(new MyEventFactory(), 1024, Exector.newInstance().getExecutorService(), ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleExceptionsWith(new MyHandlerException()); disruptor.handleEventsWithWorkerPool(new Handler1()) .thenHandleEventsWithWorkerPool(new Handler11()) .thenHandleEventsWithWorkerPool(new Handler2()); disruptor.start(); MyEventProduce ep = new MyEventProduce().setDisruptor(disruptor); CountDownLatch countDownLatch = ep.getCountDownLatch(); Exector.newInstance().getExecutorService().submit(ep); countDownLatch.await(); disruptor.shutdown(); log.debug("運行完畢"); } public void testJDBC() throws Exception { Connection connection = DriverManager.getConnection("", "", ""); } }

 

文章借鑒:http://blog.csdn.net/jeffsmish/article/details/53572043

推薦文章:http://blog.163.com/zongyuan1987@126/blog/static/131623156201271021955717/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM