高性能隊列設計


本文已整理致我的 github 地址 https://github.com/allentofight/easy-cs,歡迎大家 star 支持一下

這是一個困擾我司由來已久的問題,近年來隨着我司業務的急遽發展,單表數據量越來越大,這樣會導致讀寫性能急遽下降,自然而然的我們想到了分庫分表,不過眾所周知分庫分表規則比較復雜,而且業務代碼可能需要大改(由於數據分布在不同的庫表里,業務需要判斷到底去哪些表取數,並且取完后需要將數據再聚合在一起返回前端),所以經過橫向對比我們采用了阿里開源的分庫分表中間件 Cobar,這樣的話一來 Cobar 根據我們設定的規則分庫分表了,二來原來調用 SQL 的地方只需改成調用 Cobar 即可,Cobar 會自動根據我們寫的 SQL 去各個分庫分表里查詢並將結果返回給我們,我們業務層的代碼幾乎不需要改動(即對應用是透明的)

如圖示:使用 cobar 進行分庫分表后,可以看到業務代碼幾乎不需要改動

所以問題是?

使用 Cobar 確實解決了分庫分表和對業務代碼侵入性的問題,但由於又引入了一個中間層,導致可用性降低,為了防止 Cobar 不可用等造成的影響,我們需要監控 Cobar 的各項性能指標,如 SQL 執行時間,是否失敗,返回行數等,這樣方便我們分析 Cobar 的各項指標,這就是我們常說的 SQL 審計(記錄數據庫發生的各種事件),那怎么樣才能高效記錄這些事件而又不對執行業務代碼的線程造成影響呢?

要記錄上報這些審計事件,肯定不能在執行業務代碼的線程里執行,因為這些事件屬於業務無關的代碼,如果在業務線程里執行,一來和業務代碼藕合,二來如果這些審計事件傳輸(記錄審計事件總要通過磁盤或網絡記錄下來)遇到瓶頸會對正常的業務邏輯造成嚴重影響。我們可以修改一下 cobar 代碼,在 cobar 里的執行邏輯中拿到這些事件后,把這些事件先緩存在隊列中,讓另外的線程從這些隊列里慢慢取出(消費),然后再將這些數據上報,這樣業務線程可以立即返回執行其他正常的業務邏輯。

2 (1)
2 (1)

注:虛線部分為對 cobar 中間件的改造,業務調用是無感知的 如圖示,主要步驟如上圖所示

  1. 客戶端請求后執行 cobar
  2. cobar 執行后將「執行時間」,「是否失敗」,「返回行數」等寫入隊列
  3. 寫入隊列后業務線程立即返回,然后可以執行正常的業務邏輯
  4. 后台線程則不斷取出 event 通過 UDP 傳給另外一個機器,寫入 kafka 進行上報

小伙子不錯啊,一看這架構圖就知道有點東西,但我這里有點疑問,在第二步中,為啥不把 SQL 審計的那些指標直接寫入 kafka 呢,如下

kafaka 不是號稱寫入性能可達幾十甚至上百萬嗎,像上述這樣實現架構上實現不是更簡單嗎

這是個很好的問題,有以下兩個原因

  1. 我們是對 cobar 工程本身進行修改,然后將其打成 jar 包再集成到應用程序中來的,如果采用上面的設計,那就意味着要在 cobar 工程中引入對 kafka 的依賴,而我們只想對 cobar 作少量的修改,不想依賴太多第三方的庫
  2. 這也是最重要的, 引入 kafka 本身會導致可用性降低,有可能會阻塞業務線程,在 kafka Producer 中,設計了一個消息緩沖池,客戶端發送的消息首先會被存儲到緩沖池中,同時 Producer 啟動后還會啟動一個 sender 線程不斷地獲取緩沖池中的消息將其發送到 Broker 中

如圖示,我們在構建 kafka producer 時,會有一個自定義緩沖池大小的參數 buffer.memory,默認大小為 32M,因此緩沖池的大小是有限制,那如果這個緩存池滿了怎么辦,RecordAccumulator 是 Kafaka 緩沖池的核心類,官方對其注釋寫得非常清楚

The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.

也就是說如果緩存池滿 了,消息追加調用將會被阻塞,直到有空閑的內存塊,這樣的話只要 Kafka 集群負載很高或者網絡稍有波動,Sender 線程從緩沖池撈取消息的速度趕不上客戶端發送的速度,就會造成客戶端(也就是 Cobar 執行線程)發送被阻塞,這樣的話可能導致線上幾乎所有接口調用雪崩,系統不可用,導致嚴重的災難!

而原來的設計看似復雜,但實際上符合軟件設計中的分層原則,這樣的設計有兩個好處,如下圖示:

  1. 首先 Cobar 執行線程將審計信息丟給隊列后立即返回,我們 只要設計這樣的不阻塞高效的隊列即可
  2. 后台線程取出后通過 UDP 傳給另外的 agent,Cobar 執行線程所在的 JVM 和 agent 的 JVM 是不同的(畢竟部署在不同的機器上),做到了 JVM 的隔離,也更安全

小伙子果然有兩把刷子,單獨拎出上圖的 cobar 執行線程與后台線程通過隊列通信的模塊,可以看到它就是個典型的生產者消費者模型 所以現在問題的關鍵就轉化為如何設計這樣的隊列了,它要滿足兩個條件

  1. cobar 執行線程寫入隊列無論如何都不會阻塞
  2. 寫入隊列要足夠快(吞吐率要高),畢竟我司是大廠,巔峰期可能會達到每秒幾萬的 QPS

沒錯,不愧是大佬,一眼看透問題的本質,只要能設計好這樣的隊列,所有問題就迎刃而解了

恩,那你就來談談如何設計這樣的隊列吧。

雖然我心中已經有數,但為了展示我高超的吹牛對隊列熟練運用的能力,我決定由淺入深地來講解一下隊列的演進史,這樣可以把隊列的選型了解得明明白白

你高興就好。

高性能隊列的實現思路

1.隊列的表示:數組 or 鏈表?

隊列(Queue)是一種線性表,是一種先進先出的數據結構,主要由數組和鏈表組成,隊列只允許在后端(稱為 rear)進行插入操作,在前端(稱為 front)進行刪除操作

隊列的兩種表現形式
隊列的兩種表現形式

這兩者優缺點都很明顯,但總的來說數組的執行效率更高,為啥,這里簡單介紹下 CPU 的運行原理,由於內存太慢,所以在 CPU 和 內存間設置了多道緩存(L1,L2,L3三級緩存,其中 L1,L2 緩存為每個 CPU 核獨有的, L3是共享的)以提升 CPU 的執行效率,CPU 執行取數時,先從 L1查找,沒有再從 L2查找,L2 沒有則從 L3,查找,L3還是沒有的話就會從內存加載。

但需要注意的是,CPU 從內存加載數據時並不是以字節為單位加載,它是以 cacheline 的形式來加載的,cacheline 是從內存加載或寫入的最小單位,在 X86 架構中,一個cacheline 由內存里連續的 64 個 byte組成的,而數組是在內存里連續分配的,所以它一次性能被加載多個數據到 cache 中,而鏈表中 node 的空間是分散在非連續的內存地址空間中,所以總的來說數組由於利用了 cache line 的連續加載特性對緩存更友好,性能會更好。

鏈表對擴容更友好?

這應該是不少人支持使用鏈表的一個重要原因了,如果空間不夠大,需要擴容怎么辦,對於鏈表來說,很簡單,在 rear 結點后新增一個節點,將 rear 結點的 next 指針指向它即可,非常方便,但對於數組來說就沒那么容易了,它需要先生成一個原來 n(一般是 2) 倍大小的新數組,再把老數組里的數據給移過去,如下圖所示

如果光從擴容這一角度來看,確實鏈表更優秀,但我們不要忘了消費者消費完后是要把鏈表對應的節點給釋放掉的,在高並發下,就會造成頻繁的 GC,造成嚴重的性能影響

估計有人就會反駁了,如果數組中的元素被消費完了,難道不要被移除?這樣的話豈不是也會存在高並發下的頻繁 GC?總不能一開始給這個數組分配一個無限大的空間吧,這樣的話就成了無界隊列,這樣的話還沒等你數組填滿就 OOM 了。

這是個好問題,實際上對於數組來說,我們可以使用一個小 trick,既可以讓它變成有界(即固定大小,無需擴容)數組,也可以避免頻繁 GC,更可以避免數組擴容帶來的性能問題,怎么做,將線性數組改造成循環數組(RingBuffer)

2
2

如圖示,假設數組的初始化大小為 7,當生產者把數組的七個元素都填滿時(此時 0,1,2三個元素已經被消費者消費完了),如果生產者還想再填充數據,由於 0,1,2對應的三位元素已經被消費了,屬於過期無效的元素了,所以生產者可以從頭開始往里填充元素,只要不超過消費者的進度即可,同理,如果消費者對應的指針達到數組的末端,下一次消費也就回到數組下標 0 開始消費,只要不超過生產者進度即可。

我們將將數組的首尾拼接就形成了一個 ringbuffer

ringbuffer
ringbuffer

有人會說繞圈了怎么定位數組的具體下標?對數組大小取模即可,生產者/消費者對應的數組下標都是累加的,以以上情況為例,當前生產者的下標為 6,下一個下標就是 7,而當前數組大小為 7,於是 7 對應的數組下標即為 7%7 = 0,與實際相符。但需要注意的是取模操作是個很昂貴的操作,所以我們可以用位運算來代替,但位運算要求數組的大小為 2^n(想想為什么),於是取模操作可以用 index & (2^n -1 ) 來代替(index 為 生產者/消費者對應的下標)。

綜上,設計一個這樣大小為 2^n(這里有兩層含義,一是大小為 2^n,二是數組有界) 且由數組表示的 ringbuffer 有「對緩存友好」,「對 GC 友好」兩個重要特性,在高並發下對性能的提升是非常有幫助的。

2. 無鎖

只選好 ringbuffer 是不夠的,在高並發下,多個生產者/消費者極有可能爭用 ringbuffer 的 同一個 index,如下圖示:

為了避免這種情況,最容易想到的是加鎖,但顯然加鎖會存在嚴重的性能問題:

  1. 線程如果爭用不到鎖失敗,會阻塞(由用戶態進入內核態),喚醒時又會從內核態進入用戶態,我們知道這種不斷地在用戶態和內核態間進行切換的操作是非常昂貴的

  2. 線程 cache miss 的開銷,一開始線程可能被分配在 cpu core 1 上,但經過阻塞,喚醒后又被調度到了 cpu core 2 上,導致原來在 core 1 上加載的緩存無用武之地, 線程在 core 2 又得重新從內存加載數據

  3. 如果業務線程被阻塞了,那很可能出現對應的服務都無法使用的情況(畢竟業務線程負責接收返回用戶的請求),會造成嚴重的線上問題

綜上所述使用鎖是不行的,一種有效的方式是使用 CAS 自旋不斷嘗試獲取對應的 index,這種方式也就是我們所說的無鎖的方式

也許有人擔心 CAS 性能比較低,我們可以用 JDK 8 之后對 atomic 類引入的 unsafe 的 getAndAddInt 方法來替換,如下:

public final int incrementAndGet() {

 return unsafe.getAndAddInt(this, valueOffset, 1) + 1;

}

它使用的是 CPU 的 fetch-and-add 指令,性能上比 CAS 要強很多(實測是 6 倍於 CAS 的性能,見文末參考鏈接),skywalking 的隊列就是用的 getAndInt 來代替 CAS,達到了很高的性能。

不愧是通過了我們簡歷篩選的男人,那么 JDK 里的內置隊列能否符合這樣的條件呢

不能,我們先來看看 JDK 內置隊列有哪些,以及它們有哪些特點

image-20210621091619682
image-20210621091619682

可以看到,最接近高性能隊列「有界環形數組」和「無鎖」這兩個特性的只有 ArrayBlockingQueue 這一個隊列,但很可惜它是有鎖的,而且它是生產者和消費者共用同一把鎖(可重入鎖 Reentrantlock),這意味着生產與消費無法同時進行,顯然無法滿足我們的要求。我們可以對 ArrayBlockingQueue 進行改造,把此鎖干掉,用無鎖比如 CAS 的方式來實現,這樣不就解決了嗎,高性能隊列 disruptor就是這么干的

小伙子果然不錯,居然用過 disruptor,看得出來你對 disruptor 還挺了解的,說得頭頭是道,那我考你兩個問題,這兩個問題如果答得不錯,直接發 offer

  1. disruptor 確實滿足了你說的「使用了大小為 2^n 的 ringbuffer 數組」和「無鎖」兩個特性,除此之外它還利用了哪些優化的點,在我看來你的這兩個點確實重要,但最關鍵也是我最在意的一個點還沒有提到
  2. disruptor 是阻塞的嗎,也就是說如果 ringbuffer 滿了會不會阻塞向 ringbuffer 投遞事件的業務線程

第一題,disruptor 在初始化的時候為數組填充了所有事件類對象,這些對象不會消亡,也就避免了 GC,不同的事件只是值不一樣而已,所以新增的事件會復用數組中的對象,只是將其屬性值修改而已,另外這么做還有一個好處:避免 CPU 在高並發時分配大量的對象造成 CPU 飆升

第二題,是阻塞的,所以如果要把 disruptor 應用到我們開頭的設計架構中,可能 cobar 執行線程會有阻塞風險,想要不阻塞可以使用 skywalking 的隊列,它是非阻塞的,性能也很給力(正常 cobar 壓測5.5w/s,而用 skywalking 的隊列只損失 1.8%,可以接受)

第一題你的回答是沒問題的,但不是我想要的答案,我給你提示一下,在 ArrayBlockingQueue 中,連續定義了這三個變量

/** items index for next take, poll, peek or remove */
int takeIndex;

    /** items index for next put, offer, or add */
int putIndex;

    /** Number of elements in the queue */
int count;

在多線程的情況下會發生什么?

第二題回答完全錯誤,實際上 disruptor 是有非阻塞方法的,你用它的話性能幾乎沒有損失 (disruptor 最大特點是高性能,其 LMAX 架構可以獲得每秒6百萬訂單,用1微秒的延遲獲得吞吐量為 100K+)

你看看,明明可以用更好的 disruptor,卻要用損失了性能 1.8% 的次優隊列,很遺憾,這次面試就到此為止吧,不過鑒於你之前的表現不錯,也讓我轉身了,我不妨指點你一下,log4j2 也用了 disruptor,你想想 log4j2 是如此常見的組件,是各大公司的標配,如果日志生產的 ringbuffer 隊列滿了,按你的說法豈不是會影響業務線程?建議回去好好研讀下 log4j2 還有 disruptor 的源碼,半年后再見

少扯這些沒用的,好好回去修煉內功吧,慢走不送

小伙子,又見面了,上次臨走時問的兩個問題思考的如何了

第一個問題,按大佬的提示我查閱了一些資料,其實大佬想聽的是偽共享和緩存行填充吧

確實如此,那你給我解釋下什么是偽共享,又是如何用緩存行填充來解決偽共享的呢

在前文中我們已經知道了 cacheline 的原理,內存的最小讀取和寫入單元是 64 字節的 cacheline,而 ArrayBlockingQueue 的如下三個屬性是連接定義的,

int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;

表現在內存中就是連續分配的空間,這樣的話它們極有可能一起作為一個 cacheline 加載到 cache 中

假設線程 A 執行入隊操作,就會修改 putIndex,根據 CPU 的緩存一致性協議(如 MESI 協議),修改 putIndex 會導致其所在的 cacheline 都失效,此時假設線程 B 執行出出隊操作,需要讀取 takeIndex,但由於其所在的 cacheline 已經失效,所以 CPU-2 必須重新去內存中讀取 takeIndex 所在的 cacheline,而我們知道 CPU 中的三級緩存與內存效率相差幾十上百倍,這樣的話在多線程環境下由於 cacheline 頻繁失效毫無疑問會造成嚴重的性能問題,這就是我們所說的偽共享

恩,解釋得不錯,那怎么解決呢

解決方式就要按大佬所說的緩存行填充來解決,在 takeIndex, 前后添加 7 個 long 類型的屬性變量,就可以保證 takeIndex 和 putIndex 不在同一個 cacheline 上了,這樣針對 takeIndex 和 putIndex 的修改就不會互相影響對方了

3
3

不錯,能否舉例一下 disruptor 中對緩存行填充以避免偽共享的問題,在 disruptor 用到很多這樣的例子

我們先來看下 ringbuffer 的定義


abstract class RingBufferPad
{
   protected long p1, p2, p3, p4, p5, p6, p7;
}
 

abstract class RingBufferFields<Eextends RingBufferPad
{   
 private final long indexMask;
 private final Object[] entries;
 protected final int bufferSize;
 protected final Sequencer sequencer;
}

public final class RingBuffer<Eextends RingBufferFields<Eimplements CursoredEventSequencer<E>, EventSink<E>
{
   protected long p1, p2, p3, p4, p5, p6, p7;
}

可以看到在 RingBufferFields 前后分別填充了 7 個 long 類型的變量,這 14 個變量沒有任何實際用wt途,只是用來做緩存行填充之用。

可以看到填充之后無論怎么加載緩存行,緩存行都沒有要更新的數據,另外注意到 RingBufferFields 里面定義的變量都是 final 的,意味着第一次寫入之后就不會再被修改,也就意味着一旦加載入 CPU Cache 中,就不會被換出 Cache 了,也就是說這些值都一直會是 CPU Cache 的訪問速度,而不是內存的訪問速度

第一個問題過關了,接下來說說第二個問題吧, disruptor 的隊列滿了之后會阻塞業務線程嗎

disruptor 的 ringbuffer 提供了兩個方法,一個是 publishEvent,這個方法如果在 ringbuffer 滿了之后繼續往里添加事件是會阻塞的,而另一個 tryPublishEvent 方法則不會,隊列滿了之后會返回 false,業務線程還是可以繼續執行滴,log4j2 就是用了這個方法這樣根據其返回值為 false 即可判斷 ringbuffer 滿了,我們就可以做相當的處理了(比如丟棄 log 事件)

看得出來小伙子確實用心讀了源碼,過去半年表現進步明顯哦,恭喜你順利進入二面

總結

disruptor 是 LMAX 公司開源的一款高性能隊列,每秒支持 600w 的吞吐量,相當於用 1 微秒的延遲就獲取了 100K+ 的吞吐量,性能極高,簡單總結一下它主要用到了以下三個手段:

  1. 環形數組:ringBuffer,由於數組在內存空間中是連續分配的,而內存換入換出的最小單元是 64kb 的 cacheline,所以一次性會把數組的多個元素寫入 CPU Cache 中以提高效率,另外對數組的大小是有要求的,為了支持位運算取模以提高效率,必須將數組大小設置為2^n。
  2. 數組對象預填充,在 disruptor 初始化時為就為數組的每個坑位都初始化填充了事件對象,這些對象不會消失也就避免了頻繁 GC,之后生產者要新增事件對象,也會復用相應坑位的事件對象,只是修改其對象屬性而已,通過預分配對象的方式,避免了高並發下頻繁分配對象導致的 CPU 和內存飆升。
  3. 緩存行填充避免偽共享,這也是 diruptor 最大的亮點,在 disruptor 中你可以看到很多這樣的例子,利用緩存行填充可以保證不會造成 cacheline 失效從而造成頻繁從內存讀取導致的性能瓶頸
  4. 使用 CAS 自旋而不是 Reentrantlock 這樣重量級鎖來獲取生產者/消費者的 index,避免了鎖的開銷,提升並發能力。
  5. 使用 volatile 而不是鎖來修改同一個變量,在 disruptor 中生產者會競爭數組的 index 坑位(用的 Sequence類的 value 值),disruptor 使用了 volatile 而不是鎖來保證了變量的可見性

另外 disruptor 提供了阻塞(publishEvent) 和非阻塞(tryPublishEvent)兩種方法,針對我們文章開頭 cobar 的使用場景,建議使用 tryPublishEvent 這種非阻塞的方式來向 ringbuffer 投遞事件,不然一旦阻塞會導致線上 cobar 執行線程停頓的重大故障!

disruptor 到底有多強悍,可能看聽數據大家沒有感覺,那我們來看一張針對 log4j2 的壓測圖

log4j2 總共有三種工作機制,全局異步(Loggers all async),Appender 異步( Async Appender)以及同步(Sync),可以看到全局異步最快,幾乎是 Appender 異步的 10 倍以上,為什么同樣提異步,性能相差這么大,因為全局異步用的是 disruptor ,而 Appender 異步用的是 ArrayBlockingQueue,可以看到 disruptor 被稱為高性能隊列之王的名頭可不是蓋的。

另外除了 disruptor,大家也可以看看 skywaking 的隊列,它的性能雖然沒有 disruptor 這么強,但對一般的業務場景也是足夠的(在我司 cobar 壓測 5.5w/s,使用了 skywalking 后僅損失了 1.8%,也是很強悍的),它也是阻塞的,另外它在隊列滿的時候可以選擇「阻塞」,「覆蓋」,「忽略」三種策略,我們選擇了覆蓋。

更多精品文章,歡迎大家掃碼關注「碼海」


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM