前提概要
簡單回顧 jdk 里的隊列:
阻塞隊列:
ArrayBlockingQueue主要通過:數組(Object[])+ 計數器(count)+ ReetrantLock的Condition (notEmpty:非空、notFull:非飽和)進行阻塞。
入隊操作:
- 操作不阻塞:
- add:添加失敗,則會直接進行返回。
- offer:添加失敗后(滿了)直接拋出異常,注意:offer(E o, long timeout, TimeUnit unit):可以設定等待的時間,如果在指定的時間內,還不能往隊列中加入BlockingQueue,則返回失敗。
- 操作阻塞:
- put:滿了,通過Condition:notFull.await()阻塞當前數據信息,當出隊和刪除元素時喚醒 put 操作。
出隊操作:
- 操作不阻塞:
- poll:當空時直接返回 null。poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內,隊列一旦有數據可取,則立即返回隊列中的數據。否則知道時間,超時還沒有數據可取,返回失敗。
- remove:刪除元素情況相關元素信息控制,InterruptException異常
- 操作阻塞:
- take:當空時,notEmpty.await()(當有元素入隊時喚醒)。
- drainTo():一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數),通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖。
與ArrayBlockingQueue相對的是LinkedBlockingQueue:Node 實現、加鎖(讀鎖、寫鎖分離)、可選的有界隊列。需要考慮實際使用中的內存問題,防止溢出。
實際應用
線程池隊列
Excutors 默認是使用 LinkedBlockingQueue,但是在實際應用中,更應該手動創建線程池使用有界隊列,防止生產者生產過快,導致內存溢出。
延遲隊列(ScheduleService也是采用了延時隊列哦!):
DelayQueue : PriorityQueue (優先級隊列) + Lock.condition (延遲等待) + leader (避免不必要的空等待)。
主要方法:
-
getDelay() 延遲時間。
-
compareTo() 通過該方法比較從PriorityQueue里取值。
入隊:
與BlockingQueue很相似,add、put、offer:入隊時會將換喚醒等待中的線程,進行一次出隊處理。
出隊:
-
如果隊列里無數據,元素入隊時會被喚醒。
-
如果隊列里有數據,會阻塞至時間滿足。
- take-阻塞:
- poll-滿足隊列有數據並且 delay 時間小於0時候會取出元素,否則立即返回 null 可能會搶占成為 leader。
應用場景:
-
延時任務:設置任務延遲多久執行;需要設置過期值的處理,例如緩存過期。
-
實現方式:每次 getDelay() 方法提供一個緩存創建時間與當前時間的差值,出隊時 compareTo() 方法取差值最小的。每次入隊時都會重新取出隊列里差值最小的值進行處理。
-
使用隊列更多的是像生產者、消費者這種場景,這種場景大多數情況又對處理速度有着要求,所以我們會使用多線程技術。
-
使用多線程就可能會出現並發,為了避免出錯,我們會選擇線程安全的隊列。
-
ArrayBlockingQueue、LinkedBlockingQueue 或者是 ConcurrentLinkedQueue。前倆者是通過加鎖取實現,后面一種是通過 cas 去實現線程安全。
-
要考慮到生產者過快可能造出的內存溢出的問題,所以看起來 ArrayBlockingQueue 是最符合要求的。
-
但是因為加鎖效率又會變慢,所以就引出了:Disruptor服務框架 !
Disruptor簡介介紹
- Disruptor的源碼Git倉庫地址:https://github.com/LMAX-Exchange/disruptor
- Disruptor的概念定義:異步體系的線程間的高性能消息框架
- Disruptor的核心思想:把多線程並發寫的線程安全問題轉化為線程本地寫,即:不需要做同步,不許要進行加鎖操作。
Disruptor優點介紹
- 非常輕量,但性能卻非常強悍,得益於其優秀的設計和對計算機底層原理的運用
- 單線程每秒能處理超600W的數據(Disruptor能在1秒內將600W數據發送給消費者,現在的硬件水平會遠遠在這個水平之上了!)
- 基於事件驅動模型,不用消費者主動拉取消息
- 比JDK的ArrayBlockingQueue性能高一個數量級
為什么這么快
- 無鎖序號柵欄
- 緩存行填充,消除偽共享
- 內存預分配
- 環形隊列RingBuffer
Disruptor核心概念
-
RingBuffer(環形隊列):基於數組的內存級別緩存,是創建sequencer(序號)與定義WaitStrategy(拒絕策略)的入口。
-
Disruptor(總體執行入口):對RingBuffer的封裝,持有RingBuffer、消費者線程池Executor、消費之集合ConsumerRepository等引用。
-
Sequence(序號分配器):對RingBuffer中的元素進行序號標記,通過順序遞增的方式來管理進行交換的數據(事件/Event),一個Sequence可以跟蹤標識某個事件的處理進度,同時還能消除偽共享。
-
Sequencer(數據傳輸器):Sequencer里面包含了Sequence,是Disruptor的核心,Sequencer有兩個實現類:SingleProducerSequencer(單生產者實現)、MultiProducerSequencer(多生產者實現),Sequencer主要作用是實現生產者和消費者之間快速、正確傳遞數據的並發算法
-
SequenceBarrier(消費者屏障):用於控制RingBuffer的Producer和Consumer之間的平衡關系,並且決定了Consumer是否還有可處理的事件的邏輯。
-
WaitStrategy(消費者等待策略):決定了消費者如何等待生產者將Event生產進Disruptor,WaitStrategy有多種實現策略,分別是:
- BlockingWaitStrategy(最穩定的策略):阻塞方式,效率較低,但對cpu消耗小,內部使用的是典型的鎖和條件變量機制(java的ReentrantLock),來處理線程的喚醒,這個策略是disruptor等待策略中最慢的一種,但是是最保守使用消耗cpu的一種用法,並且在不同的部署環境下最能保持性能一致。 但是,隨着我們可以根據部署的服務環境優化出額外的性能。
- BusySpinWaitStrategy(性能最好的策略):自旋方式,無鎖,BusySpinWaitStrategy是性能最高的等待策略,但是受部署環境的制約依賴也越強。 僅當event處理線程數少於物理核心數的時候才應該采用這種等待策略。 例如,超線程不可開啟。
- LiteBlockingWaitStrategy(幾乎不用,最接近原生的策略機制):BlockingWaitStrategy的變體版本,目前感覺不建議使用
- LiteTimeoutBlockingWaitStrategy:LiteBlockingWaitStrategy的超時版本
- PhasedBackoffWaitStrategy(最低CPU配置的策略):自旋 + yield + 自定義策略,當吞吐量和低延遲不如CPU資源重要,CPU資源緊缺,可以使用此策略。
- SleepingWaitStrategy:自旋休眠方式(無鎖),性能和BlockingWaitStrategy差不多,但是這個對生產者線程影響最小,它使用一個簡單的loop繁忙等待循環,但是在循環體中間它調用了LockSupport.parkNanos(1)。
- 一般情況在linux系統這樣會使得線程停頓大約60微秒。不過這樣做的好處是,生產者線程不需要額外的累加計數器,也不需要產生條件變量信號量開銷。
- 負面影響是,在生產者線程與消費者線程之間傳遞event數據的延遲變高。所以SleepingWaitStrategy適合在不需要低延遲, 但需要很低的生產者線程影響的情形。一個典型的案例是異步日志記錄功能。
- TimeoutBlockingWaitStrategy:BlockingWaitStrategy的超時阻塞方式
- YieldingWaitStrategy(充分進行實現CPU吞吐性能策略):自旋線程切換競爭方式(Thread.yield()),最快的方式,適用於低延時的系統,在要求極高性能且事件處理線數小於CPU邏輯核心數的場景中推薦使用此策略,它會充分使用壓榨cpu來達到降低延遲的目標。
- 通過不斷的循環等待sequence去遞增到合適的值。 在循環體內,調用Thread.yield()來允許其他的排隊線程執行。 這是一種在需要極高性能並且event handler線程數少於cpu邏輯內核數的時候推薦使用的策略。
- 這里說一下YieldingWaitStrategy使用要小心,不是特別要求性能的情況下,要謹慎使用,否則會引起服務起cpu飆升的情況,因為他的內部實現是在線程做100次遞減然后Thread.yield(),可能會壓榨cpu性能來換取速度。
注意:超線程是intel研發的一種cpu技術,可以使得一個核心提供兩個邏輯線程,比如4核心超線程后有8個線程。
- Event:從生產者到消費者過程中所處理的數據單元,Event由使用者自定義。
- EventHandler:由用戶自定義實現,就是我們寫消費者邏輯的地方,代表了Disruptor中的一個消費者的接口。
- EventProcessor:這是個事件處理器接口,實現了Runnable,處理主要事件循環,處理Event,擁有消費者的Sequence,這個接口有2個重要實現:
- WorkProcessor:多線程處理實現,在多生產者多消費者模式下,確保每個sequence只被一個processor消費,在同一個WorkPool中,確保多個WorkProcessor不會消費同樣的sequence
- BatchEventProcessor:單線程批量處理實現,包含了Event loop有效的實現,並回調到了一個EventHandler接口的實現對象,這接口實際上是通過重寫run方法,輪詢獲取數據對象,並把數據經過等待策略交給消費者去處理。
Disruptor整體架構
接下來我們來看一下 Disruptor 是如何做到無阻塞、多生產、多消費的。
- 構建 Disruptor 的各個參數以及 ringBuffer 的構造:
- EventFactory:創建事件(任務)的工廠類。
- ringBufferSize:容器的長度。
- Executor:消費者線程池,執行任務的線程。
- ProductType:生產者類型:單生產者、多生產者。
- WaitStrategy:等待策略。
- RingBuffer:存放數據的容器。
- EventHandler:事件處理器。
Disruptor使用方式
maven依賴:
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
生產單消費簡單模式案例:
Event數據模型
import lombok.Data;
@Data
public class SampleEventModel {
private String data;
}
Event事件模型Factory工廠類
import com.lmax.disruptor.EventFactory;
/**
* 消息對象生產工廠
*/
public class SampleEventModelFactory implements EventFactory<SampleEventModel> {
@Override
public SampleEventModel newInstance() {
//返回空的消息對象數據Event
return new SampleEventModel();
}
}
EventHandler處理器操作
import com.lmax.disruptor.EventHandler;
/**
* 消息事件處理器
*/
public class SampleEventHandler implements EventHandler<SampleEventModel> {
/**
* 事件驅動模式
*/
@Override
public void onEvent(SampleEventModel event, long sequence, boolean endOfBatch) throws Exception {
// do ...
System.out.println("消費者消費處理數據:" + event.getData());
}
}
EventProducer工廠生產者服務處理器操作
import com.lmax.disruptor.RingBuffer;
/**
* 消息發送
*/
public class SampleEventProducer {
private RingBuffer<SampleEventModel> ringBuffer;
public SampleEventProducer(RingBuffer<SampleEventModel> ringBuffer) {
this.ringBuffer = ringBuffer;
}
/**
* 發布數據信息
* @param data
*/
public void publish(String data){
//從ringBuffer獲取可用sequence序號
long sequence = ringBuffer.next();
try {
//根據sequence獲取sequence對應的Event
//這個Event是一個沒有賦值具體數據的對象
TestEvent testEvent = ringBuffer.get(sequence);
testEvent.setData(data);
} finally {
//提交發布
ringBuffer.publish(sequence);
}
}
}
EventProducer工廠生產者服務處理器操作
public class TestMain {
public static void main(String[] args) {
SampleEventModelFactory eventFactory = new SampleEventModelFactory();
int ringBufferSize = 1024 * 1024;
//這個線程池最好自定義
ExecutorService executor = Executors.newCachedThreadPool();
//實例化disruptor
Disruptor<SampleEventModel> disruptor = new Disruptor<SampleEventModel>(
eventFactory, //消息工廠
ringBufferSize, //ringBuffer容器最大容量長度
executor, //線程池,最好自定義一個
ProducerType.SINGLE, //單生產者模式
new BlockingWaitStrategy() //等待策略
);
//添加消費者監聽 把TestEventHandler綁定到disruptor
disruptor.handleEventsWith(new SampleEventHandler());
//啟動disruptor
disruptor.start();
//獲取實際存儲數據的容器RingBuffer
RingBuffer<SampleEventModel> ringBuffer = disruptor.getRingBuffer();
//生產發送數據
SampleEventProducer producer = new SampleEventProducer(ringBuffer);
for(long i = 0; i < 100; i ++){
producer.publish(i);
}
disruptor.shutdown();
executor.shutdown();
}
}
Disruptor的原理分析
- 使用循環數組代替隊列生產者消費者模型自然是離不開隊列的,使用預先填充數據的方式來避免 GC;
- 使用CPU緩存行填充的方式來避免極端情況下的數據爭用導致的性能下降;
- 多線程編程中盡量避免鎖爭用的編碼技巧。
Disruptor為我們提供了一個思路和實踐,基本的循環數組實現,定義一個數組,長度為2的次方冪。
循環數組
-
設定一個數字標志表示當前的可用的位置(可以從0開始)。
- 頭標志位表示下一個可以插入的位置。
- 頭標志位不能大於尾標志位一個數組長度(因為這樣就插入的位置和讀取的位置就重疊了會導致數據丟失)。
- 尾標志位表示下一個可以讀取的位置。
- 尾標志位不能等於頭標志位(因為這樣讀取的數據實際上是上一輪的舊數據) 預先填充提高性能,我們知道在java中如果創造大量的對象使用后棄用,JVM 會在適當的時候進行GC操作。
- 頭標志位表示下一個可以插入的位置。
-
當這個數字標志不斷增長到大於數組長度時進行與數組長度的並運算,得到的新數字依然在數組的長度范圍內,就又可以插入。
-
這樣就好像一直插入直到數組末尾又再次從頭開始,故而稱之為循環數組。 一般的循環數組有頭尾兩個標志位。這點和隊列很像。
循環數組(初始化數據信息)
在循環數組中,可以事先在數組中填充好數據。一旦有新數據的產生,要做的就是修改數組中某一位中的一些屬性值。這樣可以避免頻繁創建數據和棄用數據導致的 GC。
這點比起隊列是要好的。 只保留一個標志位,多線程在隊列也好,循環數組也好,必然存在對標志位的競爭。無論是使用鎖來避免競爭,還是使用 CAS 來進行無鎖算法。
只要爭用的情況存在,並且線程較多,都會出現對資源的不斷消耗。爭用的對象越多,爭用中消耗掉的資源也就越多。
為了避免這樣的情況,減少爭用的資源就是一個手段。比如在循環數組中只保留一個標志位,也就是下一個可以寫入數據位置的標志位。而尾部標志位則在各個消費者線程中保存(具體的編程手法后續細講)。
循環數組在單線程
-
循環數組在單線程中的使用,如果確定只有一個生產者,也就是說只有一個寫線程。則在循環數組中的使用會更加簡化。
-
具體來說單線程更新數組上的標志位,那這種情況,標志位就無需采用CAS寫的方式來確定下一個可寫入的位置,直接就是在單線程內進行普通的更新即可。
循環數組在多線程
-
循環數組在多線程中的使用,如果存在多個生產者,則可寫入的標志位需要用CAS 算法來進行爭奪,避免鎖的使用。
-
多個線程通過CAS得到唯一的不沖突的下一個可寫序號,由於需要獲得序號后才能進行寫入,而寫入完成才可以讓消費者線程進行消費。
-
所以才獲得序號后,完成寫入前,必須有一種方式讓消費者檢測是否完成。
-
以避免消費者拿到還未填入輸入的數組位。 為了達到這個目標,存在簡單—效率低和復雜—效率高兩種方式。
簡單但是可能效率低的方式使用兩個標志位。
-
prePut:表示下一個可以供生產者放入的位置;
- 多個生產者通過 CAS 獲得 prePut 的不同的值,在獲得的序號並且完成數據寫入后,將 put 的值以 CAS 方式遞增(比如獲得的序號是7,只有 put 是6的時候才允許設置成功),稱之為發布。
- 這種方式存在一個缺點,如果多個線程並發寫入,獲取 prePut 的值不會堵塞,假設其中一個生產者在寫入數據的時候稍慢,則其他的線程寫入完畢也無法完成發布。就會導致循環等待,浪費了 CPU 性能。
-
put:表示最后一個生產者已經放入的位置。
-
復雜但是可能效率高的方式,在上面的方式中,主要的爭奪環節集中在多線程發布中,序號大的線程發布需要等到序號小的線程發布完成后才能發布。那我們的優化的點也在這個地方。
-
這樣就可以避免發布的爭奪。 但是又來帶來一個問題,用什么數字來表示是否已經發布完成?如果只是0和1,那么寫過1輪以后,標志數組位上就都是1了。又無法區分。
-
所以標志數組上的數字應該在循環數組的每一輪循環的值都不同。
比如一開始都是-1,第一輪中是0的表示已發布,第二輪中是0表示沒發布,是1的表示已發布。
緩存行填充
要了解緩存行填充消除偽共享,首先要了解什么是系統緩存行:
-
CPU 為了更快的執行代碼。於是當從內存中讀取數據時,並不是只讀自己想要的部分。而是讀取足夠的字節來填入高速緩存行。根據不同的 CPU ,高速緩存行大小不同。如 X86 是 32BYTES ,而 ALPHA 是 64BYTES 。並且始終在第 32 個字節或第 64 個字節處對齊。這樣,當 CPU 訪問相鄰的數據時,就不必每次都從內存中讀取,提高了速度。 因為訪問內存要比訪問高速緩存用的時間多得多。
-
這個緩存是CPU內部自己的緩存,內部的緩存單位是行,叫做緩存行。在多核環境下會出現CPU之間的內存同步問題(比如一個核加載了一份緩存,另外一個核也要用到同一份數據),如果每個核每次需要時都往內存中存取(一個在讀緩存,一個在寫緩存時,造成數據不一致),這會帶來比較大的性能損耗。
-
數據在緩存中不是以獨立的項來存儲的,如不是一個單獨的變量,也不是一個單獨的指針。緩存是由緩存行組成的,通常是64字節(譯注:這篇文章發表時常用處理器的緩存行是64字節的,比較舊的處理器緩存行是32字節),並且它有效地引用主內存中的一塊地址。一個Java的long類型是8字節,因此在一個緩存行中可以存8個long類型的變量。
-
當數組中的一個值被加載到緩存中,它會額外加載另外7個。因此你能非常快地遍歷這個數組。事實上,你可以非常快速的遍歷在連續的內存塊中分配的任意數據結構。
-
因此如果你數據結構中的項在內存中不是彼此相鄰的,你將得不到免費緩存加載所帶來的優勢。並且在這些數據結構中的每一個項都可能會出現緩存未命中。
-
設想你的long類型的數據不是數組的一部分。設想它只是一個單獨的變量。讓我們稱它為head,這么稱呼它其實沒有什么原因。然后再設想在你的類中有另一個變量緊挨着它。讓我們直接稱它為tail。現在,當你加載head到緩存的時候,你也免費加載了tail