【開發總結】Disruptor 使用簡介
在極客時間看到王寶令老師關於 Disruptor 的一篇文章,覺得很有意思。看完之后又在網上找到一些其他關於Disruptor 的資料看了一下。
現在寫篇文章總結一下。
使用
Disruptor 百度翻譯是干擾者,分裂器的意思。
在這里它其實是一個高性能隊列,一個queue。所以我有點想不通為什么名字取成這樣。有清楚的同學可以知會我一生。
Disruptor 的使用相對Java集合類中的隊列,會更加復雜。
第一步,引入jar包.
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version> </dependency>
第二步,生成 Disruptor 對象
第三步,設置隊列中消息消費的handler.
第四步,啟動 Disruptor 線程。
第五步,獲取ringbuffer。生產者通過向 Disruptor 的ringbuffer 來發布消息的。所以事先要先獲取ringbuffer。
第六步,發布消息。
/** * @description: * @author: lkb * @create: 2020-10-28 19:46 */ @Slf4j public class MyTest { public static void main(String[] args) { //指定RingBuffer大小, //必須是2的N次方 int bufferSize = 1024; //構建Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>( LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); //注冊事件處理器 disruptor.handleEventsWith( (event, sequence, endOfBatch) -> System.out.println("E: " + event)); //啟動Disruptor disruptor.start(); //獲取RingBuffer RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); //生產Event ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); //生產者生產消息 ringBuffer.publishEvent( (event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); try { Thread.sleep(1000); } catch (InterruptedException e) { log.error(e.getMessage(), e); } } } }
其中LongEvent 是一個普通的POJO對象
public class LongEvent { private long value; public void set(long value) { this.value = value; } }
Disruptor 的使用是典型的生產者-消費者模式。
Java集合中的隊列更符合我們對隊列的操作習慣。以ArrayBlockingQueue為例,我們可以把ArrayBlockingQueue想象為一個隊列管道,生產者線程生產完數據后,將數據丟到隊列中,消費者線程從另外一端取出數據,進行消費。
而Disruptor 相對其他傳統的隊列而言更像一個“大家長”,生成者需要通過這位“大家長”的ringbuffer將消息發送出去,消費者需要將處理操作注冊到“大家長”這里。
這樣的隊列操作不太符合我們的習慣,所以使用上會不那么順手。
高效的秘訣
在不順手的情況下,為什么還是有很多系統用到它呢?原因在於它非常高效。
上面是網上找到的性能對比圖。可以看到Disruptor性能上是非常高的。
那它是如何實現高效的呢?
- 內存分配更加合理,使用 RingBuffer 數據結構,數組元素在初始化時一次性全部創建,提升緩存命中率;
- 對象循環利用,避免頻繁 GC。
- 能夠避免偽共享,提升緩存利用率。
- 采用無鎖算法,避免頻繁加鎖、解鎖的性能消耗。支持批量消費,消費者可以無鎖方式消費多個消息。
對於第四點,相信大家都很清楚。鎖操作涉及到操作系統狀態切換,這個操作是非常耗時耗資源的。無鎖操作可以避免狀態切換。
對於前面三點,涉及到一個非常重要的概念,就是緩存。CPU有三級緩存。離CPU越近的緩存,速度越快,但是容量越小。因為CPU的速度遠遠大於其他硬件的速度,設置緩存能夠減小CPU和其他硬件的速度差。這個緩存和生產者消費者中間的隊列有異曲同工之妙。
為了提高緩存的命中率,硬件通過局部性原理,在加載一個數據的同時將它周圍的數據也加載進去。
程序的局部性原理指的是在一段時間內程序的執行會限定在一個局部范圍內。這里的“局部性”可以從兩個方面來理解,一個是時間局部性,另一個是空間局部性。時間局部性指的是程序中的某條指令一旦被執行,不久之后這條指令很可能再次被執行;如果某條數據被訪問,不久之后這條數據很可能再次被訪問。而空間局部性是指某塊內存一旦被訪問,不久之后這塊內存附近的內存也很可能被訪問。
上訴的第1、2條,通過將數據設置進連續相鄰的內存位置,CPU在讀取了一個數據的時候,發現第二個數據已經因為“局部性”原理加載進緩存,就不需要再次去尋址,直接從緩存中獲取數據。
第1,2條是對緩存的高效利用,第3條就是對緩存低效使用的規避。
有一種緩存低效使用的方式是“偽共享”。內存是按照緩存行進行管理的。緩存行的大小通常是64個字節。
例如一個緩存行存儲了兩個對象,對其中一個對象的操作會使得整個緩存行失效。也就是說即使對象B被加入了緩存,但是因為其他對象的操作無效了。
第3條中,Disruptor 中通過將對象包裹,讓一個對象充滿整個緩存行,避免了偽共享的問題。
還有一點就是,相對於其他阻塞隊列,Disruptor 的等待策略更多,功能更加強大。
通過對緩存的利用和無鎖操作,Disruptor 成為一個高效隊列。
一些思考
Disruptor 的一些思想其實在其他框架上也是常見的。
避免偽共享問題上,MySQL 8.0 版本直接將查詢緩存的整塊功能刪掉了;在高效利用緩存上,線程池、隊列等都多算緩存概念的受益者;避免鎖操作上,Java的底層的各種鎖優化,也是利用這點,比如輕量級鎖。
為什么這么多框架會不約而同地想到這些問題呢?
因為計算機、操作系統是非常成熟的,底層都是非常相似的架構。了解計算機底層原理,對這些知識才能觸類旁通。所以,啥不說,計算機基礎課,我打算再上一遍。