redis的發布與訂閱機制


Redis 發布/訂閱機制原理分析

Redis 通過 PUBLISH 、 SUBSCRIBE 和 PSUBSCRIBE 等命令實現發布和訂閱功能。

  這些命令被廣泛用於構建即時通信應用,比如網絡聊天室(chatroom)和實時廣播、實時提醒等。

  本文通過分析 Redis 源碼里的 pubsub.c 文件,了解發布和訂閱機制的底層實現,籍此加深對 Redis 的理解。

  訂閱、發布和退訂

  在開始研究源碼之前,不妨先來回顧一下幾個相關命令的使用方式。

  PUBLISH 命令用於向給定的頻道發送信息,返回值為接收到信息的訂閱者數量:

redis> PUBLISH treehole "top secret here ..." 
  (integer) 0 
  redis> PUBLISH chatroom "hi?" 
  (integer) 1

  SUBSCRIBE 命令訂閱給定的一個或多個頻道:

redis> SUBSCRIBE chatroom 
  Reading messages... (press Ctrl-C to quit) 
  1) "subscribe" # 訂閱反饋 
  2) "chatroom" # 訂閱的頻道 
  3) (integer) 1 # 目前客戶端已訂閱頻道/模式的數量 
  1) "message" # 信息 
  2) "chatroom" # 發送信息的頻道 
  3) "hi?" # 信息內容

   SUBSCRIBE 的返回值當中, 1) 為 subscribe 的是訂閱的反饋信息,而 1) 為 message 的則是訂閱的頻道所發送的信息。

  SUBSCRIBE 還可以訂閱多個頻道,這樣一來它接收到的信息就可能來自多個頻道:

redis> SUBSCRIBE chatroom talk-to-jack 
  Reading messages... (press Ctrl-C to quit) 
  1) "subscribe" # 訂閱 chatroom 的反饋 
  2) "chatroom" 
  3) (integer) 1 
  1) "subscribe" # 訂閱 talk-to-jack 的反饋 
  2) "talk-to-jack" 
  3) (integer) 2 
  1) "message" # 來自 chatroom 的消息 
  2) "chatroom" 
  3) "yahoo" 
  1) "message" # 來自 talk-to-peter 的消息 
  2) "talk-to-jack" 
  3) "Goodmorning, peter."

  PSUBSCRIBE 提供了一種訂閱符合給定模式的所有頻道的方法,比如說,使用 it.* 為輸入,就可以訂閱所有以 it. 開頭的頻道,比如 it.news 、 it.blog 、 it.tweets ,諸如此類:

redis> PSUBSCRIBE it.* 
  Reading messages... (press Ctrl-C to quit) 
  1) "psubscribe" 
  2) "it.*" 
  3) (integer) 1 
  1) "pmessage" 
  2) "it.*" # 匹配的模式 
  3) "it.news" # 消息的來源頻道 
  4) "Redis 2.6rc5 release" # 消息內容 
  1) "pmessage" 
  2) "it.*" 
  3) "it.blog" 
  4) "Why NoSQL matters" 
  1) "pmessage" 
  2) "it.*" 
  3) "it.tweet" 
  4) "@redis: when will the 2.6 stable release?"

  當然, PSUBSCRIBE 也可以接受多個參數,從而匹配多種模式。

  最后, UNSUBSCRIBE 命令和 PUNSUBSCRIBE 負責退訂給定的頻道或模式。

  發布和訂閱機制

  當一個客戶端通過 PUBLISH 命令向訂閱者發送信息的時候,我們稱這個客戶端為發布者(publisher)。

  而當一個客戶端使用 SUBSCRIBE 或者 PSUBSCRIBE 命令接收信息的時候,我們稱這個客戶端為訂閱者(subscriber)。

  為了解耦發布者(publisher)和訂閱者(subscriber)之間的關系,Redis 使用了 channel (頻道)作為兩者的中介 —— 發布者將信息直接發布給 channel ,而 channel 負責將信息發送給適當的訂閱者,發布者和訂閱者之間沒有相互關系,也不知道對方的存在:

  

 

  知道了發布和訂閱的機制之后,接下來就可以開始研究具體的實現了,我們從 Redis 的訂閱命令開始說起。

  SUBSCRIBE 命令的實現

  前面說到,Redis 將所有接受和發送信息的任務交給 channel 來進行,而所有 channel 的信息就儲存在 redisServer 這個結構中:

 

struct redisServer { 
  // 省略 ... 
  dict *pubsub_channels; // Map channels to list of subscribed clients 
  // 省略 ... 
  };

 

  pubsub_channels 是一個字典,字典的鍵就是一個個 channel ,而字典的值則是一個鏈表,鏈表中保存了所有訂閱這個 channel 的客戶端。

  舉個例子,如果在一個 redisServer 實例中,有一個叫做 news 的頻道,這個頻道同時被client_123 和 client_456 兩個客戶端訂閱,那么這個 redisServer 結構看起來應該是這樣子:

  

  可以看出,實現 SUBSCRIBE 命令的關鍵,就是將客戶端添加到給定 channel 的訂閱鏈表中。

  函數 pubsubSubscribeChannel 是 SUBSCRIBE 命令的底層實現,它完成了將客戶端添加到訂閱鏈表中的工作:

 

// 訂閱指定頻道 
  // 訂閱成功返回 1 ,如果已經訂閱過,返回 0 
  int pubsubSubscribeChannel(redisClient *c, robj *channel) { 
  struct dictEntry *de; 
  list *clients = NULL; 
  int retval = 0; 
  /* Add the channel to the client -> channels hash table */ 
  // dictAdd 在添加新元素成功時返回 DICT_OK 
  // 因此這個判斷句表示,如果新訂閱 channel 成功,那么 。。。 
  if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { 
  retval = 1; 
  incrRefCount(channel); 
  /* Add the client to the channel -> list of clients hash table */ 
  // 將 client 添加到訂閱給定 channel 的鏈表中 
  // 這個鏈表是一個哈希表的值,哈希表的鍵是給定 channel 
  // 這個哈希表保存在 server.pubsub_channels 里 
  de = dictFind(server.pubsub_channels,channel); 
  if (de == NULL) { 
  // 如果 de 等於 NULL 
  // 表示這個客戶端是首個訂閱這個 channel 的客戶端 
  // 那么創建一個新的列表, 並將它加入到哈希表中 
  clients = listCreate(); 
  dictAdd(server.pubsub_channels,channel,clients); 
  incrRefCount(channel); 
  } else { 
  // 如果 de 不為空,就取出這個 clients 鏈表 
  clients = dictGetVal(de); 
  } 
  // 將客戶端加入到鏈表中 
  listAddNodeTail(clients,c); 
  } 
  /* Notify the client */ 
  addReply(c,shared.mbulkhdr[3]); 
  addReply(c,shared.subscribebulk); 
  // 返回訂閱的頻道 
  addReplyBulk(c,channel); 
  // 返回客戶端當前已訂閱的頻道和模式數量的總和 
  addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); 
  return retval; 
  }

 

  PSUBSCRIBE 命令的實現

  除了直接訂閱給定 channel 外,還可以使用 PSUBSCRIBE 訂閱一個模式(pattern),訂閱一個模式等同於訂閱所有匹配這個模式的 channel 。

  和 redisServer.pubsub_channels 屬性類似, redisServer.pubsub_patterns 屬性用於保存所有被訂閱的模式,和 pubsub_channels 不同的是, pubsub_patterns 是一個鏈表(而不是字典):

 

struct redisServer { 
  // 省略 ... 
  list *pubsub_patterns; // A list of pubsub_patterns 
  // 省略 ... 
  };

  pubsub_patterns 的每一個節點都是一個 pubsubPattern 結構的實例,它保存了被訂閱的模式,以及訂閱這個模式的客戶客戶端:

typedef struct pubsubPattern { 
  redisClient *client; 
  robj *pattern; 
  } pubsubPattern;

  舉個例子,假設在一個 redisServer 實例中,有一個叫做 news.* 的模式同時被客戶端client_789 和 client_999 訂閱,那么這個 redisServer 結構看起來應該是這樣子:

  

  現在可以知道,實現 PSUBSCRIBE 命令的關鍵,就是將客戶端和訂閱的模式添加到redisServer.pubsub_patterns 當中。

  pubsubSubscribePattern 是 PSUBSCRIBE 的底層實現,它將客戶端和所訂閱的模式添加到redisServer.pubsub_patterns 當中:

// 訂閱指定模式 
  // 訂閱成功返回 1 ,如果已經訂閱過,返回 0 
  int pubsubSubscribePattern(redisClient *c, robj *pattern) { 
  int retval = 0; 
  // 向 c->pubsub_patterns 中查找指定 pattern 
  // 如果返回值為 NULL ,說明這個 pattern 還沒被這個客戶端訂閱過 
  if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { 
  retval = 1; 
  // 添加 pattern 到客戶端 pubsub_patterns 
  listAddNodeTail(c->pubsub_patterns,pattern); 
  incrRefCount(pattern); 
  // 將 pattern 添加到服務器 
  pubsubPattern *pat; 
  pat = zmalloc(sizeof(*pat)); 
  pat->pattern = getDecodedObject(pattern); 
  pat->client = c; 
  listAddNodeTail(server.pubsub_patterns,pat); 
  } 
  /* Notify the client */ 
  addReply(c,shared.mbulkhdr[3]); 
  addReply(c,shared.psubscribebulk); 
  // 返回被訂閱的模式 
  addReplyBulk(c,pattern); 
  // 返回客戶端當前已訂閱的頻道和模式數量的總和 
  addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); 
  return retval; 
  }

   PUBLISH 命令的實現

  使用 PUBLISH 命令向訂閱者發送消息,需要執行以下兩個步驟:

  1) 使用給定的頻道作為鍵,在 redisServer.pubsub_channels 字典中查找記錄了訂閱這個頻道的所有客戶端的鏈表,遍歷這個鏈表,將消息發布給所有訂閱者。

  2) 遍歷 redisServer.pubsub_patterns 鏈表,將鏈表中的模式和給定的頻道進行匹配,如果匹配成功,那么將消息發布到相應模式的客戶端當中。

  舉個例子,假設有兩個客戶端分別訂閱 it.news 頻道和 it.* 模式,當執行命令PUBLISH it.news "hello moto" 的時候, it.news 頻道的訂閱者會在步驟 1 收到信息,而當PUBLISH 進行到步驟 2 的時候, it.* 模式的訂閱者也會收到信息。

  PUBLISH 命令的實際實現由 pubsubPublishMessage 函數完成,它的完整定義如下:

// 發送消息 
  int pubsubPublishMessage(robj *channel, robj *message) { 
  int receivers = 0; 
  struct dictEntry *de; 
  listNode *ln; 
  listIter li; 
  /* Send to clients listening for that channel */ 
  // 向所有頻道的訂閱者發送消息 
  de = dictFind(server.pubsub_channels,channel); 
  if (de) { 
  list *list = dictGetVal(de); // 取出所有訂閱者 
  listNode *ln; 
  listIter li; 
  // 遍歷所有訂閱者, 向它們發送消息 
  listRewind(list,&li); 
  while ((ln = listNext(&li)) != NULL) { 
  redisClient *c = ln->value; 
  addReply(c,shared.mbulkhdr[3]); 
  addReply(c,shared.messagebulk); 
  addReplyBulk(c,channel); // 打印頻道名 
  addReplyBulk(c,message); // 打印消息 
  receivers++; // 更新接收者數量 
  } 
  } 
  /* Send to clients listening to matching channels */ 
  // 向所有被匹配模式的訂閱者發送消息 
  if (listLength(server.pubsub_patterns)) { 
  listRewind(server.pubsub_patterns,&li); // 取出所有模式 
  channel = getDecodedObject(channel); 
  while ((ln = listNext(&li)) != NULL) { 
  pubsubPattern *pat = ln->value; // 取出模式 
  // 如果模式和 channel 匹配的話 
  // 向這個模式的訂閱者發送消息 
  if (stringmatchlen((char*)pat->pattern->ptr, 
  sdslen(pat->pattern->ptr), 
  (char*)channel->ptr, 
  sdslen(channel->ptr),0)) { 
  addReply(pat->client,shared.mbulkhdr[4]); 
  addReply(pat->client,shared.pmessagebulk); 
  addReplyBulk(pat->client,pat->pattern); // 打印被匹配的模式 
  addReplyBulk(pat->client,channel); // 打印頻道名 
  addReplyBulk(pat->client,message); // 打印消息 
  receivers++; // 更新接收者數量 
  } 
  } 
  decrRefCount(channel); // 釋放用過的 channel 
  } 
  return receivers; // 返回接收者數量 
  }

   UNSUBSCRIBE 和 PUNSUBSCRIBE 的實現

  UNSUBSCRIBE 和 PUNSUBSCRIBE 分別是 SUBSCRIBE 和 PSUBSCRIBE 的反操作,如果明白了SUBSCRIBE 和 PSUBSCRIBE 的工作機制的話,應該不難理解這兩個反操作的原理,所以這里就省略詳細的分析了,有興趣的可以直接看代碼。

redis訂閱與發布的使用場景

明確了Redis發布訂閱的原理和基本流程后,我們來看一下Redis的發布訂閱到底具體能做什么。

1、異步消息通知

比如渠道在調支付平台的時候,我們可以用回調的方式給支付平台一個我們的回調接口來通知我們支付狀態,還可以利用Redis的發布訂閱來實現。比如我們發起支付的同時訂閱頻道`pay_notice_` + `wk` (假如我們的渠道標識是wk,不能讓其他渠道也訂閱這個頻道),當支付平台處理完成后,支付平台往該頻道發布消息,告訴頻道的訂閱者該訂單的支付信息及狀態。收到消息后,根據消息內容更新訂單信息及后續操作。 

當很多人都調用支付平台時,支付時都去訂閱同一個頻道會有問題。比如用戶A支付完訂閱頻道`pay_notice_wk`,在支付平台未處理完時,用戶B支付完也訂閱了`pay_notice_wk`,當A收到通知后,接着B的支付通知也發布了,這時渠道收不到第二次消息發布。因為同一個頻道收到消息后,訂閱自動取消,也就是訂閱是一次性的。

所以我們訂閱的訂單支付狀態的頻道就得唯一,一個訂單一個頻道,我們可以在頻道上加上訂單號`pay_notice_wk`+orderNo保證頻道唯一。這樣我們可以把頻道號在支付時當做參數一並傳過去,支付平台處理完就可以用此頻道發布消息給我們了。(實際大多接口用回調通知,因為用Redis發布訂閱限制條件苛刻,系統間必須共用一套Redis)

2、任務通知

比如通過跑批系統通知應用系統做一些事(跑批系統無法拿到用戶數據,且應用系統又不能做定時任務的情況下)。

如每天凌晨3點提前加載一些用戶的用戶數據到Redis,應用系統不能做定時任務,可以通過系統公共的Redis來由跑批系統發布任務給應用系統,應用系統收到指令,去做相應的操作。

這里需要注意的是在線上集群部署的情況下,所有服務實例都會收到通知,都要做同樣的操作嗎?完全沒必要。可以用Redis實現鎖機制,其中一台實例拿到鎖后執行任務。另外如果任務比較耗時,可以不用鎖,可以考慮一下任務分片執行。當然這不在本文的討論范疇,這里不在贅述。

 

3、參數刷新加載

 

眾所周知,我們用Redis無非就是將系統中不怎么變的、查詢又比較頻繁的數據緩存起來,例如我們系統首頁的輪播圖啊,頁面的動態鏈接啊,一些系統參數啊,公共數據啊都加載到Redis,然后有個后台管理系統去配置修改這些數據。

 

打個比方我們首頁的輪播圖要再增加一個圖,那我們就在后管系統加上,加上就完事了嗎?當然沒有,因為Redis里還是老數據。那你會說不是有過期時間嗎?是的,但有的過期時間設置的較長如24小時並且我們想立即生效怎么辦?這時候我們就可以利用Redis的發布訂閱機制來實現數據的實時刷新。當我們修改完數據后,點擊刷新按鈕,通過發布訂閱機制,訂閱者接收到消息后調用重新加載的方法即可。

 

ZHUAN:https://my.oschina.net/u/779531/blog/904622


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM