並發框架Disruptor場景應用


今天用一個停車場問題來加深對Disruptor的理解。一個有關汽車進入停車場的問題。當汽車進入停車場時,系統首先會記錄汽車信息。同時也會發送消息到其他系統處理相關業務,最后發送短信通知車主收費開始。看了很多文章,里面的代碼都是大同小異的,可能代碼真的是很經典。以下代碼也是來源網絡,只是自己手動敲的,加了一些注釋。
 
代碼包含以下內容:
1) 事件對象Event
2)三個消費者Handler
3)一個生產者Processer
4)執行Main方法
Event類:汽車信息
  1.  
    public class MyInParkingDataEvent {
  2.  
     
  3.  
    private String carLicense; // 車牌號
  4.  
     
  5.  
    public String getCarLicense() {
  6.  
    return carLicense;
  7.  
    }
  8.  
     
  9.  
    public void setCarLicense(String carLicense) {
  10.  
    this.carLicense = carLicense;
  11.  
    }
  12.  
     
  13.  
    }
Handler類:一個負責存儲汽車數據,一個負責發送kafka信息到其他系統中,最后一個負責給車主發短信通知
  1.  
    import com.lmax.disruptor.EventHandler;
  2.  
    import com.lmax.disruptor.WorkHandler;
  3.  
     
  4.  
    /**
  5.  
    * Handler 第一個消費者,負責保存進場汽車的信息
  6.  
    *
  7.  
    */
  8.  
    public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent> , WorkHandler<MyInParkingDataEvent>{
  9.  
     
  10.  
    @Override
  11.  
    public void onEvent(MyInParkingDataEvent myInParkingDataEvent) throws Exception {
  12.  
    long threadId = Thread.currentThread().getId(); // 獲取當前線程id
  13.  
    String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
  14.  
    System.out.println(String.format( "Thread Id %s 保存 %s 到數據庫中 ....", threadId, carLicense));
  15.  
    }
  16.  
     
  17.  
    @Override
  18.  
    public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
  19.  
    throws Exception {
  20.  
    this.onEvent(myInParkingDataEvent);
  21.  
    }
  22.  
     
  23.  
    }
  1.  
    import com.lmax.disruptor.EventHandler;
  2.  
     
  3.  
    /**
  4.  
    * 第二個消費者,負責發送通知告知工作人員(Kafka是一種高吞吐量的分布式發布訂閱消息系統)
  5.  
    */
  6.  
    public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent>{
  7.  
     
  8.  
    @Override
  9.  
    public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
  10.  
    throws Exception {
  11.  
    long threadId = Thread.currentThread().getId(); // 獲取當前線程id
  12.  
    String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
  13.  
    System.out.println(String.format( "Thread Id %s 發送 %s 進入停車場信息給 kafka系統...", threadId, carLicense));
  14.  
    }
  15.  
     
  16.  
    }
  1.  
    import com.lmax.disruptor.EventHandler;
  2.  
     
  3.  
    /**
  4.  
    * 第三個消費者,sms短信服務,告知司機你已經進入停車場,計費開始。
  5.  
    */
  6.  
    public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent>{
  7.  
     
  8.  
    @Override
  9.  
    public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
  10.  
    throws Exception {
  11.  
    long threadId = Thread.currentThread().getId(); // 獲取當前線程id
  12.  
    String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
  13.  
    System.out.println(String.format( "Thread Id %s 給 %s 的車主發送一條短信,並告知他計費開始了 ....", threadId, carLicense));
  14.  
    }
  15.  
     
  16.  
    }
Producer類:負責上報停車數據
  1.  
    import java.util.concurrent.CountDownLatch;
  2.  
    import com.lmax.disruptor.EventTranslator;
  3.  
    import com.lmax.disruptor.dsl.Disruptor;
  4.  
     
  5.  
    /**
  6.  
    * 生產者,進入停車場的車輛
  7.  
    */
  8.  
    public class MyInParkingDataEventPublisher implements Runnable{
  9.  
     
  10.  
    private CountDownLatch countDownLatch; // 用於監聽初始化操作,等初始化執行完畢后,通知主線程繼續工作
  11.  
    private Disruptor<MyInParkingDataEvent> disruptor;
  12.  
    private static final Integer NUM = 1; // 1,10,100,1000
  13.  
     
  14.  
    public MyInParkingDataEventPublisher(CountDownLatch countDownLatch,
  15.  
    Disruptor<MyInParkingDataEvent> disruptor) {
  16.  
    this.countDownLatch = countDownLatch;
  17.  
    this.disruptor = disruptor;
  18.  
    }
  19.  
     
  20.  
    @Override
  21.  
    public void run() {
  22.  
    MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator();
  23.  
    try {
  24.  
    for(int i = 0; i < NUM; i ++) {
  25.  
    disruptor.publishEvent(eventTranslator);
  26.  
    Thread.sleep( 1000); // 假設一秒鍾進一輛車
  27.  
    }
  28.  
    } catch (InterruptedException e) {
  29.  
    e.printStackTrace();
  30.  
    } finally {
  31.  
    countDownLatch.countDown(); // 執行完畢后通知 await()方法
  32.  
    System.out.println(NUM + "輛車已經全部進入進入停車場!");
  33.  
    }
  34.  
    }
  35.  
     
  36.  
    }
  37.  
     
  38.  
    class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> {
  39.  
     
  40.  
    @Override
  41.  
    public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) {
  42.  
    this.generateData(myInParkingDataEvent);
  43.  
    }
  44.  
     
  45.  
    private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) {
  46.  
    myInParkingDataEvent.setCarLicense( "車牌號: 鄂A-" + (int)(Math.random() * 100000)); // 隨機生成一個車牌號
  47.  
    System.out.println( "Thread Id " + Thread.currentThread().getId() + " 寫完一個event");
  48.  
    return myInParkingDataEvent;
  49.  
    }
  50.  
     
  51.  
    }
執行的Main方法:
  1.  
    import com.lmax.disruptor.EventFactory;
  2.  
    import com.lmax.disruptor.YieldingWaitStrategy;
  3.  
    import com.lmax.disruptor.dsl.Disruptor;
  4.  
    import com.lmax.disruptor.dsl.EventHandlerGroup;
  5.  
    import com.lmax.disruptor.dsl.ProducerType;
  6.  
     
  7.  
    /**
  8.  
    * 執行的Main方法 ,
  9.  
    * 一個生產者(汽車進入停車場);
  10.  
    * 三個消費者(一個記錄汽車信息,一個發送消息給系統,一個發送消息告知司機)
  11.  
    * 前兩個消費者同步執行,都有結果了再執行第三個消費者
  12.  
    */
  13.  
    public class MyInParkingDataEventMain {
  14.  
     
  15.  
    public static void main(String[] args) {
  16.  
    long beginTime=System.currentTimeMillis();
  17.  
    int bufferSize = 2048; // 2的N次方
  18.  
    try {
  19.  
    // 創建線程池,負責處理Disruptor的四個消費者
  20.  
    ExecutorService executor = Executors.newFixedThreadPool( 4);
  21.  
     
  22.  
    // 初始化一個 Disruptor
  23.  
    Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() {
  24.  
    @Override
  25.  
    public MyInParkingDataEvent newInstance() {
  26.  
    return new MyInParkingDataEvent(); // Event 初始化工廠
  27.  
    }
  28.  
    }, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
  29.  
     
  30.  
    // 使用disruptor創建消費者組 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler
  31.  
    EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith(
  32.  
    new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler());
  33.  
     
  34.  
    // 當上面兩個消費者處理結束后在消耗 smsHandler
  35.  
    MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler();
  36.  
    handlerGroup.then(myParkingDataSmsHandler);
  37.  
     
  38.  
    // 啟動Disruptor
  39.  
    disruptor.start();
  40.  
     
  41.  
    CountDownLatch countDownLatch = new CountDownLatch(1); // 一個生產者線程准備好了就可以通知主線程繼續工作了
  42.  
    // 生產者生成數據
  43.  
    executor.submit( new MyInParkingDataEventPublisher(countDownLatch, disruptor));
  44.  
    countDownLatch.await(); // 等待生產者結束
  45.  
     
  46.  
    disruptor.shutdown();
  47.  
    executor.shutdown();
  48.  
    } catch (Exception e) {
  49.  
    e.printStackTrace();
  50.  
    }
  51.  
     
  52.  
    System.out.println( "總耗時:"+(System.currentTimeMillis()-beginTime));
  53.  
    }
  54.  
     
  55.  
    }

--------------------- 本文來自 ITDragon龍 的CSDN 博客 ,全文地址請點擊:https://blog.csdn.net/qq_19558705/article/details/77247912?utm_source=copy 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM