Redis實現消息隊列
消息隊列(Message Queue)
1. 什么是消息隊列?
消息隊列(Message Queue)是一種應用間的通信方式,消息發送后可以立即返回,由消息系統來確保消息的可靠傳遞。消息發布者只管把消息發布到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。
這個其實跟設計模式中的觀察者模式有點像,參考文章《設計模式-觀察者模式(observer)》。被觀察者可以稱之為事件對象,有新的事件發布,會被觀察者監聽到。
2. 消息隊列的特點
1)三個角色:生產者、消費者、消息處理中心
把數據放到消息隊列的叫做生產者
從消息隊列里邊取數據的叫做消費者
消息處理中心指的就是消息隊列
2)異步處理模式
生產者將消息發送到一條虛擬的通道(消息隊列)上,而無須等待響應。消費者則訂閱或是監聽該通道,取出消息。兩者互不干擾,甚至都不需要同時在線,也就是我們說的松耦合。
3)可靠性
消息要可以保證不丟失、至少被消費一次、有時可能還需要順序性的保證
3. 為什么要使用消息隊列?
消息隊列是分布式系統中重要的組件,使用消息隊列主要是為了通過異步處理提高系統性能和流量削峰、降低系統耦合性。
比如訂單系統,與其關聯的可能就有商品系統、庫存系統、收件地址系統,用戶系統、短信(郵件/微信)通知系統,日志記錄系統等等。如果用戶下單后,要把這些業務以同步的方式執行,那么調用鏈可能會很長,不利於高並發的處理。就需要把短信通知、日志記錄這種與訂單系統關聯並不是那么緊密,實時性要求那么高的業務,放到消息隊列中去異步實現。
當然,還需要注意的是,比如商品秒殺,用戶下單后,雖然某些業務可以通過投入消息隊列,立即返回。仍需考慮實際情況,不能直接告訴用戶秒殺成功,還需要消費者處理完消息后,通過異步回調通知服務器,判斷處理成功后再通知用戶,避免帶來不必要的糾紛。
Redis實現消息隊列
redis的設計是用來做緩存的,它是一個內存數據庫,不過因為其某些特性適合用來充當隊列(Redis的List數據結構比較適合做MQ),所以也多被用於做簡單的mq。它有幾個阻塞式的API可以使用,正是這些阻塞式的API讓他有做消息隊列的能力。
redis實現消息隊列有三種方式:List、發布訂閱(pub/sub)、Stream
一、生產消費模式 - List實現消息隊列(PUSH/POP)
生產消費模式會讓一個或多個客戶端監聽消息隊列,一旦消息到達,消費者馬上消費,誰先搶到算誰的。如果隊列中沒有消息,消費者會繼續監聽。
Redis 的List(列表)是簡單的字符串列表,按照插入順序排序。可以在頭部left和尾部right添加新的元素。List提供了push和pop命令,遵循着先入先出FIFO的原則。
實現方案一:LPUSH + RPOP或者 RPUSH+LPOP
LPUSH在頭部(List的左邊)添加一個新的元素並返回List長度,充當消息隊列中的生產者。RPOP在尾部(List的右邊)刪除一個元素並返回該元素的值,充當消息隊列中的消費者。
基於List類型的插入刪除元素操作實現,就是一個典型的先進先出隊列的解決方案。
優點:
1)List類型是基於鏈表實現,插入刪除元素時間復雜度僅為常量級,有先進先出(FIFO)的特點來保證數據的順序
2)Redis支持消息持久化,在服務端數據是安全的
缺點:
1)點對點的模式- Point-to-Point(P2P)
因為 Redis 單線程的特點,所以在消費數據時,一條消息只能被一個消費者接收,消費者完全靠手速來獲取,不支持分組消費,是一種比較簡陋的消息隊列。
2)性能風險點
第一點:消費者如果想要及時的處理數據,就要在程序中寫個類似 while(true) 這樣的邏輯,不停地去調用 RPOP 或 LPOP 命令,查看列表中是否有待處理的消息。這就會給消費者程序帶來些不必要的性能損失。
1 <?php 2 3 while(true){ 4 $result = $redis->lpop("queue"); 5 if($result){ 6 $data = json_decode($result, true); 7 } 8 }
第二點:如果隊列空了,消費者會陷入pop死循環,即使沒有數據也不會停止。空輪詢不但消耗消費者的CPU資源還會增加Redis的訪問壓力,影響Redis的性能。
3)沒有數據權重的概念,只能先進先出
4)客戶端數據不安全
消費確認機制(ACK)實現麻煩加之不能重復消費,一旦消費,數據就會刪除,這意味着該元素只存在於客戶端的上下文中,導致客戶端數據是不安全的,當客戶端宕機、網絡斷開會出現數據丟失,也不能實現廣播模式。
實現方案二:LPUSH/BRPOP
此方案是在方案一上針對隊列沒有元素時造成服務器資源浪費進行的優化方案,使用了BRPOP做消費者,BRPOP是阻塞的。消費者可以設置數據不存在時的阻塞時間,來減少不必要的輪詢。
關於brpop/blpop有幾點需要說明:
BRPOP LIST1 LIST2 .. LISTN TIMEOUT
第一點:使用brpop和blpop實現阻塞讀取
由於需要一直調用rpop/lpop才可以實現不停的監聽且消費消息,為解決這個問題,Redis提供了阻塞命令brpop/blpop。使用brpop會阻塞隊列,而且每次只會彈出一個消息,如果沒有消息則會阻塞。
第二點:減小Redis的壓力
生產者從列表左側lpush加入消息到隊列,消費者使用brpop命令從列表右側彈出消息並設置超時(阻塞)時間,如果列表中沒有消息則一直阻塞直到超時。當超時時間設置為0時刻,則無限等待,一直阻塞直到彈出消息。這樣做的目的在於減小Redis的壓力。
第三點:注意異常的處理
對於Redis來說提供了blpop/brpop阻塞讀,阻塞讀在隊列沒有數據時會立即進入休眠狀態,一旦數據到來則立即被喚醒,消息的延遲幾乎為零。需要注意的是如果線程一直阻塞在那里,連接就會被服務器主動斷開來減少資源占用,這時blpop/brpop會拋出異常,所以編寫消費端時需要注意異常的處理。
實現方案三:LPUSH+LRANGE+RPOP
此方案是在方案一上針對客戶端數據安全進行的優化方案,使用LRANGE首先對隊列元素只做讀取不做消費,在客戶端消費完成后,再使用RPOP對服務端進行消費。
由於LRANGE不是阻塞的就又回到了方案二解決的資源浪費問題上了,無法減少不必要的輪詢。
還存在重復執行的問題,由於先讀再消費,在消費者宕機重啟后會再次讀到沒有確認消費的但是已經在消費者處理過的元素,就有了重復消費的風險。真的是繞啊。
方案四:LPUSH+BRPOPLPUSH+LREM
該方案也是對客戶端數據安全進行的優化方案,是一種安全的隊列,雖然也會存在重復消費的風險,但是元素隊列的操作都是在服務端進行的,問題發生的概率會大大降低。
說明:使用 RPOPLPUSH 獲取消息時,RPOPLPUSH 會把消息返給客戶端,同時把該消息放入一個備份消息列表,並且這個過程是原子的,可以保證消息的安全,當客戶端成功的處理了消息后,就可以把此消息從備份列表中移除了。
這個方案當然也不是完美的,還是存在客戶端宕機的情況,正在處理中的隊列存在長期不消費的消息怎么辦?
可以再添加一台客戶端來監控長期不消費的消息,重新將消息打回待消費的隊列,這個可以使用循環隊列的模式來實現。
總結:
1)Redis中實現生產者和消費者模型,可使用LPUSH和RPOP來實現該功能。不過當列表為空時消費者就需要輪詢來獲取消息,這樣會增加Redis的訪問壓力和消費者的CPU時間,另外很多訪問也是無用的。為此Redis提供了阻塞式訪問BRPOP和BLPOP命令。同時Redis會為所有阻塞的消費者以先后順序排序。
2)使用Redis的列表來實現一個任務隊列,開啟兩個程序,一個作為生產者使用LPUSH寫隊列,一個作為消費者使用RPOP讀隊列。由於消費者並不知道什么時候會有消息過來,所以消費者需要一直循環讀取數據。使用BRPOP改進后,消費者不會一直循環讀取,而是一直阻塞直到等待超時或者有消息過來時才讀取。
二、發布訂閱模式(PUB/SUB)- 消息多播
Redis 通過 PUBLISH 、 SUBSCRIBE 、PSUBSCRIBE 等命令實現了訂閱與發布模式, 這個功能提供兩種信息機制, 分別是訂閱/發布到頻道和訂閱/發布到模式。
發布訂閱模式是一個或多個客戶端訂閱消息頻道,只要發布者發布消息,所有訂閱者都能收到消息,訂閱者都是平等的。
此模式中生產者producer和消費者consumer之間的關系是一對多的,也就是一條消息會被多個消費者所消費,當只有一個消費者時可視為一對一的消息隊列。
訂閱:
發布:
1. 訂閱/發布到頻道
2. 訂閱/發布到模式
可以理解為是一個類似正則匹配的 Key,只是個可以匹配給定模式的頻道。這樣就不需要顯式地去訂閱多個名稱了,可以通過模式訂閱這種方式,一次性關注多個頻道
我們可以看到訂閱的客戶端每次可以收到一個 3 個參數的消息,分別為:消息的種類、始發頻道的名稱、實際的消息
1 127.0.0.1:6379> psubscribe news.* 2 Reading messages... (press Ctrl-C to quit) 3 1) "psubscribe" # 返回值的類型:顯示訂閱成功 4 2) "news.*" # 訂閱的模式 5 3) (integer) 1 # 目前已訂閱的模式的數量 6 1) "pmessage" # 返回值的類型:信息 7 2) "news.*" # 信息匹配的模式 8 3) "news.55" # 信息本身的目標頻道 9 4) "php" # 信息的內容
Redis 發布訂閱 (pub/sub) 的缺點:
消息無法持久化,如果出現網絡斷開、Redis 宕機等,消息就會被丟棄。而且也沒有 Ack 機制來保證數據的可靠性,假設一個消費者都沒有,那消息就直接被丟棄了。
三、 Stream
Redis 5.0 版本新增了一個更強大的數據結構—Stream。
1) 它提供了消息的持久化和主備復制功能,可以讓任何客戶端訪問任何時刻的數據,並且能記住每一個客戶端的訪問位置,還能保證消息不丟失。
2) 它就像是個僅追加內容的消息鏈表,把所有加入的消息都串起來,每個消息都有一個唯一的 ID 和對應的內容。而且消息是持久化的。
XADD key ID field value [field value ...] - 添加消息到末尾
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
1、獨立消費
$ 這個特殊的 ID 意思是 XREAD 應該使用流 mystream 已經存儲的最大 ID 作為最后一個 ID。以便我們僅接收從我們開始監聽時間以后的新消息。
2、分組消費
xread 雖然可以扇形分發到 N 個客戶端,然而,在某些問題中,我們想要做的不是向許多客戶端提供相同的消息流,而是從同一流向許多客戶端提供不同的消息子集。比如下圖這樣,三個消費者按輪訓的方式去消費一個 Stream。
1)Redis Stream 借鑒了很多 Kafka 的設計。
Consumer Group:有了消費組的概念,每個消費組狀態獨立,互不影響,一個消費組可以有多個消費者
last_delivered_id :每個消費組會有個游標 last_delivered_id 在數組之上往前移動,表示當前消費組已經消費到哪條消息了
pending_ids :消費者的狀態變量,作用是維護消費者的未確認的 id。pending_ids 記錄了當前已經被客戶端讀取的消息,但是還沒有 ack。如果客戶端沒有 ack,這個變量里面的消息 ID 會越來越多,一旦某個消息被 ack,它就開始減少。這個 pending_ids 變量在 Redis 官方被稱之為 PEL,也就是 Pending Entries List,這是一個很核心的數據結構,它用來確保客戶端至少消費了消息一次,而不會在網絡傳輸的中途丟失了沒處理。
2) 分區
Stream 不像 Kafka 那樣有分區的概念,如果想實現類似分區的功能,就要在客戶端使用一定的策略將消息寫到不同的 Stream。
xgroup create:創建消費者組
xgreadgroup:讀取消費組中的消息
xack:ack 掉指定消息
3、按消費組消費
1)創建消費組:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
說明:
key :隊列名稱,如果不存在就創建
groupname :組名。
$ : 表示從尾部開始消費,只接受新消息,當前 Stream 消息會全部忽略。
2)讀取消費組中的消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
比如:XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
總結:
1)Redis作為消息隊列使用,redis支持的數據結構是可以支撐這類業務,主要是利用了list這種數據結構的特性。
2)Redis的列表相當於編程語言里面的 LinkedList,是一個雙向的列表結構,這意味着列表新增和刪除元素是非常快的,時間復雜度為O(1),但是查找一個元素的時候需要遍歷列表,時間復雜度為O(n)。由於列表的元素操作和消息隊列操作類似,所以redis可以適用於消息隊列的場景,當然,在適用於的棧的場景下也可以勝任。
3)需要提醒一下,生產環境中如果對消息的可靠性有十分高的要求(比如訂單支付的消費消息),請使用專業的消息隊列(例如:rmq,amq等),對消息的丟失有一定容忍度的程序完全可以使用redis,例如我們的日志收集程序。
參考鏈接:
https://cloud.tencent.com/developer/article/1665419
https://www.jianshu.com/p/02a923fea175
https://www.jianshu.com/p/f807f33a90c2