知識點1:什么是Disruptor
Disruptor使用觀察者模式, 主動將消息發送給消費者, 而不是等消費者從隊列中取; 在無鎖的情況下, 實現queue(環形, RingBuffer)的並發操作, 性能遠高於BlockingQueue
知識點2:Disruptor的設計方案
Disruptor通過以下設計來解決隊列速度慢的問題:
環形數組結構
為了避免垃圾回收,采用數組而非鏈表。同時,數組對處理器的緩存機制更加友好。
元素位置定位
數組長度2^n,通過位運算,加快定位的速度。下標采取遞增的形式。不用擔心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。
無鎖設計
每個生產者或者消費者線程,會先申請可以操作的元素在數組中的位置,申請到之后,直接在該位置寫入或者讀取數據。
下面忽略數組的環形結構,介紹一下如何實現無鎖設計。整個過程通過原子變量CAS,保證操作的線程安全。
知識點3:Disruptor實現特征
另一個關鍵的實現低延遲的細節就是在Disruptor中利用無鎖的算法,所有內存的可見性和正確性都是利用內存屏障或者CAS操作。使用CAS來保證多線程安全,與大部分並發隊列使用的鎖相比,CAS顯然要快很多。CAS是CPU級別的指令,更加輕量,不必像鎖一樣需要操作系統提供支持,所以每次調用不需要在用戶態與內核態之間切換,也不需要上下文切換。
只有一個用例中鎖是必須的,那就是BlockingWaitStrategy(阻塞等待策略),唯一的實現方法就是使用Condition實現消費者在新事件到來前等待。許多低延遲系統使用忙等待去避免Condition的抖動,然而在系統忙等待的操作中,性能可能會顯著降低,尤其是在CPU資源嚴重受限的情況下,例如虛擬環境下的WEB服務器。
知識點4:Disruptor實現生產與消費
1、Pom Maven依賴信息
<dependencies> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.2.1</version> </dependency> </dependencies>
2、首先聲明一個Event來包含需要傳遞的數據:
//定義事件event 通過Disruptor 進行交換的數據類型。 public class LongEvent { private Long value; public Long getValue() { return value; } public void setValue(Long value) { this.value = value; } }
3、需要讓Disruptor為我們創建事件,我們同時還聲明了一個EventFactory來實例化Event對象
public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); } }
4、事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中存儲的數據打印到終端:
public class LongEventHandler implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println("消費者:"+event.getValue()); } }
5、定義生產這發送事件
public class LongEventProducer { public final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer byteBuffer) { // 1.ringBuffer 事件隊列 下一個槽 long sequence = ringBuffer.next(); Long data = null; try { //2.取出空的事件隊列 LongEvent longEvent = ringBuffer.get(sequence); data = byteBuffer.getLong(0); //3.獲取事件隊列傳遞的數據 longEvent.setValue(data); try { Thread.sleep(10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } finally { System.out.println("生產這准備發送數據"); //4.發布事件 ringBuffer.publish(sequence); } } }
6、main函數執行調用
public class DisruptorMain { public static void main(String[] args) { // 1.創建一個可緩存的線程 提供線程來出發Consumer 的事件處理 ExecutorService executor = Executors.newCachedThreadPool(); // 2.創建工廠 EventFactory<LongEvent> eventFactory = new LongEventFactory(); // 3.創建ringBuffer 大小 int ringBufferSize = 1024 * 1024; // ringBufferSize大小一定要是2的N次方 // 4.創建Disruptor Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); // 5.連接消費端方法 disruptor.handleEventsWith(new LongEventHandler()); // 6.啟動 disruptor.start(); // 7.創建RingBuffer容器 RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); // 8.創建生產者 LongEventProducer producer = new LongEventProducer(ringBuffer); // 9.指定緩沖區大小 ByteBuffer byteBuffer = ByteBuffer.allocate(8); for (int i = 1; i <= 100; i++) { byteBuffer.putLong(0, i); producer.onData(byteBuffer); } //10.關閉disruptor和executor disruptor.shutdown(); executor.shutdown(); } }
知識點5:什么是ringbuffer
它是一個環(首尾相接的環),你可以把它用做在不同上下文(線程)間傳遞數據的buffer。
基本來說,ringbuffer擁有一個序號,這個序號指向數組中下一個可用的元素。(校對注:如下圖右邊的圖片表示序號,這個序號指向數組的索引4的位置。)
隨着你不停地填充這個buffer(可能也會有相應的讀取),這個序號會一直增長,直到繞過這個環。
要找到數組中當前序號指向的元素,可以通過mod操作:
以上面的ringbuffer為例(java的mod語法):12 % 10 = 2。很簡單吧。 事實上,上圖中的ringbuffer只有10個槽完全是個意外。如果槽的個數是2的N次方更有利於基於二進制
1、優點
之所以ringbuffer采用這種數據結構,是因為它在可靠消息傳遞方面有很好的性能。這就夠了,不過它還有一些其他的優點。
首先,因為它是數組,所以要比鏈表快,而且有一個容易預測的訪問模式。(譯者注:數組內元素的內存地址的連續性存儲的)。這是對CPU緩存友好的—也就是說,在硬件級別,數組中的元素是會被預加載的,因此在ringbuffer當中,cpu無需時不時去主存加載數組中的下一個元素。(校對注:因為只要一個元素被加載到緩存行,其他相鄰的幾個元素也會被加載進同一個緩存行)
其次,你可以為數組預先分配內存,使得數組對象一直存在(除非程序終止)。這就意味着不需要花大量的時間用於垃圾回收。此外,不像鏈表那樣,需要為每一個添加到其上面的對象創造節點對象—對應的,當刪除節點時,需要執行相應的內存清理操作。
2、RingBuffer底層實現
RingBuffer是一個首尾相連的環形數組,所謂首尾相連,是指當RingBuffer上的指針越過數組是上界后,繼續從數組頭開始遍歷。因此,RingBuffer中至少有一個指針,來表示RingBuffer中的操作位置。另外,指針的自增操作需要做並發控制,Disruptor和本文的OptimizedQueue都使用CAS的樂觀並發控制來保證指針自增的原子性,關於樂觀並發控制之后會着重介紹。
Disruptor中的RingBuffer上只有一個指針,表示當前RingBuffer上消息寫到了哪里,此外,每個消費者會維護一個sequence表示自己在RingBuffer上讀到哪里,從這個角度講,Disruptor中的RingBuffer上實際有消費者數+1個指針。由於我們要實現的是一個單消息單消費的阻塞隊列,只要維護一個讀指針(對應消費者)和一個寫指針(對應生產者)即可,無論哪個指針,每次讀寫操作后都自增一次,一旦越界,即從數組頭開始繼續讀寫
知識點6:Disruptor的核心概念
1、RingBuffer
如其名,環形的緩沖區。曾經 RingBuffer 是 Disruptor 中的最主要的對象,但從3.0版本開始,其職責被簡化為僅僅負責對通過 Disruptor 進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 可以由用戶的自定義實現來完全替代。
2、SequenceDisruptor
通過順序遞增的序號來編號管理通過其進行交換的數據(事件),對數據(事件)的處理過程總是沿着序號逐個遞增處理。一個 Sequence 用於跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一個 AtomicLong 也可以用於標識進度,但定義 Sequence 來負責該問題還有另一個目的,那就是防止不同的 Sequence 之間的CPU緩存偽共享(Flase Sharing)問題。(注:這是 Disruptor 實現高性能的關鍵點之一,網上關於偽共享問題的介紹已經汗牛充棟,在此不再贅述)。
Sequencer
Sequencer 是 Disruptor 的真正核心。此接口有兩個實現類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產者和消費者之間快速、正確地傳遞數據的並發算法。
3、Sequence Barrier
用於保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。 Sequence Barrier 還定義了決定 Consumer 是否還有可處理的事件的邏輯。
4、Wait Strategy
定義 Consumer 如何進行等待下一個事件的策略。 (注:Disruptor 定義了多種不同的策略,針對不同的場景,提供了不一樣的性能表現)
5、Event
在 Disruptor 的語義中,生產者和消費者之間進行交換的數據被稱為事件(Event)。它不是一個被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義並指定。
6、EventProcessor
EventProcessor 持有特定消費者(Consumer)的 Sequence,並提供用於調用事件處理實現的事件循環(Event Loop)。
7、EventHandler
Disruptor 定義的事件處理接口,由用戶實現,用於處理事件,是 Consumer 的真正實現。
8、Producer
即生產者,只是泛指調用 Disruptor 發布事件的用戶代碼,Disruptor 沒有定義特定接口或類型。
RingBuffer——Disruptor底層數據結構實現,核心類,是線程間交換數據的中轉地;
Sequencer——序號管理器,負責消費者/生產者各自序號、序號柵欄的管理和協調;
Sequence——序號,聲明一個序號,用於跟蹤ringbuffer中任務的變化和消費者的消費情況;
SequenceBarrier——序號柵欄,管理和協調生產者的游標序號和各個消費者的序號,確保生產者不會覆蓋消費者未來得及處理的消息,確保存在依賴的消費者之間能夠按照正確的順序處理;
EventProcessor——事件處理器,監聽RingBuffer的事件,並消費可用事件,從RingBuffer讀取的事件會交由實際的生產者實現類來消費;它會一直偵聽下一個可用的序號,直到該序號對應的事件已經准備好。
EventHandler——業務處理器,是實際消費者的接口,完成具體的業務邏輯實現,第三方實現該接口;代表着消費者。
Producer——生產者接口,第三方線程充當該角色,producer向RingBuffer寫入事件。
原文鏈接:https://blog.csdn.net/weixin_37645032/article/details/103264928