歡迎訪問我的GitHub
https://github.com/zq2599/blog_demos
內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;
《disruptor筆記》系列鏈接
本篇概覽
本文是《disruptor筆記》的第七篇,咱們一起閱讀源碼,學習一個重要的知識點:等待策略,由於Disruptor的源碼短小精干、簡單易懂,因此本篇是個輕松愉快的源碼學習之旅;
提前小結
如果您時間不充裕,可以通過以下提前小結的內容,對等待策略有個大體的認識:
- BlockingWaitStrategy:用了ReentrantLock的等待&&喚醒機制實現等待邏輯,是默認策略,比較節省CPU
- BusySpinWaitStrategy:持續自旋,JDK9之下慎用(最好別用)
- DummyWaitStrategy:返回的Sequence值為0,正常環境是用不上的
- LiteBlockingWaitStrategy:基於BlockingWaitStrategy,在沒有鎖競爭的時候會省去喚醒操作,但是作者說測試不充分,不建議使用
- TimeoutBlockingWaitStrategy:帶超時的等待,超時后會執行業務指定的處理邏輯
- LiteTimeoutBlockingWaitStrategy:基於TimeoutBlockingWaitStrategy,在沒有鎖競爭的時候會省去喚醒操作
- SleepingWaitStrategy:三段式,第一階段自旋,第二階段執行Thread.yield交出CPU,第三階段睡眠執行時間,反復的的睡眠
- YieldingWaitStrategy:二段式,第一階段自旋,第二階段執行Thread.yield交出CPU
- PhasedBackoffWaitStrategy:四段式,第一階段自旋指定次數,第二階段自旋指定時間,第三階段執行Thread.yield交出CPU,第四階段調用成員變量的waitFor方法,這個成員變量可以被設置為BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy這三個中的一個
關於等待策略
- 回顧一下前面的文章中實例化Disruptor的代碼:
disruptor = new Disruptor<>(new OrderEventFactory(),
BUFFER_SIZE,
new CustomizableThreadFactory("event-handler-"));
- 展開上述構造方法,會見到創建RingBuffer的代碼,默認使用了BlockingWaitStrategy作為等待策略:
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize)
{
return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
}
- 繼續展開上面的createMultiProducer方法,可見每個Sequencer(注意不是Sequence)都有自己的watStrategy成員變量:
- 這個waitStrategy的最終用途是創建SequenceBarrier的時候,傳給SequenceBarrier做成員變量:
- 在看看SequenceBarrier是如何使用waitStrategy的,一共兩處用到,第一處如下圖紅框,原來是waitFor方法內部會用到,這個waitFor咱們前面已經了解過,對消費者來說,等待環形隊列的指定位置有可用數據時,就是調用SequenceBarrier的waitFor完成的:
- SequenceBarrier第二處用到waitStrategy是喚醒的時候:
@Override
public void alert()
{
alerted = true;
waitStrategy.signalAllWhenBlocking();
}
- 現在咱們知道了WaitStrategy的使用場景,接下來看看這個接口有哪些具體實現吧,這樣咱們在編程中就知道如何選擇才最適合自己
BlockingWaitStrategy
- 作為默認的等待策略,BlockingWaitStrategy還有個特點就是代碼量小(不到百行),很容易理解,其實就是用ReentrantLock+Condition來實現等待和喚醒操作的,如下圖紅框:
- 如果您更傾向於節省CPU資源,對高吞吐量和低延時的要求相對低一些,那么BlockingWaitStrategy就適合您了;
BusySpinWaitStrategy(慎用)
- 前面的BlockingWaitStrategy有個特點,就是一旦環形隊列指定位置來了數據,由於線程是等待狀態(底層調用了native的UNSAFE.park方法),因此還要喚醒后才能執行業務邏輯,在一些場景中希望數據一到就盡快消費,此時BusySpinWaitStrategy就很合適了,代碼太簡單,全部貼出:
public final class BusySpinWaitStrategy implements WaitStrategy
{
@Override
public long waitFor(
final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
ThreadHints.onSpinWait();
}
return availableSequence;
}
@Override
public void signalAllWhenBlocking()
{
}
}
- 上述代碼顯示,整個while循環的關鍵就是ThreadHints.onSpinWait做了什么,源碼如下,這里要格外注意,如果ON_SPIN_WAIT_METHOD_HANDLE為空,意味着外面的while循環是個非常消耗CPU的自旋:
public static void onSpinWait()
{
if (null != ON_SPIN_WAIT_METHOD_HANDLE)
{
try
{
ON_SPIN_WAIT_METHOD_HANDLE.invokeExact();
}
catch (final Throwable ignore)
{
}
}
}
- ON_SPIN_WAIT_METHOD_HANDLE為空是很可怕的事情,咱們來看看它是何方神聖?代碼還是在ThreadHints.java中,如下所示,真相一目了然,它就是Thread類的onSpinWait方法,如果Thread類沒有onSpinWait方法,那么使用BusySpinWaitStrategy作為等待策略就有很高的代價了,環形隊列里沒有數據時消費線程會執行自旋,很耗費CPU:
static
{
final MethodHandles.Lookup lookup = MethodHandles.lookup();
MethodHandle methodHandle = null;
try
{
methodHandle = lookup.findStatic(Thread.class, "onSpinWait", methodType(void.class));
}
catch (final Exception ignore)
{
}
ON_SPIN_WAIT_METHOD_HANDLE = methodHandle;
}
-
好吧,還剩兩個問題:Thread類有沒有onSpinWait方法還不能確定嗎?這個onSpinWait方法是何方神聖?
-
去看JDK官方文檔,如下圖,原來這方法是從JDK9才有的,所以對於JDK8使用者來說來說,選用BusySpinWaitStrategy就意味着要面對沒做啥事兒的while循環了:
- 第二個問題,onSpinWait方法干了些啥?前面的官方文檔,以欣宸的英語水平顯然是無法理解的,去看stackoverflow吧,如下圖,簡單的說,就是告訴CPU當前線程處於循環查詢的狀態,CPU得知后就會調度更多CPU資源給其他線程:
-
至此真像大白:環形隊列的條件就緒后,BusySpinWaitStrategy策略是通過whlie死循環來做到快速響應的,如果JDK是9或者更高版本,這個死循環帶來的CPU損耗由Thread.onSpinWait幫助緩解,如果JDK版本低於9,這里就是個簡單的while死循環,至於這種死循環有多消耗CPU,您可以寫段簡單代碼感受一下...
-
難怪Disruptor源碼中會提醒最好是將使用此實例的線程綁定到指定CPU核:
DummyWaitStrategy
固定返回0,個人覺得這個策略在正常開發中用不上,因為環形隊列可用位置始終是0的話,不論是生產還是消費都難以實現:
LiteBlockingWaitStrategy
- 看名字,LiteBlockingWaitStrategy是BlockingWaitStrategy策略的輕量級實現,在鎖沒有競爭的時候(例如獨立消費的場景),會省略掉喚醒操作,不過如下圖紅框所示,作者說他沒有充分驗證過正確性,因此建議只用於體驗,太好了,這個策略我不學了!!!
TimeoutBlockingWaitStrategy
- 顧名思義,TimeoutBlockingWaitStrategy表示只等待某段時長,超過了就算超時,其代碼和BlockingWaitStrategy類似,只是等待的時候有個時長限制,如下圖,一目了然:
- 其實我對拋出異常后的處理很感興趣,去看看吧,外面是熟悉的BatchEventProcessor類,熟悉的processEvents方法,如下圖,每次超時異常都交給notifyTimeout處理,而外部的主流程不受影響,依舊不斷的從環形隊列中等待和獲取數據:
- 進入notifyTImeout方法,可見實際上是交給成員變量timeoutHandler去處理的,而且處理過程中發生的任何異常都會被捕獲,不會拋出去影響外部調用:
- 再來看看成員變量是哪來的,如下圖,真相大白,咱們開發的EventHandler實現類,如果也實現了Timeouthandler,就被當做成員變量timeoutHandler了:
- 至此TimeoutBlockingWaitStrategy也搞清楚了:用於有時間限制的場景,每次等待超時后都會調用業務定制的超時處理邏輯,這個邏輯寫到EventHandler實現類中,這個實現類要實現Timeouthandler接口
LiteTimeoutBlockingWaitStrategy
- LiteTimeoutBlockingWaitStrategy與TimeoutBlockingWaitStrategy的關系,就像BlockingWaitStrategy與LiteBlockingWaitStrategy的關系:作為TimeoutBlockingWaitStrategy的變體,有TimeoutBlockingWaitStrategy的超時處理特性,而且沒有鎖競爭的時候,省略掉喚醒操作;
- 作者說LiteBlockingWaitStrategy可用於體驗,但正確性並未經過充分驗證,但是在LiteTimeoutBlockingWaitStrategy的注釋中沒有看到這種說法,看樣子這是個靠譜的等待策略,可以用,用在有超時處理的需求,而且沒有鎖競爭的場景(例如獨立消費)
SleepingWaitStrategy
- 和前面幾個不同的是,SleepingWaitStrategy沒有用到鎖,這意味這無需調用signalAllWhenBlocking方法做喚醒處理,相當於省去了生產線程的通知操作,官方源碼注釋有這么句話引起了我的興趣,如下圖紅框,大意是該策略在性能和CPU資源消耗之間取得了平衡,接下來去看看關鍵代碼,來了解這個特性:
- 如下圖,等到可用數據的過程是個死循環:
- 接下來是關鍵代碼了,如下圖,可見整個等待過程分為三段:計數器高於100時就只有一個減一的操作(最快響應),計數器在100到0之間時每次都交出CPU執行時間(最省資源),其他時候就睡眠固定時間:
YieldingWaitStrategy
- 看過SleepingWaitStrategy之后,再看YieldingWaitStrategy就很容易理解了,和SleepingWaitStrategy相比,YieldingWaitStrategy先做指定次數的自旋,然后不斷的交出CPU時間:
- 由於在不斷的執行Thread.yield()方法,因此該策略雖然很消耗CPU,不過一旦其他線程有CPU需求,很容易從這個線程得到;
PhasedBackoffWaitStrategy
- 最后是PhasedBackoffWaitStrategy,該策略的特點是將整個等待過程分成下圖的四段,四個方塊代表一個時間線上的四個階段:
- 這里說明一下上圖的四個階段:
- 首先是自旋指定的次數,默認10000次;
- 自旋過后,開始帶計時的自旋,執行的時長是spinTimeoutNanos的值;
- 執行時長達到spinTimeoutNanos的值后,開始執行Thread.yield()交出CPU資源,這個邏輯的執行時長是yieldTimeoutNanos-spinTimeoutNanos;
- 執行時長達到yieldTimeoutNanos-spinTimeoutNanos的值后,開始調用fallbackStrategy.waitFor,這個調用沒有時間或者次數限制;
- 現在問題來了fallbackStrategy是何方神聖?PhasedBackoffWaitStrategy類准備了三個靜態方法,咱們可以按需選用,讓fallbackStrategy是BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy這三個中的一個:
public static PhasedBackoffWaitStrategy withLock(
long spinTimeout,
long yieldTimeout,
TimeUnit units)
{
return new PhasedBackoffWaitStrategy(
spinTimeout, yieldTimeout,
units, new BlockingWaitStrategy());
}
public static PhasedBackoffWaitStrategy withLiteLock(
long spinTimeout,
long yieldTimeout,
TimeUnit units)
{
return new PhasedBackoffWaitStrategy(
spinTimeout, yieldTimeout,
units, new LiteBlockingWaitStrategy());
}
public static PhasedBackoffWaitStrategy withSleep(
long spinTimeout,
long yieldTimeout,
TimeUnit units)
{
return new PhasedBackoffWaitStrategy(
spinTimeout, yieldTimeout,
units, new SleepingWaitStrategy(0));
}
- 至此,Disruptor的九種等待策略就全部分析完畢了,除了選用等待策略的時候更加得心應手,還有個收獲就是積攢了閱讀優秀源碼的經驗,在讀源碼的路上更加有信心了;
你不孤單,欣宸原創一路相伴
歡迎關注公眾號:程序員欣宸
微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos