springboot~disruptor異步隊列


Disruptor

Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題(在性能測試中發現竟然與I/O操作處於同樣的數量級)。

Java內置隊列的問題

介紹Disruptor之前,我們先來看一看常用的線程安全的內置隊列有什么問題。Java的內置隊列如下表所示。

隊列的底層一般分成三種:數組、鏈表和堆。其中,堆一般情況下是為了實現帶有優先級特性的隊列,暫且不考慮。

從數組和鏈表兩種數據結構來看,基於數組線程安全的隊列,比較典型的是ArrayBlockingQueue,它主要通過加鎖的方式來保證線程安全;基於鏈表的線程安全隊列分成LinkedBlockingQueue和ConcurrentLinkedQueue兩大類,前者也通過鎖的方式來實現線程安全,而后者以及上面表格中的LinkedTransferQueue都是通過原子變量compare and swap(以下簡稱“CAS”)這種不加鎖的方式來實現的。

但是對 volatile類型的變量進行 CAS 操作,存在偽共享問題,下面介紹一下

偽共享

CPU的緩存系統是以緩存行(cache line)為單位存儲的,一般的大小為64bytes。在多線程程序的執行過程中,存在着一種情況,多個需要頻繁修改的變量存在同一個緩存行當中。

假設:有兩個線程分別訪問並修改X和Y這兩個變量,X和Y恰好在同一個緩存行上,這兩個線程分別在不同的CPU上執行。那么每個CPU分別更新好X和Y時將緩存行刷入內存時,發現有別的修改了各自緩存行內的數據,這時緩存行會失效,從L3中重新獲取。這樣的話,程序執行效率明顯下降。為了減少這種情況的發生,其實就是避免X和Y在同一個緩存行中,可以主動添加一些無關變量將緩存行填充滿,比如在X對象中添加一些變量,讓它有64 Byte那么大,正好占滿一個緩存行。

偽共享問題 的解決方案

簡單的說,就是 以空間換時間: 使用占位字節,將變量的所在的 緩沖行 塞滿。
disruptor 無鎖框架就是這么干的。

Disruptor框架是如何解決偽共享問題的?

在Disruptor中有一個重要的類Sequence,該類包裝了一個volatile修飾的long類型數據value,無論是Disruptor中的基於數組實現的緩沖區RingBuffer,還是生產者,消費者,都有各自獨立的Sequence,RingBuffer緩沖區中,Sequence標示着寫入進度,例如每次生產者要寫入數據進緩沖區時,都要調用RingBuffer.next()來獲得下一個可使用的相對位置。對於生產者和消費者來說,Sequence標示着它們的事件序號。

例子

/**
 * 停車場問題.
 * 1) 事件對象Event
 * 2)三個消費者Handler
 * 3)一個生產者Processer
 * 4)執行Main方法
 */
public class DisruptorCar {
    private static final Integer NUM = 1; // 1,10,100,1000

    /**
     * 測試入口 ,
     * 一個生產者(汽車進入停車場);
     * 三個消費者(一個記錄汽車信息,一個發送消息給系統,一個發送消息告知司機)
     * 前兩個消費者同步執行,都有結果了再執行第三個消費者
     */
    @Test
     public  void main() throws InterruptedException {
        long beginTime = System.currentTimeMillis();
        int bufferSize = 2048; // 2的N次方
        try {
            // 創建線程池,負責處理Disruptor的四個消費者
            ExecutorService executor = Executors.newFixedThreadPool(4);

            // 初始化一個 Disruptor
            Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() {
                @Override
                public MyInParkingDataEvent newInstance() {
                    return new MyInParkingDataEvent(); // Event 初始化工廠
                }
            }, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());

            // 使用disruptor創建消費者組 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler
            EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith(
                    new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler());

            // 當上面兩個消費者處理結束后在消耗 smsHandler
            MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler();
            handlerGroup.then(myParkingDataSmsHandler);

            // 啟動Disruptor
            disruptor.start();

            CountDownLatch countDownLatch = new CountDownLatch(1); // 一個生產者線程准備好了就可以通知主線程繼續工作了
            // 生產者生成數據
            executor.submit(new MyInParkingDataEventPublisher(countDownLatch, disruptor));
            countDownLatch.await(); // 等待生產者結束

            disruptor.shutdown();
            executor.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("總耗時:" + (System.currentTimeMillis() - beginTime));
    }

    public class MyInParkingDataEvent {

        private String carLicense; // 車牌號

        public String getCarLicense() {
            return carLicense;
        }

        public void setCarLicense(String carLicense) {
            this.carLicense = carLicense;
        }

    }

    /**
     * Handler 第一個消費者,負責保存進場汽車的信息
     */
    public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent>, WorkHandler<MyInParkingDataEvent> {

        @Override
        public void onEvent(MyInParkingDataEvent myInParkingDataEvent) throws Exception {
            long threadId = Thread.currentThread().getId(); // 獲取當前線程id
            String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
            System.out.println(String.format("Thread Id %s 保存 %s 到數據庫中 ....", threadId, carLicense));
        }

        @Override
        public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
                throws Exception {
            this.onEvent(myInParkingDataEvent);
        }

    }

    /**
     * 第二個消費者,負責發送通知告知工作人員(Kafka是一種高吞吐量的分布式發布訂閱消息系統)
     */
    public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent> {

        @Override
        public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
                throws Exception {
            long threadId = Thread.currentThread().getId(); // 獲取當前線程id
            String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
            System.out.println(String.format("Thread Id %s 發送 %s 進入停車場信息給 kafka系統...", threadId, carLicense));
        }

    }

    /**
     * 第三個消費者,sms短信服務,告知司機你已經進入停車場,計費開始。
     */
    public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent> {

        @Override
        public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
                throws Exception {
            long threadId = Thread.currentThread().getId(); // 獲取當前線程id
            String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
            System.out.println(String.format("Thread Id %s 給  %s 的車主發送一條短信,並告知他計費開始了 ....", threadId, carLicense));
        }

    }

    /**
     * 生產者,進入停車場的車輛
     */
    public class MyInParkingDataEventPublisher implements Runnable {

        private CountDownLatch countDownLatch; // 用於監聽初始化操作,等初始化執行完畢后,通知主線程繼續工作
        private Disruptor<MyInParkingDataEvent> disruptor;

        public MyInParkingDataEventPublisher(CountDownLatch countDownLatch,
                                             Disruptor<MyInParkingDataEvent> disruptor) {
            this.countDownLatch = countDownLatch;
            this.disruptor = disruptor;
        }

        @Override
        public void run() {
            MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator();
            try {
                for (int i = 0; i < NUM; i++) {
                    disruptor.publishEvent(eventTranslator);
                    Thread.sleep(1000); // 假設一秒鍾進一輛車
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown(); // 執行完畢后通知 await()方法
                System.out.println(NUM + "輛車已經全部進入進入停車場!");
            }
        }

    }

    class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> {

        @Override
        public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) {
            this.generateData(myInParkingDataEvent);
        }

        private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) {
            myInParkingDataEvent.setCarLicense("車牌號: 鄂A-" + (int) (Math.random() * 100000)); // 隨機生成一個車牌號
            System.out.println("Thread Id " + Thread.currentThread().getId() + " 寫完一個event");
            return myInParkingDataEvent;
        }

    }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM