Redis 發布/訂閱機制原理分析


Redis 通過 PUBLISH 、 SUBSCRIBE 和 PSUBSCRIBE 等命令實現發布和訂閱功能。

  這些命令被廣泛用於構建即時通信應用,比如網絡聊天室(chatroom)和實時廣播、實時提醒等。

  本文通過分析 Redis 源碼里的 pubsub.c 文件,了解發布和訂閱機制的底層實現,籍此加深對 Redis 的理解。

  訂閱、發布和退訂

  在開始研究源碼之前,不妨先來回顧一下幾個相關命令的使用方式。

  PUBLISH 命令用於向給定的頻道發送信息,返回值為接收到信息的訂閱者數量:

redis> PUBLISH treehole "top secret here ..." 
  (integer) 0 
  redis> PUBLISH chatroom "hi?" 
  (integer) 1

  SUBSCRIBE 命令訂閱給定的一個或多個頻道:

redis> SUBSCRIBE chatroom 
  Reading messages... (press Ctrl-C to quit) 
  1) "subscribe" # 訂閱反饋 
  2) "chatroom" # 訂閱的頻道 
  3) (integer) 1 # 目前客戶端已訂閱頻道/模式的數量 
  1) "message" # 信息 
  2) "chatroom" # 發送信息的頻道 
  3) "hi?" # 信息內容

   SUBSCRIBE 的返回值當中, 1) 為 subscribe 的是訂閱的反饋信息,而 1) 為 message 的則是訂閱的頻道所發送的信息。

  SUBSCRIBE 還可以訂閱多個頻道,這樣一來它接收到的信息就可能來自多個頻道:

redis> SUBSCRIBE chatroom talk-to-jack 
  Reading messages... (press Ctrl-C to quit) 
  1) "subscribe" # 訂閱 chatroom 的反饋 
  2) "chatroom" 
  3) (integer) 1 
  1) "subscribe" # 訂閱 talk-to-jack 的反饋 
  2) "talk-to-jack" 
  3) (integer) 2 
  1) "message" # 來自 chatroom 的消息 
  2) "chatroom" 
  3) "yahoo" 
  1) "message" # 來自 talk-to-peter 的消息 
  2) "talk-to-jack" 
  3) "Goodmorning, peter."

  PSUBSCRIBE 提供了一種訂閱符合給定模式的所有頻道的方法,比如說,使用 it.* 為輸入,就可以訂閱所有以 it. 開頭的頻道,比如 it.news 、 it.blog 、 it.tweets ,諸如此類:

redis> PSUBSCRIBE it.* 
  Reading messages... (press Ctrl-C to quit) 
  1) "psubscribe" 
  2) "it.*" 
  3) (integer) 1 
  1) "pmessage" 
  2) "it.*" # 匹配的模式 
  3) "it.news" # 消息的來源頻道 
  4) "Redis 2.6rc5 release" # 消息內容 
  1) "pmessage" 
  2) "it.*" 
  3) "it.blog" 
  4) "Why NoSQL matters" 
  1) "pmessage" 
  2) "it.*" 
  3) "it.tweet" 
  4) "@redis: when will the 2.6 stable release?"

  當然, PSUBSCRIBE 也可以接受多個參數,從而匹配多種模式。

  最后, UNSUBSCRIBE 命令和 PUNSUBSCRIBE 負責退訂給定的頻道或模式。

  發布和訂閱機制

  當一個客戶端通過 PUBLISH 命令向訂閱者發送信息的時候,我們稱這個客戶端為發布者(publisher)。

  而當一個客戶端使用 SUBSCRIBE 或者 PSUBSCRIBE 命令接收信息的時候,我們稱這個客戶端為訂閱者(subscriber)。

  為了解耦發布者(publisher)和訂閱者(subscriber)之間的關系,Redis 使用了 channel (頻道)作為兩者的中介 —— 發布者將信息直接發布給 channel ,而 channel 負責將信息發送給適當的訂閱者,發布者和訂閱者之間沒有相互關系,也不知道對方的存在:

  

 

  知道了發布和訂閱的機制之后,接下來就可以開始研究具體的實現了,我們從 Redis 的訂閱命令開始說起。

  SUBSCRIBE 命令的實現

  前面說到,Redis 將所有接受和發送信息的任務交給 channel 來進行,而所有 channel 的信息就儲存在 redisServer 這個結構中:

 

struct redisServer { 
  // 省略 ... 
  dict *pubsub_channels; // Map channels to list of subscribed clients 
  // 省略 ... 
  };

 

  pubsub_channels 是一個字典,字典的鍵就是一個個 channel ,而字典的值則是一個鏈表,鏈表中保存了所有訂閱這個 channel 的客戶端。

  舉個例子,如果在一個 redisServer 實例中,有一個叫做 news 的頻道,這個頻道同時被client_123 和 client_456 兩個客戶端訂閱,那么這個 redisServer 結構看起來應該是這樣子:

  

  可以看出,實現 SUBSCRIBE 命令的關鍵,就是將客戶端添加到給定 channel 的訂閱鏈表中。

  函數 pubsubSubscribeChannel 是 SUBSCRIBE 命令的底層實現,它完成了將客戶端添加到訂閱鏈表中的工作:

 

// 訂閱指定頻道 
  // 訂閱成功返回 1 ,如果已經訂閱過,返回 0 
  int pubsubSubscribeChannel(redisClient *c, robj *channel) { 
  struct dictEntry *de; 
  list *clients = NULL; 
  int retval = 0; 
  /* Add the channel to the client -> channels hash table */ 
  // dictAdd 在添加新元素成功時返回 DICT_OK 
  // 因此這個判斷句表示,如果新訂閱 channel 成功,那么 。。。 
  if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { 
  retval = 1; 
  incrRefCount(channel); 
  /* Add the client to the channel -> list of clients hash table */ 
  // 將 client 添加到訂閱給定 channel 的鏈表中 
  // 這個鏈表是一個哈希表的值,哈希表的鍵是給定 channel 
  // 這個哈希表保存在 server.pubsub_channels 里 
  de = dictFind(server.pubsub_channels,channel); 
  if (de == NULL) { 
  // 如果 de 等於 NULL 
  // 表示這個客戶端是首個訂閱這個 channel 的客戶端 
  // 那么創建一個新的列表, 並將它加入到哈希表中 
  clients = listCreate(); 
  dictAdd(server.pubsub_channels,channel,clients); 
  incrRefCount(channel); 
  } else { 
  // 如果 de 不為空,就取出這個 clients 鏈表 
  clients = dictGetVal(de); 
  } 
  // 將客戶端加入到鏈表中 
  listAddNodeTail(clients,c); 
  } 
  /* Notify the client */ 
  addReply(c,shared.mbulkhdr[3]); 
  addReply(c,shared.subscribebulk); 
  // 返回訂閱的頻道 
  addReplyBulk(c,channel); 
  // 返回客戶端當前已訂閱的頻道和模式數量的總和 
  addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); 
  return retval; 
  }

 

  PSUBSCRIBE 命令的實現

  除了直接訂閱給定 channel 外,還可以使用 PSUBSCRIBE 訂閱一個模式(pattern),訂閱一個模式等同於訂閱所有匹配這個模式的 channel 。

  和 redisServer.pubsub_channels 屬性類似, redisServer.pubsub_patterns 屬性用於保存所有被訂閱的模式,和 pubsub_channels 不同的是, pubsub_patterns 是一個鏈表(而不是字典):

 

struct redisServer { 
  // 省略 ... 
  list *pubsub_patterns; // A list of pubsub_patterns 
  // 省略 ... 
  };

  pubsub_patterns 的每一個節點都是一個 pubsubPattern 結構的實例,它保存了被訂閱的模式,以及訂閱這個模式的客戶客戶端:

typedef struct pubsubPattern { 
  redisClient *client; 
  robj *pattern; 
  } pubsubPattern;

  舉個例子,假設在一個 redisServer 實例中,有一個叫做 news.* 的模式同時被客戶端client_789 和 client_999 訂閱,那么這個 redisServer 結構看起來應該是這樣子:

  

  現在可以知道,實現 PSUBSCRIBE 命令的關鍵,就是將客戶端和訂閱的模式添加到redisServer.pubsub_patterns 當中。

  pubsubSubscribePattern 是 PSUBSCRIBE 的底層實現,它將客戶端和所訂閱的模式添加到redisServer.pubsub_patterns 當中:

// 訂閱指定模式 
  // 訂閱成功返回 1 ,如果已經訂閱過,返回 0 
  int pubsubSubscribePattern(redisClient *c, robj *pattern) { 
  int retval = 0; 
  // 向 c->pubsub_patterns 中查找指定 pattern 
  // 如果返回值為 NULL ,說明這個 pattern 還沒被這個客戶端訂閱過 
  if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { 
  retval = 1; 
  // 添加 pattern 到客戶端 pubsub_patterns 
  listAddNodeTail(c->pubsub_patterns,pattern); 
  incrRefCount(pattern); 
  // 將 pattern 添加到服務器 
  pubsubPattern *pat; 
  pat = zmalloc(sizeof(*pat)); 
  pat->pattern = getDecodedObject(pattern); 
  pat->client = c; 
  listAddNodeTail(server.pubsub_patterns,pat); 
  } 
  /* Notify the client */ 
  addReply(c,shared.mbulkhdr[3]); 
  addReply(c,shared.psubscribebulk); 
  // 返回被訂閱的模式 
  addReplyBulk(c,pattern); 
  // 返回客戶端當前已訂閱的頻道和模式數量的總和 
  addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); 
  return retval; 
  }

   PUBLISH 命令的實現

  使用 PUBLISH 命令向訂閱者發送消息,需要執行以下兩個步驟:

  1) 使用給定的頻道作為鍵,在 redisServer.pubsub_channels 字典中查找記錄了訂閱這個頻道的所有客戶端的鏈表,遍歷這個鏈表,將消息發布給所有訂閱者。

  2) 遍歷 redisServer.pubsub_patterns 鏈表,將鏈表中的模式和給定的頻道進行匹配,如果匹配成功,那么將消息發布到相應模式的客戶端當中。

  舉個例子,假設有兩個客戶端分別訂閱 it.news 頻道和 it.* 模式,當執行命令PUBLISH it.news "hello moto" 的時候, it.news 頻道的訂閱者會在步驟 1 收到信息,而當PUBLISH 進行到步驟 2 的時候, it.* 模式的訂閱者也會收到信息。

  PUBLISH 命令的實際實現由 pubsubPublishMessage 函數完成,它的完整定義如下:

// 發送消息 
  int pubsubPublishMessage(robj *channel, robj *message) { 
  int receivers = 0; 
  struct dictEntry *de; 
  listNode *ln; 
  listIter li; 
  /* Send to clients listening for that channel */ 
  // 向所有頻道的訂閱者發送消息 
  de = dictFind(server.pubsub_channels,channel); 
  if (de) { 
  list *list = dictGetVal(de); // 取出所有訂閱者 
  listNode *ln; 
  listIter li; 
  // 遍歷所有訂閱者, 向它們發送消息 
  listRewind(list,&li); 
  while ((ln = listNext(&li)) != NULL) { 
  redisClient *c = ln->value; 
  addReply(c,shared.mbulkhdr[3]); 
  addReply(c,shared.messagebulk); 
  addReplyBulk(c,channel); // 打印頻道名 
  addReplyBulk(c,message); // 打印消息 
  receivers++; // 更新接收者數量 
  } 
  } 
  /* Send to clients listening to matching channels */ 
  // 向所有被匹配模式的訂閱者發送消息 
  if (listLength(server.pubsub_patterns)) { 
  listRewind(server.pubsub_patterns,&li); // 取出所有模式 
  channel = getDecodedObject(channel); 
  while ((ln = listNext(&li)) != NULL) { 
  pubsubPattern *pat = ln->value; // 取出模式 
  // 如果模式和 channel 匹配的話 
  // 向這個模式的訂閱者發送消息 
  if (stringmatchlen((char*)pat->pattern->ptr, 
  sdslen(pat->pattern->ptr), 
  (char*)channel->ptr, 
  sdslen(channel->ptr),0)) { 
  addReply(pat->client,shared.mbulkhdr[4]); 
  addReply(pat->client,shared.pmessagebulk); 
  addReplyBulk(pat->client,pat->pattern); // 打印被匹配的模式 
  addReplyBulk(pat->client,channel); // 打印頻道名 
  addReplyBulk(pat->client,message); // 打印消息 
  receivers++; // 更新接收者數量 
  } 
  } 
  decrRefCount(channel); // 釋放用過的 channel 
  } 
  return receivers; // 返回接收者數量 
  }

   UNSUBSCRIBE 和 PUNSUBSCRIBE 的實現

  UNSUBSCRIBE 和 PUNSUBSCRIBE 分別是 SUBSCRIBE 和 PSUBSCRIBE 的反操作,如果明白了SUBSCRIBE 和 PSUBSCRIBE 的工作機制的話,應該不難理解這兩個反操作的原理,所以這里就省略詳細的分析了,有興趣的可以直接看代碼。

  小節

  Redis 的 pubsub 機制的分析就到此結束了,跟往常一樣,帶有注釋的完整 pubsub.c 文件可以到我的 GITHUB 上找: https://github.com/huangz1990/reading_redis_source

ZHUAN:https://my.oschina.net/u/779531/blog/904622


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM