Disruptor框架


知識點1:什么是Disruptor

Disruptor使用觀察者模式, 主動將消息發送給消費者, 而不是等消費者從隊列中取; 在無鎖的情況下, 實現queue(環形, RingBuffer)的並發操作, 性能遠高於BlockingQueue

 

知識點2:Disruptor的設計方案

Disruptor通過以下設計來解決隊列速度慢的問題:

環形數組結構

為了避免垃圾回收,采用數組而非鏈表。同時,數組對處理器的緩存機制更加友好。

元素位置定位

數組長度2^n,通過位運算,加快定位的速度。下標采取遞增的形式。不用擔心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。

無鎖設計

每個生產者或者消費者線程,會先申請可以操作的元素在數組中的位置,申請到之后,直接在該位置寫入或者讀取數據。

下面忽略數組的環形結構,介紹一下如何實現無鎖設計。整個過程通過原子變量CAS,保證操作的線程安全。

 

知識點3:Disruptor實現特征

另一個關鍵的實現低延遲的細節就是在Disruptor中利用無鎖的算法,所有內存的可見性和正確性都是利用內存屏障或者CAS操作。使用CAS來保證多線程安全,與大部分並發隊列使用的鎖相比,CAS顯然要快很多。CAS是CPU級別的指令,更加輕量,不必像鎖一樣需要操作系統提供支持,所以每次調用不需要在用戶態與內核態之間切換,也不需要上下文切換。

只有一個用例中鎖是必須的,那就是BlockingWaitStrategy(阻塞等待策略),唯一的實現方法就是使用Condition實現消費者在新事件到來前等待。許多低延遲系統使用忙等待去避免Condition的抖動,然而在系統忙等待的操作中,性能可能會顯著降低,尤其是在CPU資源嚴重受限的情況下,例如虛擬環境下的WEB服務器。

 

知識點4:Disruptor實現生產與消費

1、Pom Maven依賴信息

<dependencies>
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.2.1</version>
        </dependency>
    </dependencies>
    

2、首先聲明一個Event來包含需要傳遞的數據:

//定義事件event  通過Disruptor 進行交換的數據類型。
public class LongEvent {
    private Long value;
    public Long getValue() {
        return value;
    }
 
    public void setValue(Long value) {
        this.value = value;
    }
}

 

3、需要讓Disruptor為我們創建事件,我們同時還聲明了一個EventFactory來實例化Event對象

public class LongEventFactory implements EventFactory<LongEvent> {
    public LongEvent newInstance() {
        return new LongEvent();
    }
}

 

4、事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中存儲的數據打印到終端:

public class LongEventHandler implements EventHandler<LongEvent>  {
 
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
         System.out.println("消費者:"+event.getValue());
    }
}

 

5、定義生產這發送事件

public class LongEventProducer {
    public final RingBuffer<LongEvent> ringBuffer;
    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
 
    public void onData(ByteBuffer byteBuffer) {
        // 1.ringBuffer 事件隊列 下一個槽
        long sequence = ringBuffer.next();
        Long data = null;
        try {
            //2.取出空的事件隊列
            LongEvent longEvent = ringBuffer.get(sequence);
            data = byteBuffer.getLong(0);
            //3.獲取事件隊列傳遞的數據
            longEvent.setValue(data);
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } finally {
            System.out.println("生產這准備發送數據");
            //4.發布事件
            ringBuffer.publish(sequence);
        }
    }
 
}

 

6、main函數執行調用

public class DisruptorMain {
 
    public static void main(String[] args) {
        // 1.創建一個可緩存的線程 提供線程來出發Consumer 的事件處理
        ExecutorService executor = Executors.newCachedThreadPool();
        // 2.創建工廠
        EventFactory<LongEvent> eventFactory = new LongEventFactory();
        // 3.創建ringBuffer 大小
        int ringBufferSize = 1024 * 1024; // ringBufferSize大小一定要是2的N次方
        // 4.創建Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor,
                ProducerType.SINGLE, new YieldingWaitStrategy());
        // 5.連接消費端方法
        disruptor.handleEventsWith(new LongEventHandler());
        // 6.啟動
        disruptor.start();
        // 7.創建RingBuffer容器
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        // 8.創建生產者
        LongEventProducer producer = new LongEventProducer(ringBuffer);
        // 9.指定緩沖區大小
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for (int i = 1; i <= 100; i++) {
            byteBuffer.putLong(0, i);
            producer.onData(byteBuffer);
        }
        //10.關閉disruptor和executor
        disruptor.shutdown();
        executor.shutdown();
    }
 
}

 

 

知識點5:什么是ringbuffer

它是一個環(首尾相接的環),你可以把它用做在不同上下文(線程)間傳遞數據的buffer。

                                                                             

基本來說,ringbuffer擁有一個序號,這個序號指向數組中下一個可用的元素。(校對注:如下圖右邊的圖片表示序號,這個序號指向數組的索引4的位置。)

                                                                             

隨着你不停地填充這個buffer(可能也會有相應的讀取),這個序號會一直增長,直到繞過這個環。

                                                                             

要找到數組中當前序號指向的元素,可以通過mod操作:

以上面的ringbuffer為例(java的mod語法):12 % 10 = 2。很簡單吧。  事實上,上圖中的ringbuffer只有10個槽完全是個意外。如果槽的個數是2的N次方更有利於基於二進制

1、優點

之所以ringbuffer采用這種數據結構,是因為它在可靠消息傳遞方面有很好的性能。這就夠了,不過它還有一些其他的優點。

首先,因為它是數組,所以要比鏈表快,而且有一個容易預測的訪問模式。(譯者注:數組內元素的內存地址的連續性存儲的)。這是對CPU緩存友好的—也就是說,在硬件級別,數組中的元素是會被預加載的,因此在ringbuffer當中,cpu無需時不時去主存加載數組中的下一個元素。(校對注:因為只要一個元素被加載到緩存行,其他相鄰的幾個元素也會被加載進同一個緩存行)

其次,你可以為數組預先分配內存,使得數組對象一直存在(除非程序終止)。這就意味着不需要花大量的時間用於垃圾回收。此外,不像鏈表那樣,需要為每一個添加到其上面的對象創造節點對象—對應的,當刪除節點時,需要執行相應的內存清理操作。

2、RingBuffer底層實現

RingBuffer是一個首尾相連的環形數組,所謂首尾相連,是指當RingBuffer上的指針越過數組是上界后,繼續從數組頭開始遍歷。因此,RingBuffer中至少有一個指針,來表示RingBuffer中的操作位置。另外,指針的自增操作需要做並發控制,Disruptor和本文的OptimizedQueue都使用CAS的樂觀並發控制來保證指針自增的原子性,關於樂觀並發控制之后會着重介紹。

Disruptor中的RingBuffer上只有一個指針,表示當前RingBuffer上消息寫到了哪里,此外,每個消費者會維護一個sequence表示自己在RingBuffer上讀到哪里,從這個角度講,Disruptor中的RingBuffer上實際有消費者數+1個指針。由於我們要實現的是一個單消息單消費的阻塞隊列,只要維護一個讀指針(對應消費者)和一個寫指針(對應生產者)即可,無論哪個指針,每次讀寫操作后都自增一次,一旦越界,即從數組頭開始繼續讀寫

 

知識點6:Disruptor的核心概念

1、RingBuffer

如其名,環形的緩沖區。曾經 RingBuffer 是 Disruptor 中的最主要的對象,但從3.0版本開始,其職責被簡化為僅僅負責對通過 Disruptor 進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 可以由用戶的自定義實現來完全替代。

2、SequenceDisruptor

通過順序遞增的序號來編號管理通過其進行交換的數據(事件),對數據(事件)的處理過程總是沿着序號逐個遞增處理。一個 Sequence 用於跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一個 AtomicLong 也可以用於標識進度,但定義 Sequence 來負責該問題還有另一個目的,那就是防止不同的 Sequence 之間的CPU緩存偽共享(Flase Sharing)問題。(注:這是 Disruptor 實現高性能的關鍵點之一,網上關於偽共享問題的介紹已經汗牛充棟,在此不再贅述)。

Sequencer

Sequencer 是 Disruptor 的真正核心。此接口有兩個實現類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產者和消費者之間快速、正確地傳遞數據的並發算法。

3、Sequence Barrier

用於保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。 Sequence Barrier 還定義了決定 Consumer 是否還有可處理的事件的邏輯。

4、Wait Strategy

定義 Consumer 如何進行等待下一個事件的策略。 (注:Disruptor 定義了多種不同的策略,針對不同的場景,提供了不一樣的性能表現)

5、Event

在 Disruptor 的語義中,生產者和消費者之間進行交換的數據被稱為事件(Event)。它不是一個被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義並指定。

6、EventProcessor

EventProcessor 持有特定消費者(Consumer)的 Sequence,並提供用於調用事件處理實現的事件循環(Event Loop)。

7、EventHandler

Disruptor 定義的事件處理接口,由用戶實現,用於處理事件,是 Consumer 的真正實現。

8、Producer

即生產者,只是泛指調用 Disruptor 發布事件的用戶代碼,Disruptor 沒有定義特定接口或類型。

RingBuffer——Disruptor底層數據結構實現,核心類,是線程間交換數據的中轉地;

Sequencer——序號管理器,負責消費者/生產者各自序號、序號柵欄的管理和協調;

Sequence——序號,聲明一個序號,用於跟蹤ringbuffer中任務的變化和消費者的消費情況;

SequenceBarrier——序號柵欄,管理和協調生產者的游標序號和各個消費者的序號,確保生產者不會覆蓋消費者未來得及處理的消息,確保存在依賴的消費者之間能夠按照正確的順序處理;

EventProcessor——事件處理器,監聽RingBuffer的事件,並消費可用事件,從RingBuffer讀取的事件會交由實際的生產者實現類來消費;它會一直偵聽下一個可用的序號,直到該序號對應的事件已經准備好。

EventHandler——業務處理器,是實際消費者的接口,完成具體的業務邏輯實現,第三方實現該接口;代表着消費者。

Producer——生產者接口,第三方線程充當該角色,producer向RingBuffer寫入事件。

 

 

原文鏈接:https://blog.csdn.net/weixin_37645032/article/details/103264928

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM