disruptor (史上最全)


文章很長,建議收藏起來,慢慢讀! Java 高並發 發燒友社群:瘋狂創客圈 奉上以下珍貴的學習資源:


推薦2:史上最全 Java 面試題 21 個專題

史上最全 Java 面試題 21 個專題 阿里、京東、美團、頭條.... 隨意挑、橫着走!!!
1: JVM面試題(史上最強、持續更新、吐血推薦) https://www.cnblogs.com/crazymakercircle/p/14365820.html
2:Java基礎面試題(史上最全、持續更新、吐血推薦) https://www.cnblogs.com/crazymakercircle/p/14366081.html
3:死鎖面試題(史上最強、持續更新) https://www.cnblogs.com/crazymakercircle/p/14323919.html
4:設計模式面試題 (史上最全、持續更新、吐血推薦) https://www.cnblogs.com/crazymakercircle/p/14367101.html
5:架構設計面試題 (史上最全、持續更新、吐血推薦) https://www.cnblogs.com/crazymakercircle/p/14367907.html
還有 10 +必刷、必刷 的面試題 更多 ....., 請參見【 瘋狂創客圈 高並發 總目錄

推薦3: 瘋狂創客圈 高質量 博文

springCloud 高質量 博文
nacos 實戰(史上最全) sentinel (史上最全+入門教程)
springcloud + webflux 高並發實戰 Webflux(史上最全)
SpringCloud gateway (史上最全) spring security (史上最全)
還有 10 +必刷、必刷 的高質量 博文 更多 ....., 請參見【 瘋狂創客圈 高並發 總目錄

無鎖編程(Lock Free)框架 系列文章:

1 disruptor 是什么?

Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題(在性能測試中發現竟然與I/O操作處於同樣的數量級)。

基於Disruptor開發的系統單線程能支撐每秒600萬訂單,2010年在QCon演講后,獲得了業界關注。2011年,企業應用軟件專家Martin Fowler專門撰寫長文介紹。同年它還獲得了Oracle官方的Duke大獎。

目前,包括Apache Storm、Camel、Log4j 2在內的很多知名項目都應用了Disruptor以獲取高性能。

需要特別指出的是,這里所說的隊列是系統內部的內存隊列,而不是Kafka這樣的分布式隊列。另外,本文所描述的Disruptor特性限於3.3.4。

2 Java內置隊列的問題

介紹Disruptor之前,我們先來看一看常用的線程安全的內置隊列有什么問題。Java的內置隊列如下表所示。

隊列 有界性 數據結構
ArrayBlockingQueue bounded 加鎖 arraylist
LinkedBlockingQueue optionally-bounded 加鎖 linkedlist
ConcurrentLinkedQueue unbounded 無鎖 linkedlist
LinkedTransferQueue unbounded 無鎖 linkedlist
PriorityBlockingQueue unbounded 加鎖 heap
DelayQueue unbounded 加鎖 heap

隊列的底層一般分成三種:數組、鏈表和堆。其中,堆一般情況下是為了實現帶有優先級特性的隊列,暫且不考慮。

從數組和鏈表兩種數據結構來看,基於數組線程安全的隊列,比較典型的是ArrayBlockingQueue,它主要通過加鎖的方式來保證線程安全;基於鏈表的線程安全隊列分成LinkedBlockingQueue和ConcurrentLinkedQueue兩大類,前者也通過鎖的方式來實現線程安全,而后者以及上面表格中的LinkedTransferQueue都是通過原子變量compare and swap(以下簡稱“CAS”)這種不加鎖的方式來實現的。

但是對 volatile類型的變量進行 CAS 操作,存在偽共享問題,具體請參考專門的文章:

偽共享 (圖解)

Disruptor 使用了類似上面的方案,解決了偽共享問題。

3 Disruptor框架是如何解決偽共享問題的?

在Disruptor中有一個重要的類Sequence,該類包裝了一個volatile修飾的long類型數據value,無論是Disruptor中的基於數組實現的緩沖區RingBuffer,還是生產者,消費者,都有各自獨立的Sequence,RingBuffer緩沖區中,Sequence標示着寫入進度,例如每次生產者要寫入數據進緩沖區時,都要調用RingBuffer.next()來獲得下一個可使用的相對位置。對於生產者和消費者來說,Sequence標示着它們的事件序號,來看看Sequence類的源碼:

  class LhsPadding {
	protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding {
	protected volatile long value;
}

class RhsPadding extends Value {
	protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding {
	static final long INITIAL_VALUE = -1L;
	private static final Unsafe UNSAFE;
	private static final long VALUE_OFFSET;
	static {
		UNSAFE = Util.getUnsafe();
		try {
			VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
		} catch(final Exception e) {
			 throw new RuntimeException(e);
		}
	}
	

​```
public Sequence() {
	this(INITIAL_VALUE);
}

public Sequence(final long initialValue) {
	UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
}
​```

}

從第1到11行可以看到,真正使用到的變量value,它的前后空間都由8個long型的變量填補了,對於一個大小為64字節的緩存行,它剛好被填補滿(一個long型變量value,8個字節加上前/后個7long型變量填補,7*8=56,56+8=64字節)。這樣做每次把變量value讀進高速緩存中時,都能把緩存行填充滿(對於大小為64個字節的緩存行來說,如果緩存行大小大於64個字節,那么還是會出現偽共享問題),保證每次處理數據時都不會與其他變量發生沖突。

Disruptor 的使用場景

Disruptor的最常用的場景就是“生產者-消費者”場景,對場景的就是“一個生產者、多個消費者”的場景,並且要求順序處理。

當前業界開源組件使用Disruptor的包括Log4j2、Apache Storm等,它可以用來作為高性能的有界內存隊列,基於生產者消費者模式,實現一個/多個生產者對應多個消費者。它也可以認為是觀察者模式的一種實現,或者發布訂閱模式。

舉個例子,我們從MySQL的BigLog文件中順序讀取數據,然后寫入到ElasticSearch(搜索引擎)中。在這種場景下,BigLog要求一個文件一個生產者,那個是一個生產者。而寫入到ElasticSearch,則嚴格要求順序,否則會出現問題,所以通常意義上的多消費者線程無法解決該問題,如果通過加鎖,則性能大打折扣。

實戰:Disruptor 的 使用實例

我們從一個簡單的例子開始學習Disruptor:生產者傳遞一個long類型的值給消費者,而消費者消費這個數據的方式僅僅是把它打印出來。

定義一個Event

首先定義一個Event來包含需要傳遞的數據:

public class LongEvent { 
    private long value;
    public long getValue() { 
        return value; 
    } 
 
    public void setValue(long value) { 
        this.value = value; 
    } 
} 

由於需要讓Disruptor為我們創建事件,我們同時還聲明了一個EventFactory來實例化Event對象。

public class LongEventFactory implements EventFactory { 
    @Override 
    public Object newInstance() { 
        return new LongEvent(); 
    } 
} 

定義事件處理器(disruptor會回調此處理器的方法)

我們還需要一個事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中存儲的數據打印到終端:

/** 
 */public class LongEventHandler implements EventHandler<LongEvent> { 
    @Override 
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { 
        System.out.println(longEvent.getValue()); 
    } 
} 

定義事件源: 事件發布器 發布事件

事件都會有一個生成事件的源,這個例子中假設事件是由於磁盤IO或者network讀取數據的時候觸發的,事件源使用一個ByteBuffer來模擬它接受到的數據,也就是說,事件源會在IO讀取到一部分數據的時候觸發事件(觸發事件不是自動的,程序員需要在讀取到數據的時候自己觸發事件並發布):

public class LongEventProducer { 
    private final RingBuffer<LongEvent> ringBuffer;
    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { 
        this.ringBuffer = ringBuffer; 
    } 
 
    /** 
     * onData用來發布事件,每調用一次就發布一次事件事件 
     * 它的參數會通過事件傳遞給消費者 
     * 
     * @param bb 
     */public void onData(ByteBuffer bb) { 
            //可以把ringBuffer看做一個事件隊列,那么next就是得到下面一個事件槽
            long sequence = ringBuffer.next();
            
        try { 
            //用上面的索引取出一個空的事件用於填充 
            LongEvent event = ringBuffer.get(sequence);// for the sequence 
            event.setValue(bb.getLong(0)); 
        } finally { 
            //發布事件 
            ringBuffer.publish(sequence); 
        } 
    } 
} 

很明顯的是:當用一個簡單隊列來發布事件的時候會牽涉更多的細節,這是因為事件對象還需要預先創建。

發布事件最少需要兩步:

獲取下一個事件槽,發布事件(發布事件的時候要使用try/finnally保證事件一定會被發布)。

如果我們使用RingBuffer.next()獲取一個事件槽,那么一定要發布對應的事件。如果不能發布事件,那么就會引起Disruptor狀態的混亂。尤其是在多個事件生產者的情況下會導致事件消費者失速,從而不得不重啟應用才能會恢復。

Disruptor 3.0提供了lambda式的API。這樣可以把一些復雜的操作放在Ring Buffer,所以在Disruptor3.0以后的版本最好使用Event Publisher或者Event Translator(事件轉換器)來發布事件。

Disruptor3.0以后的事件轉換器(填充事件的業務數據)

public class LongEventProducerWithTranslator { 
    //一個translator可以看做一個事件初始化器,publicEvent方法會調用它
    //填充Event
    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = 
            new EventTranslatorOneArg<LongEvent, ByteBuffer>() { 
                public void translateTo(LongEvent event, long sequence, ByteBuffer bb) { 
                    event.setValue(bb.getLong(0)); 
                } 
            };
            
    private final RingBuffer<LongEvent> ringBuffer;
    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { 
        this.ringBuffer = ringBuffer; 
    } 
 
    public void onData(ByteBuffer bb) { 
        ringBuffer.publishEvent(TRANSLATOR, bb); 
    } 
} 

上面寫法的另一個好處是,Translator可以分離出來並且更加容易單元測試。Disruptor提供了不同的接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg, 等等)去產生一個Translator對象。很明顯,Translator中方法的參數是通過RingBuffer來傳遞的。

組裝起來

最后一步就是把所有的代碼組合起來完成一個完整的事件處理系統。Disruptor在這方面做了簡化,使用了DSL風格的代碼(其實就是按照直觀的寫法,不太能算得上真正的DSL)。雖然DSL的寫法比較簡單,但是並沒有提供所有的選項。如果依靠DSL已經可以處理大部分情況了。

注意:這里沒有使用時間轉換器,而是使用簡單的 事件發布器。

public class LongEventMain { 
    public static void main(String[] args) throws InterruptedException { 
        // Executor that will be used to construct new threads for consumers 
        Executor executor = Executors.newCachedThreadPool();
        // The factory for the event 
        LongEventFactory factory = new LongEventFactory();
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;
        // Construct the Disruptor 
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor);
        
        // Connect the handler 
        disruptor.handleEventsWith(new LongEventHandler());
        // Start the Disruptor, starts all threads running 
        disruptor.start();
        // Get the ring buffer from the Disruptor to be used for publishing. 
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 
 
        LongEventProducer producer = new LongEventProducer(ringBuffer); 
 
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++) { 
            bb.putLong(0, l); 
            
            //發布事件 
            producer.onData(bb); 
            Thread.sleep(1000); 
        } 
    } 
} 

在Java 8使用Disruptor

Disruptor在自己的接口里面添加了對於Java 8 Lambda的支持。大部分Disruptor中的接口都符合Functional Interface的要求(也就是在接口中僅僅有一個方法)。所以在Disruptor中,可以廣泛使用Lambda來代替自定義類。

public class LongEventMainJava8 { 
    /** 
     * 用lambda表達式來注冊EventHandler和EventProductor 
     * @param args 
     * @throws InterruptedException 
     */public static void main(String[] args) throws InterruptedException { 
        // Executor that will be used to construct new threads for consumers 
        Executor executor = Executors.newCachedThreadPool();
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;// Construct the Disruptor 
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
        // 可以使用lambda來注冊一個EventHandler 
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.getValue()));
        // Start the Disruptor, starts all threads running 
        disruptor.start();
        // Get the ring buffer from the Disruptor to be used for publishing. 
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 
 
        LongEventProducer producer = new LongEventProducer(ringBuffer); 
 
        ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++) { 
            bb.putLong(0, l); 
            ringBuffer.publishEvent((event, sequence, buffer) -> event.setValue(buffer.getLong(0)), bb); 
            Thread.sleep(1000); 
        } 
    } 
} 

由於在Java 8中方法引用也是一個lambda,因此還可以把上面的代碼改成下面的代碼:

public class LongEventWithMethodRef { 
    public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) 
    { 
        System.out.println(event.getValue()); 
    } 
 
    public static void translate(LongEvent event, long sequence, ByteBuffer buffer) 
    { 
        event.setValue(buffer.getLong(0)); 
    } 
 
    public static void main(String[] args) throws Exception 
    { 
        // Executor that will be used to construct new threads for consumers 
        Executor executor = Executors.newCachedThreadPool();
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;
        // Construct the Disruptor 
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
        // Connect the handler 
        disruptor.handleEventsWith(LongEventWithMethodRef::handleEvent);
        // Start the Disruptor, starts all threads running 
        disruptor.start();
        // Get the ring buffer from the Disruptor to be used for publishing. 
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 
 
        LongEventProducer producer = new LongEventProducer(ringBuffer); 
 
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++) 
        { 
            bb.putLong(0, l); 
            ringBuffer.publishEvent(LongEventWithMethodRef::translate, bb); 
            Thread.sleep(1000); 
        } 
    } 
} 

Disruptor如何實現高性能?

Disruptor實現高性能主要體現了去掉了鎖,采用CAS算法,同時內部通過環形隊列實現有界隊列。

  • 環形數據結構
    為了避免垃圾回收,采用數組而非鏈表。同時,數組對處理器的緩存機制更加友好。

  • 元素位置定位
    數組長度2^n,通過位運算,加快定位的速度。下標采取遞增的形式。不用擔心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。

  • 無鎖設計
    每個生產者或者消費者線程,會先申請可以操作的元素在數組中的位置,申請到之后,直接在該位置寫入或者讀取數據。整個過程通過原子變量CAS,保證操作的線程安全。

使用Disruptor,主要用於對性能要求高、延遲低的場景,它通過“榨干”機器的性能來換取處理的高性能。如果你的項目有對性能要求高,對延遲要求低的需求,並且需要一個無鎖的有界隊列,來實現生產者/消費者模式,那么Disruptor是你的不二選擇。

原理:Disruptor 的內部Ring Buffer環形隊列

RingBuffer是什么

RingBuffer 是一個環(首尾相連的環),用做在不同上下文(線程)間傳遞數據的buffer。
RingBuffer 擁有一個序號,這個序號指向數組中下一個可用元素。

Disruptor使用環形隊列的優勢:

Disruptor框架就是一個使用CAS操作的內存隊列,與普通的隊列不同,Disruptor框架使用的是一個基於數組實現的環形隊列,無論是生產者向緩沖區里提交任務,還是消費者從緩沖區里獲取任務執行,都使用CAS操作。

使用環形隊列的優勢:

第一,簡化了多線程同步的復雜度。學數據結構的時候,實現隊列都要兩個指針head和tail來分別指向隊列的頭和尾,對於一般的隊列是這樣,想象下,如果有多個生產者同時往緩沖區隊列中提交任務,某一生產者提交新任務后,tail指針都要做修改的,那么多個生產者提交任務,頭指針不會做修改,但會對tail指針產生沖突,例如某一生產者P1要做寫入操作,在獲得tail指針指向的對象值V后,執行compareAndSet()方法前,tail指針被另一生產者P2修改了,這時生產者P1執行compareAndSet()方法,發現tail指針指向的值V和期望值E不同,導致沖突。同樣,如果多個消費者不斷從緩沖區中獲取任務,不會修改尾指針,但會造成隊列頭指針head的沖突問題(因為隊列的FIFO特點,出列會從頭指針出開始)。

環形隊列的一個特點就是只有一個指針,只通過一個指針來實現出列和入列操作。如果使用兩個指針head和tail來管理這個隊列,有可能會出現“偽共享”問題(偽共享問題在下面我會詳細說),因為創建隊列時,head和tail指針變量常常在同一個緩存行中,多線程修改同一緩存行中的變量就容易出現偽共享問題。

第二,由於使用的是環形隊列,那么隊列創建時大小就被固定了,Disruptor框架中的環形隊列本來也就是基於數組實現的,使用數組的話,減少了系統對內存空間管理的壓力,因為它不像鏈表,Java會定期回收鏈表中一些不再引用的對象,而數組不會出現空間的新分配和回收問題。

原理:Disruptor的等待策略

Disruptor默認的等待策略是BlockingWaitStrategy。這個策略的內部適用一個鎖和條件變量來控制線程的執行和等待(Java基本的同步方法)。BlockingWaitStrategy是最慢的等待策略,但也是CPU使用率最低和最穩定的選項。然而,可以根據不同的部署環境調整選項以提高性能。

SleepingWaitStrategy

和BlockingWaitStrategy一樣,SpleepingWaitStrategy的CPU使用率也比較低。它的方式是循環等待並且在循環中間調用LockSupport.parkNanos(1)來睡眠,(在Linux系統上面睡眠時間60µs).然而,它的優點在於生產線程只需要計數,而不執行任何指令。並且沒有條件變量的消耗。但是,事件對象從生產者到消費者傳遞的延遲變大了。SleepingWaitStrategy最好用在不需要低延遲,而且事件發布對於生產者的影響比較小的情況下。比如異步日志功能。

YieldingWaitStrategy

YieldingWaitStrategy是可以被用在低延遲系統中的兩個策略之一,這種策略在減低系統延遲的同時也會增加CPU運算量。YieldingWaitStrategy策略會循環等待sequence增加到合適的值。循環中調用Thread.yield()允許其他准備好的線程執行。如果需要高性能而且事件消費者線程比邏輯內核少的時候,推薦使用YieldingWaitStrategy策略。例如:在開啟超線程的時候。

BusySpinW4aitStrategy

BusySpinWaitStrategy是性能最高的等待策略,同時也是對部署環境要求最高的策略。這個性能最好用在事件處理線程比物理內核數目還要小的時候。例如:在禁用超線程技術的時候。

原理:並行模式

單一寫者模式

在並發系統中提高性能最好的方式之一就是單一寫者原則,對Disruptor也是適用的。如果在你的代碼中僅僅有一個事件生產者,那么可以設置為單一生產者模式來提高系統的性能。

這里寫圖片描述

public class singleProductorLongEventMain { 
    public static void main(String[] args) throws Exception { 
        //.....// Construct the Disruptor with a SingleProducerSequencer 
 
        Disruptor<LongEvent> disruptor = new Disruptor(factory, 
                bufferSize, 
                ProducerType.SINGLE, // 單一寫者模式, 
                executor);//..... 
    } 
} 

一次生產,串行消費

比如:現在觸發一個注冊Event,需要有一個Handler來存儲信息,一個Hanlder來發郵件等等。

/**
  * 串行依次執行
  * <br/>
  * p --> c11 --> c21
  * @param disruptor
  */
 public static void serial(Disruptor<LongEvent> disruptor){
     disruptor.handleEventsWith(new C11EventHandler()).then(new C21EventHandler());
     disruptor.start();
 }

菱形方式執行

 public static void diamond(Disruptor<LongEvent> disruptor){
     disruptor.handleEventsWith(new C11EventHandler(),new C12EventHandler()).then(new C21EventHandler());
     disruptor.start();
 }

鏈式並行計算

 public static void chain(Disruptor<LongEvent> disruptor){
     disruptor.handleEventsWith(new C11EventHandler()).then(new C12EventHandler());
     disruptor.handleEventsWith(new C21EventHandler()).then(new C22EventHandler());
     disruptor.start();
 }

相互隔離模式

這里寫圖片描述

 public static void parallelWithPool(Disruptor<LongEvent> disruptor){
     disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler());
     disruptor.handleEventsWithWorkerPool(new C21EventHandler(),new C21EventHandler());
     disruptor.start();
 }

航道模式

這里寫圖片描述串行依次執行,同時C11,C21分別有2個實例

/**
  * 串行依次執行,同時C11,C21分別有2個實例
   * <br/>
   * p --> c11 --> c21
   * @param disruptor
   */
  public static void serialWithPool(Disruptor<LongEvent> disruptor){
      disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler()).then(new C21EventHandler(),new C21EventHandler());
      disruptor.start();
  }

回到◀瘋狂創客圈

瘋狂創客圈 - Java高並發研習社群,為大家開啟大廠之門


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM