文章很長,建議收藏起來,慢慢讀! Java 高並發 發燒友社群:瘋狂創客圈 奉上以下珍貴的學習資源:
-
免費贈送 經典圖書:《Java高並發核心編程(卷1)》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
-
免費贈送 經典圖書:《Java高並發核心編程(卷2)》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
-
免費贈送 經典圖書:《Netty Zookeeper Redis 高並發實戰》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
-
免費贈送 經典圖書:《SpringCloud Nginx高並發核心編程》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
-
免費贈送 資源寶庫: Java 必備 百度網盤資源大合集 價值>10000元 加尼恩領取
推薦2:史上最全 Java 面試題 21 個專題
史上最全 Java 面試題 21 個專題 | 阿里、京東、美團、頭條.... 隨意挑、橫着走!!! |
---|---|
1: JVM面試題(史上最強、持續更新、吐血推薦) | https://www.cnblogs.com/crazymakercircle/p/14365820.html |
2:Java基礎面試題(史上最全、持續更新、吐血推薦) | https://www.cnblogs.com/crazymakercircle/p/14366081.html |
3:死鎖面試題(史上最強、持續更新) | https://www.cnblogs.com/crazymakercircle/p/14323919.html |
4:設計模式面試題 (史上最全、持續更新、吐血推薦) | https://www.cnblogs.com/crazymakercircle/p/14367101.html |
5:架構設計面試題 (史上最全、持續更新、吐血推薦) | https://www.cnblogs.com/crazymakercircle/p/14367907.html |
還有 10 + 篇必刷、必刷 的面試題 | 更多 ....., 請參見【 瘋狂創客圈 高並發 總目錄 】 |
推薦3: 瘋狂創客圈 高質量 博文
springCloud 高質量 博文 | |
---|---|
nacos 實戰(史上最全) | sentinel (史上最全+入門教程) |
springcloud + webflux 高並發實戰 | Webflux(史上最全) |
SpringCloud gateway (史上最全) | spring security (史上最全) |
還有 10 + 篇 必刷、必刷 的高質量 博文 | 更多 ....., 請參見【 瘋狂創客圈 高並發 總目錄 】 |
無鎖編程(Lock Free)框架 系列文章:
- 1 前置知識:偽共享 原理&實戰
- 2 disruptor 使用和原理 圖解
- 3 akka 使用和原理 圖解
- 4 camel 使用和 原理 圖解
1 disruptor 是什么?
Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題(在性能測試中發現竟然與I/O操作處於同樣的數量級)。
基於Disruptor開發的系統單線程能支撐每秒600萬訂單,2010年在QCon演講后,獲得了業界關注。2011年,企業應用軟件專家Martin Fowler專門撰寫長文介紹。同年它還獲得了Oracle官方的Duke大獎。
目前,包括Apache Storm、Camel、Log4j 2在內的很多知名項目都應用了Disruptor以獲取高性能。
需要特別指出的是,這里所說的隊列是系統內部的內存隊列,而不是Kafka這樣的分布式隊列。另外,本文所描述的Disruptor特性限於3.3.4。
2 Java內置隊列的問題
介紹Disruptor之前,我們先來看一看常用的線程安全的內置隊列有什么問題。Java的內置隊列如下表所示。
隊列 | 有界性 | 鎖 | 數據結構 |
---|---|---|---|
ArrayBlockingQueue | bounded | 加鎖 | arraylist |
LinkedBlockingQueue | optionally-bounded | 加鎖 | linkedlist |
ConcurrentLinkedQueue | unbounded | 無鎖 | linkedlist |
LinkedTransferQueue | unbounded | 無鎖 | linkedlist |
PriorityBlockingQueue | unbounded | 加鎖 | heap |
DelayQueue | unbounded | 加鎖 | heap |
隊列的底層一般分成三種:數組、鏈表和堆。其中,堆一般情況下是為了實現帶有優先級特性的隊列,暫且不考慮。
從數組和鏈表兩種數據結構來看,基於數組線程安全的隊列,比較典型的是ArrayBlockingQueue,它主要通過加鎖的方式來保證線程安全;基於鏈表的線程安全隊列分成LinkedBlockingQueue和ConcurrentLinkedQueue兩大類,前者也通過鎖的方式來實現線程安全,而后者以及上面表格中的LinkedTransferQueue都是通過原子變量compare and swap(以下簡稱“CAS”)這種不加鎖的方式來實現的。
但是對 volatile類型的變量進行 CAS 操作,存在偽共享問題,具體請參考專門的文章:
Disruptor 使用了類似上面的方案,解決了偽共享問題。
3 Disruptor框架是如何解決偽共享問題的?
在Disruptor中有一個重要的類Sequence,該類包裝了一個volatile修飾的long類型數據value,無論是Disruptor中的基於數組實現的緩沖區RingBuffer,還是生產者,消費者,都有各自獨立的Sequence,RingBuffer緩沖區中,Sequence標示着寫入進度,例如每次生產者要寫入數據進緩沖區時,都要調用RingBuffer.next()來獲得下一個可使用的相對位置。對於生產者和消費者來說,Sequence標示着它們的事件序號,來看看Sequence類的源碼:
class LhsPadding {
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding {
protected volatile long value;
}
class RhsPadding extends Value {
protected long p9, p10, p11, p12, p13, p14, p15;
}
public class Sequence extends RhsPadding {
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
static {
UNSAFE = Util.getUnsafe();
try {
VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
} catch(final Exception e) {
throw new RuntimeException(e);
}
}
```
public Sequence() {
this(INITIAL_VALUE);
}
public Sequence(final long initialValue) {
UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
}
```
}
從第1到11行可以看到,真正使用到的變量value,它的前后空間都由8個long型的變量填補了,對於一個大小為64字節的緩存行,它剛好被填補滿(一個long型變量value,8個字節加上前/后個7long型變量填補,7*8=56,56+8=64字節)。這樣做每次把變量value讀進高速緩存中時,都能把緩存行填充滿(對於大小為64個字節的緩存行來說,如果緩存行大小大於64個字節,那么還是會出現偽共享問題),保證每次處理數據時都不會與其他變量發生沖突。
Disruptor 的使用場景
Disruptor的最常用的場景就是“生產者-消費者”場景,對場景的就是“一個生產者、多個消費者”的場景,並且要求順序處理。
當前業界開源組件使用Disruptor的包括Log4j2、Apache Storm等,它可以用來作為高性能的有界內存隊列,基於生產者消費者模式,實現一個/多個生產者對應多個消費者。它也可以認為是觀察者模式的一種實現,或者發布訂閱模式。
舉個例子,我們從MySQL的BigLog文件中順序讀取數據,然后寫入到ElasticSearch(搜索引擎)中。在這種場景下,BigLog要求一個文件一個生產者,那個是一個生產者。而寫入到ElasticSearch,則嚴格要求順序,否則會出現問題,所以通常意義上的多消費者線程無法解決該問題,如果通過加鎖,則性能大打折扣。
實戰:Disruptor 的 使用實例
我們從一個簡單的例子開始學習Disruptor:生產者傳遞一個long類型的值給消費者,而消費者消費這個數據的方式僅僅是把它打印出來。
定義一個Event
首先定義一個Event來包含需要傳遞的數據:
public class LongEvent {
private long value;
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
由於需要讓Disruptor為我們創建事件,我們同時還聲明了一個EventFactory來實例化Event對象。
public class LongEventFactory implements EventFactory {
@Override
public Object newInstance() {
return new LongEvent();
}
}
定義事件處理器(disruptor會回調此處理器的方法)
我們還需要一個事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中存儲的數據打印到終端:
/**
*/public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
System.out.println(longEvent.getValue());
}
}
定義事件源: 事件發布器 發布事件
事件都會有一個生成事件的源,這個例子中假設事件是由於磁盤IO或者network讀取數據的時候觸發的,事件源使用一個ByteBuffer來模擬它接受到的數據,也就是說,事件源會在IO讀取到一部分數據的時候觸發事件(觸發事件不是自動的,程序員需要在讀取到數據的時候自己觸發事件並發布):
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
/**
* onData用來發布事件,每調用一次就發布一次事件事件
* 它的參數會通過事件傳遞給消費者
*
* @param bb
*/public void onData(ByteBuffer bb) {
//可以把ringBuffer看做一個事件隊列,那么next就是得到下面一個事件槽
long sequence = ringBuffer.next();
try {
//用上面的索引取出一個空的事件用於填充
LongEvent event = ringBuffer.get(sequence);// for the sequence
event.setValue(bb.getLong(0));
} finally {
//發布事件
ringBuffer.publish(sequence);
}
}
}
很明顯的是:當用一個簡單隊列來發布事件的時候會牽涉更多的細節,這是因為事件對象還需要預先創建。
發布事件最少需要兩步:
獲取下一個事件槽,發布事件(發布事件的時候要使用try/finnally保證事件一定會被發布)。
如果我們使用RingBuffer.next()獲取一個事件槽,那么一定要發布對應的事件。如果不能發布事件,那么就會引起Disruptor狀態的混亂。尤其是在多個事件生產者的情況下會導致事件消費者失速,從而不得不重啟應用才能會恢復。
Disruptor 3.0提供了lambda式的API。這樣可以把一些復雜的操作放在Ring Buffer,所以在Disruptor3.0以后的版本最好使用Event Publisher或者Event Translator(事件轉換器)來發布事件。
Disruptor3.0以后的事件轉換器(填充事件的業務數據)
public class LongEventProducerWithTranslator {
//一個translator可以看做一個事件初始化器,publicEvent方法會調用它
//填充Event
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {
event.setValue(bb.getLong(0));
}
};
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer bb) {
ringBuffer.publishEvent(TRANSLATOR, bb);
}
}
上面寫法的另一個好處是,Translator可以分離出來並且更加容易單元測試。Disruptor提供了不同的接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg, 等等)去產生一個Translator對象。很明顯,Translator中方法的參數是通過RingBuffer來傳遞的。
組裝起來
最后一步就是把所有的代碼組合起來完成一個完整的事件處理系統。Disruptor在這方面做了簡化,使用了DSL風格的代碼(其實就是按照直觀的寫法,不太能算得上真正的DSL)。雖然DSL的寫法比較簡單,但是並沒有提供所有的選項。如果依靠DSL已經可以處理大部分情況了。
注意:這里沒有使用時間轉換器,而是使用簡單的 事件發布器。
public class LongEventMain {
public static void main(String[] args) throws InterruptedException {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor);
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
//發布事件
producer.onData(bb);
Thread.sleep(1000);
}
}
}
在Java 8使用Disruptor
Disruptor在自己的接口里面添加了對於Java 8 Lambda的支持。大部分Disruptor中的接口都符合Functional Interface的要求(也就是在接口中僅僅有一個方法)。所以在Disruptor中,可以廣泛使用Lambda來代替自定義類。
public class LongEventMainJava8 {
/**
* 用lambda表達式來注冊EventHandler和EventProductor
* @param args
* @throws InterruptedException
*/public static void main(String[] args) throws InterruptedException {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
// 可以使用lambda來注冊一個EventHandler
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.getValue()));
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++) {
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence, buffer) -> event.setValue(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
}
}
由於在Java 8中方法引用也是一個lambda,因此還可以把上面的代碼改成下面的代碼:
public class LongEventWithMethodRef {
public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println(event.getValue());
}
public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
{
event.setValue(buffer.getLong(0));
}
public static void main(String[] args) throws Exception
{
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
// Connect the handler
disruptor.handleEventsWith(LongEventWithMethodRef::handleEvent);
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent(LongEventWithMethodRef::translate, bb);
Thread.sleep(1000);
}
}
}
Disruptor如何實現高性能?
Disruptor實現高性能主要體現了去掉了鎖,采用CAS算法,同時內部通過環形隊列實現有界隊列。
-
環形數據結構
為了避免垃圾回收,采用數組而非鏈表。同時,數組對處理器的緩存機制更加友好。 -
元素位置定位
數組長度2^n,通過位運算,加快定位的速度。下標采取遞增的形式。不用擔心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。 -
無鎖設計
每個生產者或者消費者線程,會先申請可以操作的元素在數組中的位置,申請到之后,直接在該位置寫入或者讀取數據。整個過程通過原子變量CAS,保證操作的線程安全。
使用Disruptor,主要用於對性能要求高、延遲低的場景,它通過“榨干”機器的性能來換取處理的高性能。如果你的項目有對性能要求高,對延遲要求低的需求,並且需要一個無鎖的有界隊列,來實現生產者/消費者模式,那么Disruptor是你的不二選擇。
原理:Disruptor 的內部Ring Buffer環形隊列
RingBuffer是什么
RingBuffer 是一個環(首尾相連的環),用做在不同上下文(線程)間傳遞數據的buffer。
RingBuffer 擁有一個序號,這個序號指向數組中下一個可用元素。
Disruptor使用環形隊列的優勢:
Disruptor框架就是一個使用CAS操作的內存隊列,與普通的隊列不同,Disruptor框架使用的是一個基於數組實現的環形隊列,無論是生產者向緩沖區里提交任務,還是消費者從緩沖區里獲取任務執行,都使用CAS操作。
使用環形隊列的優勢:
第一,簡化了多線程同步的復雜度。學數據結構的時候,實現隊列都要兩個指針head和tail來分別指向隊列的頭和尾,對於一般的隊列是這樣,想象下,如果有多個生產者同時往緩沖區隊列中提交任務,某一生產者提交新任務后,tail指針都要做修改的,那么多個生產者提交任務,頭指針不會做修改,但會對tail指針產生沖突,例如某一生產者P1要做寫入操作,在獲得tail指針指向的對象值V后,執行compareAndSet()方法前,tail指針被另一生產者P2修改了,這時生產者P1執行compareAndSet()方法,發現tail指針指向的值V和期望值E不同,導致沖突。同樣,如果多個消費者不斷從緩沖區中獲取任務,不會修改尾指針,但會造成隊列頭指針head的沖突問題(因為隊列的FIFO特點,出列會從頭指針出開始)。
環形隊列的一個特點就是只有一個指針,只通過一個指針來實現出列和入列操作。如果使用兩個指針head和tail來管理這個隊列,有可能會出現“偽共享”問題(偽共享問題在下面我會詳細說),因為創建隊列時,head和tail指針變量常常在同一個緩存行中,多線程修改同一緩存行中的變量就容易出現偽共享問題。
第二,由於使用的是環形隊列,那么隊列創建時大小就被固定了,Disruptor框架中的環形隊列本來也就是基於數組實現的,使用數組的話,減少了系統對內存空間管理的壓力,因為它不像鏈表,Java會定期回收鏈表中一些不再引用的對象,而數組不會出現空間的新分配和回收問題。
原理:Disruptor的等待策略
Disruptor默認的等待策略是BlockingWaitStrategy。這個策略的內部適用一個鎖和條件變量來控制線程的執行和等待(Java基本的同步方法)。BlockingWaitStrategy是最慢的等待策略,但也是CPU使用率最低和最穩定的選項。然而,可以根據不同的部署環境調整選項以提高性能。
SleepingWaitStrategy
和BlockingWaitStrategy一樣,SpleepingWaitStrategy的CPU使用率也比較低。它的方式是循環等待並且在循環中間調用LockSupport.parkNanos(1)來睡眠,(在Linux系統上面睡眠時間60µs).然而,它的優點在於生產線程只需要計數,而不執行任何指令。並且沒有條件變量的消耗。但是,事件對象從生產者到消費者傳遞的延遲變大了。SleepingWaitStrategy最好用在不需要低延遲,而且事件發布對於生產者的影響比較小的情況下。比如異步日志功能。
YieldingWaitStrategy
YieldingWaitStrategy是可以被用在低延遲系統中的兩個策略之一,這種策略在減低系統延遲的同時也會增加CPU運算量。YieldingWaitStrategy策略會循環等待sequence增加到合適的值。循環中調用Thread.yield()允許其他准備好的線程執行。如果需要高性能而且事件消費者線程比邏輯內核少的時候,推薦使用YieldingWaitStrategy策略。例如:在開啟超線程的時候。
BusySpinW4aitStrategy
BusySpinWaitStrategy是性能最高的等待策略,同時也是對部署環境要求最高的策略。這個性能最好用在事件處理線程比物理內核數目還要小的時候。例如:在禁用超線程技術的時候。
原理:並行模式
單一寫者模式
在並發系統中提高性能最好的方式之一就是單一寫者原則,對Disruptor也是適用的。如果在你的代碼中僅僅有一個事件生產者,那么可以設置為單一生產者模式來提高系統的性能。
public class singleProductorLongEventMain {
public static void main(String[] args) throws Exception {
//.....// Construct the Disruptor with a SingleProducerSequencer
Disruptor<LongEvent> disruptor = new Disruptor(factory,
bufferSize,
ProducerType.SINGLE, // 單一寫者模式,
executor);//.....
}
}
一次生產,串行消費
比如:現在觸發一個注冊Event,需要有一個Handler來存儲信息,一個Hanlder來發郵件等等。
/**
* 串行依次執行
* <br/>
* p --> c11 --> c21
* @param disruptor
*/
public static void serial(Disruptor<LongEvent> disruptor){
disruptor.handleEventsWith(new C11EventHandler()).then(new C21EventHandler());
disruptor.start();
}
菱形方式執行
public static void diamond(Disruptor<LongEvent> disruptor){
disruptor.handleEventsWith(new C11EventHandler(),new C12EventHandler()).then(new C21EventHandler());
disruptor.start();
}
鏈式並行計算
public static void chain(Disruptor<LongEvent> disruptor){
disruptor.handleEventsWith(new C11EventHandler()).then(new C12EventHandler());
disruptor.handleEventsWith(new C21EventHandler()).then(new C22EventHandler());
disruptor.start();
}
相互隔離模式
public static void parallelWithPool(Disruptor<LongEvent> disruptor){
disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler());
disruptor.handleEventsWithWorkerPool(new C21EventHandler(),new C21EventHandler());
disruptor.start();
}
航道模式
串行依次執行,同時C11,C21分別有2個實例
/**
* 串行依次執行,同時C11,C21分別有2個實例
* <br/>
* p --> c11 --> c21
* @param disruptor
*/
public static void serialWithPool(Disruptor<LongEvent> disruptor){
disruptor.handleEventsWithWorkerPool(new C11EventHandler(),new C11EventHandler()).then(new C21EventHandler(),new C21EventHandler());
disruptor.start();
}
回到◀瘋狂創客圈▶
瘋狂創客圈 - Java高並發研習社群,為大家開啟大廠之門