Redis隊列Stream、Redis多線程詳解


Redis隊列Stream、Redis多線程詳解

Redis目前最新版本為Redis-6.2.6 ,考慮到實際的情況,本篇文章會以CentOS7下Redis-6.2.4版本進行講解。

下載地址:https://redis.io/download

安裝運行Redis很簡單,在Linux下執行上面的4條命令即可 ,同時前面的課程已經有完整的視頻講解,請到網盤中下載觀看,並自行安裝。如安裝過程出錯,請保證安裝包完整無誤,依賴包無誤,並仔細閱讀安裝錯誤日志和檢查操作系統層面的用戶、用戶組、文件和目錄是否存在,各種權限是否正確等!

同時Redis的官方文檔相當豐富和齊全,WEB地址如下:

https://redis.io/documentation

Redis隊列與Stream

Redis5.0 最大的新特性就是多出了一個數據結構 Stream,它是一個新的強大的支持多播的可持久化的消息隊列,作者聲明Redis Stream地借鑒了 Kafka 的設計。

Stream總述

Redis Stream 的結構如上圖所示,每一個Stream都有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的 ID 和對應的內容。消息是持久化的,Redis 重啟后,內容還在。

每個 Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用xadd指令追加消息時自動創建。

每個 Stream 都可以掛多個消費組,每個消費組會有個游標last_delivered_id在 Stream 數組之上往前移動,表示當前消費組已經消費到哪條消息了。每個消費組都有一個 Stream 內唯一的名稱,消費組不會自動創建,它需要單獨的指令xgroup create進行創建,需要指定從 Stream 的某個消息 ID 開始消費,這個 ID 用來初始化last_delivered_id變量。

每個消費組 (Consumer Group) 的狀態都是獨立的,相互不受影響。也就是說同一份 Stream 內部的消息會被每個消費組都消費到。

同一個消費組 (Consumer Group) 可以掛接多個消費者 (Consumer),這些消費者之間是競爭關系,任意一個消費者讀取了消息都會使游標last_delivered_id往前移動。每個消費者有一個組內唯一名稱。

消費者 (Consumer) 內部會有個狀態變量pending_ids,它記錄了當前已經被客戶端讀取,但是還沒有 ack的消息。如果客戶端沒有 ack,這個變量里面的消息 ID 會越來越多,一旦某個消息被 ack,它就開始減少。這個 pending_ids 變量在 Redis 官方被稱之為PEL,也就是Pending Entries List,這是一個很核心的數據結構,它用來確保客戶端至少消費了消息一次,而不會在網絡傳輸的中途丟失了沒處理。

消息 ID 的形式是timestampInMillis-sequence,例如1527846880572-5,它表示當前的消息在毫米時間戳1527846880572時產生,並且是該毫秒內產生的第 5 條消息。消息 ID 可以由服務器自動生成,也可以由客戶端自己指定,但是形式必須是整數-整數,而且必須是后面加入的消息的 ID 要大於前面的消息 ID。

消息內容就是鍵值對,形如 hash 結構的鍵值對,這沒什么特別之處。

常用操作命令

生產端

xadd 追加消息

xdel 刪除消息,這里的刪除僅僅是設置了標志位,不會實際刪除消息。

xrange 獲取消息列表,會自動過濾已經刪除的消息

xlen 消息長度

del 刪除 Stream

xadd streamtest * name dsl age 18

streamtest 表示當前這個隊列的名字,也就是我們一般意義上Redis中的key,* 號表示服務器自動生成 ID,后面順序跟着“name mark age 18”,是我們存入當前streamtest 這個隊列的消息,采用的也是 key/value的存儲形式

返回值1626705954593-0 則是生成的消息 ID,由兩部分組成:時間戳-序號。時間戳時毫秒級單位,是生成消息的Redis服務器時間,它是個64位整型。序號是在這個毫秒時間點內的消息序號。它也是個64位整型。

為了保證消息是有序的,因此Redis生成的ID是單調遞增有序的。由於ID中包含時間戳部分,為了避免服務器時間錯誤而帶來的問題(例如服務器時間延后了),Redis的每個Stream類型數據都維護一個latest_generated_id屬性,用於記錄最后一個消息的ID。若發現當前時間戳退后(小於latest_generated_id所記錄的),則采用時間戳不變而序號遞增的方案來作為新消息ID(這也是序號為什么使用int64的原因,保證有足夠多的的序號),從而保證ID的單調遞增性質。

如果不是非常特別的需求,強烈建議使用Redis的方案生成消息ID,因為這種時間戳+序號的單調遞增的ID方案,幾乎可以滿足全部的需求,但ID是支持自定義的。

xrange streamtest - +

其中-表示最小值 , + 表示最大值

或者我們可以指定消息 ID 的列表:

xdel streamtest 1626706380924-0

xlen streamtest

del streamtest 刪除整個 Stream

消費端

單消費者

雖然Stream中有消費者組的概念,但是可以在不定義消費組的情況下進行 Stream 消息的獨立消費,當 Stream 沒有新消息時,甚至可以阻塞等待。Redis 設計了一個單獨的消費指令xread,可以將 Stream 當成普通的消息隊列 (list) 來使用。使用 xread 時,我們可以完全忽略消費組 (Consumer Group) 的存在,就好比 Stream 就是一個普通的列表 (list)。

xread count 1 streams stream2 0-0

“count 1”表示從 Stream 讀取1條消息,缺省當然是頭部,“streams”可以理解為Redis關鍵字,“stream2”指明了要讀取的隊列名稱,“0-0”指從頭開始

xread count 2 streams stream2 1647911711261-0

也可以指定從streams的消息Id開始(不包括命令中的消息id)

xread count 1 streams stream2 $

$代表從尾部讀取,上面的意思就是從尾部讀取最新的一條消息,此時默認不返回任何消息

所以最好以阻塞的方式讀取尾部最新的一條消息,直到新的消息的到來

xread block 0 count 1 streams stream2 $

block后面的數字代表阻塞時間,單位毫秒

此時我們新開一個客戶端,往stream2中寫入一條消息

可以看到阻塞解除了,返回了新的消息內容,而且還顯示了一個等待時間,這里我們等待了127.87s

一般來說客戶端如果想要使用 xread 進行順序消費,一定要記住當前消費到哪里了,也就是返回的消息 ID。下次繼續調用 xread 時,將上次返回的最后一個消息 ID 作為參數傳遞進去,就可以繼續消費后續的消息。

消費組

創建消費組

Stream 通過xgroup create指令創建消費組 (Consumer Group),需要傳遞起始消息 ID 參數用來初始化last_delivered_id變量。

xgroup create stream2 cg1 0-0

“stream2”指明了要讀取的隊列名稱,“cg1”表示消費組的名稱,“0-0”表示從頭開始消費

xgroup create stream2 cg2 $

$ 表示從尾部開始消費,只接受新消息,當前 Stream 消息會全部忽略

現在我們可以用xinfo命令來看看stream2的情況:

xinfo stream stream2

xinfo groups stream2

消息消費

有了消費組,自然還需要消費者,Stream 提供了 xreadgroup 指令可以進行消費組的組內消費,需要提供消費組名稱、消費者名稱和起始消息 ID。

它同 xread 一樣,也可以阻塞等待新消息。讀到新消息后,對應的消息 ID 就會進入消費者的PEL(正在處理的消息) 結構里,客戶端處理完畢后使用 xack 指令通知服務器,本條消息已經處理完畢,該消息 ID 就會從 PEL 中移除。

xreadgroup GROUP cg1 c1 count 1 streams stream2 >

“GROUP”屬於關鍵字,“cg1”是消費組名稱,“c1”是消費者名稱,“count 1”指明了消費數量,> 號表示從當前消費組的 last_delivered_id 后面開始讀,每當消費者讀取一條消息,last_delivered_id 變量就會前進

前面我們定義cg1的時候是從頭開始消費的,自然就獲得Stream2中第一條消息

再執行一次上面的命令

自然就讀取到了下條消息。

我們將Stream2中的消息讀取完

xreadgroup GROUP cg1 c1 count 2 streams stream2 >

很自然就沒有消息可讀了, xreadgroup GROUP cg1 c1 count 1 streams stream2 >

然后設置阻塞等待

xreadgroup GROUP cg1 c1 block 0 count 1 streams stream2 >

我們新開一個客戶端,發送消息到stream2

xadd stream2 * name lixiaolong222 age 1000

回到原來的客戶端發現阻塞解除,收到新消息

我們來觀察一下觀察消費組狀態

如果同一個消費組有多個消費者,我們還可以通過 xinfo consumers 指令觀察每個消費者的狀態

xinfo consumers stream2 cg1

可以看到目前c1這個消費者有 2條待ACK的消息,空閑了371025 ms 沒有讀取消息。

如果我們確認一條消息

xack stream2 cg1 1647916485038-0

就可以看到待確認消息變成了1條

xack允許帶多個消息id,比如

同時Stream還提供了命令XPENDIING 用來獲消費組或消費內消費者的未處理完畢的消息,每個Pending的消息有4個屬性:

消息ID

所屬消費者

IDLE,已讀取時長

delivery counter,消息被讀取次數

命令XCLAIM用以進行消息轉移的操作,將某個消息轉移到自己的Pending列表中。需要設置組、轉移的目標消費者和消息ID,同時需要提供IDLE(已被讀取時長),只有超過這個時長,才能被轉移。

更多的Redis的Stream命令請大家參考Redis官方文檔:

https://redis.io/topics/streams-intro

https://redis.io/commands

同時Redis文檔中,在每個命令的詳情頁右邊會顯示“Related commands”,可以通過這個列表快速了解相關的命令和進入具體命令的詳情頁。

Redis隊列幾種實現的總結

基於List的 LPUSH+BRPOP 的實現

足夠簡單,消費消息延遲幾乎為零,但是需要處理空閑連接的問題。

如果線程一直阻塞在那里,Redis客戶端的連接就成了閑置連接,閑置過久,服務器一般會主動斷開連接,減少閑置資源占用,這個時候blpop和brpop或拋出異常,所以在編寫客戶端消費者的時候要小心,如果捕獲到異常需要重試。

其他缺點包括:

做消費者確認ACK麻煩,不能保證消費者消費消息后是否成功處理的問題(宕機或處理異常等),通常需要維護一個Pending列表,保證消息處理確認;不能做廣播模式,如pub/sub,消息發布/訂閱模型;不能重復消費,一旦消費就會被刪除;不支持分組消費。

基於Sorted-Set的實現

多用來實現延遲隊列,當然也可以實現有序的普通的消息隊列,但是消費者無法阻塞的獲取消息,只能輪詢,不允許重復消息。

PUB/SUB,訂閱/發布模式

優點:

典型的廣播模式,一個消息可以發布到多個消費者;多信道訂閱,消費者可以同時訂閱多個信道,從而接收多類消息;消息即時發送,消息不用等待消費者讀取,消費者會自動接收到信道發布的消息。

缺點:

消息一旦發布,不能接收。換句話就是發布時若客戶端不在線,則消息丟失,不能尋回;不能保證每個消費者接收的時間是一致的;若消費者客戶端出現消息積壓,到一定程度,會被強制斷開,導致消息意外丟失。通常發生在消息的生產遠大於消費速度時;可見,Pub/Sub 模式不適合做消息存儲,消息積壓類的業務,而是擅長處理廣播,即時通訊,即時反饋的業務。

基於Stream類型的實現

基本上已經有了一個消息中間件的雛形,可以考慮在生產過程中使用,當然真正要在生產中應用,要做的事情還很多,比如消息隊列的管理和監控就需要花大力氣去實現,而專業消息隊列都已經自帶或者存在着很好的第三方方案和插件。

與Java的集成

/**
 * 實現消費組消費,不考慮單消費者模式
 */
@Component
public class StreamVer {
    public final static String RS_STREAM_MQ_NS = "rsm:";

    @Autowired
    private JedisPool jedisPool;

    /**
     * 發布消息到Stream
     * @param key
     * @param message
     * @return
     */
    public StreamEntryID produce(String key,Map<String,String> message){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            StreamEntryID id = jedis.xadd(RS_STREAM_MQ_NS+key, StreamEntryID.NEW_ENTRY, message);
            System.out.println("發布消息到"+RS_STREAM_MQ_NS+key+" 返回消息id="+id.toString());
            return id;
        } catch (Exception e) {
            throw new RuntimeException("發布消息失敗!");
        } finally {
            jedis.close();
        }
    }


    /**
     * 創建消費群組,消費群組不可重復創建
     * @param key
     * @param groupName
     * @param lastDeliveredId
     */
    public void createCustomGroup(String key, String groupName, String lastDeliveredId){
        Jedis jedis = null;
        try {
            StreamEntryID id = null;
            if (lastDeliveredId==null){
                lastDeliveredId = "0-0";
            }
            id = new StreamEntryID(lastDeliveredId);
            jedis = jedisPool.getResource();
            /*makeStream表示沒有時是否自動創建stream,但是如果有,再自動創建會異常*/
            jedis.xgroupCreate(RS_STREAM_MQ_NS+key,groupName,id,false);
            System.out.println("創建消費群組成功:"+groupName);
        } catch (Exception e) {
            throw new RuntimeException("創建消費群組失敗!",e);
        } finally {
            jedis.close();
        }
    }


    /**
     * 消息消費
     * @param key
     * @param customerName
     * @param groupName
     * @return
     */
    public List<Map.Entry<String, List<StreamEntry>>> consume(String key, String customerName,String groupName){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            /*消息消費時的參數*/
            XReadGroupParams xReadGroupParams = new XReadGroupParams().block(0).count(1);
            Map<String, StreamEntryID> streams = new HashMap<>();
            streams.put(RS_STREAM_MQ_NS+key,StreamEntryID.UNRECEIVED_ENTRY);
            List<Map.Entry<String, List<StreamEntry>>> result
                    = jedis.xreadGroup(groupName, customerName, xReadGroupParams, streams);
            System.out.println(groupName+"從"+RS_STREAM_MQ_NS+key+"接受消息, 返回消息:"+result);
            return result;
        } catch (Exception e) {
            throw new RuntimeException("消息消費失敗!",e);
        } finally {
            jedis.close();
        }
    }

    /**
     * 消息確認
     * @param key
     * @param groupName
     * @param msgId
     */
    public void ackMsg(String key, String groupName,StreamEntryID msgId){
        if (msgId==null) throw new RuntimeException("msgId為空!");
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            System.out.println(jedis.xack(key,groupName,msgId));
            System.out.println(RS_STREAM_MQ_NS+key+",消費群組"+groupName+" 消息已確認");
        } catch (Exception e) {
            throw new RuntimeException("消息確認失敗!",e);
        } finally {
            jedis.close();
        }
    }

    /*
    檢查消費者群組是否存在,輔助方法
    * */
    public boolean checkGroup(String key, String groupName){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            List<StreamGroupInfo> xinfoGroupResult = jedis.xinfoGroup(RS_STREAM_MQ_NS+key);
            for(StreamGroupInfo groupinfo : xinfoGroupResult) {
                if(groupName.equals(groupinfo.getName())) return true;
            }
            return false;
        } catch (Exception e) {
            throw new RuntimeException("檢查消費群組失敗!",e);
        } finally {
            jedis.close();
        }
    }

    public final static int MQ_INFO_CONSUMER = 1;
    public final static int MQ_INFO_GROUP = 2;
    public final static int MQ_INFO_STREAM = 0;
    /**
     * 消息隊列信息查看
     * @param type
     */
    public void MqInfo(int type,String key, String groupName){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            if(type==MQ_INFO_CONSUMER){
                List<StreamConsumersInfo> xinfoConsumersResult = jedis.xinfoConsumers(RS_STREAM_MQ_NS+key, groupName);
                System.out.println(RS_STREAM_MQ_NS+key+" 消費者信息:" + xinfoConsumersResult);
                for( StreamConsumersInfo consumersinfo : xinfoConsumersResult) {
                    System.out.println("-ConsumerInfo:" + consumersinfo.getConsumerInfo());
                    System.out.println("--Name:" + consumersinfo.getName());
                    System.out.println("--Pending:" + consumersinfo.getPending());
                    System.out.println("--Idle:" + consumersinfo.getIdle());
                }
            }else if (type==MQ_INFO_GROUP){
                List<StreamGroupInfo> xinfoGroupResult = jedis.xinfoGroup(RS_STREAM_MQ_NS+key);
                System.out.println(RS_STREAM_MQ_NS+key+"消費者群組信息:" + xinfoGroupResult);
                for(StreamGroupInfo groupinfo : xinfoGroupResult) {
                    System.out.println("-GroupInfo:" + groupinfo.getGroupInfo());
                    System.out.println("--Name:" + groupinfo.getName());
                    System.out.println("--Consumers:" + groupinfo.getConsumers());
                    System.out.println("--Pending:" + groupinfo.getPending());
                    System.out.println("--LastDeliveredId:" + groupinfo.getLastDeliveredId());
                }
            }else{
                StreamInfo xinfoStreamResult = jedis.xinfoStream(RS_STREAM_MQ_NS+key);
                System.out.println(RS_STREAM_MQ_NS+key+"隊列信息:" + xinfoStreamResult);
                System.out.println("-StreamInfo:" + xinfoStreamResult.getStreamInfo());
                System.out.println("--Length:" + xinfoStreamResult.getLength());
                System.out.println("--RadixTreeKeys:" + xinfoStreamResult.getRadixTreeKeys());
                System.out.println("--RadixTreeNodes():" + xinfoStreamResult.getRadixTreeNodes());
                System.out.println("--Groups:" + xinfoStreamResult.getGroups());
                System.out.println("--LastGeneratedId:" + xinfoStreamResult.getLastGeneratedId());
                System.out.println("--FirstEntry:" + xinfoStreamResult.getFirstEntry());
                System.out.println("--LastEntry:" + xinfoStreamResult.getLastEntry());
            }
        } catch (Exception e) {
            throw new RuntimeException("消息隊列信息檢索失敗!",e);
        } finally {
            jedis.close();
        }
    }

}

消息隊列問題

從我們上面對Stream的使用表明,Stream已經具備了一個消息隊列的基本要素,生產者API、消費者API,消息Broker,消息的確認機制等等,所以在使用消息中間件中產生的問題,這里一樣也會遇到。

Stream 消息太多怎么辦?

要是消息積累太多,Stream 的鏈表豈不是很長,內容會不會爆掉?xdel 指令又不會刪除消息,它只是給消息做了個標志位。

Redis 自然考慮到了這一點,所以它提供了一個定長 Stream 功能。在 xadd 的指令提供一個定長長度 maxlen,就可以將老的消息干掉,確保最多不超過指定長度。

消息如果忘記 ACK 會怎樣?

Stream 在每個消費者結構中保存了正在處理中的消息 ID 列表 PEL,如果消費者收到了消息處理完了但是沒有回復 ack,就會導致 PEL 列表不斷增長,如果有很多消費組的話,那么這個 PEL 占用的內存就會放大。進而引發redis 的淘汰策略進行淘汰消息,所以消息要盡可能的快速消費並確認。

PEL 如何避免消息丟失?

在客戶端消費者讀取 Stream 消息時,Redis 服務器將消息回復給客戶端的過程中,客戶端突然斷開了連接,消息就丟失了。但是 PEL 里已經保存了發出去的消息 ID。待客戶端重新連上之后,可以再次收到 PEL 中的消息 ID 列表。不過此時 xreadgroup 的起始消息 ID 不能為參數>,而必須是任意有效的消息 ID,一般將參數設為 0-0,表示讀取所有的 PEL 消息以及自last_delivered_id之后的新消息。

死信問題

如果某個消息,不能被消費者處理,也就是不能被XACK,這是要長時間處於Pending列表中,即使被反復的轉移給各個消費者也是如此。此時該消息的delivery counter(通過XPENDING可以查詢到)就會累加,當累加到某個我們預設的臨界值時,我們就認為是壞消息(也叫死信,DeadLetter,無法投遞的消息),由於有了判定條件,我們將壞消息處理掉即可,刪除即可。刪除一個消息,使用XDEL語法,注意,這個命令並沒有刪除Pending中的消息,因此查看Pending,消息還會在,可以在執行執行XDEL之后,XACK這個消息標識其處理完畢。

Stream 的高可用

Stream 的高可用是建立主從復制基礎上的,它和其它數據結構的復制機制沒有區別,也就是說在 Sentinel 和 Cluster 集群環境下 Stream 是可以支持高可用的。不過鑒於 Redis 的指令復制是異步的,在 failover 發生時,Redis 可能會丟失極小部分數據,這點 Redis 的其它數據結構也是一樣的。

分區 Partition

Redis 的服務器沒有原生支持分區能力,如果想要使用分區,那就需要分配多個 Stream,然后在客戶端使用一定的策略來生產消息到不同的 Stream。

Stream小結

Stream 的消費模型借鑒了 Kafka 的消費分組的概念,它彌補了 Redis Pub/Sub 不能持久化消息的缺陷。但是它又不同於 kafka,Kafka 的消息可以分 partition,而 Stream 不行。如果非要分 parition 的話,得在客戶端做,提供不同的 Stream 名稱,對消息進行 hash 取模來選擇往哪個 Stream 里塞。

總的來說,如果是中小項目和企業,在工作中已經使用了Redis,在業務量不是很大,而又需要消息中間件功能的情況下,可以考慮使用Redis的Stream功能。但是如果並發量很高,資源足夠支持下,還是以專業的消息中間件,比如RocketMQ、Kafka等來支持業務更好。

和kafka rabbitmq等相比

已經具備了一個消息隊列的基本要素,生產者API、消費者API,消息Broker,消息的確認機制等等,但是和專業的mq相比還是存在一些問題,比如說可視化界面、消息的持久化,怎么保證消息的不丟失等等,因為畢竟redis 是基於內存存儲的,內存的大小是有限制的。另外就是stream的高可用和redis架構的高可用是一樣的,在主從備份的情況下因為是異步復制的原因可能會丟失數據,在考慮架構的同時要考慮到數據丟失的問題。還需要考慮到redis stream不支持原生分區能力。

Redis中的線程和IO模型

為什么redis會那么快?

1.基於內存的操作

2.單線程的使用(在進行網絡io的讀寫和客戶端的連接單線程的情況下,不需要加鎖不需要進行cpu協調上下文的切換所以就會快。)

3.非阻塞的io

4.內部通信的resp協議比較簡單

什么是Reactor模式 ?

“反應”器名字中”反應“的由來:

“反應”即“倒置”,“控制逆轉”,具體事件處理程序不調用反應器,而向反應器注冊一個事件處理器,表示自己對某些事件感興趣,有時間來了,具體事件處理程序通過事件處理器對某個指定的事件發生做出反應;這種控制逆轉又稱為“好萊塢法則”(不要調用我,讓我來調用你)

例如,路人甲去做男士SPA,前台的接待小姐接待了路人甲,路人甲現在只對10000技師感興趣,但是路人甲去的比較早,就告訴接待小姐,等10000技師上班了或者是空閑了,通知我。等路人甲接到接待小姐通知,做出了反應,把10000技師占住了。

然后,路人甲想起上一次的那個10000號房間不錯,設備舒適,燈光曖昧,又告訴前台的接待小姐,我對10000號房間很感興趣,房間空出來了就告訴我,我現在先和10000這個小姐聊下人生,10000號房間空出來了,路人甲再次接到接待小姐通知,路人甲再次做出了反應。

路人甲就是具體事件處理程序,前台的接待小姐就是所謂的反應器,“10000技師上班了”和“10000號房間空閑了”就是事件,路人甲只對這兩個事件感興趣,其他,比如10001號技師或者10002號房間空閑了也是事件,但是路人甲不感興趣。

前台的接待小姐不僅僅服務路人甲1人,他還可以同時服務路人乙、丙……..,每個人所感興趣的事件是不一樣的,前台的接待小姐會根據每個人感興趣的事件通知對應的每個人。

單線程Reactor模式流程

服務器端的Reactor是一個線程對象,該線程會啟動事件循環,並使用Acceptor事件處理器關注ACCEPT事件,這樣Reactor會監聽客戶端向服務器端發起的連接請求事件(ACCEPT事件)。

客戶端向服務器端發起一個連接請求,Reactor監聽到了該ACCEPT事件的發生並將該ACCEPT事件派發給相應的Acceptor處理器來進行處理。建立連接后關注的READ事件,這樣一來Reactor就會監聽該連接的READ事件了。

當Reactor監聽到有讀READ事件發生時,將相關的事件派發給對應的處理器進行處理。比如,讀處理器會通過讀取數據,此時read()操作可以直接讀取到數據,而不會堵塞與等待可讀的數據到來。

在目前的單線程Reactor模式中,不僅I/O操作在該Reactor線程上,連非I/O的業務操作也在該線程上進行處理了,這可能會大大延遲I/O請求的響應。所以我們應該將非I/O的業務邏輯操作從Reactor線程上卸載,以此來加速Reactor線程對I/O請求的響應。

單線程Reactor,工作者線程池

與單線程Reactor模式不同的是,添加了一個工作者線程池,並將非I/O操作從Reactor線程中移出轉交給工作者線程池來執行。這樣能夠提高Reactor線程的I/O響應,不至於因為一些耗時的業務邏輯而延遲對后面I/O請求的處理。

但是對於一些小容量應用場景,可以使用單線程模型,對於高負載、大並發或大數據量的應用場景卻不合適,主要原因如下:

① 一個NIO線程同時處理成百上千的鏈路,性能上無法支撐,即便NIO線程的CPU負荷達到100%,也無法滿足海量消息的讀取和發送;

② 當NIO線程負載過重之后,處理速度將變慢,這會導致大量客戶端連接超時,超時之后往往會進行重發,這更加重了NIO線程的負載,最終會導致大量消息積壓和處理超時,成為系統的性能瓶頸;

多Reactor線程模式

Reactor線程池中的每一Reactor線程都會有自己的Selector、線程和分發的事件循環邏輯。

mainReactor可以只有一個,但subReactor一般會有多個。mainReactor線程主要負責接收客戶端的連接請求,然后將接收到的SocketChannel傳遞給subReactor,由subReactor來完成和客戶端的通信。

多Reactor線程模式將“接受客戶端的連接請求”和“與該客戶端的通信”分在了兩個Reactor線程來完成。mainReactor完成接收客戶端連接請求的操作,它不負責與客戶端的通信,而是將建立好的連接轉交給subReactor線程來完成與客戶端的通信,這樣一來就不會因為read()數據量太大而導致后面的客戶端連接請求得不到即時處理的情況。並且多Reactor線程模式在海量的客戶端並發請求的情況下,還可以通過實現subReactor線程池來將海量的連接分發給多個subReactor線程,在多核的操作系統中這能大大提升應用的負載和吞吐量。

netty就是用這種模式來實現的,這種模式是由NIO的實現者提出的,下面這張圖也是他畫的。

Redis中的線程和IO概述

Redis 基於 Reactor 模式開發了自己的網絡事件處理器 - 文件事件處理器(file event handler,后文簡稱為 FEH),而該處理器又是單線程的,所以redis設計為單線程模型。

采用I/O多路復用同時監聽多個socket,根據socket當前執行的事件來為 socket 選擇對應的事件處理器。

當被監聽的socket准備好執行accept、read、write、close等操作時,和操作對應的文件事件就會產生,這時FEH就會調用socket之前關聯好的事件處理器來處理對應事件。

所以雖然FEH是單線程運行,但通過I/O多路復用監聽多個socket,不僅實現高性能的網絡通信模型,又能和 Redis 服務器中其它同樣單線程運行的模塊交互,保證了Redis內部單線程模型的簡潔設計。

下面來看文件事件處理器的幾個組成部分。

socket

文件事件就是對socket操作的抽象, 每當一個 socket 准備好執行連接accept、read、write、close等操作時, 就會產生一個文件事件。一個服務器通常會連接多個socket, 多個socket可能並發產生不同操作,每個操作對應不同文件事件。

I/O多路復用程序

I/O 多路復用程序會負責監聽多個socket。

盡管文件事件可能並發出現, 但 I/O 多路復用程序會將所有產生事件的socket放入隊列, 通過該隊列以有序、同步且每次一個socket的方式向文件事件分派器傳送socket。

當上一個socket產生的事件被對應事件處理器執行完后, I/O 多路復用程序才會向文件事件分派器傳送下個socket, 如下:

I/O多路復用程序的實現

Redis 的 I/O 多路復用程序的所有功能都是通過包裝常見的 select、epoll、 evport 和 kqueue 這些 I/O 多路復用函數庫實現的。

每個 I/O 多路復用函數庫在 Redis 源碼中都對應一個單獨的文件:

因為 Redis 為每個 I/O 多路復用函數庫都實現了相同的 API , 所以 I/O 多路復用程序的底層實現是可以互換的。Redis 在 I/O 多路復用程序的實現源碼ae.c文件中宏定義了相應規則,使得程序在編譯時自動選擇系統中性能最高的 I/O 多路復用函數庫作為 Redis 的 I/O 多路復用程序的底層實現:性能降序排列。

注:

evport = Solaris 10

epoll = Linux

kqueue = OS X,FreeBSD

select =通常作為fallback安裝在所有平台上

Evport,Epoll和KQueue具有 O(1)描述符選擇算法復雜度,並且它們都使用內部內核空間內存結構.他們還可以提供很多(數十萬個)文件描述符.

除其他外,select最多只能提供 1024個描述符,並且對描述符進行完全掃描(因此每次迭代所有描述符以選擇一個可使用的描述符),因此復雜性是 O(n).

文件事件分派器

文件事件分派器接收 I/O 多路復用程序傳來的socket, 並根據socket產生的事件類型, 調用相應的事件處理器。

文件事件處理器

服務器會為執行不同任務的套接字關聯不同的事件處理器, 這些處理器是一個個函數, 它們定義了某個事件發生時, 服務器應該執行的動作。

Redis 為各種文件事件需求編寫了多個處理器,若客戶端連接Redis,對連接服務器的各個客戶端進行應答,就需要將socket映射到連接應答處理器寫數據到Redis,接收客戶端傳來的命令請求,就需要映射到命令請求處理器從Redis讀數據,向客戶端返回命令的執行結果,就需要映射到命令回復處理器當主服務器和從服務器進行復制操作時, 主從服務器都需要映射到特別為復制功能編寫的復制處理器。

文件事件的類型

I/O 多路復用程序可以監聽多個socket的 ae.h/AE_READABLE 事件和 ae.h/AE_WRITABLE 事件, 這兩類事件和套接字操作之間的對應關系如下:

當socket可讀(比如客戶端對Redis執行write/close操作),或有新的可應答的socket出現時(即客戶端對Redis執行connect操作),socket就會產生一個AE_READABLE事件。

當socket可寫時(比如客戶端對Redis執行read操作),socket會產生一個AE_WRITABLE事件。

I/O多路復用程序可以同時監聽AE_REABLE和AE_WRITABLE兩種事件,要是一個socket同時產生這兩種事件,那么文件事件分派器優先處理AE_REABLE事件。即一個socket又可讀又可寫時, Redis服務器先讀后寫socket。

總結

最后,讓我們梳理一下客戶端和Redis服務器通信的整個過程:

Redis啟動初始化時,將連接應答處理器跟AE_READABLE事件關聯。

若一個客戶端發起連接,會產生一個AE_READABLE事件,然后由連接應答處理器負責和客戶端建立連接,創建客戶端對應的socket,同時將這個socket的AE_READABLE事件和命令請求處理器關聯,使得客戶端可以向主服務器發送命令請求。

當客戶端向Redis發請求時(不管讀還是寫請求),客戶端socket都會產生一個AE_READABLE事件,觸發命令請求處理器。處理器讀取客戶端的命令內容, 然后傳給相關程序執行。

當Redis服務器准備好給客戶端的響應數據后,會將socket的AE_WRITABLE事件和命令回復處理器關聯,當客戶端准備好讀取響應數據時,會在socket產生一個AE_WRITABLE事件,由對應命令回復處理器處理,即將准備好的響應數據寫入socket,供客戶端讀取。

命令回復處理器全部寫完到 socket 后,就會刪除該socket的AE_WRITABLE事件和命令回復處理器的映射。

Redis6中的多線程

1. Redis6.0之前的版本真的是單線程嗎?

Redis在處理客戶端的請求時,包括獲取 (socket 讀)、解析、執行、內容返回 (socket 寫) 等都由一個順序串行的主線程處理,這就是所謂的“單線程”。但如果嚴格來講從Redis4.0之后並不是單線程,除了主線程外,它也有后台線程在處理一些較為緩慢的操作,例如清理臟數據、無用連接的釋放、大 key 的刪除等等。

2. Redis6.0之前為什么一直不使用多線程?

官方曾做過類似問題的回復:使用Redis時,幾乎不存在CPU成為瓶頸的情況, Redis主要受限於內存和網絡。例如在一個普通的Linux系統上,Redis通過使用pipelining每秒可以處理100萬個請求,所以如果應用程序主要使用O(N)或O(log(N))的命令,它幾乎不會占用太多CPU。

使用了單線程后,可維護性高。多線程模型雖然在某些方面表現優異,但是它卻引入了程序執行順序的不確定性,帶來了並發讀寫的一系列問題,增加了系統復雜度、同時可能存在線程切換、甚至加鎖解鎖、死鎖造成的性能損耗。Redis通過AE事件模型以及IO多路復用等技術,處理性能非常高,因此沒有必要使用多線程。單線程機制使得 Redis 內部實現的復雜度大大降低,Hash 的惰性 Rehash、Lpush 等等 “線程不安全” 的命令都可以無鎖進行。

3. Redis6.0為什么要引入多線程呢?

Redis將所有數據放在內存中,內存的響應時長大約為100納秒,對於小數據包,Redis服務器可以處理80,000到100,000 QPS,這也是Redis處理的極限了,對於80%的公司來說,單線程的Redis已經足夠使用了。

但隨着越來越復雜的業務場景,有些公司動不動就上億的交易量,因此需要更大的QPS。常見的解決方案是在分布式架構中對數據進行分區並采用多個服務器,但該方案有非常大的缺點,例如要管理的Redis服務器太多,維護代價大,集群分槽位存儲但是在熱點key的訪問上無法控制和存儲value的大小無法控制等問題;某些適用於單個Redis服務器的命令不適用於數據分區;數據分區無法解決熱點讀/寫問題;數據偏斜,重新分配和放大/縮小變得更加復雜等等。

從Redis自身角度來說,因為讀寫網絡的read/write系統調用占用了Redis執行期間大部分CPU時間,瓶頸主要在於網絡的 IO 消耗, 優化主要有兩個方向:

• 提高網絡 IO 性能,典型的實現比如使用 DPDK (越過操作系統的網絡棧,直接讀取網卡內容,自行來處理)來替代內核網絡棧的方式

• 使用多線程充分利用多核,典型的實現比如 Memcached。

協議棧優化的這種方式跟 Redis 關系不大,支持多線程是一種最有效最便捷的操作方式。所以總結起來,redis支持多線程主要就是兩個原因:

• 可以充分利用服務器 CPU 資源,目前主線程只能利用一個核

• 多線程任務可以分攤 Redis 同步 IO 讀寫負荷

4.Redis6.0默認是否開啟了多線程?

Redis6.0的多線程默認是禁用的,只使用主線程。如需開啟需要修改redis.conf配置文件:io-threads-do-reads yes

開啟多線程后,還需要設置線程數,否則是不生效的。

關於線程數的設置,官方有一個建議:4核的機器建議設置為2或3個線程,8核的建議設置為6個線程,線程數一定要小於機器核數。還需要注意的是,線程數並不是越大越好,官方認為超過了8個基本就沒什么意義了。(專機專用的情況下)

5.Redis6.0采用多線程后,性能的提升效果如何?

Redis 作者 antirez 在 RedisConf 2019分享時曾提到:Redis 6 引入的多線程 IO 特性對性能提升至少是一倍以上。國內也有大牛曾使用unstable版本在阿里雲esc進行過測試,GET/SET 命令在4線程 IO時性能相比單線程是幾乎是翻倍了。如果開啟多線程,至少要4核的機器,且Redis實例已經占用相當大的CPU耗時的時候才建議采用,否則使用多線程沒有意義。

6.Redis6.0多線程的實現機制?

流程簡述如下:

1、主線程負責接收建立連接請求,獲取 socket 放入全局等待讀處理隊列

2、主線程處理完讀事件之后,通過 RR(Round Robin) 將這些連接分配給這些 IO 線程

3、主線程阻塞等待 IO 線程讀取 socket 完畢

4、主線程通過單線程的方式執行請求命令,請求數據讀取並解析完成,但並不執行回寫 socket

5、主線程阻塞等待 IO 線程將數據回寫 socket 完畢

6、解除綁定,清空等待隊列

該設計有如下特點:

1、IO 線程要么同時在讀 socket,要么同時在寫,不會同時讀或寫

2、IO 線程只負責讀寫 socket 解析命令,不負責命令處理

7.開啟多線程后,是否會存在線程並發安全問題?

從上面的實現機制可以看出,Redis的多線程部分只是用來處理網絡數據的讀寫和協議解析,執行命令仍然是單線程順序執行。所以我們不需要去考慮控制 key、lua、事務,LPUSH/LPOP 等等的並發及線程安全問題。

8.Redis6.0的多線程和Memcached多線程模型進行對比

Memcached 服務器采用 master-woker 模式進行工作,服務端采用 socket 與客戶端通訊。主線程、工作線程 采用 pipe管道進行通訊。主線程采用 libevent 監聽 listen、accept 的讀事件,事件響應后將連接信息的數據結構封裝起來,根據算法選擇合適的工作線程,將連接任務攜帶連接信息分發出去,相應的線程利用連接描述符建立與客戶端的socket連接 並進行后續的存取數據操作。

相同點:都采用了 master線程-worker 線程的模型

不同點:Memcached 執行主邏輯也是在 worker 線程里,模型更加簡單,實現了真正的線程隔離,符合我們對線程隔離的常規理解。而 Redis 把處理邏輯交還給 master 線程(woker線程負責網絡數據的讀取和發送,master線程負責讀寫內存,執行命令),雖然一定程度上增加了模型復雜度,但也解決了線程並發安全等問題

解決熱點key的方案

1.將所有的熱點key復制到所有的redis集群上。

2.發現熱點key之后將熱點key提取到web應用服務器上(數據離用戶越近,性能提升越大,效率越高),不在redis服務器做讀取。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM