Redis集群 - 圖解 - 秒懂(史上最全)


文章很長,而且持續更新,建議收藏起來,慢慢讀! 高並發 發燒友社群:瘋狂創客圈(總入口) 奉上以下珍貴的學習資源:


推薦:入大廠 、做架構、大力提升Java 內功 的 精彩博文

入大廠 、做架構、大力提升Java 內功 必備的精彩博文 2021 秋招漲薪1W + 必備的精彩博文
1:Redis 分布式鎖 (圖解-秒懂-史上最全) 2:Zookeeper 分布式鎖 (圖解-秒懂-史上最全)
3: Redis與MySQL雙寫一致性如何保證? (面試必備) 4: 面試必備:秒殺超賣 解決方案 (史上最全)
5:面試必備之:Reactor模式 6: 10分鍾看懂, Java NIO 底層原理
7:TCP/IP(圖解+秒懂+史上最全) 8:Feign原理 (圖解)

Java 面試題 30個專題 , 史上最全 , 面試必刷 阿里、京東、美團... 隨意挑、橫着走!!!
1: JVM面試題(史上最強、持續更新、吐血推薦) 2:Java基礎面試題(史上最全、持續更新、吐血推薦
3:架構設計面試題 (史上最全、持續更新、吐血推薦) 4:設計模式面試題 (史上最全、持續更新、吐血推薦)
17、分布式事務面試題 (史上最全、持續更新、吐血推薦) 一致性協議 (史上最全)
29、多線程面試題(史上最全) 30、HR面經,過五關斬六將后,小心陰溝翻船!
9.網絡協議面試題(史上最全、持續更新、吐血推薦) 更多專題, 請參見【 瘋狂創客圈 高並發 總目錄

SpringCloud 精彩博文
nacos 實戰(史上最全) sentinel (史上最全+入門教程)
SpringCloud gateway (史上最全) 更多專題, 請參見【 瘋狂創客圈 高並發 總目錄

Redis集群 - 圖解 - 秒懂(史上最全)

說明:

本文,以史上最為清晰的筆法,介紹清楚了Redis集群。

看完本文,涉及到Redis集群的架構類面試題目,按照本文的思路去回答,一定是120分。

Redis的架構模式分類

  • 單節點模式

  • 主從模式

  • 哨兵模式

  • 集群模式

單節點模式

img點擊並拖拽以移動

特點:簡單

問題:

1、內存容量有限 2、處理能力有限 3、無法高可用。

主從模式

img

點擊並拖拽以移動

Redis 的復制(replication)功能允許用戶根據一個 Redis 服務器來創建任意多個該服務器的復制品,其中被復制的服務器為主服務器(master),而通過復制創建出來的服務器復制品則為從服務器(slave)。 只要主從服務器之間的網絡連接正常,主從服務器兩者會具有相同的數據,主服務器就會一直將發生在自己身上的數據更新同步 給從服務器,從而一直保證主從服務器的數據相同。

特點:

1、master/slave 角色

2、master/slave 數據相同

3、降低 master 讀壓力, 讀取工作轉交從庫

哨兵模式

img

哨兵本身也有單點故障的問題,可以使用多個哨兵進行監控,哨兵不僅會監控redis集群,哨兵之間也會相互監控。每一個哨兵都是一個獨立的進程,作為進程,它會獨立運行。

img

Redis sentinel 是一個分布式系統中監控 redis 主從服務器,並在主服務器下線時自動進行故障轉移。

Redis sentinel 其中三個特性:

  • 監控(Monitoring):

Sentinel 會不斷地檢查你的主服務器和從服務器是否運作正常。

  • 提醒(Notification):

當被監控的某個 Redis 服務器出現問題時, Sentinel 可以通過 API 向管理員或者其他應用程序發送通知。

  • 自動故障遷移(Automatic failover):

當一個主服務器不能正常工作時, Sentinel 會開始一次自動故障遷移操作。

特點:

  • 1、保證高可用

  • 2、監控各個節點

  • 3、自動故障遷移

缺點:

主從模式,切換需要時間丟數據

沒有解決 master 寫的壓力

集群模式

集群模式方案主要包括以下幾個:

  • 客戶端分片
  • 代理分片
  • 服務端分片
  • 代理模式和服務端分片相結合的模式

代理分片包括:

  • Codis

  • Twemproxy

服務端分片包括:

  • Redis Cluster

它們還可以用是否中心化來划分,其中客戶端分片、Redis Cluster屬於無中心化的集群方案,Codis、Tweproxy屬於中心化的集群方案。

是否中心化是指客戶端訪問多個Redis節點時,是直接訪問還是通過一個中間層Proxy來進行操作,直接訪問的就屬於無中心化的方案,通過中間層Proxy訪問的就屬於中心化的方案,它們有各自的優劣,下面分別來介紹。

集群的必要性

所謂的集群,就是通過添加服務器的數量,提供相同的服務,從而讓服務器達到一個穩定、高效的狀態。

問題:我們已經部署好了redis,並且能啟動一個redis,實現數據的讀寫,為什么還要學習redis集群?

答:

(1)單個redis存在不穩定性。當redis服務宕機了,就沒有可用的服務了。

(2)單個redis的讀寫能力是有限的。

總結:redis集群是為了強化redis的讀寫能力。

如何學習redis集群

說明:

 (1)redis集群中,每一個redis稱之為一個節點。
 (2)redis集群中,有兩種類型的節點:主節點(master)、從節點(slave)。
  (3)redis集群,是基於redis主從復制實現。

所以,學習redis集群,就是從學習redis主從模式開始的。

而學習主從模式,需要從redis主從復制開始。

redis主從復制

主從復制,是指將一台Redis服務器的數據,復制到其他的Redis服務器。前者稱為主節點(master),后者稱為從節點(slave);數據的復制是單向的,只能由主節點到從節點。

默認情況下,每台Redis服務器都是主節點;且一個主節點可以有多個從節點(或沒有從節點),但一個從節點只能有一個主節點。

主從復制的使用場景

主從復制的作用

主從復制的作用主要包括:

  1. 數據冗余:主從復制實現了數據的熱備份,是持久化之外的一種數據冗余方式。
  2. 故障恢復:當主節點出現問題時,可以由從節點提供服務,實現快速的故障恢復;實際上是一種服務的冗余。
  3. 負載均衡:在主從復制的基礎上,配合讀寫分離,可以由主節點提供寫服務,由從節點提供讀服務(即寫Redis數據時應用連接主節點,讀Redis數據時應用連接從節點),分擔服務器負載;尤其是在寫少讀多的場景下,通過多個從節點分擔讀負載,可以大大提高Redis服務器的並發量。
  4. 高可用基石:除了上述作用以外,主從復制還是哨兵和集群能夠實施的基礎,因此說主從復制是Redis高可用的基礎。

主從(master-slave)架構涉及到主從復制

單機的 redis,能夠承載的 QPS 大概就在上萬到幾萬不等。對於緩存來說,一般都是用來支撐讀高並發的。

因此架構做成主從(master-slave)架構,一主多從,主負責寫,並且將數據復制到其它的 slave 節點,從節點負責讀。所有的讀請求全部走從節點。這樣也可以很輕松實現水平擴容,支撐讀高並發。

img點擊並拖拽以移動

為了更直觀的理解主從復制,在介紹其內部原理之前,先說明我們需要如何操作才能開啟主從復制。

建立復制

需要注意,主從復制的開啟,完全是在從節點發起的;不需要我們在主節點做任何事情。

從節點開啟主從復制,有3種方式:

(1)配置文件

在從服務器的配置文件中加入:slaveof

(2)啟動命令

redis-server啟動命令后加入 --slaveof

(3)客戶端命令

Redis服務器啟動后,直接通過客戶端執行命令:slaveof ,則該Redis實例成為從節點。

上述3種方式是等效的,下面以客戶端命令的方式為例,看一下當執行了slaveof后,Redis主節點和從節點的變化。

主從復制實例

准備工作:啟動兩個節點

方便起見,實驗所使用的主從節點是在一台機器上的不同Redis實例,其中主節點監聽6379端口,從節點監聽6380端口;從節點監聽的端口號可以在配置文件中修改:

img

啟動后可以看到:

img

兩個Redis節點啟動后(分別稱為6379節點和6380節點),默認都是主節點。

建立復制關系

此時在6380節點執行slaveof命令,使之變為從節點:

img

觀察效果

下面驗證一下,在主從復制建立后,主節點的數據會復制到從節點中。

(1)首先在從節點查詢一個不存在的key:

img

(2)然后在主節點中增加這個key:

img

(3)此時在從節點中再次查詢這個key,會發現主節點的操作已經同步至從節點:

img

(4)然后在主節點刪除這個key:

img

(5)此時在從節點中再次查詢這個key,會發現主節點的操作已經同步至從節點:

img

斷開復制

通過slaveof 命令建立主從復制關系以后,可以通過slaveof no one斷開。需要注意的是,從節點斷開復制后,不會刪除已有的數據,只是不再接受主節點新的數據變化。

從節點執行slaveof no one后,打印日志如下所示;可以看出斷開復制后,從節點又變回為主節點。

img

主節點打印日志如下:

img

核心原理: 主從復制的核心原理

上面一節中,介紹了如何操作可以建立主從關系;本小節將介紹主從復制的實現原理。

1 當啟動一個 slave node 的時候,它會發送一個 PSYNC 命令給 master node。

2 如果這是 slave node 初次連接到 master node,那么會觸發一次 full resynchronization 全量復制。

此時 master 會啟動一個后台線程,開始生成一份 RDB 快照文件,同時還會將從客戶端 client 新收到的所有寫命令緩存在內存中。RDB 文件生成完畢后, master 會將這個 RDB 發送給 slave,

slave接收到RDB , 會先寫入本地磁盤,然后再從本地磁盤加載到內存中,

3 接着 master 會將內存中緩存的寫命令發送到 slave,slave 也會同步這些數據。

如果slave node跟 master node 有網絡故障,斷開了連接,會自動重連,連接之后 master node 僅會復制給 slave 部分缺少的數據。

img點擊並拖拽以移動

主從復制過程大體可以分為3個階段:

  • 連接建立階段(即准備階段)
  • 數據同步階段
  • 命令傳播階段;

下面分別進行介紹。

連接建立階段

該階段的主要作用是在主從節點之間建立連接,為數據同步做好准備。

步驟1:保存主節點信息

從節點服務器內部維護了兩個字段,即masterhost和masterport字段,用於存儲主節點的ip和port信息。

需要注意的是,slaveof是異步命令,從節點完成主節點ip和port的保存后,向發送slaveof命令的客戶端直接返回OK,實際的復制操作在這之后才開始進行。

這個過程中,可以看到從節點打印日志如下:

img

步驟2:建立socket連接

從節點每秒1次調用復制定時函數replicationCron(),如果發現了有主節點可以連接,便會根據主節點的ip和port,創建socket連接。

如果連接成功,則:

  • 從節點:

為該socket建立一個專門處理復制工作的文件事件處理器,負責后續的復制工作,如接收RDB文件、接收命令傳播等。

  • 主節點:

接收到從節點的socket連接后(即accept之后),為該socket創建相應的客戶端狀態,並將從節點看做是連接到主節點的一個客戶端,后面的步驟會以從節點向主節點發送命令請求的形式來進行。

這個過程中,從節點打印日志如下:

img

步驟3:發送ping命令

從節點成為主節點的客戶端之后,發送ping命令進行首次請求,目的是:檢查socket連接是否可用,以及主節點當前是否能夠處理請求。

從節點發送ping命令后,可能出現3種情況:

(1)返回pong:說明socket連接正常,且主節點當前可以處理請求,復制過程繼續。

(2)超時:一定時間后從節點仍未收到主節點的回復,說明socket連接不可用,則從節點斷開socket連接,並重連。

(3)返回pong以外的結果:如果主節點返回其他結果,如正在處理超時運行的腳本,說明主節點當前無法處理命令,則從節點斷開socket連接,並重連。

在主節點返回pong情況下,從節點打印日志如下:

img

步驟4:身份驗證

如果從節點中設置了masterauth選項,則從節點需要向主節點進行身份驗證;沒有設置該選項,則不需要驗證。從節點進行身份驗證是通過向主節點發送auth命令進行的,auth命令的參數即為配置文件中的masterauth的值。

如果主節點設置密碼的狀態,與從節點masterauth的狀態一致(一致是指都存在,且密碼相同,或者都不存在),則身份驗證通過,復制過程繼續;如果不一致,則從節點斷開socket連接,並重連。

步驟5:發送從節點端口信息

身份驗證之后,從節點會向主節點發送其監聽的端口號(前述例子中為6380),主節點將該信息保存到該從節點對應的客戶端的slave_listening_port字段中;該端口信息除了在主節點中執行info Replication時顯示以外,沒有其他作用。

數據同步階段

主從節點之間的連接建立以后,便可以開始進行數據同步,該階段可以理解為從節點數據的初始化。

具體執行的方式是:從節點向主節點發送psync命令(Redis2.8以前是sync命令),開始同步。

數據同步階段是主從復制最核心的階段,根據主從節點當前狀態的不同,可以分為全量復制和部分復制。

在Redis2.8以前,從節點向主節點發送sync命令請求同步數據,此時的同步方式是全量復制;

在Redis2.8及以后,從節點可以發送psync命令請求同步數據,此時根據主從節點當前狀態的不同,同步方式可能是全量復制或部分復制。后文介紹以Redis2.8及以后版本為例。

  1. 全量復制:用於初次復制或其他無法進行部分復制的情況,將主節點中的所有數據都發送給從節點,是一個非常重型的操作。
  2. 部分復制:用於網絡中斷等情況后的復制,只將中斷期間主節點執行的寫命令發送給從節點,與全量復制相比更加高效。需要注意的是,如果網絡中斷時間過長,導致主節點沒有能夠完整地保存中斷期間執行的寫命令,則無法進行部分復制,仍使用全量復制。

全量復制

Redis通過psync命令進行全量復制的過程如下:

(1)從節點判斷無法進行部分復制,向主節點發送全量復制的請求;或從節點發送部分復制的請求,但主節點判斷無法進行部分復制;具體判斷過程需要在講述了部分復制原理后再介紹。

(2)主節點收到全量復制的命令后,執行bgsave,在后台生成RDB文件,並使用一個緩沖區(稱為復制緩沖區)記錄從現在開始執行的所有寫命令

(3)主節點的bgsave執行完成后,將RDB文件發送給從節點;從節點首先清除自己的舊數據,然后載入接收的RDB文件,將數據庫狀態更新至主節點執行bgsave時的數據庫狀態

(4)主節點將前述復制緩沖區中的所有寫命令發送給從節點,從節點執行這些寫命令,將數據庫狀態更新至主節點的最新狀態

(5)如果從節點開啟了AOF,則會觸發bgrewriteaof的執行,從而保證AOF文件更新至主節點的最新狀態

下面是執行全量復制時,主從節點打印的日志;可以看出日志內容與上述步驟是完全對應的。

主節點的打印日志如下:

img

從節點打印日志如下圖所示:

img

其中,有幾點需要注意:從節點接收了來自主節點的89260個字節的數據;從節點在載入主節點的數據之前要先將老數據清除;從節點在同步完數據后,調用了bgrewriteaof。

通過全量復制的過程可以看出,全量復制是非常重型的操作:

(1)主節點通過bgsave命令fork子進程進行RDB持久化,該過程是非常消耗CPU、內存(頁表復制)、硬盤IO的;

(2)主節點通過網絡將RDB文件發送給從節點,對主從節點的帶寬都會帶來很大的消耗

(3)從節點清空老數據、載入新RDB文件的過程是阻塞的,無法響應客戶端的命令;如果從節點執行bgrewriteaof,也會帶來額外的消耗

部分復制

由於全量復制在主節點數據量較大時效率太低,因此Redis2.8開始提供部分復制,用於處理網絡中斷時的數據同步。

部分復制的實現,依賴於三個重要的概念:

(1)復制偏移量

主節點和從節點分別維護一個復制偏移量(offset),代表的是主節點向從節點傳遞的字節數;主節點每次向從節點傳播N個字節數據時,主節點的offset增加N;從節點每次收到主節點傳來的N個字節數據時,從節點的offset增加N。

offset用於判斷主從節點的數據庫狀態是否一致:如果二者offset相同,則一致;如果offset不同,則不一致,此時可以根據兩個offset找出從節點缺少的那部分數據。例如,如果主節點的offset是1000,而從節點的offset是500,那么部分復制就需要將offset為501-1000的數據傳遞給從節點。而offset為501-1000的數據存儲的位置,就是下面要介紹的復制積壓緩沖區。

(2)復制積壓緩沖區

復制積壓緩沖區是由主節點維護的、固定長度的、先進先出(FIFO)隊列,默認大小1MB;當主節點開始有從節點時創建,其作用是備份主節點最近發送給從節點的數據。注意,無論主節點有一個還是多個從節點,都只需要一個復制積壓緩沖區。

在命令傳播階段,主節點除了將寫命令發送給從節點,還會發送一份給復制積壓緩沖區,作為寫命令的備份;除了存儲寫命令,復制積壓緩沖區中還存儲了其中的每個字節對應的復制偏移量(offset)。由於復制積壓緩沖區定長且是先進先出,所以它保存的是主節點最近執行的寫命令;時間較早的寫命令會被擠出緩沖區。

由於該緩沖區長度固定且有限,因此可以備份的寫命令也有限,當主從節點offset的差距過大超過緩沖區長度時,將無法執行部分復制,只能執行全量復制。反過來說,為了提高網絡中斷時部分復制執行的概率,可以根據需要增大復制積壓緩沖區的大小(通過配置repl-backlog-size);例如如果網絡中斷的平均時間是60s,而主節點平均每秒產生的寫命令(特定協議格式)所占的字節數為100KB,則復制積壓緩沖區的平均需求為6MB,保險起見,可以設置為12MB,來保證絕大多數斷線情況都可以使用部分復制。

從節點將offset發送給主節點后,主節點根據offset和緩沖區大小決定能否執行部分復制:

  • 如果offset偏移量之后的數據,仍然都在復制積壓緩沖區里,則執行部分復制;
  • 如果offset偏移量之后的數據已不在復制積壓緩沖區中(數據已被擠出),則執行全量復制。

(3)服務器運行ID(runid)

每個Redis節點(無論主從),在啟動時都會自動生成一個隨機ID(每次啟動都不一樣),由40個隨機的十六進制字符組成;runid用來唯一識別一個Redis節點。通過info Server命令,可以查看節點的runid:

img

主從節點初次復制時,主節點將自己的runid發送給從節點,從節點將這個runid保存起來;當斷線重連時,從節點會將這個runid發送給主節點;主節點根據runid判斷能否進行部分復制:

  • 如果從節點保存的runid與主節點現在的runid相同,說明主從節點之前同步過,主節點會繼續嘗試使用部分復制(到底能不能部分復制還要看offset和復制積壓緩沖區的情況);
  • 如果從節點保存的runid與主節點現在的runid不同,說明從節點在斷線前同步的Redis節點並不是當前的主節點,只能進行全量復制。

psync命令的執行

在了解了復制偏移量、復制積壓緩沖區、節點運行id之后,本節將介紹psync命令的參數和返回值,從而說明psync命令執行過程中,主從節點是如何確定使用全量復制還是部分復制的。

psync命令的執行過程可以參見下圖(圖片來源:《Redis設計與實現》):

img點擊並拖拽以移動

(1)首先,從節點根據當前狀態,決定如何調用psync命令:

  • 如果從節點之前未執行過slaveof或最近執行了slaveof no one,則從節點發送命令為psync ? -1,向主節點請求全量復制;
  • 如果從節點之前執行了slaveof,則發送命令為psync ,其中runid為上次復制的主節點的runid,offset為上次復制截止時從節點保存的復制偏移量。

(2)主節點根據收到的psync命令,及當前服務器狀態,決定執行全量復制還是部分復制:

  • 如果主節點版本低於Redis2.8,則返回-ERR回復,此時從節點重新發送sync命令執行全量復制;
  • 如果主節點版本夠新,且runid與從節點發送的runid相同,且從節點發送的offset之后的數據在復制積壓緩沖區中都存在,則回復+CONTINUE,表示將進行部分復制,從節點等待主節點發送其缺少的數據即可;
  • 如果主節點版本夠新,但是runid與從節點發送的runid不同,或從節點發送的offset之后的數據已不在復制積壓緩沖區中(在隊列中被擠出了),則回復+FULLRESYNC ,表示要進行全量復制,其中runid表示主節點當前的runid,offset表示主節點當前的offset,從節點保存這兩個值,以備使用。

命令傳播階段

數據同步階段完成后,主從節點進入命令傳播階段;在這個階段主節點將自己執行的寫命令發送給從節點,從節點接收命令並執行,從而保證主從節點數據的一致性。

在命令傳播階段,除了發送寫命令,主從節點還維持着心跳機制:PING和REPLCONF ACK。

redis主從模式

redis主從模式架構

在軟件架構中,master-slave(主從模式)是使用比較多的一種架構方式;

img點擊並拖拽以移動

主(master)和 從(slave)部署在不同的服務器上,當主節點服務器寫入數據時會同步到從節點的服務器上,一般主節點負責寫入數據,從節點負責讀取數據

所以,適用於讀寫分離的高並發場景:客戶端可以通過主寫入,通過從讀取。

img

主從集群的搭建

單機配置一主多從

  1. 主服務器通過默認的redis.conf啟動redis-server
  2. 復制主服務器的redis.conf為兩個新的文件redis_slave1.conf和redis_slave2.conf
  3. 分別添加如下配置
# 端口
# 或者port 6381
port 6382
# AOF和快照文件文件夾
# dir /usr/local/var/db/redis_slave1/
dir /usr/local/var/db/redis_slave2/
# 從節點要跟隨的主節點
slaveof 127.0.0.1 6379
# 如果設置了密碼,就要設置
masterauth master-password
  1. 從服務器分別通過redis_slave1.conf和redis_slave2.conf啟動

集群的運行結果(部分)

# 主節點
32314:M 13 Feb 2019 18:19:34.807 * Replica 127.0.0.1:6381 asks for synchronization
32314:M 13 Feb 2019 18:19:34.807 * Full resync requested by replica 127.0.0.1:6381
32314:M 13 Feb 2019 18:19:34.807 * Starting BGSAVE for SYNC with target: disk
32314:M 13 Feb 2019 18:19:34.807 * Background saving started by pid 33175
33175:C 13 Feb 2019 18:19:34.808 * DB saved on disk
32314:M 13 Feb 2019 18:19:34.838 * Background saving terminated with success
32314:M 13 Feb 2019 18:19:34.839 * Synchronization with replica 127.0.0.1:6381 succeeded

32314:M 13 Feb 2019 18:22:01.275 * Replica 127.0.0.1:6382 asks for synchronization
32314:M 13 Feb 2019 18:22:01.275 * Full resync requested by replica 127.0.0.1:6382
32314:M 13 Feb 2019 18:22:01.275 * Starting BGSAVE for SYNC with target: disk
32314:M 13 Feb 2019 18:22:01.276 * Background saving started by pid 33436
33436:C 13 Feb 2019 18:22:01.277 * DB saved on disk
32314:M 13 Feb 2019 18:22:01.359 * Background saving terminated with success
32314:M 13 Feb 2019 18:22:01.360 * Synchronization with replica 127.0.0.1:6382 succeeded
# 從節點
33174:S 13 Feb 2019 18:19:34.806 * MASTER <-> REPLICA sync started
33174:S 13 Feb 2019 18:19:34.806 * Non blocking connect for SYNC fired the event.
33174:S 13 Feb 2019 18:19:34.807 * Master replied to PING, replication can continue...
33174:S 13 Feb 2019 18:19:34.807 * Partial resynchronization not possible (no cached master)
33174:S 13 Feb 2019 18:19:34.807 * Full resync from master: deb0cb0abde947bba19c5224a3664e27c90a6b65:0
33174:S 13 Feb 2019 18:19:34.839 * MASTER <-> REPLICA sync: receiving 175 bytes from master
33174:S 13 Feb 2019 18:19:34.839 * MASTER <-> REPLICA sync: Flushing old data
33174:S 13 Feb 2019 18:19:34.839 * MASTER <-> REPLICA sync: Loading DB in memory
33174:S 13 Feb 2019 18:19:34.839 * MASTER <-> REPLICA sync: Finished with success
33174:S 13 Feb 2019 18:19:34.839 * Background append only file rewriting started by pid 33176
33174:S 13 Feb 2019 18:19:34.863 * AOF rewrite child asks to stop sending diffs.
33176:C 13 Feb 2019 18:19:34.863 * Parent agreed to stop sending diffs. Finalizing AOF...
33176:C 13 Feb 2019 18:19:34.863 * Concatenating 0.00 MB of AOF diff received from parent.
33176:C 13 Feb 2019 18:19:34.863 * SYNC append only file rewrite performed
33174:S 13 Feb 2019 18:19:34.909 * Background AOF rewrite terminated with success
33174:S 13 Feb 2019 18:19:34.910 * Residual parent diff successfully flushed to the rewritten AOF (0.00 MB)
33174:S 13 Feb 2019 18:19:34.910 * Background AOF rewrite finished successfully

不同機器上配置一主多從

如果redis-server在不同的機器上,只需要以下兩個配置即可

# 從節點要跟隨的主節點
slaveof 127.0.0.1 6379
# 如果設置了密碼,就要設置
masterauth master-password

通過命令設置從服務器

  1. 可以通過向運行中的從服務器發送SLAVEOF命令來將其設置為從服務器。
  2. 如果用戶使用的是 SLAVEOF配置選項,那么Redis在啟動時首先會載入當前可用的任何快照文件或者AOF文件,然后連接主服務器並執行上述的復制過程。如果用戶使用的是SLAVEOF命令,那么Redis會立即嘗試連接主服務器,並在連接成功之后,開始上述復制過程。

優點

讀寫分離,提高效率

數據熱備份,提供多個副本

問題:

  • master無法保證高可用

主節點故障,集群則無法進行工作,可用性比較低,從節點升主節點需要人工手動干預

  • 沒有解決 master 寫的壓力

單點容易造成性能低下,主節點的寫受到限制(只有一個主節點)

  • 主節點的存儲能力受到限制

只有一個主節點

  • 全量同步可能會造成毫秒或者秒級的卡頓現象

redission訪問redis集群

redission作為redis 官方推薦的java客戶端。 redission使用netty4.x作為網絡層。 redission使用異步io方式操作

redission的讀寫操作源碼分析

從一個讀、寫操作的代碼作為分析代碼,如下:

//創建redis客戶端
Redisson redisson =(Redisson) Redisson.create();
//創建RBucket對象
RBucket<String> bucket = redisson.getBucket("key1");
//設置對象
bucket.set("someValue");
//獲取結果
String bucketObject = bucket.get();
System.out.println("bucketObject:"+bucketObject); 

下面詳細介紹Redission提供的RBucket 接口和RedissonBucket類。

RBucket 接口

/**
 * Any object holder
 * @author Nikita Koksharov
 */
public interface RBucket<V> extends RExpirable, RBucketAsync<V>
{
    V get();
    void set(V value);
    void set(V value, long timeToLive, TimeUnit timeUnit);
}

RBucket提供set()和sget()方法用於保存和獲取對象。

RedissonBucket 實現類

RedissonBucket 是對RBucket對象的實現。

RedissonBucket實現set()和get()同步方法和異步方法。

public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
    protected RedissonBucket(CommandAsyncExecutor connectionManager, String name) {
        super(connectionManager, name);
    }
    
    protected RedissonBucket(Codec codec, CommandAsyncExecutor connectionManager, String name) 
    {
        super(codec, connectionManager, name);
    }
    
    @Override
    public V get() {
        return get(getAsync());
    }
    
    protected final <V> V get(RFuture<V> future) {
        return commandExecutor.get(future);
    }
    
    @Override
    public Future<V> getAsync() {
        return commandExecutor.readAsync(getName(), codec, RedisCommands.GET, getName());
    }
    
    @Override
    public void set(V value) {
        get(setAsync(value));
    }
    @Override
    public Future<Void> setAsync(V value) {
        return commandExecutor.writeAsync(getName(), codec, RedisCommands.SET, getName(), value);
    }
    
    @Override
    public void set(V value, long timeToLive, TimeUnit timeUnit) {
        get(setAsync(value, timeToLive, timeUnit));
    }
    @Override
    public Future<Void> setAsync(V value, long timeToLive, TimeUnit timeUnit) {
        return commandExecutor.writeAsync(getName(), codec, RedisCommands.SETEX, getName(), timeUnit.toSeconds(timeToLive), value);
    }

}


RedissonBucket 對象又繼承了基類RedissonExpirable,RedissonExpirable繼承了基類RedissionObject對象。


abstract class RedissonExpirable extends RedissonObject implements RExpirable {

    RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
        super(connectionManager, name);
    }

    RedissonExpirable(Codec codec, CommandAsyncExecutor connectionManager, String name) {
        super(codec, connectionManager, name);
    }

    @Override
    public boolean expire(long timeToLive, TimeUnit timeUnit) {
        return commandExecutor.get(expireAsync(timeToLive, timeUnit));
    }

...
}
//RedissonExpirable繼承了基類RedissionObject對象。
abstract class RedissonObject implements RObject 
{
    final CommandAsyncExecutor commandExecutor;  //連接池
    private final String name;//作為鍵
    final Codec codec;  //編碼解碼器

}

在RedissionObject 包含name 作為鍵,codec 作為編碼解碼器,commandExecutor 用來執行操作的連接池。

Redisstion的類型

對於Redisson的任何操作,都需要獲取到操作句柄類RedissonObject,RedissonObject根據不同的數據類型有不同的RedissonObject實現類,RedissonObject的類繼承關系圖如下:
img
例如想設置redis服務端的key=key的值value=123,你需要查詢Redis命令和Redisson對象匹配列表,找到如下對應關系:
img
然后我們就知道調用代碼這么寫:

Config config = new Config();// 創建配置  
  config.useMasterSlaveServers() // 指定使用主從部署方式  
  .setMasterAddress("redis://192.168.29.24:6379")  // 設置redis主節點  
  .addSlaveAddress("redis://192.168.29.24:7000") // 設置redis從節點  
  .addSlaveAddress("redis://192.168.29.24:7001"); // 設置redis從節點  
RedissonClient redisson = Redisson.create(config);// 創建客戶端(發現這一操作非常耗時,基本在2秒-4秒左右) 

//任何Redisson操作首先需要獲取對應的操作句柄
//RBucket是操作句柄之一,實現類是RedissonBucket
RBucket<String> rBucket = redissonClient.getBucket("key");

//通過操作句柄rBucket進行讀操作
rBucket.get();

//通過操作句柄rBucket進行寫操作
rBucket.set("123");

​ 至於其它的redis命令對應的redisson操作對象,都可以官網的Redis命令和Redisson對象匹配列表 查到。

解密:redission的set()方法流程

RedissionBucket執行set()方法

當RedissionBucket執行set()方法時,這將是Redission整個流程的核心。

  • RedissonBucket 執行set()方法,並通過get(RFuture future)方法獲取RFuture異步任務的返回值
public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> 
{
    @Override
    public void set(V value) {
        get(setAsync(value));
    }
}
  • setAsync(value)異步寫入的方法

真正執行set操作,調用commandExecutor 來完成,參數為主要封裝redis的鍵值,命令以及所需要的參數

public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> 
{
    @Override
    public Future<Void> setAsync(V value) {
        return commandExecutor.writeAsync(getName(), codec, RedisCommands.SET, getName(), value);
    }
}
  • CommandAsyncService 線程池的 writeAsync 異步寫入

CommandAsyncService 首先通過netty框架創建一個promise接口,並且返回。

public class CommandAsyncService implements CommandAsyncExecutor 
{
    public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) 
	{
	    //通過netty創建一個Promise接口並且返回
        Promise<R> mainPromise = connectionManager.newPromise();
        NodeSource source = getNodeSource(key);
        async(false, source, codec, command, params, mainPromise, 0);
        return mainPromise;
    }
}

在此,RBucket 執行一個set操作,由netty創建一個promise對象,並且RBucket通過get()同步方法獲取promise完成時的結果。

前面的異步轉同步的方法:

protected final <V> V get(RFuture<V> future) {
    return commandExecutor.get(future);
}
  • CommandAsyncService 線程池的同步get方法
public class CommandAsyncService implements CommandAsyncExecutor 
{
    @Override
    public <V> V get(RFuture<V> future) {
        try {
            future.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        if (future.isSuccess()) {
            return future.getNow();
        }

        throw convertException(future);
    }
}    

在Redission引擎中,核心流程處理在於

CommandAsyncService.async() 創建異步任務

這個關鍵方法。CommandAsyncService.async()方法內部,一直在優化和改變。不過代碼,一直朝着更加清晰的地方發展,這更加有利於我們的學習。

protected <V, R> void async(final boolean readOnlyMode,
				final NodeSource source, 
				final Codec codec,
				final RedisCommand<V>  command,
				final Object[] params,
				final Promise<R> mainPromise,  
				final int attempt) 
	{
	     RedisExecutor<V, R> executor =
             new RedisExecutor<>(readOnlyMode, source,
                                 codec, command, params, mainPromise, 
                                 ignoreRedirect, connectionManager, objectBuilder);
        executor.execute();
	}

RedisExecutor 命令執行器

public RedisExecutor(boolean readOnlyMode, NodeSource source, 
		Codec codec, 		RedisCommand<V> command,
		Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect, 
		ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) {
			super();
			this.readOnlyMode = readOnlyMode;
			this.source = source;
			this.codec = codec;
			this.command = command;
			this.params = params;
			this.mainPromise = mainPromise;
			this.ignoreRedirect = ignoreRedirect;
			this.connectionManager = connectionManager;
			this.objectBuilder = objectBuilder;
			
			this.attempts = connectionManager.getConfig().getRetryAttempts();
			this.retryInterval = connectionManager.getConfig().getRetryInterval();
			this.responseTimeout = connectionManager.getConfig().getTimeout();
}

命令執行方法

public void execute() {
    if (mainPromise.isCancelled()) {
      free();
      return;
    }

    if (!connectionManager.getShutdownLatch().acquire()) {
      free();
      mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown"));
      return;
    }

    codec = getCodec(codec);

    //獲取連接
    RFuture<RedisConnection> connectionFuture = getConnection();

    RPromise<R> attemptPromise = new RedissonPromise<R>();
    mainPromiseListener = (r, e) -> {
        if (mainPromise.isCancelled() && connectionFuture.cancel(false)) {
            log.debug("Connection obtaining canceled for {}", command);
            timeout.cancel();
            if (attemptPromise.cancel(false)) {
                free();
            }
         }
        };

        if (attempt == 0) {
           mainPromise.onComplete((r, e) -> {
            if (this.mainPromiseListener != null) {
                    this.mainPromiseListener.accept(r, e);
            }
          });
    }

    scheduleRetryTimeout(connectionFuture, attemptPromise);

    connectionFuture.onComplete((connection, e) -> {
          if (connectionFuture.isCancelled()) {
            connectionManager.getShutdownLatch().release();
            return;
          }

         if (!connectionFuture.isSuccess()) {
                connectionManager.getShutdownLatch().release();
                exception = convertException(connectionFuture);
                return;
         }

        if (attemptPromise.isDone() || mainPromise.isDone()) {
            releaseConnection(attemptPromise, connectionFuture);
            return;
        }

        sendCommand(attemptPromise, connection);

        writeFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    checkWriteFuture(writeFuture, attemptPromise, connection);
                }
        });

       releaseConnection(attemptPromise, connectionFuture);
    });

     attemptPromise.onComplete((r, e) -> {
          checkAttemptPromise(attemptPromise, connectionFuture);
     });
}


在RedisExecutor.execute()方法內部,獲取一個連接

 //獲取連接
 RFuture<RedisConnection> connectionFuture = getConnection();

獲取一個連接 后,這里也使用了異步方式, 對connectionFuture 進行監聽。

當connectFuture完成時,然后進行寫操作。然后sendCommand 發送命令。

  connectionFuture.onComplete((connection, e) -> {
          if (connectionFuture.isCancelled()) {
            connectionManager.getShutdownLatch().release();
            return;
          }

         if (!connectionFuture.isSuccess()) {
                connectionManager.getShutdownLatch().release();
                exception = convertException(connectionFuture);
                return;
         }

        if (attemptPromise.isDone() || mainPromise.isDone()) {
            releaseConnection(attemptPromise, connectionFuture);
            return;
        }

        sendCommand(attemptPromise, connection);

        writeFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    checkWriteFuture(writeFuture, attemptPromise, connection);
                }
        });

       releaseConnection(attemptPromise, connectionFuture);
    });

當Redission接受到Redis服務器消息時,會取出發送時的信息。這個時候會設置attempPromise完成時通知,在attempPromise監聽器中會通知MainPromise.這個時候MainPromise就會獲取到通知。

protected void completeResponse(CommandData<Object, Object> data, Object result, Channel channel) {
    if (data != null 
        && !data.getPromise().trySuccess(result) 
       		 && data.cause() instanceof RedisTimeoutException) {
        log.warn("response has been skipped due to timeout! channel: {}, command: {}", channel, LogHelper.toString(data));
    }
}

Redssion的處理流水線

 @Override
    protected void initChannel(Channel ch) throws Exception {
        initSsl(config, ch);
        
        if (type == Type.PLAIN) {
            ch.pipeline().addLast(new RedisConnectionHandler(redisClient));
        } else {
            ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));
        }
        
        ch.pipeline().addLast(
            connectionWatchdog,
            CommandEncoder.INSTANCE,
            CommandBatchEncoder.INSTANCE,
            new CommandsQueue());
        
        if (pingConnectionHandler != null) {
            ch.pipeline().addLast(pingConnectionHandler);
        }
        
        if (type == Type.PLAIN) {
            ch.pipeline().addLast(new CommandDecoder(config.getExecutor(), config.isDecodeInExecutor()));
        } else {
            ch.pipeline().addLast(new CommandPubSubDecoder(config.getExecutor(), config.isKeepPubSubOrder(), config.isDecodeInExecutor()));
        }
    }
    

redission命令發送處理器


public class CommandsQueue extends ChannelDuplexHandler {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof QueueCommand) {
            QueueCommand data = (QueueCommand) msg;
            QueueCommandHolder holder = queue.peek();
            if (holder != null && holder.getCommand() == data) {
                super.write(ctx, msg, promise);
            } else {
                queue.add(new QueueCommandHolder(data, promise));
                sendData(ctx.channel());
            }
        } else {
            super.write(ctx, msg, promise);
        }
    }
    

private void sendData(Channel ch) {
        QueueCommandHolder command = queue.peek();
        if (command != null && command.trySend()) {
            QueueCommand data = command.getCommand();
            List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();
            if (!pubSubOps.isEmpty()) {
                for (CommandData<Object, Object> cd : pubSubOps) {
                    for (Object channel : cd.getParams()) {
                        ch.pipeline().get(CommandPubSubDecoder.class).addPubSubCommand((ChannelName) channel, cd);
                    }
                }
            } else {
                ch.attr(CURRENT_COMMAND).set(data);
            }

            command.getChannelPromise().addListener(listener);
            ch.writeAndFlush(data, command.getChannelPromise());
        }
    }

}    

NioSocketChannel 執行最終的寫操作

 public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
 
 @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        SocketChannel ch = javaChannel();
        int writeSpinCount = config().getWriteSpinCount();
        do {
            if (in.isEmpty()) {
                // All written so clear OP_WRITE
                clearOpWrite();
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }

            // Ensure the pending writes are made of ByteBufs only.
            int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
            ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
            int nioBufferCnt = in.nioBufferCount();

            // Always us nioBuffers() to workaround data-corruption.
            // See https://github.com/netty/netty/issues/2761
            switch (nioBufferCnt) {
                case 0:
                    // We have something else beside ByteBuffers to write so fallback to normal writes.
                    writeSpinCount -= doWrite0(in);
                    break;
                case 1: {
                    // Only one ByteBuf so use non-gathering write
                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                    // to check if the total size of all the buffers is non-zero.
                    ByteBuffer buffer = nioBuffers[0];
                    int attemptedBytes = buffer.remaining();
                    final int localWrittenBytes = ch.write(buffer);
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                    adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
                default: {
                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                    // to check if the total size of all the buffers is non-zero.
                    // We limit the max amount to int above so cast is safe
                    long attemptedBytes = in.nioBufferSize();
                    final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                    // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
                    adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                            maxBytesPerGatheringWrite);
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
            }
        } while (writeSpinCount > 0);

        incompleteWrite(writeSpinCount < 0);
    }

ChannelOutboundBuffer回調Promise的接口

ChannelOutboundBuffer寫完之后,回調Promise的接口。

Future 有兩種模式:將來式和回調式。而回調式會出現回調地獄的問題,由此衍生出了 Promise 模式來解決這個問題。這才是 Future 模式和 Promise 模式的相關性。

 
 public final class ChannelOutboundBuffer {
 /**
     * Removes the fully written entries and update the reader index of the partially written entry.
     * This operation assumes all messages in this buffer is {@link ByteBuf}.
     */
    public void removeBytes(long writtenBytes) {
        for (;;) {
            Object msg = current();
            if (!(msg instanceof ByteBuf)) {
                assert writtenBytes == 0;
                break;
            }

            final ByteBuf buf = (ByteBuf) msg;
            final int readerIndex = buf.readerIndex();
            final int readableBytes = buf.writerIndex() - readerIndex;

            if (readableBytes <= writtenBytes) {
                if (writtenBytes != 0) {
                    progress(readableBytes);
                    writtenBytes -= readableBytes;
                }
                remove();
            } else { // readableBytes > writtenBytes
                if (writtenBytes != 0) {
                    buf.readerIndex(readerIndex + (int) writtenBytes);
                    progress(writtenBytes);
                }
                break;
            }
        }
        clearNioBuffers();
    }
    
       /**
     * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
     * flushed message exists at the time this method is called it will return {@code false} to signal that no more
     * messages are ready to be handled.
     */
    public boolean remove() {
        Entry e = flushedEntry;
        if (e == null) {
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;

        ChannelPromise promise = e.promise;
        int size = e.pendingSize;

        removeEntry(e);

        if (!e.cancelled) {
            // only release message, notify and decrement if it was not canceled before.
            ReferenceCountUtil.safeRelease(msg);
            safeSuccess(promise);
            decrementPendingOutboundBytes(size, false, true);
        }

        // recycle the entry
        e.recycle();

        return true;
    }

回到寫完后的異步回調接口

RedisExecutor 發送命令sendCommand

public class RedisExecutor<V, R> {

    protected void sendCommand(RPromise<R> attemptPromise, RedisConnection connection) {
        if (source.getRedirect() == Redirect.ASK) {
            List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
            RPromise<Void> promise = new RedissonPromise<Void>();
            list.add(new CommandData<Void, Void>(promise, codec, RedisCommands.ASKING, new Object[]{}));
            list.add(new CommandData<V, R>(attemptPromise, codec, command, params));
            RPromise<Void> main = new RedissonPromise<Void>();
            writeFuture = connection.send(new CommandsData(main, list, false));
        } else {
            if (log.isDebugEnabled()) {
                log.debug("acquired connection for command {} and params {} from slot {} using node {}... {}",
                        command, LogHelper.toString(params), source, connection.getRedisClient().getAddr(), connection);
            }
            writeFuture = connection.send(new CommandData<V, R>(attemptPromise, codec, command, params));
        }
    }
    

來自於 AbstractChannelHandlerContext 的異步回調:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {


@Override
public ChannelFuture writeAndFlush(Object msg) {
				return writeAndFlush(msg, newPromise());
}

}

RedisExecutor啟動超時重試定時器

public class RedisExecutor<V, R> {
        public void execute() {
              writeFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    checkWriteFuture(writeFuture, attemptPromise, connection);
                }
            });
            
        }
    
    private void checkWriteFuture(ChannelFuture future, RPromise<R> attemptPromise, RedisConnection connection) {
        if (future.isCancelled() || attemptPromise.isDone()) {
            return;
        }

        if (!future.isSuccess()) {
            exception = new WriteRedisConnectionException(
                    "Unable to send command! Node source: " + source + ", connection: " + connection + 
                    ", command: " + LogHelper.toString(command, params)
                    + " after " + attempt + " retry attempts", future.cause());
            if (attempt == attempts) {
                if (!attemptPromise.tryFailure(exception)) {
                    log.error(exception.getMessage());
                }
            }
            return;
        }

        timeout.cancel();

        scheduleResponseTimeout(attemptPromise, connection);
    }
    
    
     private void scheduleResponseTimeout(RPromise<R> attemptPromise, RedisConnection connection) {
        long timeoutTime = responseTimeout;
        if (command != null 
                && (RedisCommands.BLOCKING_COMMAND_NAMES.contains(command.getName())
                        || RedisCommands.BLOCKING_COMMANDS.contains(command))) {
            Long popTimeout = null;
            if (RedisCommands.BLOCKING_COMMANDS.contains(command)) {
                boolean found = false;
                for (Object param : params) {
                    if (found) {
                        popTimeout = Long.valueOf(param.toString()) / 1000;
                        break;
                    }
                    if ("BLOCK".equals(param)) {
                        found = true; 
                    }
                }
            } else {
                popTimeout = Long.valueOf(params[params.length - 1].toString());
            }
            
            handleBlockingOperations(attemptPromise, connection, popTimeout);
            if (popTimeout == 0) {
                return;
            }
            timeoutTime += popTimeout * 1000;
            // add 1 second due to issue https://github.com/antirez/redis/issues/874
            timeoutTime += 1000;
        }

        long timeoutAmount = timeoutTime;
        TimerTask timeoutTask = new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                if (attempt < attempts) {
                    if (!attemptPromise.cancel(false)) {
                        return;
                    }

                    attempt++;
                    if (log.isDebugEnabled()) {
                        log.debug("attempt {} for command {} and params {}",
                                attempt, command, LogHelper.toString(params));
                    }
                    
                    mainPromiseListener = null;

                    execute();
                    return;
                }
                
                attemptPromise.tryFailure(
                        new RedisResponseTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured"
                                + " after " + attempts + " retry attempts. Command: " 
                                + LogHelper.toString(command, params) + ", channel: " + connection.getChannel()));
            }
        };

        timeout = connectionManager.newTimeout(timeoutTask, timeoutTime, TimeUnit.MILLISECONDS);
    }
        

CommandDecoder 命令解碼器

public class CommandDecoder extends ReplayingDecoder<State> {

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get();

        if (state() == null) {
            state(new State());
        }
        
        if (data == null) {
            while (in.writerIndex() > in.readerIndex()) {
                int endIndex = skipCommand(in);

                try {
                    decode(ctx, in, data);
                } catch (Exception e) {
                    in.readerIndex(endIndex);
                    throw e;
                }
            }
        } else {
            int endIndex = 0;
            if (!(data instanceof CommandsData)) {
                endIndex = skipCommand(in);
            }
            
            try {
                decode(ctx, in, data);
            } catch (Exception e) {
                if (!(data instanceof CommandsData)) {
                    in.readerIndex(endIndex);
                }
                throw e;
            }
        }
    }
    
    
    private void decode(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data) throws Exception {


         decodeCommand(ctx.channel(), in, data);
 
    }
    
    protected void decodeCommand(Channel channel, ByteBuf in, QueueCommand data) throws Exception {
     if (data instanceof CommandData) {
        CommandData<Object, Object> cmd = (CommandData<Object, Object>) data;
        try {
           decode(in, cmd, null, channel, false, null);
           sendNext(channel, data);
        } catch (Exception e) {
            cmd.tryFailure(e);
           sendNext(channel);
            throw e;
        }
    }
  }
  
      protected void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, boolean skipConvertor, List<CommandData<?, ?>> commandsData) throws IOException {
        int code = in.readByte();
        if (code == '+') {
            String result = readString(in);

            handleResult(data, parts, result, skipConvertor, channel);
        } else if (code == '-') {
           String error = readString(in);
        } else if (code == ':') {
            Long result = readLong(in);
            handleResult(data, parts, result, false, channel);
        } else if (code == '$') {
         } else {
            String dataStr = in.toString(0, in.writerIndex(), CharsetUtil.UTF_8);
            throw new IllegalStateException("Can't decode replay: " + dataStr);
        }
    }

    private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean skipConvertor, Channel channel) {
        if (data != null && !skipConvertor) {
            result = data.getCommand().getConvertor().convert(result);
        }
        if (parts != null) {
            parts.add(result);
        } else {
            completeResponse(data, result, channel);
        }
    }
    
    
  protected void completeResponse(CommandData<Object, Object> data, Object result, Channel channel) {
        if (data != null 
        		&& !data.getPromise().trySuccess(result) 
        			&& data.cause() instanceof RedisTimeoutException) {
            log.warn("response has been skipped due to timeout! channel: {}, command: {}", channel, LogHelper.toString(data));
        }
 }

回到最初的mainPromise 回調

public class CommandAsyncService implements CommandAsyncExecutor 
{
    public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) 
	{
	    //通過netty創建一個Promise接口並且返回
        Promise<R> mainPromise = connectionManager.newPromise();
        NodeSource source = getNodeSource(key);
        async(false, source, codec, command, params, mainPromise, 0);
        return mainPromise;
    }
}
public class RedissonPromise<T> extends CompletableFuture<T> implements RPromise<T> {

    private final Promise<T> promise = ImmediateEventExecutor.INSTANCE.newPromise();

    @Override
    public boolean trySuccess(T result) {
        if (promise.trySuccess(result)) {
            complete(result);
            return true;
        }
        return false;
    }
}

取消重試定時器

        mainPromiseListener = (r, e) -> {
            if (mainPromise.isCancelled() && connectionFuture.cancel(false)) {
                log.debug("Connection obtaining canceled for {}", command);
                timeout.cancel();
                if (attemptPromise.cancel(false)) {
                    free();
                }
            }
        };

同步,異步,阻塞,非阻塞實戰

同步,異步,阻塞,非阻塞的理解需要花費很大的精力,從 IO 模型和內核進行深入地理解,才能分清區別。在日常開發中往往沒必要過於糾結到底是何種調用,但得對調用的特性有所了解,比如是否占用主線程的時間片,出現異常怎么捕獲,超時怎么解決等等

Future 和 Promise的區別

Future 的結果由異步計算的結果確定。請注意,必須使用Callable或Runnable初始化FutureTask(經典的Future),沒有無參數構造函數,並且Future和FutureTask都是從外部只讀的(FutureTask的set方法受保護)。該值將從內部設置為計算結果。

img點擊並拖拽以移動

Promise和 Future 是非常相似的概念,不同之處在於,Future 是針對尚不存在的結果的只讀容器,而 Promise 可以被寫入(通常只能寫入一次)。

Promise 的結果可以由你(或實際上任何人)隨時設置,因為它具有公共設置方法。你向客戶代碼發送一個 Promise,並在以后根據需要執行。

img點擊並拖拽以移動

在Java 8中,Promise最終被稱為CompletableFuture,它的javadoc解釋了:可以明確完成(設置其值和狀態)並可以用作CompletionStage的Future,它支持在完成時觸發的相關功能和操作。

在Java 8中,可以創建CompletableFuture和SettableFuture,而無需執行任何任務,並且可以隨時設置它們的值。注意,CompletableFuture不是“純粹的”Promise,可以使用諸如FutureTask之類的任務對其進行初始化,並且它最有用的功能是與處理鏈的步驟無關。

連接的獲取

RedissonClient一主兩從部署時連接池組成

​ 對如下圖的主從部署(1主2從)
img redisson純java操作代碼如下

Config config = new Config();// 創建配置
  config.useMasterSlaveServers() // 指定使用主從部署方式
  //.setReadMode(ReadMode.SLAVE) 默認值SLAVE,讀操作只在從節點進行
  //.setSubscriptionMode(SubscriptionMode.SLAVE) 默認值SLAVE,訂閱操作只在從節點進行
  //.setMasterConnectionMinimumIdleSize(10) 默認值10,針對每個master節點初始化10個連接
  //.setMasterConnectionPoolSize(64) 默認值64,針對每個master節點初始化10個連接,最大可以擴展至64個連接
  //.setSlaveConnectionMinimumIdleSize(10) 默認值10,針對每個slave節點初始化10個連接
  //.setSlaveConnectionPoolSize(64) 默認值,針對每個slave節點初始化10個連接,最大可以擴展至64個連接
  //.setSubscriptionConnectionMinimumIdleSize(1) 默認值1,在SubscriptionMode=SLAVE時候,針對每個slave節點初始化1個連接
  //.setSubscriptionConnectionPoolSize(50) 默認值50,在SubscriptionMode=SLAVE時候,針對每個slave節點初始化1個連接,最大可以擴展至50個連接
  .setMasterAddress("redis://192.168.29.24:6379")  // 設置redis主節點
  .addSlaveAddress("redis://192.168.29.24:7000") // 設置redis從節點
  .addSlaveAddress("redis://192.168.29.24:7001"); // 設置redis從節點
RedissonClient redisson = Redisson.create(config);// 創建客戶端(發現這一操作非常耗時,基本在2秒-4秒左右)

​ 上面代碼執行完畢后,如果在redis服務端所在服務器執行以下linux命令:

#6379上建立了10個連接
netstat -ant |grep 6379|grep  ESTABLISHED
#7000上建立了11個連接
netstat -ant |grep 7000|grep  ESTABLISHED
#7001上建立了11個連接
netstat -ant |grep 7001|grep  ESTABLISHED

​ 你會發現redisson連接到redis服務端總計建立了32個連接,其中masterpool占據10個連接,slavepool占據20個連接,另外pubSubConnectionPool占據2個連接

連接池中池化對象分布

連接池中池化對象分布如下圖:

img

從上圖可以看出,連接池是針對每個IP端口都有一個獨立的池,連接池也按照主從進行划分,具體如下:

  • MasterConnectionPool:

    默認針對每個不同的IP+port組合,初始化10個對象,最大可擴展至64個,因為只有一個master,所以上圖創建了10個連接;

  • MasterPubSubConnectionPool:

    默認針對每個不同的IP+port組合,初始化1個對象,最大可擴展至50個,因為默認SubscriptionMode=SubscriptionMode.SLAVE,所以master上不會創建連接池,所以上圖MasterPubSubConnectionPool里沒有創建任何連接;

  • SlaveConnectionPool:

    默認針對每個不同的IP+port組合,初始化10個對象,最大可擴展至64個,因為有兩個slave,每個slave上圖創建了10個連接,總計創建了20個連接;

  • PubSubConnectionPool:

    默認針對每個不同的IP+port組合,初始化1個對象,最大可擴展至50個,因為有兩個slave,每個slave上圖創建了1個連接,總計創建了2個連接。

Redisson的4類連接池

​ 這里我們來詳細介紹下Redisson的連接池實現類,Redisson里有4種連接池,它們是

  • MasterConnectionPool、

  • MasterPubSubConnectionPool、

  • SlaveConnectionPool

  • PubSubConnectionPool,

    Redisson里有4種連接池,它們的父類都是ConnectionPool,其類繼承關系圖如下:

    img
    通過上圖我們了解了ConnectionPool類的繼承關系圖

用一張圖來解釋ConnectionPool干了些啥,如下圖:

img
都到這里了,不介意再送一張圖了解各種部署方式下的連接池分布了,如下圖:
img

ConnectionPool類的組成

通過上圖我們了解了ConnectionPool類的繼承關系圖,再來一張圖來了解下ConnectionPool.java類的組成,如下:
img

好了,再來圖就有點啰嗦了,注釋ConnectionPool.java代碼如下:

abstract class ConnectionPool<T extends RedisConnection> {  
    private final Logger log = LoggerFactory.getLogger(getClass());  
    //維持着連接池對應的redis節點信息  
    //比如1主2從部署MasterConnectionPool里的entries只有一個主節點(192.168.29.24 6379)  
    //比如1主2從部署MasterPubSubConnectionPool里的entries為空,因為SubscriptionMode=SubscriptionMode.SLAVE  
    //比如1主2從部署SlaveConnectionPool里的entries有3個節點(192.168.29.24 6379,192.168.29.24 7000,192.168.29.24 7001,但是注意192.168.29.24 6379凍結屬性freezed=true不會參與讀操作除非2個從節點全部宕機才參與讀操作)  
    //比如1主2從部署PubSubConnectionPool里的entries有2個節點(192.168.29.24 7000,192.168.29.24 7001),因為SubscriptionMode=SubscriptionMode.SLAVE,主節點不會加入  
    protected final List<ClientConnectionsEntry> entries = new CopyOnWriteArrayList<ClientConnectionsEntry>(); 
    
    //持有者RedissonClient的組件ConnectionManager  
    final ConnectionManager connectionManager;  
    
    //持有者RedissonClient的組件ConnectionManager里的MasterSlaveServersConfig  
    final MasterSlaveServersConfig config;  
    
    //持有者RedissonClient的組件ConnectionManager里的MasterSlaveEntry  
    final MasterSlaveEntry masterSlaveEntry;  
  
    //構造函數  
    public ConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) {  
        this.config = config;  
        this.masterSlaveEntry = masterSlaveEntry;  
        this.connectionManager = connectionManager;  
    }  
  
    //連接池中需要增加對象時候調用此方法  
    public RFuture<Void> add(final ClientConnectionsEntry entry) {  
        final RPromise<Void> promise = connectionManager.newPromise();  
        promise.addListener(new FutureListener<Void>() {  
            @Override  
            public void operationComplete(Future<Void> future) throws Exception {  
                entries.add(entry);  
            }  
        });  
        initConnections(entry, promise, true);  
        return promise;  
    }  
  
    //初始化連接池中最小連接數  
    private void initConnections(final ClientConnectionsEntry entry, final RPromise<Void> initPromise, boolean checkFreezed) {  
        final int minimumIdleSize = getMinimumIdleSize(entry);  
  
        if (minimumIdleSize == 0 || (checkFreezed && entry.isFreezed())) {  
            initPromise.trySuccess(null);  
            return;  
        }  
  
        final AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize);  
        int startAmount = Math.min(50, minimumIdleSize);  
        final AtomicInteger requests = new AtomicInteger(startAmount);  
        for (int i = 0; i < startAmount; i++) {  
            createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);  
        }  
    }  
  
    //創建連接對象到連接池中  
    private void createConnection(final boolean checkFreezed, final AtomicInteger requests, final ClientConnectionsEntry entry, final RPromise<Void> initPromise,  
            final int minimumIdleSize, final AtomicInteger initializedConnections) {  
  
        if ((checkFreezed && entry.isFreezed()) || !tryAcquireConnection(entry)) {  
            int totalInitializedConnections = minimumIdleSize - initializedConnections.get();  
            Throwable cause = new RedisConnectionException(  
                    "Unable to init enough connections amount! Only " + totalInitializedConnections + " from " + minimumIdleSize + " were initialized. Server: "  
                                        + entry.getClient().getAddr());  
            initPromise.tryFailure(cause);  
            return;  
        }  
          
        acquireConnection(entry, new Runnable() {  
              
            @Override  
            public void run() {  
                RPromise<T> promise = connectionManager.newPromise();  
                createConnection(entry, promise);  
                promise.addListener(new FutureListener<T>() {  
                    @Override  
                    public void operationComplete(Future<T> future) throws Exception {  
                        if (future.isSuccess()) {  
                            T conn = future.getNow();  
  
                            releaseConnection(entry, conn);  
                        }  
  
                        releaseConnection(entry);  
  
                        if (!future.isSuccess()) {  
                            int totalInitializedConnections = minimumIdleSize - initializedConnections.get();  
                            String errorMsg;  
                            if (totalInitializedConnections == 0) {  
                                errorMsg = "Unable to connect to Redis server: " + entry.getClient().getAddr();  
                            } else {  
                                errorMsg = "Unable to init enough connections amount! Only " + totalInitializedConnections   
                                        + " from " + minimumIdleSize + " were initialized. Redis server: " + entry.getClient().getAddr();  
                            }  
                            Throwable cause = new RedisConnectionException(errorMsg, future.cause());  
                            initPromise.tryFailure(cause);  
                            return;  
                        }  
  
                        int value = initializedConnections.decrementAndGet();  
                        if (value == 0) {  
                            log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());  
                            if (!initPromise.trySuccess(null)) {  
                                throw new IllegalStateException();  
                            }  
                        } else if (value > 0 && !initPromise.isDone()) {  
                            if (requests.incrementAndGet() <= minimumIdleSize) {  
                                createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);  
                            }  
                        }  
                    }  
                });  
            }  
        });  
  
    }  
  
    //連接池中租借出連接對象  
    public RFuture<T> get(RedisCommand<?> command) {  
        for (int j = entries.size() - 1; j >= 0; j--) {  
            final ClientConnectionsEntry entry = getEntry();  
            if (!entry.isFreezed()   
                    && tryAcquireConnection(entry)) {  
                return acquireConnection(command, entry);  
            }  
        }  
          
        List<InetSocketAddress> failedAttempts = new LinkedList<InetSocketAddress>();  
        List<InetSocketAddress> freezed = new LinkedList<InetSocketAddress>();  
        for (ClientConnectionsEntry entry : entries) {  
            if (entry.isFreezed()) {  
                freezed.add(entry.getClient().getAddr());  
            } else {  
                failedAttempts.add(entry.getClient().getAddr());  
            }  
        }  
  
        StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");  
        if (!freezed.isEmpty()) {  
            errorMsg.append(" Disconnected hosts: " + freezed);  
        }  
        if (!failedAttempts.isEmpty()) {  
            errorMsg.append(" Hosts disconnected due to `failedAttempts` limit reached: " + failedAttempts);  
        }  
  
        RedisConnectionException exception = new RedisConnectionException(errorMsg.toString());  
        return connectionManager.newFailedFuture(exception);  
    }  
  
    //連接池中租借出連接對象執行操作RedisCommand  
    public RFuture<T> get(RedisCommand<?> command, ClientConnectionsEntry entry) {  
        if ((!entry.isFreezed() || entry.getFreezeReason() == FreezeReason.SYSTEM) &&   
                tryAcquireConnection(entry)) {  
            return acquireConnection(command, entry);  
        }  
  
        RedisConnectionException exception = new RedisConnectionException(  
                "Can't aquire connection to " + entry);  
        return connectionManager.newFailedFuture(exception);  
    }  
    
    //通過向redis服務端發送PING看是否返回PONG來檢測連接
    private void ping(RedisConnection c, final FutureListener<String> pingListener) {  
        RFuture<String> f = c.async(RedisCommands.PING);  
        f.addListener(pingListener);  
    }  
  
    //歸還連接對象到連接池  
    public void returnConnection(ClientConnectionsEntry entry, T connection) {  
        if (entry.isFreezed()) {  
            connection.closeAsync();  
        } else {  
            releaseConnection(entry, connection);  
        }  
        releaseConnection(entry);  
    }  
  
    //釋放連接池中連接對象  
    protected void releaseConnection(ClientConnectionsEntry entry) {  
        entry.releaseConnection();  
    }  
  
    //釋放連接池中連接對象  
    protected void releaseConnection(ClientConnectionsEntry entry, T conn) {  
        entry.releaseConnection(conn);  
    }  
}

Redisson初始化connectionManager

那么這些連接池是在哪里初始化的?如何初始化的?讀操作和寫操作如何獲取連接的?

Redisson接口

​ RedissonClient.java是一個接口類,它的實現類是Redisson.java,對於Redisson.java的介紹先以一張Redisson的4大組件關系圖開始,如下圖:
img
​ 對Redisson.java的代碼注釋如下:

/**  
* 根據配置Config創建redisson操作類RedissonClient  
* @param config for Redisson  
* @return Redisson instance  
*/  
public static RedissonClient create(Config config) {  
    //調用構造方法  
    Redisson redisson = new Redisson(config);  
    if (config.isRedissonReferenceEnabled()) {  
        redisson.enableRedissonReferenceSupport();  
    }  
    return redisson;  
}  

/**  
* Redisson構造方法  
* @param config for Redisson  
* @return Redisson instance  
*/  
protected Redisson(Config config) {  
    //賦值變量config  
    this.config = config;  
    //產生一份對於傳入config的備份  
    Config configCopy = new Config(config);  

    //根據配置config的類型(主從模式、單機模式、哨兵模式、集群模式、亞馬遜雲模式、微軟雲模式)而進行不同的初始化  
    connectionManager = ConfigSupport.createConnectionManager(configCopy);  
    //連接池對象回收調度器  
    evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());  
    //Redisson的對象編碼類  
    codecProvider = configCopy.getCodecProvider();  
    //Redisson的ResolverProvider,默認為org.redisson.liveobject.provider.DefaultResolverProvider  
    resolverProvider = configCopy.getResolverProvider();  
}

其中與連接池相關的就是ConnectionManager.

ConnectionManager的初始化轉交工具類ConfigSupport.java進行,ConfigSupport.java會根據部署方式(主從模式、單機模式、哨兵模式、集群模式、亞馬遜雲模式、微軟雲模式)的不同而分別進行。

ConfigSupport.java

​ 這里現將ConfigSupport.java創建ConnectionManager的核心代碼注釋如下:

/**  
* 據配置config的類型(主從模式、單機模式、哨兵模式、集群模式、亞馬遜雲模式、微軟雲模式)而進行不同的初始化  
* @param configCopy for Redisson  
* @return ConnectionManager instance  
*/  
public static ConnectionManager createConnectionManager(Config configCopy) {  
    if (configCopy.getMasterSlaveServersConfig() != null) {//配置configCopy類型為主從模式  
        validate(configCopy.getMasterSlaveServersConfig());  
        return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy);  
    } else if (configCopy.getSingleServerConfig() != null) {//配置configCopy類型為單機模式  
        validate(configCopy.getSingleServerConfig());  
        return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy);  
    } else if (configCopy.getSentinelServersConfig() != null) {//配置configCopy類型為哨兵模式  
        validate(configCopy.getSentinelServersConfig());  
        return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy);  
    } else if (configCopy.getClusterServersConfig() != null) {//配置configCopy類型為集群模式  
        validate(configCopy.getClusterServersConfig());  
        return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy);  
    } else if (configCopy.getElasticacheServersConfig() != null) {//配置configCopy類型為亞馬遜雲模式  
        validate(configCopy.getElasticacheServersConfig());  
        return new ElasticacheConnectionManager(configCopy.getElasticacheServersConfig(), configCopy);  
    } else if (configCopy.getReplicatedServersConfig() != null) {//配置configCopy類型為微軟雲模式  
        validate(configCopy.getReplicatedServersConfig());  
        return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy);  
    } else if (configCopy.getConnectionManager() != null) {//直接返回configCopy自帶的默認ConnectionManager  
        return configCopy.getConnectionManager();  
    }else {  
        throw new IllegalArgumentException("server(s) address(es) not defined!");  
    }  
}

​ 上面可以看到根據傳入的配置Config.java的不同,會分別創建不同的ConnectionManager的實現類。

ConnectionManager的6個實現類

​ 這里開始介紹ConnectionManager,ConnectionManager.java是一個接口類,它有6個實現類,分別對應着不同的部署模式(主從模式、單機模式、哨兵模式、集群模式、亞馬遜雲模式、微軟雲模式)

如下如所示:

img

MasterSlaveConnectionManager.java

這里以主從部署方式進行講解,先通過一張圖了解MasterSlaveConnectionManager的組成:

img
上圖中最終要的組件要數MasterSlaveEntry,在后面即將進行介紹,這里注釋

MasterSlaveConnectionManager.java的核心代碼如下:

/**   
* MasterSlaveConnectionManager的構造方法 
* @param cfg for MasterSlaveServersConfig 
* @param config for Config   
*/    
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) {
    //調用構造方法
    this(config);
    //
    initTimer(cfg);
    this.config = cfg;
    //初始化MasterSlaveEntry
    initSingleEntry();
}
/**   
* MasterSlaveConnectionManager的構造方法 
* @param cfg for Config 
*/    
public MasterSlaveConnectionManager(Config cfg) {
    //讀取redisson的jar中的文件META-INF/MANIFEST.MF,打印出Bundle-Version對應的Redisson版本信息
    Version.logVersion();
    //EPOLL是linux的多路復用IO模型的增強版本,這里如果啟用EPOLL,就讓redisson底層netty使用EPOLL的方式,否則配置netty里的NIO非阻塞方式
    if (cfg.isUseLinuxNativeEpoll()) {
        if (cfg.getEventLoopGroup() == null) {
            //使用linux IO非阻塞模型EPOLL
            this.group = new EpollEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
        } else {
            this.group = cfg.getEventLoopGroup();
        }
        this.socketChannelClass = EpollSocketChannel.class;
    } else {
        if (cfg.getEventLoopGroup() == null) {
            //使用linux IO非阻塞模型NIO
            this.group = new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
        } else {
            this.group = cfg.getEventLoopGroup();
        }
        this.socketChannelClass = NioSocketChannel.class;
    }
    if (cfg.getExecutor() == null) {
        //線程池大小,對於2U 2CPU 8cores/cpu,意思是有2塊板子,每個板子上8個物理CPU,那么總計物理CPU個數為16
        //對於linux有個超線程概念,意思是每個物理CPU可以虛擬出2個邏輯CPU,那么總計邏輯CPU個數為32
        //這里Runtime.getRuntime().availableProcessors()取的是邏輯CPU的個數,所以這里線程池大小會是64
        int threads = Runtime.getRuntime().availableProcessors() * 2;
        if (cfg.getThreads() != 0) {
            threads = cfg.getThreads();
        }
        executor = Executors.newFixedThreadPool(threads, new DefaultThreadFactory("redisson"));
    } else {
        executor = cfg.getExecutor();
    }

    this.cfg = cfg;
    this.codec = cfg.getCodec();
    //一個可以獲取異步執行任務返回值的回調對象,本質是對於java的Future的實現,監控MasterSlaveConnectionManager的shutdown進行一些必要的處理
    this.shutdownPromise = newPromise();
    //一個持有MasterSlaveConnectionManager的異步執行服務
    this.commandExecutor = new CommandSyncService(this);
}
/**   
* 初始化定時調度器
* @param config for MasterSlaveServersConfig 
*/   
protected void initTimer(MasterSlaveServersConfig config) {
    //讀取超時時間配置信息
    int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout(), config.getReconnectionTimeout()};
    Arrays.sort(timeouts);
    int minTimeout = timeouts[0];
    //設置默認超時時間
    if (minTimeout % 100 != 0) {
        minTimeout = (minTimeout % 100) / 2;
    } else if (minTimeout == 100) {
        minTimeout = 50;
    } else {
        minTimeout = 100;
    }
    //創建定時調度器
    timer = new HashedWheelTimer(Executors.defaultThreadFactory(), minTimeout, TimeUnit.MILLISECONDS, 1024);
    
    // to avoid assertion error during timer.stop invocation
    try {
        Field leakField = HashedWheelTimer.class.getDeclaredField("leak");
        leakField.setAccessible(true);
        leakField.set(timer, null);
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }
    //檢測MasterSlaveConnectionManager的空閑連接的監視器IdleConnectionWatcher,會清理不用的空閑的池中連接對象
    connectionWatcher = new IdleConnectionWatcher(this, config);
}

/**   
* 創建MasterSlaveConnectionManager的MasterSlaveEntry  
*/    
protected void initSingleEntry() {
    try {
        //主從模式下0~16383加入到集合slots  
        HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
        slots.add(singleSlotRange);

        MasterSlaveEntry entry;
        if (config.checkSkipSlavesInit()) {//ReadMode不為MASTER並且SubscriptionMode不為MASTER才執行
            entry = new SingleEntry(slots, this, config);
            RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
            f.syncUninterruptibly();
        } else {//默認主從部署ReadMode=SLAVE,SubscriptionMode=SLAVE,這里會執行
            entry = createMasterSlaveEntry(config, slots);
        }
        //將每個分片0~16383都指向創建的MasterSlaveEntry
        for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) {
            addEntry(slot, entry);
        }
        //DNS相關
        if (config.getDnsMonitoringInterval() != -1) {
            dnsMonitor = new DNSMonitor(this, Collections.singleton(config.getMasterAddress()), 
                    config.getSlaveAddresses(), config.getDnsMonitoringInterval());
            dnsMonitor.start();
        }
    } catch (RuntimeException e) {
        stopThreads();
        throw e;
    }
}
/**   
* MasterSlaveEntry的構造方法 
* @param config for MasterSlaveServersConfig  
* @param slots for HashSet<ClusterSlotRange> 
* @return MasterSlaveEntry
*/    
protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config, HashSet<ClusterSlotRange> slots) {
    //創建MasterSlaveEntry
    MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
    //從節點連接池SlaveConnectionPool和PubSubConnectionPool的默認的最小連接數初始化
    List<RFuture<Void>> fs = entry.initSlaveBalancer(java.util.Collections.<URI>emptySet());
    for (RFuture<Void> future : fs) {
        future.syncUninterruptibly();
    }
    主節點連接池MasterConnectionPool和MasterPubSubConnectionPool的默認的最小連接數初始化
    RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
    f.syncUninterruptibly();
    return entry;
}

​ 上面個人覺得有兩處代碼值得我們特別關注,特別說明如下:

  • entry.initSlaveBalancer:從節點連接池SlaveConnectionPoolPubSubConnectionPool的默認的最小連接數初始化。
  • entry.setupMasterEntry:主節點連接池MasterConnectionPoolMasterPubSubConnectionPool的默認的最小連接數初始化。

MasterSlaveEntry.java

​ 用一張圖來解釋MasterSlaveEntry的組件如下:
img

MasterSlaveEntry.java里正是我們一直在尋找着的四個連接池

  • MasterConnectionPool、

  • MasterPubSubConnectionPool、

  • SlaveConnectionPool和

  • PubSubConnectionPool,

MasterSlaveEntry.java的核心代碼如下:

/**   
* MasterSlaveEntry的構造方法 
* @param slotRanges for Set<ClusterSlotRange>   
* @param connectionManager for ConnectionManager   
* @param config for MasterSlaveServersConfig 
*/    
public MasterSlaveEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) {  
    //主從模式下0~16383加入到集合slots  
    for (ClusterSlotRange clusterSlotRange : slotRanges) {  
        for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {  
            slots.add(i);  
        }  
    }  
    //賦值MasterSlaveConnectionManager給connectionManager  
    this.connectionManager = connectionManager;  
    //賦值config  
    this.config = config;  
  
    //創建LoadBalancerManager  
    //其實LoadBalancerManager里持有者從節點的SlaveConnectionPool和PubSubConnectionPool  
    //並且此時連接池里還沒有初始化默認的最小連接數  
    slaveBalancer = new LoadBalancerManager(config, connectionManager, this);  
    //創建主節點連接池MasterConnectionPool,此時連接池里還沒有初始化默認的最小連接數  
    writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this);  
    //創建主節點連接池MasterPubSubConnectionPool,此時連接池里還沒有初始化默認的最小連接數  
    pubSubConnectionHolder = new MasterPubSubConnectionPool(config, connectionManager, this);  
}  
  
/**   
* 從節點連接池SlaveConnectionPool和PubSubConnectionPool的默認的最小連接數初始化 
* @param disconnectedNodes for Collection<URI> 
* @return List<RFuture<Void>>   
*/   
public List<RFuture<Void>> initSlaveBalancer(Collection<URI> disconnectedNodes) {  
    //這里freezeMasterAsSlave=true  
    boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty() && !config.checkSkipSlavesInit() && disconnectedNodes.size() < config.getSlaveAddresses().size();  
  
    List<RFuture<Void>> result = new LinkedList<RFuture<Void>>();  
    //把主節點當作從節點處理,因為默認ReadMode=ReadMode.SLAVE,所以這里不會添加針對該節點的連接池  
    RFuture<Void> f = addSlave(config.getMasterAddress(), freezeMasterAsSlave, NodeType.MASTER);  
    result.add(f);  
    //讀取從節點的地址信息,然后針對每個從節點地址創建SlaveConnectionPool和PubSubConnectionPool  
    //SlaveConnectionPool【初始化10個RedisConnection,最大可以擴展至64個】  
    //PubSubConnectionPool【初始化1個RedisPubSubConnection,最大可以擴展至50個】  
    for (URI address : config.getSlaveAddresses()) {  
        f = addSlave(address, disconnectedNodes.contains(address), NodeType.SLAVE);  
        result.add(f);  
    }  
    return result;  
}  
  
/**   
* 從節點連接池SlaveConnectionPool和PubSubConnectionPool的默認的最小連接數初始化 
* @param address for URI 
* @param freezed for boolean 
* @param nodeType for NodeType 
* @return RFuture<Void> 
*/   
private RFuture<Void> addSlave(URI address, boolean freezed, NodeType nodeType) {  
    //創建到從節點的連接RedisClient  
    RedisClient client = connectionManager.createClient(NodeType.SLAVE, address);  
    ClientConnectionsEntry entry = new ClientConnectionsEntry(client,  
            this.config.getSlaveConnectionMinimumIdleSize(),  
            this.config.getSlaveConnectionPoolSize(),  
            this.config.getSubscriptionConnectionMinimumIdleSize(),  
            this.config.getSubscriptionConnectionPoolSize(), connectionManager, nodeType);  
    //默認只有主節點當作從節點是會設置freezed=true  
    if (freezed) {  
        synchronized (entry) {  
            entry.setFreezed(freezed);  
            entry.setFreezeReason(FreezeReason.SYSTEM);  
        }  
    }  
    //調用slaveBalancer來對從節點連接池SlaveConnectionPool和PubSubConnectionPool的默認的最小連接數初始化  
    return slaveBalancer.add(entry);  
}  
  
/**   
* 主節點連接池MasterConnectionPool和MasterPubSubConnectionPool的默認的最小連接數初始化 
* @param address for URI 
* @return RFuture<Void> 
*/   
public RFuture<Void> setupMasterEntry(URI address) {  
    //創建到主節點的連接RedisClient  
    RedisClient client = connectionManager.createClient(NodeType.MASTER, address);  
    masterEntry = new ClientConnectionsEntry(  
            client,   
            config.getMasterConnectionMinimumIdleSize(),   
            config.getMasterConnectionPoolSize(),  
            config.getSubscriptionConnectionMinimumIdleSize(),  
            config.getSubscriptionConnectionPoolSize(),   
            connectionManager,   
            NodeType.MASTER);  
    //如果配置的SubscriptionMode=SubscriptionMode.MASTER就初始化MasterPubSubConnectionPool  
    //默認SubscriptionMode=SubscriptionMode.SLAVE,MasterPubSubConnectionPool這里不會初始化最小連接數  
    if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {  
        //MasterPubSubConnectionPool【初始化1個RedisPubSubConnection,最大可以擴展至50個】  
        RFuture<Void> f = writeConnectionHolder.add(masterEntry);  
        RFuture<Void> s = pubSubConnectionHolder.add(masterEntry);  
        return CountListener.create(s, f);  
    }  
    //調用MasterConnectionPool使得連接池MasterConnectionPool里的對象最小個數為10個  
    //MasterConnectionPool【初始化10個RedisConnection,最大可以擴展至64個】  
    return writeConnectionHolder.add(masterEntry);  
}

​ 上面代碼個人覺得有四個地方值得我們特別關注,它們是一個連接池創建對象的入口,列表如下:

  • writeConnectionHolder.add(masterEntry):其實writeConnectionHolder的類型就是MasterConnectionPool,這里是連接池MasterConnectionPool里添加對象
  • pubSubConnectionHolder.add(masterEntry):其實pubSubConnectionHolder的類型是MasterPubSubConnectionPool,這里是連接池MasterPubSubConnectionPool添加對象
  • slaveConnectionPool.add(entry):這里是連接池SlaveConnectionPool里添加對象
  • pubSubConnectionPool.add(entry):這里是連接池PubSubConnectionPool里添加對象

LoadBalancerManager.java

​ 圖解LoadBalancerManager.java的內部組成如下:
img

​ LoadBalancerManager.java里面有着從節點相關的兩個重要的連接池SlaveConnectionPoolPubSubConnectionPool,這里注釋LoadBalancerManager.java的核心代碼如下:

/**   
* LoadBalancerManager的構造方法 
* @param config for MasterSlaveServersConfig  
* @param connectionManager for ConnectionManager   
* @param entry for MasterSlaveEntry 
*/    
public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {  
    //賦值connectionManager  
    this.connectionManager = connectionManager;  
    //創建連接池SlaveConnectionPool  
    slaveConnectionPool = new SlaveConnectionPool(config, connectionManager, entry);  
    //創建連接池PubSubConnectionPool  
    pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry);  
}  

/**   
* LoadBalancerManager的連接池SlaveConnectionPool和PubSubConnectionPool里池化對象添加方法,也即池中需要對象時,調用此方法添加 
* @param entry for ClientConnectionsEntry 
* @return RFuture<Void> 
*/    
public RFuture<Void> add(final ClientConnectionsEntry entry) {  
    final RPromise<Void> result = connectionManager.newPromise();  
    //創建一個回調監聽器,在池中對象創建失敗時,進行2次嘗試  
    FutureListener<Void> listener = new FutureListener<Void>() {  
        AtomicInteger counter = new AtomicInteger(2);  
        @Override  
        public void operationComplete(Future<Void> future) throws Exception {  
            if (!future.isSuccess()) {  
                result.tryFailure(future.cause());  
                return;  
            }  
            if (counter.decrementAndGet() == 0) {  
                String addr = entry.getClient().getIpAddr();  
                ip2Entry.put(addr, entry);  
                result.trySuccess(null);  
            }  
        }  
    };  
    //調用slaveConnectionPool添加RedisConnection對象到池中  
    RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);  
    slaveFuture.addListener(listener);  
   
    //調用pubSubConnectionPool添加RedisPubSubConnection對象到池中  
    RFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);  
    pubSubFuture.addListener(listener);  
    return result;  
}

​ 至此,我們已經了解了開篇提到的四個連接池是在哪里創建的。

連接池的初始化

ConnectionPool.java里獲取讀寫操作的連接,是遍歷ConnectionPool里維持的ClientConnectionsEntry列表,找到一非凍結的ClientConnectionsEntry;

然后調用ClientConnectionsEntry里的freeConnectionsCounter嘗試將值減1,如果成功,說明連接池中可以獲取到連接,那么就從ClientConnectionsEntry里獲取一個連接出來,如果拿不到連接,會調用ClientConnectionsEntry創建一個新連接放置到連接池中,並返回此連接

回顧一下 ClientConnectionsEntry的組成圖:

img

上面的代碼說明如果 ClientConnectionsEntry里的 freeConnections有空閑連接,那么直接返回該連接,如果沒有那么調用 RedisClient.connectAsync創建一個新的連接

/**   
* 真正從連接池中獲取連接
* @param entry for ClientConnectionsEntry
* @param promise for RPromise<T>
*/ 
private void connectTo(ClientConnectionsEntry entry, RPromise<T> promise) {
    if (promise.isDone()) {
        releaseConnection(entry);
        return;
    }
    //從連接池中取出一個連接
    T conn = poll(entry);
    if (conn != null) {
        if (!conn.isActive()) {
            promiseFailure(entry, promise, conn);
            return;
        }

        connectedSuccessful(entry, promise, conn);
        return;
    }
    //如果仍然獲取不到連接,可能連接池中連接對象都被租借了,這里開始創建一個新的連接對象放到連接池中
    createConnection(entry, promise);
}

/**   
* 從連接池中獲取連接
* @param entry for ClientConnectionsEntry
* @return T
*/ 
protected T poll(ClientConnectionsEntry entry) {
    return (T) entry.pollConnection();
}

/**   
* 調用ClientConnectionsEntry創建一個連接放置到連接池中並返回此連接
* @param entry for ClientConnectionsEntry
* @param promise for RPromise<T>
*/ 
private void createConnection(final ClientConnectionsEntry entry, final RPromise<T> promise) {
    //調用ClientConnectionsEntry創建一個連接放置到連接池中並返回此連接
    RFuture<T> connFuture = connect(entry);
    connFuture.addListener(new FutureListener<T>() {
        @Override
        public void operationComplete(Future<T> future) throws Exception {
            if (!future.isSuccess()) {
                promiseFailure(entry, promise, future.cause());
                return;
            }

            T conn = future.getNow();
            if (!conn.isActive()) {
                promiseFailure(entry, promise, conn);
                return;
            }

            connectedSuccessful(entry, promise, conn);
        }
    });
}

從freeConnections里獲取一個連接並返回給讀寫操作使用

/**   
*  ClientConnectionsEntry里從freeConnections里獲取一個連接並返回給讀寫操作使用
*/ 
public RedisConnection pollConnection() {
    return freeConnections.poll();
}

新創建一個連接對象返回給讀寫操作使用

/**   
*  ClientConnectionsEntry里新創建一個連接對象返回給讀寫操作使用
*/ 
public RFuture<RedisConnection> connect() {
    //調用RedisClient利用netty連接redis服務端,將返回的netty的outboundchannel包裝成RedisConnection並返回
    RFuture<RedisConnection> future = client.connectAsync();
    future.addListener(new FutureListener<RedisConnection>() {
        @Override
        public void operationComplete(Future<RedisConnection> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }
            
            RedisConnection conn = future.getNow();
            onConnect(conn);
            log.debug("new connection created: {}", conn);
        }
    });
    return future;
}

RedisClient管理Netty連接

使用java里的網絡編程框架Netty連接redis服務端 RedisClient.java

package org.redisson.client;
...
/**
 * 使用java里的網絡編程框架Netty連接redis服務端
 * 作者: Nikita Koksharov
 */
public class RedisClient {
    private final Bootstrap bootstrap;//Netty的工具類Bootstrap,用於連接建立等作用
    private final Bootstrap pubSubBootstrap;//Netty的工具類Bootstrap,用於連接建立等作用
    private final InetSocketAddress addr;//socket連接的地址
    //channels是netty提供的一個全局對象,里面記錄着當前socket連接上的所有處於可用狀態的連接channel
    //channels會自動監測里面的channel,當channel斷開時,會主動踢出該channel,永遠保留當前可用的channel列表
    private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    private ExecutorService executor;//REACOTR模型的java異步執行線程池
    private final long commandTimeout;//超時時間
    private Timer timer;//定時器
    private boolean hasOwnGroup;
    private RedisClientConfig config;//redis連接配置信息

    //構造方法
    public static RedisClient create(RedisClientConfig config) {
        if (config.getTimer() == null) {
            config.setTimer(new HashedWheelTimer());
        }
        return new RedisClient(config);
    }
    //構造方法
    private RedisClient(RedisClientConfig config) {
        this.config = config;
        this.executor = config.getExecutor();
        this.timer = config.getTimer();
        
        addr = new InetSocketAddress(config.getAddress().getHost(), config.getAddress().getPort());
        
        bootstrap = createBootstrap(config, Type.PLAIN);
        pubSubBootstrap = createBootstrap(config, Type.PUBSUB);
        
        this.commandTimeout = config.getCommandTimeout();
    }

    //java的網路編程框架Netty工具類Bootstrap初始化
    private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
        Bootstrap bootstrap = new Bootstrap()
                        .channel(config.getSocketChannelClass())
                        .group(config.getGroup())
                        .remoteAddress(addr);
        //注冊netty相關socket數據處理RedisChannelInitializer
        bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
        //設置超時時間
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
        return bootstrap;
    }
    
    //構造方法
    @Deprecated
    public RedisClient(String address) {
        this(URIBuilder.create(address));
    }
    
    //構造方法
    @Deprecated
    public RedisClient(URI address) {
        this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), address);
        hasOwnGroup = true;
    }

    //構造方法
    @Deprecated
    public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, URI address) {
        this(timer, executor, group, address.getHost(), address.getPort());
    }
    
    //構造方法
    @Deprecated
    public RedisClient(String host, int port) {
        this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, 10000, 10000);
        hasOwnGroup = true;
    }

    //構造方法
    @Deprecated
    public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, String host, int port) {
        this(timer, executor, group, NioSocketChannel.class, host, port, 10000, 10000);
    }
    
    //構造方法
    @Deprecated
    public RedisClient(String host, int port, int connectTimeout, int commandTimeout) {
        this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, connectTimeout, commandTimeout);
    }

    //構造方法
    @Deprecated
    public RedisClient(final Timer timer, ExecutorService executor, EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, 
                        int connectTimeout, int commandTimeout) {
        RedisClientConfig config = new RedisClientConfig();
        config.setTimer(timer).setExecutor(executor).setGroup(group).setSocketChannelClass(socketChannelClass)
        .setAddress(host, port).setConnectTimeout(connectTimeout).setCommandTimeout(commandTimeout);
        
        this.config = config;
        this.executor = config.getExecutor();
        this.timer = config.getTimer();
        
        addr = new InetSocketAddress(config.getAddress().getHost(), config.getAddress().getPort());
        
        //java的網路編程框架Netty工具類Bootstrap初始化
        bootstrap = createBootstrap(config, Type.PLAIN);
        pubSubBootstrap = createBootstrap(config, Type.PUBSUB);
        
        this.commandTimeout = config.getCommandTimeout();
    }

    //獲取連接的IP地址
    public String getIpAddr() {
        return addr.getAddress().getHostAddress() + ":" + addr.getPort();
    }
    //獲取socket連接的地址
    public InetSocketAddress getAddr() {
        return addr;
    }
    //獲取超時時間
    public long getCommandTimeout() {
        return commandTimeout;
    }
    //獲取netty的線程池
    public EventLoopGroup getEventLoopGroup() {
        return bootstrap.config().group();
    }
    //獲取redis連接配置
    public RedisClientConfig getConfig() {
        return config;
    }
    //獲取連接RedisConnection
    public RedisConnection connect() {
        try {
            return connectAsync().syncUninterruptibly().getNow();
        } catch (Exception e) {
            throw new RedisConnectionException("Unable to connect to: " + addr, e);
        }
    }
    //啟動netty去連接redis服務端,設置java的Future嘗試將netty連接上的OutBoundChannel包裝成RedisConnection並返回RedisConnection
    public RFuture<RedisConnection> connectAsync() {
        final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>();
        //netty連接redis服務端
        ChannelFuture channelFuture = bootstrap.connect();
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(final ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    //將netty連接上的OutBoundChannel包裝成RedisConnection並返回RedisConnection
                    final RedisConnection c = RedisConnection.getFrom(future.channel());
                    c.getConnectionPromise().addListener(new FutureListener<RedisConnection>() {
                        @Override
                        public void operationComplete(final Future<RedisConnection> future) throws Exception {
                            bootstrap.config().group().execute(new Runnable() {
                                @Override
                                public void run() {
                                    if (future.isSuccess()) {
                                        if (!f.trySuccess(c)) {
                                            c.closeAsync();
                                        }
                                    } else {
                                        f.tryFailure(future.cause());
                                        c.closeAsync();
                                    }
                                }
                            });
                        }
                    });
                } else {
                    bootstrap.config().group().execute(new Runnable() {
                        public void run() {
                            f.tryFailure(future.cause());
                        }
                    });
                }
            }
        });
        return f;
    }
    //獲取訂閱相關連接RedisPubSubConnection
    public RedisPubSubConnection connectPubSub() {
        try {
            return connectPubSubAsync().syncUninterruptibly().getNow();
        } catch (Exception e) {
            throw new RedisConnectionException("Unable to connect to: " + addr, e);
        }
    }

    //啟動netty去連接redis服務端,設置java的Future嘗試將netty連接上的OutBoundChannel包裝成RedisPubSubConnection並返回RedisPubSubConnection
    public RFuture<RedisPubSubConnection> connectPubSubAsync() {
        final RPromise<RedisPubSubConnection> f = new RedissonPromise<RedisPubSubConnection>();
        //netty連接redis服務端
        ChannelFuture channelFuture = pubSubBootstrap.connect();
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(final ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    //將netty連接上的OutBoundChannel包裝成RedisPubSubConnection並返回RedisPubSubConnection
                    final RedisPubSubConnection c = RedisPubSubConnection.getFrom(future.channel());
                    c.<RedisPubSubConnection>getConnectionPromise().addListener(new FutureListener<RedisPubSubConnection>() {
                        @Override
                        public void operationComplete(final Future<RedisPubSubConnection> future) throws Exception {
                            bootstrap.config().group().execute(new Runnable() {
                                @Override
                                public void run() {
                                    if (future.isSuccess()) {
                                        if (!f.trySuccess(c)) {
                                            c.closeAsync();
                                        }
                                    } else {
                                        f.tryFailure(future.cause());
                                        c.closeAsync();
                                    }
                                }
                            });
                        }
                    });
                } else {
                    bootstrap.config().group().execute(new Runnable() {
                        public void run() {
                            f.tryFailure(future.cause());
                        }
                    });
                }
            }
        });
        return f;
    }

    //關閉netty網絡連接
    public void shutdown() {
        shutdownAsync().syncUninterruptibly();
        if (hasOwnGroup) {
            timer.stop();
            executor.shutdown();
            try {
                executor.awaitTermination(15, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            bootstrap.config().group().shutdownGracefully();
            
        }
    }

    //異步關閉netty網絡連接
    public ChannelGroupFuture shutdownAsync() {
        for (Channel channel : channels) {
            RedisConnection connection = RedisConnection.getFrom(channel);
            if (connection != null) {
                connection.setClosed(true);
            }
        }
        return channels.close();
    }

    @Override
    public String toString() {
        return "[addr=" + addr + "]";
    }
}

讀寫負載流程圖

ConnectionPool.java里獲取讀寫操作的連接,是遍歷ConnectionPool里維持的ClientConnectionsEntry列表,找到一非凍結的ClientConnectionsEntry,然后調用ClientConnectionsEntry里的freeConnectionsCounter嘗試將值減1,如果成功,說明連接池中可以獲取到連接,那么就從ClientConnectionsEntry里獲取一個連接出來,如果拿不到連接,會調用ClientConnectionsEntry創建一個新連接放置到連接池中,並返回此連接,

img點擊並拖拽以移動

CommandAsyncExecutor 開始獲取連接

讀寫操作首先都需要獲取到一個連接對象,在上面的分析中我們知道讀寫操作都是通過CommandAsyncExecutor.java里的如下代碼獲取連接對象:

    protected RFuture<RedisConnection> getConnection() {
        //開始從connectionManager獲取池中的連接  
        //這里采用異步方式,創建一個RFuture對象,等待池中連接,一旦獲得連接,然后進行讀和寫操作  
        final RFuture<RedisConnection> connectionFuture;  
        if (readOnlyMode) {
        //對於讀操作默認readOnlyMode=true,這里會執行  
         connectionFuture = connectionManager.connectionReadOp(source, command);  
        } else {
        //對於寫操作默認readOnlyMode=false,這里會執行  
         connectionFuture = connectionManager.connectionWriteOp(source, command);  
        }  
    }
    

上面:

  • 讀操作(readOnlyMode=true)調用了 connectionManager.connectionReadOp從連接池獲取連接對象,
  • 寫操作(readOnlyMode=false)調用了connectionManager.connectionWriteOp從連接池獲取連接對象,

從connectionManager獲取連接

我們繼續跟進 connectionManager關於connectionReadOp和connectionWriteOp

/**   
* 讀操作通過ConnectionManager從連接池獲取連接對象
* @param source for NodeSource 
* @param command for RedisCommand<?> 
* @return RFuture<RedisConnection>
*/ 
public RFuture<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command) {
    //這里之前分析過source=NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
    MasterSlaveEntry entry = source.getEntry();
    if (entry == null && source.getSlot() != null) {//這里不會執行source里slot=null
        entry = getEntry(source.getSlot());
    }
    if (source.getAddr() != null) {//這里不會執行source里addr=null
        entry = getEntry(source.getAddr());
        if (entry == null) {
            for (MasterSlaveEntry e : getEntrySet()) {
                if (e.hasSlave(source.getAddr())) {
                    entry = e;
                    break;
                }
            }
        }
        if (entry == null) {
            RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
            return RedissonPromise.newFailedFuture(ex);
        }
        
        return entry.connectionReadOp(command, source.getAddr());
    }
    
    if (entry == null) {//這里不會執行source里entry不等於null
        RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
        return RedissonPromise.newFailedFuture(ex);
    }
    //MasterSlaveEntry里從連接池獲取連接對象
    return entry.connectionReadOp(command);
}
/**   
* 寫操作通過ConnectionManager從連接池獲取連接對象
* @param source for NodeSource 
* @param command for RedisCommand<?> 
* @return RFuture<RedisConnection>
*/ 
public RFuture<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command) {
    //這里之前分析過source=NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
    MasterSlaveEntry entry = source.getEntry();
    if (entry == null) {
        entry = getEntry(source);
    }
    if (entry == null) {//這里不會執行source里entry不等於null
        RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
        return RedissonPromise.newFailedFuture(ex);
    }
    //MasterSlaveEntry里從連接池獲取連接對象
    return entry.connectionWriteOp(command);
}

上面調用 ConnectionManager從連接池獲取連接對象,但是 ConnectionManager卻將獲取連接操作轉交 MasterSlaveEntry處理

回顧一下ConnectionManager 的組成

img

從ConnectionPool獲取連接

最終的獲取連接對象都轉交到了從連接池 ConnectionPool

ConnectionPool.java里獲取讀寫操作的連接,是遍歷ConnectionPool里維持的ClientConnectionsEntry列表,找到一非凍結的ClientConnectionsEntry,

img

然后,ClientConnectionsEntry里的freeConnectionsCounter嘗試將值減1,

如果成功,說明連接池中可以獲取到連接,那么就從ClientConnectionsEntry里獲取一個連接出來,如果拿不到連接,會調用ClientConnectionsEntry創建一個新連接放置到連接池中,並返回此連接

/**   
* 讀寫操作從ConnectionPool.java連接池里獲取連接對象
* @param command for RedisCommand<?> 
* @return RFuture<T>
*/ 
public RFuture<T> get(RedisCommand<?> command) {
	List<ClientConnectionsEntry> entriesCopy = new LinkedList<ClientConnectionsEntry>(entries);
	for (Iterator<ClientConnectionsEntry> iterator = entriesCopy.iterator(); iterator.hasNext();) {


		//ClientConnectionsEntry里對應的redis節點為非凍結節點,也即freezed=false
		
		ClientConnectionsEntry entry = iterator.next();
		if (!((!entry.isFreezed() || entry.isMasterForRead()) 
				&& tryAcquireConnection(entry))) {
			iterator.remove();
		}
	}
	while (!entriesCopy.isEmpty()) {
		//遍歷entriesCopy列表
		//遍歷的算法默認為RoundRobinLoadBalancer
		ClientConnectionsEntry entry = config.getLoadBalancer().getEntry(entriesCopy);
		return acquireConnection(command, entry);
	}

	//記錄失敗節點信息
	List<InetSocketAddress> failed = new LinkedList<InetSocketAddress>();
	//記錄凍結的節點信息
	List<InetSocketAddress> freezed = new LinkedList<InetSocketAddress>();
		for (ClientConnectionsEntry entry : entries) {
		if (entry.isFailed()) {
			failed.add(entry.getClient().getAddr());
		} else if (entry.isFreezed()) {
			freezed.add(entry.getClient().getAddr());
		}
	}

	StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");
	if (!freezed.isEmpty()) {
			errorMsg.append(" Disconnected hosts: " + freezed);
	}
	if (!failed.isEmpty()) {
			errorMsg.append(" Hosts disconnected due to errors during `failedSlaveCheckInterval`: " + failed);
	}

	RedisConnectionException exception = new RedisConnectionException(errorMsg.toString());
	return RedissonPromise.newFailedFuture(exception);
}


/**   
* 讀寫操作從ConnectionPool.java連接池里獲取連接對象
* @param command for RedisCommand<?> 
* @param entry for ClientConnectionsEntry
* @return RFuture<T>
*/ 
private RFuture<T> acquireConnection(RedisCommand<?> command, final ClientConnectionsEntry entry) {
    //創建一個異步結果獲取RPromise
    final RPromise<T> result = connectionManager.newPromise();
    //獲取連接前首先將ClientConnectionsEntry里的空閑連接信號freeConnectionsCounter值減1
    //該操作成功后將調用這里的回調函數AcquireCallback<T>
    AcquireCallback<T> callback = new AcquireCallback<T>() {
        @Override
        public void run() {
            result.removeListener(this);
            //freeConnectionsCounter值減1成功,說明獲取可以獲取到連接
            //這里才是真正獲取連接的操作
            connectTo(entry, result);
        }
        
        @Override
        public void operationComplete(Future<T> future) throws Exception {
            entry.removeConnection(this);
        }
    };
    //異步結果獲取RPromise綁定到上面的回調函數callback
    result.addListener(callback);
    //嘗試將ClientConnectionsEntry里的空閑連接信號freeConnectionsCounter值減1,如果成功就調用callback從連接池獲取連接
    acquireConnection(entry, callback);
    //返回異步結果獲取RPromise
    return result;
}

/**   
* 真正從連接池中獲取連接
* @param entry for ClientConnectionsEntry
* @param promise for RPromise<T>
*/ 
private void connectTo(ClientConnectionsEntry entry, RPromise<T> promise) {
    if (promise.isDone()) {
        releaseConnection(entry);
        return;
    }
    //從連接池中取出一個連接
    T conn = poll(entry);
    if (conn != null) {
        if (!conn.isActive()) {
            promiseFailure(entry, promise, conn);
            return;
        }

        connectedSuccessful(entry, promise, conn);
        return;
    }
    //如果仍然獲取不到連接,可能連接池中連接對象都被租借了,這里開始創建一個新的連接對象放到連接池中
    createConnection(entry, promise);
}

/**   
* 從連接池中獲取連接
* @param entry for ClientConnectionsEntry
* @return T
*/ 
protected T poll(ClientConnectionsEntry entry) {
    return (T) entry.pollConnection();
}

/**   
* 調用ClientConnectionsEntry創建一個連接放置到連接池中並返回此連接
* @param entry for ClientConnectionsEntry
* @param promise for RPromise<T>
*/ 
private void createConnection(final ClientConnectionsEntry entry, final RPromise<T> promise) {
    //調用ClientConnectionsEntry創建一個連接放置到連接池中並返回此連接
    RFuture<T> connFuture = connect(entry);
    connFuture.addListener(new FutureListener<T>() {
        @Override
        public void operationComplete(Future<T> future) throws Exception {
            if (!future.isSuccess()) {
                promiseFailure(entry, promise, future.cause());
                return;
            }

            T conn = future.getNow();
            if (!conn.isActive()) {
                promiseFailure(entry, promise, conn);
                return;
            }

            connectedSuccessful(entry, promise, conn);
        }
    });
}

凍結判斷條件

每個ConnectionPool持有的ClientConnectionsEntry對象凍結判斷條件

​ 一個節點被判斷為凍結,必須同時滿足以下條件:

  • 該節點有slave節點,並且slave從節點個數大於0;
  • 設置的配置ReadMode為false並且SubscriptionMode不為MASTER;
  • 該節點的從節點至少有一個存活着,也即如果有從節點宕機,宕機的從節點的個數小於該節點總的從節點個數

主從模式的缺陷

問題:

主從模式下,當主節點宕機了,整個集群就沒有可寫的節點了。

解決方案:

由於從節點上備份了主節點的所有數據,那在主節點宕機的情況下,如果能夠將從節點變成一個主節點,是不是就可以解決這個問題了呢?是的,這個就是Sentinel哨兵的作用。

總之,當主服務器宕機后,需要手動把一台從服務器切換為主服務器,這就需要人工干預,費事費力,還會造成一段時間內服務不可用。哨兵模式就是為了解決此類問題而產生的。

Sentinel(哨兵)模式

Sentinel(哨兵)是用於監控redis集群中Master狀態的工具,是Redis 的高可用性解決方案,sentinel哨兵模式已經被集成在redis2.4之后的版本中。sentinel系統可以監視一個或者多個redis master服務,以及這些master服務的所有slave;當某個master服務下線時,自動將該master下的slave升級為master服務替代已下線的master服務繼續處理請求。

哨兵的任務

Redis 的 Sentinel 系統用於管理多個 Redis 服務器(instance), 該系統執行以下三個任務:

監控(Monitoring):

Sentinel 會不斷地檢查你的主服務器和從服務器是否運作正常。

提醒(Notification):

當被監控的某個 Redis 服務器出現問題時, Sentinel 可以通過 API 向管理員或者其他應用程序發送通知。

自動故障遷移(Automatic failover):

當一個主服務器不能正常工作時, Sentinel 會開始一次自動故障遷移操作, 它會進行選舉,將其中一個從服務器升級為新的主服務器, 並讓失效主服務器的其他從服務器改為復制新的主服務器; 當客戶端試圖連接失效的主服務器時, 集群也會向客戶端返回新主服務器的地址, 使得集群可以使用新主服務器代替失效服務器。

監控(Monitoring)

(1)Sentinel可以監控任意多個Master和該Master下的Slaves。(即多個主從模式)

(2)同一個哨兵下的、不同主從模型,彼此之間相互獨立。

(3)Sentinel會不斷檢查Master和Slaves是否正常。

自動故障切換(Automatic failover)

Sentinel網絡

監控同一個Master的Sentinel會自動連接,組成一個分布式的Sentinel網絡,互相通信並交換彼此關於被監視服務器的信息。

下圖中,三個監控s1的Sentinel,自動組成Sentinel網絡結構。

img

點擊並拖拽以移動

當一個集群中的master失效之后,sentinel可以選舉出一個新的master用於自動接替master的工作,集群中的其他redis服務器自動指向新的master同步數據。

一般建議sentinel采取奇數台,防止某一台sentinel無法連接到master導致誤切換。

img點擊並拖拽以移動

疑問:為什么要使用sentinel網絡呢?

答:當只有一個sentinel的時候,如果這個sentinel掛掉了,那么就無法實現自動故障切換了。在sentinel網絡中,只要還有一個sentinel活着,就可以實現故障切換。

有關sentinel的幾個要點:

  • sentinel本身是監督者的身份,沒有存儲功能。在整個體系中一個sentinel者或一群sentinels與主從服務架構體系是監督與被監督的關系。

  • 作為一個sentinel在整個架構體系中有就可能有如下三種交互:sentinel與主服務器、sentinel與從服務器、sentinel與其他sentinel。

  • 既然是交互,交互所需要的基本內容對於這三種場景還是一樣的,首先要構建這樣的一個交互網絡無可避免,需要節點的注冊與發現、節點之間的通信連接、節點保活、節點之間的通信協議等。

  • 因為角色不同所以在這個架構體系中承擔的功能也不一樣。所以交互的內容也不一樣。
    在理解了以上幾點之后,我們一步步從構建sentinel網絡體系到這整個體系結構是如何來保證其高可用性來分析。

故障切換的過程

(1) 判定客觀下線

假如有三個哨兵和一主兩從的節點,下面是一主多從

哨兵之間會互相監測運行狀態,並且會交換一下節點監測的狀態,同時哨兵也會監測主從節點的狀態。

如果檢測到某一個節點沒有正常回復,並且距離上次正常回復的時間超過了某個閾值,那么就認為該節點為主觀下線在這里插入圖片描述

這個時候其他哨兵也會來監測該節點是不是真的主觀下線,如果有足夠多數量的哨兵都認為它確實主觀下線了,那么它就會被標記為客觀下線

在這里插入圖片描述

這個時候哨兵開始啟動故障切換的流程。

關於主從節點的切換有兩個環節,第一個是哨兵要選舉出領頭人Sentinel Leader來負責下線機器的故障轉移,第二是從Slave中選出主節點,

在這里插入圖片描述

(2)選舉Sentinel Leader

當一個Master服務器客觀下線后,監控這個Master服務器的所有Sentinel將會選舉出一個Sentinel Leader。並由Sentinel Leader對客觀下線的Master進行故障轉移。

每一個Sentinel節點都可以成為Leader,當一個Sentinel節點確認redis集群的主節點主觀下線后,會請求其他Sentinel節點要求將自己選舉為Leader。被請求的Sentinel節點如果沒有同意過其他Sentinel節點的選舉請求,則同意該請求(選舉票數+1),否則不同意。

如果一個Sentinel節點獲得的選舉票數達到Leader最低票數(quorum/Sentinel節點數/2+1的最大值),則該Sentinel節點選舉為Leader;否則重新進行選舉。

img點擊並拖拽以移動

為什么Sentinel集群至少3節點

一個Sentinel節選舉成為Leader的最低票數為quorumSentinel節點數/2+1的最大值,如果Sentinel集群只有2個Sentinel節點,則

Sentinel節點數/2 + 1
= 2/2 + 1
= 2

即Leader最低票數至少為2,當該Sentinel集群中由一個Sentinel節點故障后,僅剩的一個Sentinel節點是永遠無法成為Leader。

也可以由此公式可以推導出,Sentinel集群允許1個Sentinel節點故障則需要3個節點的集群;允許2個節點故障則需要5個節點集

如果超過半數,則當前Sentinel就會被選為領頭Sentinel並進行故障轉移。

(3)故障轉移

故障轉移包括以下三步:

  1. 在已下線的Master主機下面挑選一個Slave將其轉換為主服務器。
  2. 讓其余所有Slave服務器復制新的Master服務器。
  3. 讓已下線的Master服務器變成新的Master服務器的Slave。當已下線的服務器在此上線后將復新的Master的數據。
(4)選舉新的主服務器

領頭Sentinel會在所有Slave中選出新的Master,發送SLAVEOF no one命令,將這個服務器確定為主服務器。

領頭Sentinel會將已下線Master的所有從服務器報錯在一個列表中,按照規則進行挑選。

  1. 刪除列表中所有處於下線或者短線狀態的Slave。
  2. 刪除列表中所有最近5s內沒有回復過領頭Sentinel的INFO命令的Slave。
  3. 刪除所有與下線Master連接斷開超過down-after-milliseconds * 10毫秒的Slave。
  4. 領頭Sentinel將根據Slave優先級,對列表中剩余的Slave進行排序,並選出其中優先級最高的Slave。如果有多個具有相同優先級的Slave,那么領頭Sentinel將按照Slave復制偏移量,選出其中偏移量最大的Slave。如果有多個優先級最高,偏移量最大的Slave,那么根據運行ID最小原則選出新的Master。

確定新的Master之后,領頭Sentinel會以每秒一次的頻率向新的Master發送SLAVEOF no one命令,當得到確切的回復role由slave變為master之后,當前服務器順利升級為Master服務器。

(5)將舊的Master變成Slave

當已下線的Master重新上線后,領頭Sentinel會向此服務器發送SLAVEOF命令,將當前服務器變成新的Master的Slave。

(6)原Master重新上線

當原Master節點重新上線后,自動轉為當前Master節點的從節點。

故障切換的例子

1.Sentinel集群包括三個sentinel節點sentinel1、sentinel2、seninel3,sentinel集群各節點之間互相監控哨兵運行狀態。

img點擊並拖拽以移動

2.Sentinel集群各節點分別與Redis主節點進行ping命令,以檢查Redis主節點的運行狀態。

3.假設Sentinel集群檢測到Redis主節點Master宕機,在指定時間內未恢復,Sentinel集群通過投票(半數原則),最終確定Sentinel leader。投票通過之后,Sentinel leader就會對Redis集群做故障轉移操作。

3.1 首先,是Sentinel集群從各slave節點中挑選一台優先級最高的slave節點提升為Master節點。

3.2,其次,新的Master節點向原Master的所有從節點發送slaveof命令,讓它們作為新Master的slave節點,並將新的Master節點數據復制數據各個slave節點上,故障轉移完成。

3.3 最后,Sentinel集群會繼續監視老的Master節點,老的Master恢復上線后,Sentinel會將它設置為新Master的slave節點。

3.4 故障轉移后的拓撲圖如下所示,在圖中,slave節點slave-1被選舉成為新的Master的節點。

img點擊並拖拽以移動

哨兵模式部署

需求

前提:已經存在一個正在運行的主從模式。另外,配置三個Sentinel實例,監控同一個Master節點。

配置Sentinel

(1)在/usr/local目錄下,創建/redis/sentinels/目錄

cd /usr/local/redis
mkdir sentinels

(2)在sentinels目錄下,依次創建s1、s2、s3三個子目錄中

cd sentinels
mkdir s1 s2 s3

(3)依次拷貝redis解壓目錄下的sentinel.conf文件,到這三個子目錄中

cp /usr/local/src/redis-5.0.4/sentinel.conf s1/
cp /usr/local/src/redis-5.0.4/sentinel.conf s2/
cp /usr/local/src/redis-5.0.4/sentinel.conf s3/

(4)依次修改s1、s2、s3子目錄中的sentinel.conf文件,修改端口,並指定要監控的主節點。(從節點不需要指定,sentinel會自動識別)

# s1 哨兵配置
port 26379
sentinel monitor mymaster 127.0.0.1 6380 2 

# mymaster為主節點別名,127.0.0.1為主節點IP,6380為主節點端口,2為觸發故障切換的最少哨兵數量

# s2 哨兵配置
port 26380
sentinel monitor mymaster 127.0.0.1 6380 2

# s3 哨兵配置
port 26381
sentinel monitor mymaster 127.0.0.1 6380 2

(5)再打開三個shell窗口,在每一個窗口中,啟動一個哨兵實例,並觀察日志輸出

[root@node0719 sentinels]# redis-sentinel ./s1/sentinel.conf

[root@node0719 sentinels]# redis-sentinel ./s2/sentinel.conf

[root@node0719 sentinels]# redis-sentinel ./s3/sentinel.conf

哨兵模式測試

(1)先關閉6380節點。發現,確實重新指定了一個主節點

(2)再次上線6380節點。發現,6380節點成為了新的主節點的從節點。

哨兵模式建議:

1 如果監控同一業務,可以選擇一套 Sentinel 集群監控多組 Redis 集群方案

2 sentinel monitor配置中的建議設置成 Sentinel 節點的一半加 1,當 Sentinel 部署在多個 IDC 的時候,單個 IDC 部署的 Sentinel 數量不建議超過(Sentinel 數量 – quorum)。

3 部署的各個節點服務器時間盡量要同步,否則日志的時序性會混亂。

4 Redis 建議使用 pipeline 和 multi-keys 操作,減少 RTT 次數,提高請求效率。

哨兵模式優點

Sentinel哨兵模式,確實能實現自動故障切換。提供穩定的服務

哨兵模式缺點:

  1. 是一種中心化的集群實現方案:

    始終只有一個Redis主機來接收和處理寫請求,寫操作受單機瓶頸影響。

  2. 集群里所有節點保存的都是全量數據,浪費內存空間

    沒有真正實現分布式存儲。數據量過大時,主從同步嚴重影響master的性能。

  3. 故障轉移期間不能寫

    Redis主機宕機后,哨兵模式正在投票選舉的情況之外,因為投票選舉結束之前,誰也不知道主機和從機是誰,此時Redis也會開啟保護機制,禁止寫操作,直到選舉出了新的Redis主機。

客戶端分片的集群模式

客戶端分片主要是說,我們只需要部署多個Redis節點,具體如何使用這些節點,主要工作在客戶端。

客戶端分片是把分片的邏輯放在Redis客戶端實現,客戶端通過固定的Hash算法,針對不同的key計算對應的Hash值,然后對不同的Redis節點進行讀寫。

比如:jedis已支持Redis Sharding功能,即ShardedJedis,通過Redis客戶端預先定義好的路由規則(使用一致性哈希),把對Key的訪問轉發到不同的Redis實例中,查詢數據時把返回結果匯集。這種方案的模式如圖所示。

img

客戶端分片集群模式 需要業務開發人員事先評估業務的請求量和數據量,然后讓DBA部署足夠的節點交給開發人員使用即可。

特點

這實際上是一種靜態分片技術。Redis 實例的增減,都得手工調整分片程序。

基於此分片機制的開源產品,現在仍不多見。

優點

1 部署非常方便,業務需要多少個節點DBA直接部署交付即可,剩下的事情就需要業務開發人員根據節點數量來編寫key的請求路由邏輯,制定一個規則,一般采用固定的Hash算法,把不同的key寫入到不同的節點上,然后再根據這個規則進行數據讀取。

2 性能較好。這種分片機制的性能比代理式更好(少了一個中間分發環節)。

3 解決了主從模式的分布式存儲問題,數據不在集中在一個節點上。 通過分片的方式,存儲在多個節點上。

缺點

1 業務開發人員使用Redis的成本較高,需要編寫路由規則的代碼來使用多個節點,而且如果事先對業務的數據量評估不准確,后期的擴容和遷移成本非常高,因為節點數量發生變更后,Hash算法對應的節點也就不再是之前的節點了。客戶端代碼升級麻煩,對研發人員的個人依賴性強——需要有較強的程序開發能力做后盾。如果主力程序員離職,可能新的負責人,會選擇重寫一遍。

所以后來又衍生出了一致性哈希算法,就是為了解決當節點數量變更時,盡量減少數據的遷移和性能問題。

2 對數據業務數據量比較穩定.。這種客戶端分片的方案一般用於業務數據量比較穩定,后期不會有大幅度增長的業務場景下使用,只需要前期評估好業務數據量即可。

3 可運維性較差。出現故障,定位和解決都得研發和運維配合着解決,故障時間變長。這種方案,難以進行標准化運維,不太適合中小公司(除非有足夠的 DevOPS)。

服務端分片模式(redis集群模式)

Redis 的哨兵模式雖然已經可以實現高可用,讀寫分離 ,但是存在幾個方面的不足:

  • 哨兵模式下每台 Redis 服務器都存儲相同的數據,很浪費內存空間;數據量太大,主從同步時嚴重影響了master性能。
  • 哨兵模式是中心化的集群實現方案,每個從機和主機的耦合度很高,master宕機到salve選舉master恢復期間服務不可用。
  • 哨兵模式始終只有一個Redis主機來接收和處理寫請求,寫操作還是受單機瓶頸影響,沒有實現真正的分布式架構。

Redis Cluster(Redis集群)簡介

RedisCluster 是 Redis 的親兒子,它是 Redis 作者自己提供的 Redis 集群化方案。 相對於 Codis 的不同,它是去中心化的,如圖所示,該集群有三個 Redis 節點組成, 每個節點負責整個集群的一部分數據,每個節點負責的數據多少可能不一樣。這三個節點相 互連接組成一個對等的集群,它們之間通過一種特殊的二進制協議相互交互集群信息。

redis在3.0上加入了 Cluster 集群模式,實現了 Redis 的分布式存儲,也就是說每台 Redis 節點上存儲不同的數據。cluster模式為了解決單機Redis容量有限的問題,將數據按一定的規則分配到多台機器,內存/QPS不受限於單機,可受益於分布式集群高擴展性。

Redis Cluster 將所有數據划分為 16384 的 slots,它比 Codis 的 1024 個槽划分得更為精細,每個節點負責其中一部分槽位。槽位的信息存儲於每個節點中,它不像 Codis,它不 需要另外的分布式存儲來存儲節點槽位信息。

Redis Cluster是一種服務器Sharding技術(分片和路由都是在服務端實現),采用多主多從,每一個分區都是由一個Redis主機和多個從機組成,片區和片區之間是相互平行的。

Redis Cluster集群采用了P2P的模式,完全去中心化。

img

如上圖,官方推薦,集群部署至少要 3 台以上的master節點,最好使用 3 主 3 從六個節點的模式。

集群搭建需要的環境

Redis集群 原理與實戰 - 圖解- 秒懂

Redis集群數據的分片

Redis集群不是使用一致性哈希,而是使用哈希槽。整個redis集群有16384個哈希槽,決定一個key應該分配到那個槽的算法是:計算該key的CRC16結果再模16834。

集群中的每個節點負責一部分哈希槽,比如集群中有3個節點,則:

  • 節點A存儲的哈希槽范圍是:0 – 5500
  • 節點B存儲的哈希槽范圍是:5501 – 11000
  • 節點C存儲的哈希槽范圍是:11001 – 16384

這樣的分布方式方便節點的添加和刪除。比如,需要新增一個節點D,只需要把A、B、C中的部分哈希槽數據移到D節點。同樣,如果希望在集群中刪除A節點,只需要把A節點的哈希槽的數據移到B和C節點,當A節點的數據全部被移走后,A節點就可以完全從集群中刪除。

因為把哈希槽從一個節點移到另一個節點是不需要停機的,所以,增加或刪除節點,或更改節點上的哈希槽,也是不需要停機的。

如果多個key都屬於一個哈希槽,集群支持通過一個命令(或事務, 或lua腳本)同時操作這些key。通過“哈希標簽”的概念,用戶可以讓多個key分配到同一個哈希槽。哈希標簽在集群詳細文檔中有描述,這里做個簡單介紹:如果key含有大括號”{}”,則只有大括號中的字符串會參與哈希,比如”this{foo}”和”another{foo}”這2個key會分配到同一個哈希槽,所以可以在一個命令中同時操作他們。

槽位為什么是16384(2^14)個?

分片SLOT的計算公式

SLOT=CRC16.crc16(key.getBytes()) % MAX_SLOT

在這里插入圖片描述

但是可能這個槽並不歸隨機找的這個節點管,節點如果發現不歸自己管,就會返回一個MOVED ERROR通知,引導客戶端去正確的節點訪問,這個時候客戶端就會去正確的節點操作數據。

在這里插入圖片描述

對於客戶端請求的key,根據公式HASH_SLOT=CRC16(key) mod 16384,計算出映射到哪個分片上,然后Redis會去相應的節點進行操作!

CRC16算法產生的hash值有16bit,該算法可以產生2^16-=65536個值。換句話說,值是分布在0~65535之間。那作者在做mod運算的時候,為什么不mod 65536,而選擇 mod 16384?

在redis節點發送心跳包時需要把所有的槽放到這個心跳包里,以便讓節點知道當前集群信息,

img點擊並拖拽以移動

交換的數據信息,由消息體和消息頭組成。消息體無外乎是一些節點標識啊,IP啊,端口號啊,發送時間啊。這與本文關系不是太大,我不細說。我們來看消息頭,結構如下

img點擊並拖拽以移動

消息頭里面有個myslots的char數組,長度為16383/8,這其實是一個bitmap,每一個位代表一個槽,如果該位為1,表示這個槽是屬於這個節點的。在消息頭中,最占空間的是myslots[CLUSTER_SLOTS/8]

這塊的大小是:
16384÷8÷1024=2kb

16384=16k,在發送心跳包時使用char進行bitmap壓縮后是2k(2 * 8 (8 bit) * 1024(1k) = 16K)個char,也就是說使用2k個char的空間創建了16k的槽數。

雖然使用CRC16算法最多可以分配65535(2^16-1)個槽位,65535=65k,壓縮后就是8k(8 * 8 (8 bit) * 1024(1k) =65K),也就是說需要需要8k的心跳包,作者認為這樣做不太值得;

集群節點越多,心跳包的消息體內攜帶的數據越多。如果節點過1000個,也會導致網絡擁堵。因此redis作者,不建議redis cluster節點數量超過1000個。
那么,對於節點數在1000以內的redis cluster集群,16384個槽位夠用了。沒有必要拓展到65536個。並且一般情況下一個redis集群不會有超過1000個master節點,所以16k的槽位是個比較合適的選擇。

Redis集群的一致性保證

Redis集群不能保證強一致性。一些已經向客戶端確認寫成功的操作,會在某些不確定的情況下丟失。

產生寫操作丟失的第一個原因,是因為主從節點之間使用了異步的方式來同步數據。

一個寫操作是這樣一個流程:

  • 1)客戶端向主節點B發起寫的操作

  • 2)主節點B回應客戶端寫操作成功

  • 3)主節點B向它的從節點B1,B2,B3同步該寫操作

從上面的流程可以看出來,主節點B並沒有等從節點B1,B2,B3寫完之后再回復客戶端這次操作的結果。所以,如果主節點B在通知客戶端寫操作成功之后,但同步給從節點之前,主節點B故障了,其中一個沒有收到該寫操作的從節點會晉升成主節點,該寫操作就這樣永遠丟失了。

就像傳統的數據庫,在不涉及到分布式的情況下,它每秒寫回磁盤。為了提高一致性,可以在寫盤完成之后再回復客戶端,但這樣就要損失性能。這種方式就等於Redis集群使用同步復制的方式。

基本上,在性能和一致性之間,需要一個權衡。

如果真的需要,Redis集群支持同步復制的方式,通過WAIT指令來實現,這可以讓丟失寫操作的可能性降到很低。但就算使用了同步復制的方式,Redis集群依然不是強一致性的,在某些復雜的情況下,比如從節點在與主節點失去連接之后被選為主節點,不一致性還是會發生。

這種不一致性發生的情況是這樣的,當客戶端與少數的節點(至少含有一個主節點)網絡聯通,但他們與其他大多數節點網絡不通。比如6個節點,A,B,C是主節點,A1,B1,C1分別是他們的從節點,一個客戶端稱之為Z1。

當網絡出問題時,他們被分成2組網絡,組內網絡聯通,但2組之間的網絡不通,假設A,C,A1,B1,C1彼此之間是聯通的,另一邊,B和Z1的網絡是聯通的。Z1可以繼續往B發起寫操作,B也接受Z1的寫操作。當網絡恢復時,如果這個時間間隔足夠短,集群仍然能繼續正常工作。如果時間比較長,以致B1在大多數的這邊被選為主節點,那剛才Z1發給B的寫操作都將丟失。

注意,Z1給B發送寫操作是有一個限制的,如果時間長度達到了大多數節點那邊可以選出一個新的主節點時,少數這邊的所有主節點都不接受寫操作。

這個時間的配置,稱之為節點超時(node timeout),對集群來說非常重要,當達到了這個節點超時的時間之后,主節點被認為已經宕機,可以用它

服務端分片Redis 集群的缺點

另外,官方 Redis 集群方案將 Sentinel 功能內置到 Redis 內,這導致在節點數較多(大於 100)時在 Gossip 階段會產生大量的 PING/INFO/CLUSTER INFO 流量,根據 issue 中提到的情況,200 個使用 3.2.8 版本節點搭建的 Redis 集群,在沒有任何客戶端請求的情況下,每個節點仍然會產生 40Mb/s 的流量,雖然到后期 Redis 官方嘗試對其進行壓縮修復,但按照 Redis 集群機制,節點較多的情況下無論如何都會產生這部分流量,對於使用大內存機器但是使用千兆網卡的用戶這是一個值得注意的地方。

最后,每個 Key 對應的 Slot 的存儲開銷,在規模較大的時候會占用較多內存,4.x 版本以前甚至會達到實際使用內存的數倍,雖然 4.x 版本使用 rax 結構進行存儲,但是仍然占據了大量內存,從非官方集群方案遷移到官方集群方案時,需要注意這部分多出來的內存。

還有,官方集群方案的高可用策略僅有主從一種,高可用級別跟 Slave 的數量成正相關,如果只有一個 Slave,則只能允許一台物理機器宕機, Redis 4.2 roadmap 提到了 cache-only mode,提供類似於 Twemproxy 的自動剔除后重分片策略,但是截至目前仍未實現。

Redis Cluster集群具有如下幾個特點

  • 集群完全去中心化,采用多主多從;所有的redis節點彼此互聯(PING-PONG機制),內部使用二進制協議優化傳輸速度和帶寬。
  • 客戶端與 Redis 節點直連,不需要中間代理層。客戶端不需要連接集群所有節點,連接集群中任何一個可用節點即可。
  • 每一個分區都是由一個Redis主機和多個從機組成,分片和分片之間是相互平行的。
  • 每一個master節點負責維護一部分槽,以及槽所映射的鍵值數據;集群中每個節點都有全量的槽信息,通過槽每個node都知道具體數據存儲到哪個node上。

redis cluster主要是針對海量數據+高並發+高可用的場景,海量數據,如果你的數據量很大,那么建議就用redis cluster,數據量不是很大時,使用sentinel就夠了。redis cluster的性能和高可用性均優於哨兵模式

Redis Cluster采用虛擬哈希槽分區而非一致性hash算法,預先分配一些卡槽,所有的鍵根據哈希函數映射到這些槽內,每一個分區內的master節點負責維護一部分槽以及槽所映射的鍵值數據。關於Redis Cluster的詳細實現原理請參考:Redis Cluster數據分片實現原理

優點:

  1. 無中心架構;
  2. 數據按照 slot 存儲分布在多個節點,節點間數據共享,可動態調整數據分布;
  3. 可擴展性:可線性擴展到 1000 多個節點,節點可動態添加或刪除;
  4. 高可用性:部分節點不可用時,集群仍可用。通過增加 Slave 做 standby 數據副本,能夠實現故障自動 failover,節點之間通過 gossip 協議交換狀態信息,用投票機制完成 Slave 到 Master 的角色提升;
  5. 降低運維成本,提高系統的擴展性和可用性。

缺點:

  1. Client 實現復雜,驅動要求實現 Smart Client,緩存 slots mapping 信息並及時更新,提高了開發難度,客戶端的不成熟影響業務的穩定性。目前僅 JedisCluster 相對成熟,異常處理部分還不完善,比如常見的“max redirect exception”。
  2. 節點會因為某些原因發生阻塞(阻塞時間大於 clutser-node-timeout),被判斷下線,這種 failover 是沒有必要的。
  3. 數據通過異步復制,不保證數據的強一致性。
  4. 多個業務使用同一套集群時,無法根據統計區分冷熱數據,資源隔離性較差,容易出現相互影響的情況。
  5. Slave 在集群中充當“冷備”,不能緩解讀壓力,當然可以通過 SDK 的合理設計來提高 Slave 資源的利用率。
  6. Key 批量操作限制,如使用 mset、mget 目前只支持具有相同 slot 值的 Key 執行批量操作。對於映射為不同 slot 值的 Key 由於 Keys 不支持跨 slot 查詢,所以執行 mset、mget、sunion 等操作支持不友好。
  7. Key 事務操作支持有限,只支持多 key 在同一節點上的事務操作,當多個 Key 分布於不同的節點上時無法使用事務功能。
  8. Key 作為數據分區的最小粒度,不能將一個很大的鍵值對象如 hash、list 等映射到不同的節點。
  9. 不支持多數據庫空間,單機下的 redis 可以支持到 16 個數據庫,集群模式下只能使用 1 個數據庫空間,即 db 0。
  10. 復制結構只支持一層,從節點只能復制主節點,不支持嵌套樹狀復制結構。
  11. 避免產生 hot-key,導致主庫節點成為系統的短板。
  12. 避免產生 big-key,導致網卡撐爆、慢查詢等。
  13. 重試時間應該大於 cluster-node-time 時間。
  14. Redis Cluster 不建議使用 pipeline 和 multi-keys 操作,減少 max redirect 產生的場景。

代理分片 集群模式

這種方案,將分片工作交給專門的代理程序來做。代理程序接收到來自業務程序的數據請求,根據路由規則,將這些請求分發給正確的 Redis 實例並返回給業務程序。

其基本原理是:通過中間件的形式,Redis客戶端把請求發送到代理 proxy,代理 proxy 根據路由規則發送到正確的Redis實例,最后 代理 proxy 把結果匯集返回給客戶端。

redis代理分片用得最多的就是Twemproxy,由Twitter開源的Redis代理,其基本原理是:通過中間件的形式,Redis客戶端把請求發送到Twemproxy,Twemproxy根據路由規則發送到正確的Redis實例,最后Twemproxy把結果匯集返回給客戶端。

img

這種機制下,一般會選用第三方代理程序(而不是自己研發),因為后端有多個 Redis 實例,所以這類程序又稱為分布式中間件。

這樣的好處是,業務程序不用關心后端 Redis 實例,運維起來也方便。雖然會因此帶來些性能損耗,但對於 Redis 這種內存讀寫型應用,相對而言是能容忍的。

Twemproxy 代理分片

Twemproxy 是一個 Twitter 開源的一個 redis 和 memcache 快速/輕量級代理服務器; Twemproxy 是一個快速的單線程代理程序,支持 Memcached ASCII 協議和 redis 協議。

Twemproxy是由Twitter開源的集群化方案,它既可以做Redis Proxy,還可以做Memcached Proxy。

它的功能比較單一,只實現了請求路由轉發,沒有像Codis那么全面有在線擴容的功能,它解決的重點就是把客戶端分片的邏輯統一放到了Proxy層而已,其他功能沒有做任何處理。

img

Tweproxy推出的時間最久,在早期沒有好的服務端分片集群方案時,應用范圍很廣,而且性能也極其穩定。

但它的痛點就是無法在線擴容、縮容,這就導致運維非常不方便,而且也沒有友好的運維UI可以使用。

Twemproxy的優點:

  • 客戶端像連接Redis實例一樣連接Twemproxy,不需要改任何的代碼邏輯。
  • 支持無效Redis實例的自動刪除。
  • Twemproxy與Redis實例保持連接,減少了客戶端與Redis實例的連接數。
  • 后端 Sharding 分片邏輯對業務透明,業務方的讀寫方式和操作單個 Redis 一致
  • 多種 hash 算法:MD5、CRC16、CRC32、CRC32a、hsieh、murmur

Twemproxy的不足:

  • 由於Redis客戶端的每個請求都經過Twemproxy代理才能到達Redis服務器,這個過程中會產生性能損失。

  • 沒有友好的監控管理后台界面,不利於運維監控。

  • Twemproxy最大的痛點在於,無法平滑地擴容/縮容。對於運維人員來說,當因為業務需要增加Redis實例時工作量非常大。

  • 增加了新的 proxy,需要維護其高可用。

Twemproxy作為最被廣泛使用、最久經考驗、穩定性最高的Redis代理,在業界被廣泛使用。

知乎的redis集群

知乎存儲平台團隊基於開源Redis 組件打造的知乎 Redis 平台,經過不斷的研發迭代,目前已經形成了一整套完整自動化運維服務體系,提供很多強大的功能。

  • 1)機器內存總量約 70TB,實際使用內存約 40TB;
  • 2)平均每秒處理約 1500 萬次請求,峰值每秒約 2000 萬次請求;
  • 3)每天處理約 1 萬億余次請求;
  • 4)單集群每秒處理最高每秒約 400 萬次請求;
  • 5)集群實例與單機實例總共約 800 個;
  • 6)實際運行約 16000 個 Redis 實例;
  • 7)Redis 使用官方 3.0.7 版本,少部分實例采用 4.0.11 版本。

知乎的Redis應用類型

根據業務的需求,我們將Redis實例區分為單機(Standalone)和集群(Cluster)兩種類型,

  • 單機實例通常用於容量與性能要求不高的小型存儲,

  • 集群則用來應對對性能和容量要求較高的場景。

這里僅僅介紹知乎的Twemproxy 代理Redis集群實踐情況。

知乎的Twemproxy 代理Redis集群

由 Twitter 開源的 Twemproxy 具有如下特點:
  • 1)單核模型造成性能瓶頸;
  • 2)傳統擴容模式僅支持停機擴容。
知乎的Twemproxy 代理Redis集群

知乎對Twemproxy 進行了局部的定制。其總體方案如下:

在方案早期使用數量固定的物理機部署 Twemproxy,通過物理機上的 Agent 啟動實例(對 Twemproxy 進行健康檢查與故障恢復),由於 Twemproxy 僅提供全量的使用計數,所以 Agent 運行時還會進行定時的差值計算來計算 Twemproxy 的 requests_per_second 等指標。

后來為了更好地故障檢測和資源調度,知乎的Twemproxy 引入了 Kubernetes,將 Twemproxy 和 Agent 放入同一個 Pod 的兩個容器內,底層 Docker 網段的配置使每個 Pod 都能獲得獨立的 IP,方便管理。

早期使用dns進行負載均衡

我們使用 DNS A Record 來進行客戶端的資源發現,每個 Twemproxy 采用相同的端口號,一個 DNS A Record 后面掛接多個 IP 地址對應多個 Twemproxy 實例。 初期,這種方案簡單易用,但是到了后期流量日益上漲,單集群 Twemproxy 實例個數很快就超過了 20 個。

由於 DNS 采用的 UDP 協議有 512 字節的包大小限制,單個 A Record 只能掛接 20 個左右的 IP 地址,超過這個數字就會轉換為 TCP 協議,客戶端不做處理就會報錯,導致客戶端啟動失敗。

知乎的端口復用的Twemproxy 部署

之后知乎修改了 Twemproxy 源碼, 加入 SO_REUSEPORT 支持。

通過端口復用,實現同一個容器內由 Starter 啟動多個 Twemproxy 實例並綁定到同一個端口,由操作系統進行負載均衡,對外仍然暴露一個端口,但是內部已經由操作系統將訪問均攤到了多個 Twemproxy 上。

Twemproxy with SO_REUSEPORT on Kubernetes:

img點擊並拖拽以移動

同一個容器內由 Starter 啟動多個 Twemproxy 實例並綁定到同一個端口,由操作系統進行負載均衡,對外仍然暴露一個端口,但是內部已經由系統均攤到了多個 Twemproxy 上。

同時 Starter 會定時去每個 Twemproxy 的 stats 端口獲取 Twemproxy 運行狀態進行聚合,此外 Starter 還承載了信號轉發的職責。

原有的 Agent 不需要用來啟動 Twemproxy 實例,所以 Monitor 調用 Starter 獲取聚合后的 stats 信息進行差值計算,最終對外界暴露出實時的運行狀態信息。

Linux的SO_REUSEPORT特性

1、前言

  昨天總結了一下Linux下網絡編程“驚群”現象,給出Nginx處理驚群的方法,使用互斥鎖。為例發揮多核的優勢,目前常見的網絡編程模型就是多進程或多線程,根據accpet的位置,分為如下場景:

  (1)單進程或線程創建socket,並進行listen和accept,接收到連接后創建進程和線程處理連接

  (2)單進程或線程創建socket,並進行listen,預先創建好多個工作進程或線程accept()在同一個服務器套接字、

在多核時代,一般主流的web服務器都使用 SO_REUSEADDR模式。 以下是比較典型的多進程/多線程服務器模型。

7d82ae846eb6955ee29953c1180cb39f.png

這兩種模型解充分發揮了多核CPU的優勢,雖然可以做到線程和CPU核綁定,但都會存在:

  • 單一listener工作進程胡線程在高速的連接接入處理時會成為瓶頸
  • 多個線程之間競爭獲取服務套接字
  • 緩存行跳躍
  • 很難做到CPU之間的負載均衡
  • 隨着核數的擴展,性能並沒有隨着提升

Linux kernel 3.9帶來了SO_REUSEPORT特性

Linux kernel 3.9帶來了SO_REUSEPORT特性,可以解決以上大部分問題。

首先需要單線程listen一個端口上,然后由多個工作進程/線程去accept()在同一個服務器套接字上。

第一個性能瓶頸,單線程listener,在處理高速率海量連接時,一樣會成為瓶頸

第二個性能瓶頸,多線程訪問server socket鎖競爭嚴重。

那么怎么解決? 這里先別扯什么分布式調度,集群xxx的 , 就拿單機來說問題。在Linux kernel 3.9帶來了SO_REUSEPORT特性,她可以解決上面(單進程listen,多工作進程accept() )的問題.

4407d33c228ace2cf42569680286da9b.png

看圖說話,對比SO_REUSADDR的模型,我想你應該看懂SO_REUSEPORT是個什么東西了。 SO_REUSEPORT是支持多個進程或者線程綁定到同一端口,提高服務器程序的吞吐性能,具體來說解決了下面的幾個問題:

允許多個套接字 bind()/listen() 同一個TCP/UDP端口

每一個線程擁有自己的服務器套接字

在服務器套接字上沒有了鎖的競爭,因為每個進程一個服務器套接字

內核層面實現負載均衡

安全層面,監聽同一個端口的套接字只能位於同一個用戶下面

SO_REUSEADDR和SO_REUSEPORT的區別。

SO_REUSEADDR提供如下四個功能:

SO_REUSEADDR允許啟動一個監聽服務器並捆綁其眾所周知端口,即使以前建立的將此端口用做他們的本地端口的連接仍存在。這通常是重啟監聽服務器時出現,若不設置此選項,則bind時將出錯。

SO_REUSEADDR允許在同一端口上啟動同一服務器的多個實例,只要每個實例捆綁一個不同的本地IP地址即可。對於TCP,我們根本不可能啟動捆綁相同IP地址和相同端口號的多個服務器。

SO_REUSEADDR允許單個進程捆綁同一端口到多個套接口上,只要每個捆綁指定不同的本地IP地址即可。這一般不用於TCP服務器。

SO_REUSEADDR允許完全重復的捆綁:當一個IP地址和端口綁定到某個套接口上時,還允許此IP地址和端口捆綁到另一個套接口上。一般來說,這個特性僅在支持多播的系統上才有,而且只對UDP套接口而言(TCP不支持多播)。

SO_REUSEPORT選項有如下語義:

此選項允許完全重復捆綁,但僅在想捆綁相同IP地址和端口的套接口都指定了此套接口選項才行。

如果被捆綁的IP地址是一個多播地址,則SO_REUSEADDR和SO_REUSEPORT等效。

SO_REUSEPORT解決了什么問題

SO_REUSEPORT支持多個進程或者線程綁定到同一端口,提高服務器程序的性能,解決的問題:

  • 允許多個套接字 bind()/listen() 同一個TCP/UDP端口
    • 每一個線程擁有自己的服務器套接字
    • 在服務器套接字上沒有了鎖的競爭
  • 內核層面實現負載均衡
  • 安全層面,監聽同一個端口的套接字只能位於同一個用戶下面

其核心的實現主要有三點:

  • 擴展 socket option,增加 SO_REUSEPORT 選項,用來設置 reuseport。
  • 修改 bind 系統調用實現,以便支持可以綁定到相同的 IP 和端口
  • 修改處理新建連接的實現,查找 listener 的時候,能夠支持在監聽相同 IP 和端口的多個 sock 之間均衡選擇。

有了SO_RESUEPORT后,每個進程可以自己創建socket、bind、listen、accept相同的地址和端口,各自是獨立平等的。讓多進程監聽同一個端口,各個進程中accept socket fd不一樣,有新連接建立時,內核只會喚醒一個進程來accept,並且保證喚醒的均衡性。

原有的 Agent 不需要用來啟動 Twemproxy 實例,所以 Monitor 調用 Starter 獲取聚合后的 stats 信息進行差值計算,最終對外界暴露出實時的運行狀態信息。

1)MIGRATE 造成的阻塞問題:
MIGRATE
調研后發現,MIGRATE 命令實現分為三個階段:

對於小 Key,該時間可以忽略不計,但如果一旦 Key 的內存使用過大,一個 MIGRATE 命令輕則導致P95 尖刺,重則直接觸發集群內的 Failover,造成不必要的切換

同樣,方案初期時的 Codis 采用的是相同的 MIGRATE 方案,但是使用 Proxy 控制 Redis 進行遷移操作而非第三方腳本(如 redis-trib.rb),基於同步的類似 MIGRATE 的命令,實際跟 Redis 官方集群方案存在同樣的問題。

對此,Redis 作者在 Redis 4.2 的 中提到了 Non blocking MIGRATE 但是截至目前,Redis 5.0 即將正式發布,仍未看到有關改動,社區中已經有相關的 ,該功能可能會在 5.2 或者 6.0 之后並入 master 分支,對此我們將持續觀望。

什么是P95 尖刺

假設有100個請求,按照響應時間從小到大排列,位置為X的值,即為PX值。

P1就是響應時間最小的請求,P10就是排名第十的請求,P100就是響應時間最長的請求。

在真正使用過程中,最常用的主要有P50(中位數)、P95、P99。

P50: 即中位數值。100個請求按照響應時間從小到大排列,位置為50的值,即為P50值。如果響應時間的P50值為200ms,代表我們有半數的用戶響應耗時在200ms之內,有半數的用戶響應耗時大於200ms。如果你覺得中位數值不夠精確,那么可以使用P95和P99.9

P95:響應耗時從小到大排列,順序處於95%位置的值即為P95值。

P99.9:許多大型的互聯網公司會采用P99.9值,也就是99.9%用戶耗時作為指標,意思就是1000個用戶里面,999個用戶的耗時上限,通過測量與優化該值,就可保證絕大多數用戶的使用體驗。 至於P99.99值,優化成本過高,而且服務響應由於網絡波動、系統抖動等不能解決之情況,因此大多數時候都不考慮該指標。

P95、P99.9百分位數值——服務響應時間的重要衡量指標

平均值之所以會成為大多數人使用衡量指標,其原因主要在於他的計算非常簡單。請求的總耗時/請求總數量就可以得到平均值。而P值的計算則相對麻煩一些。

按照傳統的方式,計算P值需要將響應耗時從小到大排序,然后取得對應百分位之值。

下面是一個供參考的:響應延遲折線圖

img點擊並拖拽以移動

需要客戶端自己完成讀寫分離

由於 Twemproxy 僅進行高性能的命令轉發,不進行讀寫分離,所以默認沒有讀寫分離功能,而在實際使用過程中,知乎團隊也沒有遇到集群讀寫分離的需求。

如果要進行讀寫分離,也有解決放哪,可以使用資源發現策略,在 Slave 節點上架設 Twemproxy 集群,由客戶端進行讀寫分離的路由。

img點擊並拖拽以移動

知乎Redis實例的擴容實踐

靜態擴容

對於單機實例,如果通過調度器觀察到對應的機器仍然有空閑的內存,僅需直接調整實例的 maxmemory 配置與報警即可。同樣,對於集群實例, 通過調度器觀察每個節點所在的機器,如果所有節點所在機器均有空閑內存,像擴容單機實例一樣直接更新 maxmemory 與報警。

動態擴容

但是當機器空閑內存不夠,或單機實例與集群的后端實例過大時,無法直接擴容,需要進行動態擴容:

1)對於單機實例,如果單實例超過 30GB 且沒有如 sinterstore 之類的多 Key 操作, 會將其擴容為集群實例;

2)對於集群實例,會進行橫向的重分片,稱之為 Resharding 過程。

Resharding 過程:

知乎技術分享:從單機到2000萬QPS並發的Redis高性能緩存實踐之路

原生 Twemproxy 集群方案並不支持擴容,知乎團隊開發了數據遷移工具來進行 Twemproxy 的擴容,遷移工具本質上是一個上下游之間的代理,將數據從上游按照新的分片方式搬運到下游。

原生 Redis 主從同步使用 SYNC/PSYNC 命令建立主從連接,收到 SYNC 命令的 Master 會 fork 出一個進程遍歷內存空間生成 RDB 文件並發送給 Slave,期間所有發送至 Master 的寫命令在執行的同時都會被緩存到內存的緩沖區內,當 RDB 發送完成后,Master 會將緩沖區內的命令及之后的寫命令轉發給 Slave 節點。

開發的遷移代理會向上游發送 SYNC 命令模擬上游實例的 Slave,代理收到 RDB 后進行解析,由於 RDB 中每個 Key 的格式與 RESTORE 命令的格式相同,所以我們使用生成 RESTORE 命令按照下游的 Key 重新計算哈希並使用 Pipeline 批量發送給下游。

等待 RDB 轉發完成后,我們按照新的后端生成新的 Twemproxy 配置,並按照新的 Twemproxy 配置建立 Canary 實例,從上游的 Redis 后端中取 Key 進行測試,測試 Resharding 過程是否正確,測試過程中的 Key 按照大小,類型,TTL 進行比較。

測試通過后,對於集群實例,使用生成好的配置替代原有 Twemproxy 配置並 restart/reload Twemproxy 代理,知乎團隊修改了 Twemproxy 代碼,加入了 config reload 功能,但是實際使用中發現直接重啟實例更加可控。而對於單機實例,由於單機實例和集群實例對於命令的支持不同,通常需要和業務方確定后手動重啟切換。

由於 Twemproxy 部署於 Kubernetes ,可以實現細粒度的灰度,如果客戶端接入了讀寫分離,可以先將讀流量接入新集群,最終接入全部流量。

這樣相對於 Redis 官方集群方案,除在上游進行 BGSAVE 時的 fork 復制頁表時造成的尖刺以及重啟時造成的連接閃斷,其余對於 Redis 上游造成的影響微乎其微。

這樣擴容存在的問題:

1)對上游發送 SYNC 后,上游 fork 時會造成尖刺:

  • 對於存儲實例,使用 Slave 進行數據同步,不會影響到接收請求的 Master 節點;

  • 對於緩存實例,由於沒有 Slave 實例,該尖刺無法避免,如果對於尖刺過於敏感,可以跳過 RDB 階段,直接通過 PSYNC 使用最新的 SET 消息建立下游的緩存。

2)切換過程中有可能寫到下游,而讀在上游:

  • 對於接入了讀寫分離的客戶端,會先切換讀流量到下游實例,再切換寫流量。

3)一致性問題,兩條具有先后順序的寫同一個 Key 命令在切換代理后端時會通過 1)寫上游同步到下游 2)直接寫到下游兩種方式寫到下游,此時,可能存在應先執行的命令卻通過 1)執行落后於通過 2)執行,導致命令先后順序倒置:

  • 這個問題在切換過程中無法避免,好在絕大部分應用沒有這種問題,如果無法接受,只能通過上游停寫排空 Resharding 代理保證先后順序;

  • 官方 Redis 集群方案和 Codis 會通過 blocking 的 migrate 命令來保證一致性,不存在這種問題。

實際使用過程中,如果上游分片安排合理,可實現數千萬次每秒的遷移速度,1TB 的實例 Resharding 只需要半小時左右。另外,對於實際生產環境來說,提前做好預期規划比遇到問題緊急擴容要快且安全得多。

知乎為什么沒有使用官方 Redis 集群方案

在 2015 年調研過多種集群方案,綜合評估多種方案后,最終選擇了看起來較為陳舊的 Twemproxy 而不是官方 Redis 集群方案與 Codis,具體原因如下:

1)MIGRATE 造成的阻塞問題:

Redis 官方集群方案使用 CRC16 算法計算哈希值並將 Key 分散到 16384 個 Slot 中,由使用方自行分配 Slot 對應到每個分片中,擴容時由使用方自行選擇 Slot 並對其進行遍歷,對 Slot 中每一個 Key 執行 MIGRATE 命令進行遷移。

調研后發現,MIGRATE 命令實現分為三個階段:

a)DUMP 階段:由源實例遍歷對應 Key 的內存空間,將 Key 對應的 Redis Object 序列化,序列化協議跟 Redis RDB 過程一致;

b)RESTORE 階段:由源實例建立 TCP 連接到對端實例,並將 DUMP 出來的內容使用 RESTORE 命令到對端進行重建,新版本的 Redis 會緩存對端實例的連接;

c)DEL 階段(可選):如果發生遷移失敗,可能會造成同名的 Key 同時存在於兩個節點,此時 MIGRATE 的 REPLACE 參數決定是是否覆蓋對端的同名 Key,如果覆蓋,對端的 Key 會進行一次刪除操作,4.0 版本之后刪除可以異步進行,不會阻塞主進程。

經過調研,認為這種模式MIGRATE 並不適合知乎的生產環境。

Redis 為了保證遷移的一致性, MIGRATE 所有操作都是同步操作,執行 MIGRATE 時,兩端的 Redis 均會進入時長不等的 BLOCK 狀態。對於小 Key,該時間可以忽略不計,但如果一旦 Key 的內存使用過大,一個 MIGRATE 命令輕則導致尖刺,重則直接觸發集群內的 Failover,造成不必要的切換

同時,遷移過程中訪問到處於遷移中間狀態的 Slot 的 Key 時,根據進度可能會產生 ASK 轉向,此時需要客戶端發送 ASKING 命令到 Slot 所在的另一個分片重新請求,請求時延則會變為原來的兩倍。

同樣,方案調研期間的 Codis 采用的是相同的 MIGRATE 方案,但是使用 Proxy 控制 Redis 進行遷移操作而非第三方腳本(如 redis-trib.rb),基於同步的類似 MIGRATE 的命令,實際跟 Redis 官方集群方案存在同樣的問題。

2)緩存模式下高可用方案不夠靈活:

還有,官方集群方案的高可用策略僅有主從一種,高可用級別跟 Slave 的數量成正相關,如果只有一個 Slave,則只能允許一台物理機器宕機, Redis 4.2 roadmap 提到了 cache-only mode,提供類似於 Twemproxy 的自動剔除后重分片策略,但是截至目前仍未實現。

3)內置 Sentinel 造成額外流量負載:

另外,官方 Redis 集群方案將 Sentinel 功能內置到 Redis 內,這導致在節點數較多(大於 100)時在 Gossip 階段會產生大量的 PING/INFO/CLUSTER INFO 流量,根據 issue 中提到的情況,200 個使用 3.2.8 版本節點搭建的 Redis 集群,在沒有任何客戶端請求的情況下,每個節點仍然會產生 40Mb/s 的流量,雖然到后期 Redis 官方嘗試對其進行壓縮修復,但按照 Redis 集群機制,節點較多的情況下無論如何都會產生這部分流量,對於使用大內存機器但是使用千兆網卡的用戶這是一個值得注意的地方。

4)slot 存儲開銷:

最后,每個 Key 對應的 Slot 的存儲開銷,在規模較大的時候會占用較多內存,4.x 版本以前甚至會達到實際使用內存的數倍,雖然 4.x 版本使用 rax 結構進行存儲,但是仍然占據了大量內存,從非官方集群方案遷移到官方集群方案時,需要注意這部分多出來的內存。

總之,官方 Redis 集群方案與 Codis 方案對於絕大多數場景來說都是非常優秀的解決方案,但是仔細調研發現並不是很適合集群數量較多且使用方式多樣化的知乎,

總之,場景不同側重點也會不一樣,方案也需要調整,沒有最有,只有最適合。

Codis代理分片

Codis 是一個分布式 Redis 解決方案, 對於上層的應用來說, 連接到 Codis Proxy 和連接原生的 Redis Server 沒有明顯的區別 (有一些命令不支持), 上層應用可以像使用單機的 Redis 一樣使用, Codis 底層會處理請求的轉發, 不停機的數據遷移等工作, 所有后邊的一切事情, 對於前面的客戶端來說是透明的, 可以簡單的認為后邊連接的是一個內存無限大的 Redis 服務,

現在美團、阿里等大廠已經開始用codis的集群功能了,

什么是Codis?

Twemproxy不能平滑增加Redis實例的問題帶來了很大的不便,於是豌豆莢自主研發了Codis,一個支持平滑增加Redis實例的Redis代理軟件,其基於Go和C語言開發,並於2014年11月在GitHub上開源 codis開源地址

Codis的架構圖:

img

在Codis的架構圖中,Codis引入了Redis Server Group,其通過指定一個主CodisRedis和一個或多個從CodisRedis,實現了Redis集群的高可用。當一個主CodisRedis掛掉時,Codis不會自動把一個從CodisRedis提升為主CodisRedis,這涉及數據的一致性問題(Redis本身的數據同步是采用主從異步復制,當數據在主CodisRedis寫入成功時,從CodisRedis是否已讀入這個數據是沒法保證的),需要管理員在管理界面上手動把從CodisRedis提升為主CodisRedis。

如果手動處理覺得麻煩,豌豆莢也提供了一個工具Codis-ha,這個工具會在檢測到主CodisRedis掛掉的時候將其下線並提升一個從CodisRedis為主CodisRedis。

Codis的預分片

Codis中采用預分片的形式,啟動的時候就創建了1024個slot,1個slot相當於1個箱子,每個箱子有固定的編號,范圍是1~1024。

Codis的分片算法

Codis proxy 代理通過一種算法把要操作的key經過計算后分配到各個組中,這個過程叫做分片。
在這里插入圖片描述

在Codis里面,它把所有的key分為1024個槽,每一個槽位都對應了一個分組,具體槽位的分配,可以進行自定義,現在如果有一個key進來,首先要根據CRC32算法,針對key算出32位的哈希值,然后除以1024取余,然后就能算出這個KEY屬於哪個槽,然后根據槽與分組的映射關系,就能去對應的分組當中處理數據了。

在這里插入圖片描述

CRC全稱是循環冗余校驗,主要在數據存儲和通信領域保證數據正確性的校驗手段,CRC校驗(循環冗余校驗)是數據通訊中最常采用的校驗方式。

我們繼續回過來,slot這個箱子用作存放Key,至於Key存放到哪個箱子,可以通過算法“crc32(key)%1024”獲得一個數字,這個數字的范圍一定是1~1024之間,Key就放到這個數字對應的slot。

例如,如果某個Key通過算法“crc32(key)%1024”得到的數字是5,就放到編碼為5的slot(箱子)。

slot和Server Group的關系

1個slot只能放1個Redis Server Group,不能把1個slot放到多個Redis Server Group中。1個Redis Server Group最少可以存放1個slot,最大可以存放1024個slot。因此,Codis中最多可以指定1024個Redis Server Group。

槽位和分組的映射關系就保存在codis proxy當中

codis-ha 高可用

槽位和分組的映射關系就保存在codis proxy當中,但是codis proxy它本身也存在單點問題,所以需要對proxy做一個集群。
在這里插入圖片描述

部署好集群之后,有一個問題,就是槽位的映射關系是保存在proxy里面的,不同proxy之間怎么同步映射關系?

在Codis中使用的是Zookeeper來保存映射關系,給 proxy 來同步配置信息,其實codis 支持的不止zookeeper,還有etcd和本地文件。

在zookeeper中保存的數據格式就是這個樣子。

在這里插入圖片描述

除了這個slot id, group_id 還會存儲一些其他的信息,比如分組信息、代理信息等。

如果codis proxy如果出現異常怎么處理,這個可能要利用一下k8s中pod的特性,在k8s里面可以設置pod冗余的數量,k8s會嚴格保證啟動的數量與設置一致,所以只需要一個進程監測Proxy的異常,並且把它干掉就可以了,k8s會自動拉起來一個新的proxy。
在這里插入圖片描述

codis給這個進程起名叫codis-ha,codis-ha實時監測codis proxy的運行狀態,如果有異常就會干掉,它包含了哨兵的功能,所以豌豆莢直接把哨兵去掉了。
在這里插入圖片描述

dashboard

但是codis-ha在Codis整個架構中是沒有辦法直接操作代理和服務,因為所有的代理和服務的操作都要經過dashboard處理。所以部署的時候會利用k8s的親和性將codis-ha與dashboard部署在同一個節點上。
在這里插入圖片描述

除了這些,codis自己開發了集群管理界面,集群管理可以通過界面化的方式更方便的管理集群,這個模塊叫codis-fe,我們可以看一下這個界面。
在這里插入圖片描述

最后就是redis客戶端了,客戶端是直接通過代理來訪問后端服務的。
在這里插入圖片描述

Codis的架構

Codis是一個分布式Redis解決方案,對於上層的應用來說,連接到Codis Proxy和連接原生的RedisServer沒有明顯的區別,有部分命令不支持。

Codis底層會處理請求的轉發,不停機的數據遷移等工作,所有后邊的一切事情,
對於前面的客戶端來說是透明的,可以簡單的認為后邊連接的是一個內存無限大的Redis服務.

Codis 由四部分組成:

  • Codis Proxy (codis-proxy),處理客戶端請求,支持Redis協議,因此客戶端訪問Codis Proxy跟訪問原生Redis沒有什么區別;
  • Codis Dashboard (codis-config),Codis 的管理工具,支持添加/刪除 Redis 節點、添加/刪除 Proxy 節點,發起數據遷移等操作。codis-config 本身還自帶了一個 http server,會啟動一個 dashboard,用戶可以直接在瀏覽器上觀察 Codis 集群的運行狀態;
  • Codis Redis (codis-server),Codis 項目維護的一個 Redis 分支,基於 2.8.21 開發,加入了 slot 的支持和原子的數據遷移指令;
  • ZooKeeper/Etcd,Codis 依賴 ZooKeeper 來存放數據路由表和 codis-proxy 節點的元信息,codis-config 發起的命令都會通過 ZooKeeper 同步到各個存活的 codis-proxy;

Codis 支持按照 Namespace 區分不同的產品,擁有不同的 product name 的產品,各項配置都不會沖突。

img

安裝與部署

  1. 安裝go;

  2. 安裝codis

    go get -u -d github.com/CodisLabs/codis
    cd $GOPATH/src/github.com/CodisLabs/codis
    make
    
  3. 安裝zookeeper;

  4. 啟動dashboard

    bin/codis-config dashboard
    
  5. 初始化slots,在zk上創建slot相關信息

    bin/codis-config slot init
    
  6. 啟動codis-redis,跟官方redis server方法一樣;

  7. 添加redis server group,每個 Group 作為一個 Redis 服務器組存在,只允許有一個 master, 可以有多個 slave,group id 僅支持大於等於1的整數。如: 添加兩個 server group, 每個 group 有兩個 redis 實例,group的id分別為1和2, redis實例為一主一從。

  8. bin/codis-config server add 1 localhost:6379 master
    bin/codis-config server add 1 localhost:6380 slave
    bin/codis-config server add 2 localhost:6479 master
    bin/codis-config server add 2 localhost:6480 slave
    
  9. 設置server group 服務的 slot 范圍,如設置編號為[0, 511]的 slot 由 server group 1 提供服務, 編號 [512, 1023] 的 slot 由 server group 2 提供服務

    bin/codis-config slot range-set 0 511 1 online
    bin/codis-config slot range-set 512 1023 2 online
    
  10. 啟動codis-proxy,

    bin/codis-proxy -c config.ini -L ./log/proxy.log  --cpu=8 --addr=0.0.0.0:19000 --http-addr=0.0.0.0:11000
    

    剛啟動的 codis-proxy 默認是處於 offline狀態的, 然后設置 proxy 為 online 狀態, 只有處於 online 狀態的 proxy 才會對外提供服務

    bin/codis-config -c config.ini proxy online <proxy_name>  <---- proxy的id, 如 proxy_1
    

Codis的優缺點

(1)優點

對客戶端透明,與codis交互方式和redis本身交互一樣
支持在線數據遷移,遷移過程對客戶端透明有簡單的管理和監控界面
支持高可用,無論是redis數據存儲還是代理節點
自動進行數據的均衡分配
最大支持1024個redis實例,存儲容量海量
高性能

(2)缺點

采用自有的redis分支,不能與原版的redis保持同步
如果codis的proxy只有一個的情況下, redis的性能會下降20%左右
某些命令不支持,比如事務命令muti
國內開源產品,活躍度相對弱一些

數據遷移(migrate)

安全和透明的數據遷移是 Codis 提供的一個重要的功能,也是 Codis 區別於 Twemproxy 等靜態的分布式 Redis 解決方案的地方。

數據遷移的最小單位是 key,我們在 codis redis 中添加了一些指令,實現基於key的遷移,如 SLOTSMGRT等 (命令列表),每次會將特定 slot 一個隨機的 key 發送給另外一個 codis redis 實例,這個命令會確認對方已經接收,同時刪除本地的這個 k-v 鍵值,返回這個 slot 的剩余 key 的數量,整個操作是原子的。

在 codis-config 管理工具中,每次遷移任務的最小單位是 slot。如: 將slot id 為 [0-511] 的slot的數據,遷移到 server group 2上,--delay 參數表示每遷移一個 key 后 sleep 的毫秒數,默認是 0,用於限速。

bin/codis-config slot migrate 0 511 2 --delay=10

遷移的過程對於上層業務來說是安全且透明的,數據不會丟失,上層不會中止服務。

注意,遷移的過程中打斷是可以的,但是如果中斷了一個正在遷移某個slot的任務,下次需要先遷移掉正處於遷移狀態的 slot,否則無法繼續 (即遷移程序會檢查同一時刻只能有一個 slot 處於遷移狀態)。

自動再平衡(auto rebalance)

Codis 支持動態的根據實例內存,自動對slot進行遷移,以均衡數據分布

bin/codis-config slot rebalance

要求:

  • 所有的codis-server都必須設置了maxmemory參數;
  • 所有的 slots 都應該處於 online 狀態, 即沒有遷移任務正在執行;
  • 所有 server group 都必須有 Master;

高可用(HA)

因為codis的proxy是無狀態的,可以比較容易的搭多個proxy來實現高可用性並橫向擴容。

應用層的HA

對Java用戶來說,可以使用經過一些開源的組件,來實現客戶端的在多個proxy直接進行HA。

也可以通過監控zk上的注冊信息來實時獲得當前可用的proxy列表,既可以保證高可用性,

也可以通過輪流請求所有的proxy實現負載均衡。

后端redis 主備ha

對后端的redis實例來說,當一個group的master掛掉的時候,應該讓管理員清楚,並手動的操作,因為這涉及到了數據一致性等問題(redis的主從同步是最終一致性的)。

因此codis不會自動的將某個slave升級成master。 不過也提供一種解決方案:codis-ha。這是一個通過codis開放的api實現自動切換主從的工具。該工具會在檢測到master掛掉的時候將其下線並選擇其中一個slave提升為master繼續提供服務。

需要注意,codis將其中一個slave升級為master時,該組內其他slave實例是不會自動改變狀態的,這些slave仍將試圖從舊的master上同步數據,因而會導致組內新的master和其他slave之間的數據不一致。

因為redis的slave of命令切換master時會丟棄slave上的全部數據,從新master完整同步,會消耗master資源。

因此建議在知情的情況下手動操作。

使用 codis-config server add <group_id> <redis_addr> slave 命令刷新這些節點的狀態即可。codis-ha不會自動刷新其他slave的狀態。

大廠使用什么樣的redis集群:

redis 集群方案主要有3類,一是使用類 codis 的代理模式架構,按組划分,實例之間互相獨立;另一套是基於官方的 redis cluster 的服務端分片方案;代理模式和服務端分片相結合的模式

  • 基於官方 redis cluster 的服務端分片方案
  • 類 codis 的代理模式架構
  • 代理模式和服務端分片相結合的模式

類 codis 的代理模式架構

img

這套架構的特點:

  • 分片算法:基於 slot hash桶;
  • 分片實例之間相互獨立,每組 一個master 實例和多個slave;
  • 路由信息存放到第三方存儲組件,如 zookeeper 或etcd
  • 旁路組件探活

使用這套方案的公司:
阿里雲: ApsaraCache, RedisLabs、京東、百度等

阿里雲

AparaCache 的單機版已開源(開源版本中不包含slot等實現),集群方案細節未知;ApsaraCache

百度 BDRP 2.0

主要組件:
proxy,基於twemproxy 改造,實現了動態路由表;
redis內核: 基於2.x 實現的slots 方案;
metaserver:基於redis實現,包含的功能:拓撲信息的存儲 & 探活;
最多支持1000個節點;

slot 方案:
redis 內核中對db划分,做了16384個db; 每個請求到來,首先做db選擇;

數據遷移實現:
數據遷移的時候,最小遷移單位是slot,遷移中整個slot 處於阻塞狀態,只支持讀請求,不支持寫請求;
對比 官方 redis cluster/ codis 的按key粒度進行遷移的方案:按key遷移對用戶請求更為友好,但遷移速度較慢;這個按slot進行遷移的方案速度更快;

京東proxy

主要組件:
proxy: 自主實現,基於 golang 開發;
redis內核:基於 redis 2.8
configServer(cfs)組件:配置信息存放;
scala組件:用於觸發部署、新建、擴容等請求;
mysql:最終所有的元信息及配置的存儲;
sentinal(golang實現):哨兵,用於監控proxy和redis實例,redis實例失敗后觸發切換;

slot 方案實現:
在內存中維護了slots的map映射表;

數據遷移:
基於 slots 粒度進行遷移;
scala組件向dst實例發送命令告知會接受某個slot;
dst 向 src 發送命令請求遷移,src開啟一個線程來做數據的dump,將這個slot的數據整塊dump發送到dst(未加鎖,只讀操作)
寫請求會開辟一塊緩沖區,所有的寫請求除了寫原有數據區域,同時雙寫到緩沖區中。
當一個slot遷移完成后,把這個緩沖區的數據都傳到dst,當緩沖區為空時,更改本分片slot規則,不再擁有該slot,后續再請求這個slot的key返回moved;
上層proxy會保存兩份路由表,當該slot 請求目標實例得到 move 結果后,更新拓撲;

跨機房:跨機房使用主從部署結構;沒有多活,異地機房作為slave;

基於官方 redis cluster 的服務端分片方案

img

和上一套方案比,所有功能都集成在 redis cluster 中,路由分片、拓撲信息的存儲、探活都在redis cluster中實現;各實例間通過 gossip 通信;這樣的好處是簡單,依賴的組件少,應對400個節點以內的場景沒有問題(按單實例8w read qps來計算,能夠支持 200 * 8 = 1600w 的讀多寫少的場景);但當需要支持更大的規模時,由於使用 gossip協議導致協議之間的通信消耗太大,redis cluster 不再合適;

使用這套方案的有:AWS, 百度貼吧

官方 redis cluster

數據遷移過程:
基於 key粒度的數據遷移;
遷移過程的讀寫沖突處理:
從A 遷移到 B;

  • 訪問的 key 所屬slot 不在節點 A 上時,返回 MOVED 轉向,client 再次請求B;
  • 訪問的 key 所屬 slot 在節點 A 上,但 key 不在 A上, 返回 ASK 轉向,client再次請求B;
  • 訪問的 key 所屬slot 在A上,且key在 A上,直接處理;(同步遷移場景:該 key正在遷移,則阻塞)

AWS ElasticCache

ElasticCache 支持主從和集群版、支持讀寫分離;
集群版用的是開源的Redis Cluster,未做深度定制;

代理模式和服務端分片相結合的模式

p2p和代理的混合模式: 基於redis cluster + twemproxy混合模式

百度貼吧的ksarch-saas:

基於redis cluster + twemproxy 實現;后被 BDRP 吞並;
twemproxy 實現了 smart client 功能;

使用 redis cluster后還加一層 proxy的好處:

  1. 對client友好,不需要client都升級為smart client;(否則,所有語言client 都需要支持一遍)
  2. 加一層proxy可以做更多平台策略;比如在proxy可做 大key、熱key的監控、慢查詢的請求監控、以及接入控制、請求過濾等;

即將發布的 redis 5.0 中有個 feature,作者計划給 redis cluster加一個proxy。

ksarch-saas 對 twemproxy的改造已開源:
https://github.com/ksarch-saas/r3proxy

面試題

請介紹一下你使用過的Redis集群的架構和原理?

看完本文,涉及到Redis集群的架構類面試題目,按照本文的思路去回答,一定是120分。

如果還有疑問,請來瘋狂創客圈社群交流。

Redis集群的架構和原理問題交流:

高並發Java發燒友社群 - 瘋狂創客圈 總入口 點擊了解詳情

參考文檔:

https://blog.csdn.net/ranrancc_/article/details/104283802

https://www.ipcpu.com/2019/01/redis-sentinel/

http://t.zoukankan.com/ilifeilong-p-12632231.html

https://zhuanlan.zhihu.com/p/60632927

https://blog.csdn.net/clapAlong/article/details/115493537

https://blog.csdn.net/justlpf/article/details/115908335

https://www.cnblogs.com/jebysun/p/9698554.html

https://www.cnblogs.com/Anker/p/7076537.html

https://www.cnblogs.com/rjzheng/p/11430592.html

https://blog.csdn.net/qq_24365213/article/details/73504091

https://www.cnblogs.com/me115/p/9043420.html

https://www.cnblogs.com/kismetv/p/9236731.html

https://www.cnblogs.com/kismetv/p/9236731.html

https://zhuanlan.zhihu.com/p/101172874

https://www.jianshu.com/p/b46cb093a083

https://zhuanlan.zhihu.com/p/89248752


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM