想了解一個項目,最好的辦法就是,把它的源碼搞到本地自己搗鼓。
在網上看了 N 多人對 Disruptor 速度的吹捧,M 多人對它的機制分析,就連 Disruptor 官方文檔中,也 NB 哄哄自詡:
At LMAX we have built an order matching engine, real-time risk management,
and a highly available in-memory transaction processing system all on this pattern to great success.
Each of these systems has set new performance standards that, as far as we can tell, are unsurpassed.
很少見到哪個開源項目,能夠對自己的項目如此接近自大的自信。也難怪,不然該項目怎么會獲得 2011 Duke's 程序框架創新獎呢。
但是,在學習它之前,我還是懷疑它是不是真的有那么快(快如閃電)?為什么會那么快?
有沒有,不能光聽別人說,我相信自己的眼睛和腦瓜勝過耳朵,且看且分析。
於是 down 了 3.2 版本的源碼開始搗鼓。
以下對 Disruptor 源碼的分析,都是基於這個版本的。
1. Disruptor 初印象
Disruptor 的源碼非常精簡,沒有任何配置文件,所有源文件類加起來也就 58 個(不同版本可能不一樣),用代碼行統計工具算了下,一共 6306 行(好像我挺無聊的)。對於一個能做到如此成功的開源工具來說,能有這么精短的代碼量,確實很不錯。
Disruptor 代碼共分為四個包:
1). com.lmax.disruptor: 大部分文件存放於這個目錄下,包括 Disruptor 中重要的類文件,包括:EventProcessor、RingBuffer、Sequence、Sequencer、WaitStrategy 等
2). com.lmax.disruptor.collections: 該目錄下只有一個類:Histogram,它不是 Disruptor 運行的必須類,其實我也沒用過它,從源碼注釋來看,該類的作用是,在一個對性能要求很高的、有多個消費者的系統中,Histogram 可以用來記錄系統耗各個組件的耗時情況,並以直方圖的形式展示出來。初學 Disruptor 可以不用管關心它。
3). com.lmax.disruptor.dsl: 該包中保存了消費者和生產者的一些信息,核心類文件 Disruptor 也存放在該目錄下。
4). com.lmax.disruptor.util: 該包中存放了幾個輔助操作類,如 Util 類,DaemonThreadFactory 類,PaddedLong 類,該類用來做緩沖行填充的。
2. Disruptor demo
閑話不多說,來個 hello 級別的 demo 應該是最好的入門手段了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
package
com.lmax.test;
import
java.util.concurrent.ExecutorService;
import
java.util.concurrent.Executors;
import
com.lmax.disruptor.EventHandler;
import
com.lmax.disruptor.RingBuffer;
import
com.lmax.disruptor.dsl.Disruptor;
public
class
Sample {
@SuppressWarnings
(
"unchecked"
)
public
static
void
main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
Disruptor<ValueEvent> disruptor =
new
Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY,
4
, exec);
final
EventHandler<ValueEvent> handler1 =
new
EventHandler<ValueEvent>() {
// event will eventually be recycled by the Disruptor after it wraps
public
void
onEvent(
final
ValueEvent event,
final
long
sequence,
final
boolean
endOfBatch)
throws
Exception {
System.out.println(
"handler1: Sequence: "
+ sequence +
" ValueEvent: "
+ event.getValue());
}
};
// final EventHandler<ValueEvent> handler2 = new EventHandler<ValueEvent>() {
// // event will eventually be recycled by the Disruptor after it wraps
// public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception {
// System.out.println("handler2: Sequence: " + sequence + " ValueEvent: " + event.getValue());
// }
// };
// disruptor.handleEventsWith(handler1, handler2);
disruptor.handleEventsWith(handler1);
RingBuffer<ValueEvent> ringBuffer = disruptor.start();
int
bufferSize = ringBuffer.getBufferSize();
System.out.println(
"bufferSize = "
+ bufferSize);
for
(
long
i =
0
; i <
1000
; i++) {
long
seq = ringBuffer.next();
try
{
String uuid = String.valueOf(i);
ValueEvent valueEvent = ringBuffer.get(seq);
valueEvent.setValue(uuid);
}
finally
{
ringBuffer.publish(seq);
}
}
disruptor.shutdown();
exec.shutdown();
}
}
|
定義 ValueEvent 類,該類作為填充 RingBuffer 的消息,生產者向該消息中填充數據(就是修改 value 屬性值,后文用生產消息代替),消費者從消息體中獲取數據(獲取 value 值,后文用消費消息代替):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
package
com.lmax.test;
import
com.lmax.disruptor.EventFactory;
/**
* WARNING: This is a mutable object which will be recycled by the RingBuffer.
* You must take a copy of data it holds before the framework recycles it.
*/
public
final
class
ValueEvent {
private
String value;
public
String getValue() {
return
value;
}
public
void
setValue(String value) {
this
.value = value;
}
public
final
static
EventFactory<ValueEvent> EVENT_FACTORY =
new
EventFactory<ValueEvent>() {
public
ValueEvent newInstance() {
return
new
ValueEvent();
}
};
}
|
Sample.java 代碼分析:
第13行:
創建ExecutorService對象。
所有消費者線程都由該對象啟動。
第14行:
創建 Disruptor 對象。
Disruptor 類是 Disruptor 項目的核心類,另一個核心類之一是 RingBuffer。
如果把 Disruptor 比作計算機的 cpu ,作為調度中心的話,那么 RingBuffer ,就是計算機的 Memory 。
第一個參數,是一個 EventFactory 對象,它負責創建 ValueEvent 對象,並填充到 RingBuffer 中;
第二個參數,指定 RingBuffer 的大小。這個參數應該是2的冪,否則程序會拋出異常:
Exception in thread "main" java.lang.IllegalArgumentException: bufferSize must be a power of 2
at com.lmax.disruptor.AbstractSequencer.<init>(AbstractSequencer.java:51)
at com.lmax.disruptor.MultiProducerSequencer.<init>(MultiProducerSequencer.java:60)
at com.lmax.disruptor.RingBuffer.createMultiProducer(RingBuffer.java:79)
at com.lmax.disruptor.RingBuffer.createMultiProducer(RingBuffer.java:94)
at com.lmax.disruptor.dsl.Disruptor.<init>(Disruptor.java:74)
at com.lmax.test.Sample.main(Sample.java:14)
第三個參數,就是之前創建的 ExecutorService 對象。
伴隨着Disruptor的創建,RingBuffer 對象也被創建,RingBuffer是一個環形緩沖區,其實它是用一個 Object[] 實現的(這個后面會詳說),RingBuffer 創建之后,會創建bufferSize數量的消息(這里說的消息,就是EventValue對象,下同)填充至RingBuffer,也許有人對此有疑問:生產者還沒有啟動,怎么已經將消息創建了?其實是這樣的:Disruptor在啟動前預創建所有消息,以后生產者 "生產消息",只是修改 RingBuffer 中某個消息的內容,當然,這里說的 "某個",不是隨隨便便從 RingBuffer 中取出一個,而是通過某個策略順序訪問,並和消費者保持協調。生產者修改完消息后,在該消息上打個標記,表示該消息已經生產了,消費者可以來消費了,消費者通過這個標記來確定是否可以讀消息了。消費者讀完消息后,也會在消息上打一個標記,表示生產者可以再次在該消息上繼續生產了。如此循環。我個人認為,理解 Disruptor 最大的難點就在於,要搞清楚生產者和消費者間是如何協同工作的,一旦理解了這一點,就掌握了 Disruptor 的靈魂,本文后續主要精力將集中在對該問題的討論上。
以下是向 RingBuffer 填充消息的代碼:
1
2
3
4
5
6
7
|
private
void
fill(EventFactory<E> eventFactory)
{
for
(
int
i =
0
; i < entries.length; i++)
{
entries[i] = eventFactory.newInstance();
}
}
|
同時,生產者對象也會被創建,Disruptor中支持兩種生產者類型:多生產者和單生產者,分別用:MultiProducerSequencer 和 SingleProducerSequencer 表示,默認使用 MultiProducerSequencer:
1
2
3
4
5
6
7
8
|
public
static
<E> RingBuffer<E> createMultiProducer(EventFactory<E> factory,
int
bufferSize,
WaitStrategy waitStrategy)
{
MultiProducerSequencer sequencer =
new
MultiProducerSequencer(bufferSize, waitStrategy);
return
new
RingBuffer<E>(factory, sequencer);
}
|
第 16 - 21 行,通過實現 EventHandler 接口,創建了一個 EventHandler 對象,用來處理消費者拿到的消息。
第 23 - 27 行,注釋起來了,創建另一個 EventHandler 對象,多消費者情況下,需要創建多個 EventHandler。 EventHandler 對象和消費者一一對應。
第 29 和 30 行,將 EventHandler 對象傳入 Disruptor ,Disruptor 依據 EventHandler 參數個數,創建相等數量消費者對象。
至此,准備工作完成:
作為管控中心的 Disruptor 對象已創建;
作為消息存儲中心的 RingBuffer 對象已創建;
生產者對象已創建;
消費者對象和與它關聯的事件處理對象(EventHandler)對象已創建。
接下來就是啟動 Disruptor 系統了,讓消費者跑起來。
第 31 行,start() 方法啟動消費者線程:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public
RingBuffer<T> start()
{
Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(
true
);
ringBuffer.addGatingSequences(gatingSequences);
checkOnlyStartedOnce();
for
(ConsumerInfo consumerInfo : consumerRepository)
{
consumerInfo.start(executor);
}
return
ringBuffer;
}
|
每個消費者線程都有一個等待策略:以確定當無消息可消費時,消費者是阻塞還是輪詢。
Disruptor 中定義了幾種不同等待策略:BlockingWaitStrategy、TimeoutBlockingWaitStrategy、SleepingWaitStrategy等。
第 36 - 45 行,生產10條消息
生產者線程(main線程)通過 next 方法,獲取 RingBuffer 可寫入的消息索引號 seq;
通過 seq 檢索消息;
修改消息的 value 屬性;
通過 publish 方法,告知消費者線程,當前索引位置的消息可被消費了。
第 47 - 48 行,停止 Disruptor系統(停止消費者線程)。
運行結果:
bufferSize = 4
handler1: Sequence: 0 ValueEvent: 0
handler1: Sequence: 1 ValueEvent: 1
handler1: Sequence: 2 ValueEvent: 2
handler1: Sequence: 3 ValueEvent: 3
handler1: Sequence: 4 ValueEvent: 4
handler1: Sequence: 5 ValueEvent: 5
handler1: Sequence: 6 ValueEvent: 6
handler1: Sequence: 7 ValueEvent: 7
handler1: Sequence: 8 ValueEvent: 8
handler1: Sequence: 9 ValueEvent: 9
3. 核心問題
3.1 環形緩沖區(RingBuffer)和索引映射
Disruptor 中緩沖區用數組實現:
1
|
private
final
Object[] entries;
|
該數組大小由 bufferSize 參數指定,以 bufferSize = 4 為例:
很明顯不是個環形。
環形數組應該是這樣的:
0點-3點區域表示 entries[0];
3點-6點區域表示 entries[1];
6點-9點區域表示 entries[2];
9點-0點區域表示 entries[3];
區域內數字表示數組索引號,
數組容量為 4, 所以索引號必須在[0, 3]區間內。
Disruptor 通過 & 映射索引:
映射后的索引號 = 實際索引號 & (bufferSize - 1)
代碼:
1
2
3
4
|
public
E get(
long
sequence)
{
return
(E)entries[(
int
)sequence & indexMask];
}
|
其中:indexMask = bufferSize - 1。
如:
6 & 3 = 2;實際索引號為 6,映射索引號為 2
8 & 3 = 0;實際索引號為 8,映射索引號為 0
3.2 緩存相關問題(Cache Line padding)
引入三個概念:
(1)偽共享
(2)緩存行
(3)緩存行填充
詳細講解可參考這里,這里簡單簡說明一下。
3.2.1 CPU 緩存結構
如下圖:一個cpu上有一個 L3 Cache,在四個 core 上共享,每個 core 上由獨立的 L2 Cache和 L1 Cache,其中 L1 Cache 又分為 D-Cache(數據緩存)和 L-Cache(指令緩存)。
3.2.2 緩存行
緩存是由緩存行構成的,緩存行一般32-256個字節,常見的64字節,如下圖,int 型變量 a 和 b 位於同一個緩存行:
3.2.3 緩存行填充
下圖,變量 X、Y 在 L3 Cache 中被存放於同一個緩存行:
如果存在多核,因為一級和二級緩存不會在核間共享,所以每個核的一級和二級緩存上,又保存和 X 和 Y 的緩存行映像。
一旦cpu core 更改了 X 或 Y,都要經過一級緩存、二級緩存、三級緩存,寫回Local Mem。
如:core1 修改了 X,同時core2 修改了 Y,則在緩存更新上必然存在沖突:core1 看到的Y,core2 看到的X,都已被對方修改。
所以,一旦core1 修改了X,就會引起 core2 中的緩存行失效,core2 須等到 core1 將修改同步到 L3 Cache,然后從L3 緩存中讀出正確的 X 值,執行Y的修改,再將結果寫入L3 Cache。
事情看起來就有點奇怪了:獨立的變量 X 和 Y,在兩個獨立的 cpu core 上由互不相干的線程執行修改,兩個 core 上的線程卻發生了同步!同步必然影響線程執行效率,所以這也被稱作多線程殺手,而且這個殺手藏得很深。
導致以上兩個獨立線程同步的原因,就是 X 和 Y 被放在了同一個緩存行上,兩個線程存現了偽共享:緩存行共享。
很自然的就會想到將 X 和 Y 放到兩個不同的緩存行中,沒有了緩存行共享,就不存在同步了。
那怎樣做到一個緩存行只保存一個變量?
這個問題就像給你兩個麻袋,兩個蘋果,怎樣做,可以保證兩個蘋果被放到不同的麻袋中?
填充是一種解決辦法:將一個蘋果和一大堆土豆捆綁到一起,只要土豆的數量夠多,就能保證這一個蘋果和這些土豆能把一個麻袋裝的滿滿的,另一個蘋果只能裝到另一個麻袋里了。
緩存行填充的思想和這個一樣(蘋果是變量,麻袋是緩存行):變量 X 和 一些不會被訪問到的變量捆綁到一起(通常用數組實現,基於數組元素內存地址的連續性),保證 X 和這些變量能把一個緩存行填充滿;Y 也按照同樣的方式做。
4. 生產者和消費者協調策略
4.1 相關說明
4.2 生產者
為生產者引入 current 屬性,表示當前坐標,current + 1 即表示 下次訪問的數組坐標,即 next 屬性。
生產者剛啟動時:
current = -1; next = -1 + 1 = 0;
生產者開始工作:
第一次生產者向 RingBuffer[0] 生產數據;
第二次生產者向 RingBuffer[1] 生產數據;
第三次生產者向 RingBuffer[2] 生產數據;
第四次生產者向 RingBuffer[3] 生產數據;
因為RingBuffer[4] 被映射到 RingBuffer[0],所以第五次生產者准備向RingBuffer[0]寫數據,問題來了:消費者尚未消費 RingBuffer[0],生產者不能訪問RingBuffer[0]:
為方便生產者判斷下一個數組元素是否可訪問,引入兩個變量:wrapPoint,sequence,其中:
1). sequence 記錄消費者已消費過的數組元素的索引(未被映射的),初始值為 -1 表示尚未消費任何元素。
2). wrapPoint = next - bufferSize; wrapPoint 在 next 上向后偏移 bufferSize 個單位,如 next = 4 時,wrapPoint = 0。wrapPoint是一個未被映射的索引。那這個 wrapPoint 是用來干嘛的?別急,先看下圖:
此圖是生產者向RingBuffer 中寫滿數據(next 由 0 變到 3),以及准備繼續寫入數據這段時間內幾個變量的變化情況。因消費者一直未消費,所以sequence值保持為 -1。
生產者寫入數據前,判斷是否可向 next 位置寫入數據的依據是:如果 wrapPoint <= sequence,表示可寫,否則,不可寫。
所以我覺得可以這樣理解wrapPoint的含義:因為RingBuffer的容量是 bufferSize,所以生產者一開始就有bufferSize大小的數組元素可寫,但寫滿之后,就必須看消費者的臉色了,消費者不消費,生產者就不能往里面寫(因為這個數組是環形的,生產者再寫就要把之前的數據覆蓋了)。既然這樣,就必須確定一個基准點,用來判斷消費者是否超過了消費者。生產者要回退 bufferSize 個單位,才能和消費者站在一個點上進行比較,所以 wrapPoint = next - bufferSize 就是這樣來的。next = 4 時,wrapPoint = 0,而 sequence = -1,表示消費者落后於生產者,生產者就必須等待。消費者消費完一個元素,會將自身的 sequence 加1,變為0,表示0索引位置的已經消費,生產者下次比較時發現 wrapPoint == sequence,就可以繼續生產了。
多消費者情況下,有些消費者消費的可能比較慢,這樣,生產者就必須等待最慢的消費者:
1
|
long
gatingSequence = Util.getMinimumSequence(gatingSequences, current);
|
生產者代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
do
{
current = cursor.get();
next = current + n;
long
wrapPoint = next - bufferSize;
long
cachedGatingSequence = gatingSequenceCache.get();
if
(wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
// eclipseek. 獲取最慢的消費者的位置
long
gatingSequence = Util.getMinimumSequence(gatingSequences, current);
// eclipseek. 表示生產者從后面追過消費者,這個是不允許的。這里等待1納秒,在重新開始
if
(wrapPoint > gatingSequence)
{
LockSupport.parkNanos(
1
);
// TODO, should we spin based on the wait strategy?
continue
;
}
gatingSequenceCache.set(gatingSequence);
}
// eclipseek. 生成者可以正常去搶位置,compareAndSet不能保證一定成功,所以
// 可以看到 while (true),其實是會不斷去嘗試,直到成功.
else
if
(cursor.compareAndSet(current, next))
{
break
;
}
}
while
(
true
);
|
4.3 消費者
消費者核心代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
T event =
null
;
long
nextSequence = sequence.get() + 1L;
try
{
while
(
true
)
{
try
{
// nextSequence:消費者期望處理的下一個數據的序號
final
long
availableSequence = sequenceBarrier.waitFor(nextSequence);
// 消費者將超過生產者
if
(nextSequence > availableSequence)
{
Thread.yield();
}
while
(nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
catch
(
final
TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch
(
final
AlertException ex)
{
if
(!running.get())
{
break
;
}
}
catch
(
final
Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
|
上面說過,每個消費者都有 sequence ,初始值為-1,消費者剛進入時,nextSequence = sequence + 1 = 0,nextSequence 表示期望消費的元素索引號。
以后消費者通過 waitFor 方法獲取實際可消費的元素索引(期望和實際是兩碼事),實際可消費的元素索引依賴於生產者,消費者之所以能做出正確的判斷,是因為消費者能看到生產者的 current 屬性!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public
long
waitFor(
long
sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws
AlertException, InterruptedException
{
long
availableSequence;
if
((availableSequence = cursorSequence.get()) < sequence)
{
// eclipseek. 用到了lock.lock();意思就是同時只允許一個個消費者排隊去搶,
// 下一個消費者要等待上一個消費者處理完一個之后才能搶.
lock.lock();
try
{
while
((availableSequence = cursorSequence.get()) < sequence)
{
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally
{
lock.unlock();
}
}
while
((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
}
return
availableSequence;
}
|
if 語句中:cursorSequence 就是生產者的 current,sequence 是消費者期望的消費索引號。這段代碼表明,如果實際可消費的索引號小於消費者期望消費的所以號,消費者就進入等待狀態。后續生產者通過 publish 方法將消費者喚醒。
上面消費者核心代碼中,消費者消費完后,執行了:
sequence.set(availableSequence);
這條語句,就是重設自己的 sequence,是為了讓生產者能及時看到,以便生產者確定可寫入數組元素的索引。
另外消費者還執行一段重要的代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public
long
waitFor(
final
long
sequence)
throws
AlertException, InterruptedException, TimeoutException
{
checkAlert();
// eclipseek. waitStrategy 的默認實現是 BlockingWaitStrategy
long
availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence,
this
);
if
(availableSequence < sequence)
{
return
availableSequence;
}
// eclipseek. 檢查生產者的位置信息的標志是否正常.這個是和生產者的publish方法聯系起來的.
return
sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
|
waitFor方法第一個參數是消費者期望消費的索引序列號,cursorSequence是生產者的current,返回值availableSequence是實際可消費的索引號,這個值返回后,生產者還要做檢查,就是通過最下面的 getHighestPublishedSequence方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@Override
public
boolean
isAvailable(
long
sequence)
{
int
index = calculateIndex(sequence);
int
flag = calculateAvailabilityFlag(sequence);
long
bufferAddress = (index * SCALE) + BASE;
return
UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
}
@Override
public
long
getHighestPublishedSequence(
long
lowerBound,
long
availableSequence)
{
for
(
long
sequence = lowerBound; sequence <= availableSequence; sequence++)
{
if
(!isAvailable(sequence))
{
return
sequence -
1
;
}
}
return
availableSequence;
}
|
這里要說下 availableBuffer 變量,這是一個 bufferSize 大小的 int 數組,初始值為[-1, -1, -1, -1]。前面說過,索引號映射之后,一定是在[0, 3] 范圍內,分別代表RingBuffer底層數組的 RingBuffer[0],RingBuffer[1],RingBuffer[2],RingBuffer[3],每次生產者向該數組中對應位置寫入一個值,availableBuffer 數組對應位置的值加1,如:
生產者向 RingBuffer[0] 生產數據,則availableBuffer變為[0, -1, -1, -1];
生產者向 RingBuffer[1] 生產數據,則availableBuffer變為[0, 0, -1, -1];
生產者向 RingBuffer[2] 生產數據,則availableBuffer變為[0, 0, 0, -1];
生產者向 RingBuffer[3] 生產數據,則availableBuffer變為[0, 0, 0, 0];
生產者向 RingBuffer[4] (映射后就是Ringbuffer[0])生產數據,則availableBuffer變為[1, 0, 0, 0];
生產者向 RingBuffer[5] (映射后就是Ringbuffer[1])生產數據,則availableBuffer變為[1, 1, 0, 0];
生產者向 RingBuffer[6] (映射后就是Ringbuffer[2])生產數據,則availableBuffer變為[1, 1, 1, 0];
生產者向 RingBuffer[7] (映射后就是Ringbuffer[3])生產數據,則availableBuffer變為[1, 1, 1, 1];
生產者向 RingBuffer[8] (映射后就是Ringbuffer[0])生產數據,則availableBuffer變為[2, 1, 1, 1];
...
那 available 數組有什么作用呢?其實這是一種驗證策略,以防消費者跑到生產者前面去消費那些生產者還沒有生產的消息。具體是這樣實現的:
生產者:
生產者在索引為 sequence 的位置處生產了元素,則修改 availeable[x] 元素的值:
availeable[x] = flag = (int) (sequence >>> indexShift);;
其中:
x = sequence & (bufferSize -1);
indexShift = log2(bufferSize);
消費者:
1
2
3
4
5
6
7
8
|
@Override
public
boolean
isAvailable(
long
sequence)
{
int
index = calculateIndex(sequence);
int
flag = calculateAvailabilityFlag(sequence);
long
bufferAddress = (index * SCALE) + BASE;
return
UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
}
|
消費者以同樣的方式,計算 sequence(可用索引坐標)對應的映射坐標index,以及flag,通過比較 available[index] 是否等於 flag,即可判斷取到的 sequence 是不是有效的。
簡單來說,算法思想是這樣的:
生產者和消費者共享 bufferSize 大小的數組 available,生產者生產了元素,就修改available數組 "對應元素" 的值,消費者拿到課消費元素索引時,計算出一個值,然后比較這個值是否和 available 數組“對應元素”的值是否相等,如果相等,就說明該索引確實是可用的。
有點像生產者加密,消費者解密的意思。
參考文獻:
1. 寫Java也得了解CPU–CPU緩存
2. http://maoyidao.iteye.com/blog/1663193
附件列表