disruptor筆記之七:等待策略


歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;

《disruptor筆記》系列鏈接

  1. 快速入門
  2. Disruptor類分析
  3. 環形隊列的基礎操作(不用Disruptor類)
  4. 事件消費知識點小結
  5. 事件消費實戰
  6. 常見場景
  7. 等待策略
  8. 知識點補充(終篇)

本篇概覽

本文是《disruptor筆記》的第七篇,咱們一起閱讀源碼,學習一個重要的知識點:等待策略,由於Disruptor的源碼短小精干、簡單易懂,因此本篇是個輕松愉快的源碼學習之旅;

提前小結

如果您時間不充裕,可以通過以下提前小結的內容,對等待策略有個大體的認識:

  1. BlockingWaitStrategy:用了ReentrantLock的等待&&喚醒機制實現等待邏輯,是默認策略,比較節省CPU
  2. BusySpinWaitStrategy:持續自旋,JDK9之下慎用(最好別用)
  3. DummyWaitStrategy:返回的Sequence值為0,正常環境是用不上的
  4. LiteBlockingWaitStrategy:基於BlockingWaitStrategy,在沒有鎖競爭的時候會省去喚醒操作,但是作者說測試不充分,不建議使用
  5. TimeoutBlockingWaitStrategy:帶超時的等待,超時后會執行業務指定的處理邏輯
  6. LiteTimeoutBlockingWaitStrategy:基於TimeoutBlockingWaitStrategy,在沒有鎖競爭的時候會省去喚醒操作
  7. SleepingWaitStrategy:三段式,第一階段自旋,第二階段執行Thread.yield交出CPU,第三階段睡眠執行時間,反復的的睡眠
  8. YieldingWaitStrategy:二段式,第一階段自旋,第二階段執行Thread.yield交出CPU
  9. PhasedBackoffWaitStrategy:四段式,第一階段自旋指定次數,第二階段自旋指定時間,第三階段執行Thread.yield交出CPU,第四階段調用成員變量的waitFor方法,這個成員變量可以被設置為BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy這三個中的一個

關於等待策略

  • 回顧一下前面的文章中實例化Disruptor的代碼:
disruptor = new Disruptor<>(new OrderEventFactory(),
                BUFFER_SIZE,
                new CustomizableThreadFactory("event-handler-"));
  • 展開上述構造方法,會見到創建RingBuffer的代碼,默認使用了BlockingWaitStrategy作為等待策略:
    public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize)
    {
        return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
    }
  • 繼續展開上面的createMultiProducer方法,可見每個Sequencer(注意不是Sequence)都有自己的watStrategy成員變量:

在這里插入圖片描述

  • 這個waitStrategy的最終用途是創建SequenceBarrier的時候,傳給SequenceBarrier做成員變量:

在這里插入圖片描述

  • 在看看SequenceBarrier是如何使用waitStrategy的,一共兩處用到,第一處如下圖紅框,原來是waitFor方法內部會用到,這個waitFor咱們前面已經了解過,對消費者來說,等待環形隊列的指定位置有可用數據時,就是調用SequenceBarrier的waitFor完成的:

在這里插入圖片描述

  • SequenceBarrier第二處用到waitStrategy是喚醒的時候:
    @Override
    public void alert()
    {
        alerted = true;
        waitStrategy.signalAllWhenBlocking();
    }
  • 現在咱們知道了WaitStrategy的使用場景,接下來看看這個接口有哪些具體實現吧,這樣咱們在編程中就知道如何選擇才最適合自己

BlockingWaitStrategy

  • 作為默認的等待策略,BlockingWaitStrategy還有個特點就是代碼量小(不到百行),很容易理解,其實就是用ReentrantLock+Condition來實現等待和喚醒操作的,如下圖紅框:

在這里插入圖片描述

  • 如果您更傾向於節省CPU資源,對高吞吐量和低延時的要求相對低一些,那么BlockingWaitStrategy就適合您了;

BusySpinWaitStrategy(慎用)

  • 前面的BlockingWaitStrategy有個特點,就是一旦環形隊列指定位置來了數據,由於線程是等待狀態(底層調用了native的UNSAFE.park方法),因此還要喚醒后才能執行業務邏輯,在一些場景中希望數據一到就盡快消費,此時BusySpinWaitStrategy就很合適了,代碼太簡單,全部貼出:
public final class BusySpinWaitStrategy implements WaitStrategy
{
    @Override
    public long waitFor(
        final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;

        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            barrier.checkAlert();
            ThreadHints.onSpinWait();
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking()
    {
    }
}
  • 上述代碼顯示,整個while循環的關鍵就是ThreadHints.onSpinWait做了什么,源碼如下,這里要格外注意,如果ON_SPIN_WAIT_METHOD_HANDLE為空,意味着外面的while循環是個非常消耗CPU的自旋
    public static void onSpinWait()
    {
        if (null != ON_SPIN_WAIT_METHOD_HANDLE)
        {
            try
            {
                ON_SPIN_WAIT_METHOD_HANDLE.invokeExact();
            }
            catch (final Throwable ignore)
            {
            }
        }
    }
  • ON_SPIN_WAIT_METHOD_HANDLE為空是很可怕的事情,咱們來看看它是何方神聖?代碼還是在ThreadHints.java中,如下所示,真相一目了然,它就是Thread類的onSpinWait方法,如果Thread類沒有onSpinWait方法,那么使用BusySpinWaitStrategy作為等待策略就有很高的代價了,環形隊列里沒有數據時消費線程會執行自旋,很耗費CPU:
static
    {
        final MethodHandles.Lookup lookup = MethodHandles.lookup();

        MethodHandle methodHandle = null;
        try
        {
            methodHandle = lookup.findStatic(Thread.class, "onSpinWait", methodType(void.class));
        }
        catch (final Exception ignore)
        {
        }

        ON_SPIN_WAIT_METHOD_HANDLE = methodHandle;
    }
  • 好吧,還剩兩個問題:Thread類有沒有onSpinWait方法還不能確定嗎?這個onSpinWait方法是何方神聖?

  • 去看JDK官方文檔,如下圖,原來這方法是從JDK9才有的,所以對於JDK8使用者來說來說,選用BusySpinWaitStrategy就意味着要面對沒做啥事兒的while循環了:

在這里插入圖片描述

  • 第二個問題,onSpinWait方法干了些啥?前面的官方文檔,以欣宸的英語水平顯然是無法理解的,去看stackoverflow吧,如下圖,簡單的說,就是告訴CPU當前線程處於循環查詢的狀態,CPU得知后就會調度更多CPU資源給其他線程:

在這里插入圖片描述

  • 至此真像大白:環形隊列的條件就緒后,BusySpinWaitStrategy策略是通過whlie死循環來做到快速響應的,如果JDK是9或者更高版本,這個死循環帶來的CPU損耗由Thread.onSpinWait幫助緩解,如果JDK版本低於9,這里就是個簡單的while死循環,至於這種死循環有多消耗CPU,您可以寫段簡單代碼感受一下...

  • 難怪Disruptor源碼中會提醒最好是將使用此實例的線程綁定到指定CPU核

在這里插入圖片描述

DummyWaitStrategy

固定返回0,個人覺得這個策略在正常開發中用不上,因為環形隊列可用位置始終是0的話,不論是生產還是消費都難以實現:

在這里插入圖片描述

LiteBlockingWaitStrategy

  • 看名字,LiteBlockingWaitStrategy是BlockingWaitStrategy策略的輕量級實現,在鎖沒有競爭的時候(例如獨立消費的場景),會省略掉喚醒操作,不過如下圖紅框所示,作者說他沒有充分驗證過正確性,因此建議只用於體驗,太好了,這個策略我不學了!!!

在這里插入圖片描述

TimeoutBlockingWaitStrategy

  • 顧名思義,TimeoutBlockingWaitStrategy表示只等待某段時長,超過了就算超時,其代碼和BlockingWaitStrategy類似,只是等待的時候有個時長限制,如下圖,一目了然:

在這里插入圖片描述

  • 其實我對拋出異常后的處理很感興趣,去看看吧,外面是熟悉的BatchEventProcessor類,熟悉的processEvents方法,如下圖,每次超時異常都交給notifyTimeout處理,而外部的主流程不受影響,依舊不斷的從環形隊列中等待和獲取數據:

在這里插入圖片描述

  • 進入notifyTImeout方法,可見實際上是交給成員變量timeoutHandler去處理的,而且處理過程中發生的任何異常都會被捕獲,不會拋出去影響外部調用:

在這里插入圖片描述

  • 再來看看成員變量是哪來的,如下圖,真相大白,咱們開發的EventHandler實現類,如果也實現了Timeouthandler,就被當做成員變量timeoutHandler了:

在這里插入圖片描述

  • 至此TimeoutBlockingWaitStrategy也搞清楚了:用於有時間限制的場景,每次等待超時后都會調用業務定制的超時處理邏輯,這個邏輯寫到EventHandler實現類中,這個實現類要實現Timeouthandler接口

LiteTimeoutBlockingWaitStrategy

  • LiteTimeoutBlockingWaitStrategy與TimeoutBlockingWaitStrategy的關系,就像BlockingWaitStrategy與LiteBlockingWaitStrategy的關系:作為TimeoutBlockingWaitStrategy的變體,有TimeoutBlockingWaitStrategy的超時處理特性,而且沒有鎖競爭的時候,省略掉喚醒操作;
  • 作者說LiteBlockingWaitStrategy可用於體驗,但正確性並未經過充分驗證,但是在LiteTimeoutBlockingWaitStrategy的注釋中沒有看到這種說法,看樣子這是個靠譜的等待策略,可以用,用在有超時處理的需求,而且沒有鎖競爭的場景(例如獨立消費)

SleepingWaitStrategy

  • 和前面幾個不同的是,SleepingWaitStrategy沒有用到鎖,這意味這無需調用signalAllWhenBlocking方法做喚醒處理,相當於省去了生產線程的通知操作,官方源碼注釋有這么句話引起了我的興趣,如下圖紅框,大意是該策略在性能和CPU資源消耗之間取得了平衡,接下來去看看關鍵代碼,來了解這個特性:

在這里插入圖片描述

  • 如下圖,等到可用數據的過程是個死循環:

在這里插入圖片描述

  • 接下來是關鍵代碼了,如下圖,可見整個等待過程分為三段:計數器高於100時就只有一個減一的操作(最快響應),計數器在100到0之間時每次都交出CPU執行時間(最省資源),其他時候就睡眠固定時間:

在這里插入圖片描述

YieldingWaitStrategy

  • 看過SleepingWaitStrategy之后,再看YieldingWaitStrategy就很容易理解了,和SleepingWaitStrategy相比,YieldingWaitStrategy先做指定次數的自旋,然后不斷的交出CPU時間:

在這里插入圖片描述

  • 由於在不斷的執行Thread.yield()方法,因此該策略雖然很消耗CPU,不過一旦其他線程有CPU需求,很容易從這個線程得到;

PhasedBackoffWaitStrategy

  • 最后是PhasedBackoffWaitStrategy,該策略的特點是將整個等待過程分成下圖的四段,四個方塊代表一個時間線上的四個階段:

在這里插入圖片描述

  • 這里說明一下上圖的四個階段:
  1. 首先是自旋指定的次數,默認10000次;
  2. 自旋過后,開始帶計時的自旋,執行的時長是spinTimeoutNanos的值;
  3. 執行時長達到spinTimeoutNanos的值后,開始執行Thread.yield()交出CPU資源,這個邏輯的執行時長是yieldTimeoutNanos-spinTimeoutNanos
  4. 執行時長達到yieldTimeoutNanos-spinTimeoutNanos的值后,開始調用fallbackStrategy.waitFor,這個調用沒有時間或者次數限制;
  • 現在問題來了fallbackStrategy是何方神聖?PhasedBackoffWaitStrategy類准備了三個靜態方法,咱們可以按需選用,讓fallbackStrategy是BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy這三個中的一個:
public static PhasedBackoffWaitStrategy withLock(
        long spinTimeout,
        long yieldTimeout,
        TimeUnit units)
    {
        return new PhasedBackoffWaitStrategy(
            spinTimeout, yieldTimeout,
            units, new BlockingWaitStrategy());
    }

public static PhasedBackoffWaitStrategy withLiteLock(
        long spinTimeout,
        long yieldTimeout,
        TimeUnit units)
    {
        return new PhasedBackoffWaitStrategy(
            spinTimeout, yieldTimeout,
            units, new LiteBlockingWaitStrategy());
    }

    public static PhasedBackoffWaitStrategy withSleep(
        long spinTimeout,
        long yieldTimeout,
        TimeUnit units)
    {
        return new PhasedBackoffWaitStrategy(
            spinTimeout, yieldTimeout,
            units, new SleepingWaitStrategy(0));
    }
  • 至此,Disruptor的九種等待策略就全部分析完畢了,除了選用等待策略的時候更加得心應手,還有個收獲就是積攢了閱讀優秀源碼的經驗,在讀源碼的路上更加有信心了;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 數據庫+中間件系列
  6. DevOps系列

歡迎關注公眾號:程序員欣宸

微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM