高性能隊列Disruptor框架的詳細說明與實戰使用


Disruptor的使用

1.簡介

The LMAX Disruptor is a high performance inter-thread messaging library. It grew out of LMAX's research into concurrency, performance and non-blocking algorithms and today forms a core part of their Exchange's infrastructure.

(LMAX Disruptor是一個高性能的線程間消息傳遞庫。它源於LMAX對並發性、性能和非阻塞算法的研究,如今已成為其Exchange基礎架構的核心部分。)

-- 引用自GITHUB介紹

Disruptor是一個高性能的異步處理框架,或者可以認為是最快的消息框架(輕量的JMS),也可以認為是一個觀察者模式的實現,或者事件監聽模式的實現。以下是介紹wiki地址:

https://github.com/LMAX-Exchange/disruptor/wiki

2.Disruptor的設計方案

Disruptor通過以下設計來解決隊列速度慢的問題:
環形數組結構
為了避免垃圾回收,采用數組而非鏈表。同時,數組對處理器的緩存機制更加友好。
元素位置定位
數組長度2^n,通過位運算,加快定位的速度。下標采取遞增的形式。不用擔心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。
無鎖設計
每個生產者或者消費者線程,會先申請可以操作的元素在數組中的位置,申請到之后,直接在該位置寫入或者讀取數據。
下面忽略數組的環形結構,介紹一下如何實現無鎖設計。整個過程通過原子變量CAS,保證操作的線程安全。

3.Disruptor實現特征

另一個關鍵的實現低延遲的細節就是在Disruptor中利用無鎖的算法,所有內存的可見性和正確性都是利用內存屏障或者CAS操作。使用CAS來保證多線程安全,與大部分並發隊列使用的鎖相比,CAS顯然要快很多。CAS是CPU級別的指令,更加輕量,不必像鎖一樣需要操作系統提供支持,所以每次調用不需要在用戶態與內核態之間切換,也不需要上下文切換。
只有一個用例中鎖是必須的,那就是BlockingWaitStrategy(阻塞等待策略),唯一的實現方法就是使用Condition實現消費者在新事件到來前等待。許多低延遲系統使用忙等待去避免Condition的抖動,然而在系統忙等待的操作中,性能可能會顯著降低,尤其是在CPU資源嚴重受限的情況下,例如虛擬環境下的WEB服務器。

4.Disruptor實現生產者消費者模型

這里我們按照原作者Demo介紹制作一個放入LongValue的生產者和消費者模型,相關的代碼如下所示:

maven依賴
	<dependencies>
		<dependency>
			<groupId>com.lmax</groupId>
			<artifactId>disruptor</artifactId>
			<version>3.2.1</version>
		</dependency>
	</dependencies>
LongEvent
//定義事件event  通過Disruptor 進行交換的數據類型。
public class LongEvent  {

	private Long value;

	public Long getValue() {
		return value;
	}

	public void setValue(Long value) {
		this.value = value;
	}

}
LongEventFactory
public class LongEventFactory implements EventFactory<LongEvent> {

	public LongEvent newInstance() {

		return new LongEvent();
	}

}
LongEventHandler
// 消費者獲得數據
public class LongEventHandler implements EventHandler<LongEvent> {

    @Override
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
        System.out.println("消費者獲得數據:" + longEvent.getValue());
    }
}
LongEventProducer
// 生產者
public class LongEventProducer {

    private RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer byteBuffer) {
        // 獲取事件隊列的下表位置
        long sequence = ringBuffer.next();
        try {
            // 取出空隊列
            LongEvent longEvent = ringBuffer.get(sequence);
            // 給空隊列賦值
            longEvent.setValue(byteBuffer.getLong(0));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("生產者發送數據....");
            ringBuffer.publish(sequence);
        }

    }

}
MainTest
public class MainTest {

    public static void main(String[] args) {

        // 1. 創建線程池
        ExecutorService executor = Executors.newCachedThreadPool();

        // 2. 創建工廠
        LongEventFactory longEventFactory = new LongEventFactory();

        // 3.創建ringbuffer 大小
        int ringbuffer = 1024 * 1024; // 2的N次方

        // 4. 創建disruptor
        Disruptor<LongEvent> longEventDisruptor = new Disruptor<>(
                longEventFactory, ringbuffer, executor,
                ProducerType.MULTI, new YieldingWaitStrategy()
        );

        // 5. 連接消費者
        longEventDisruptor.handleEventsWith(new LongEventHandler());
        // 6. 啟動
        longEventDisruptor.start();

        // 7.創建ringbuffer容器
        RingBuffer<LongEvent> ringBuffer = longEventDisruptor.getRingBuffer();

        // 8.創建生產者
        LongEventProducer longEventProducer = new LongEventProducer(ringBuffer);

        // 9. 指定緩沖區的大小
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for (int i = 0; i < 10; i++) {
            byteBuffer.putLong(0,i);
            longEventProducer.onData(byteBuffer);
        }
        executor.shutdown();
        longEventDisruptor.shutdown();

    }
}

執行結果如下:
生產者發送數據....
生產者發送數據....
生產者發送數據....
生產者發送數據....
消費者獲得數據:0
生產者發送數據....
消費者獲得數據:1
生產者發送數據....
消費者獲得數據:2
生產者發送數據....
消費者獲得數據:3
生產者發送數據....
消費者獲得數據:4
生產者發送數據....
消費者獲得數據:5
生產者發送數據....
消費者獲得數據:6
消費者獲得數據:7
消費者獲得數據:8
消費者獲得數據:9


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM