基於Redis的消息訂閱/發布


在工業生產設計中,我們往往需要實現一個基於消息訂閱的模式,用來對非定時的的消息進行監聽訂閱。

這種設計模式在 總線設計模式中得到體現。微軟以前的WCF中實現了服務總線 ServiceBus的設計模式。然並卵。WCF已經好像是上個世紀的產物................

基於事件訂閱的模式,比如 EventBus類的組件產品。但是往往設計比較復雜。

如果依賴於 Redis做事件消息推送。那就大大簡化了這種設計模式,而且性能也比較客觀。

Redis在 2.0之后的版本中 實現了 事件推送的  pub/sub命令

PSUBSCRIBE訂閱一個或多個符合給定模式的頻道
PUBLISH將信息message 發送到指定的頻道channel
PUBSUB是一個查看訂閱與發布系統狀態的內省命令
PUBSUB CHANNELS pattern 列出當前的活躍頻道
PUBSUB NUMSUB channel-1 channel-N 返回給定頻道的訂閱者數量
PUBSUB NUMPAT 返回訂閱模式的數量
PUNSUBSCRIBE 指示客戶端退訂所有給定模式
SUBSCRIBE 訂閱給定的一個或多個頻道的信息
UNSUBSCRIBE 指示客戶端退訂給定的頻道

 

P 開頭的 (pattern)支持通配符模式。

簡單例子(來自:http://www.yiibai.com/redis/redis_pub_sub.html)

以下舉例說明如何發布用戶的概念工作。在下面的例子給出一個客戶端訂閱一個通道名為redisChat

redis 127.0.0.1:6379> SUBSCRIBE redisChat
 Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "redisChat" 3) (integer) 1 

現在,兩個客戶端都發布在同一個通道名redisChat消息及以上的訂閱客戶端接收消息。

redis 127.0.0.1:6379> PUBLISH redisChat "Redis is a great caching technique"
 (integer) 1 redis 127.0.0.1:6379> PUBLISH redisChat "Learn redis by tutorials point"  (integer) 1  1) "message" 2) "redisChat" 3) "Redis is a great caching technique" 1) "message" 2) "redisChat" 3) "Learn redis by tutorials point" 

上面的代碼簡單的演示了,訂閱信道,向指定的信道發布消息。然后消息推送到訂閱者。

在 redis-cli客戶端中 推送消息的時候,返回成功發送到訂閱者的數目。

如:(integer) 1


原理:

RedisServer包含兩個重要的結構:
1. channels:實際上就是一個key-value的Map結構,key為訂閱地頻道,value為Client的List
2. patterns:存放模式+client地址的列表

 

流程:從pubsub_channels中找出跟publish中channel相符的clients-list,然后再去pubsub_patterns中找出每一個相符的pattern和client。向這些客戶端發送publish的消息。

 

訂閱信道

消息推送

 

 

在C#中的實現

基於

ServiceStack.Redis

 1             Task.Factory.StartNew(() =>
 2             {
 3                 var client = new RedisClient("192.168.1.100", 6379, "你的密碼");
 4                 try
 5                 {
 6                     var isOpen = client.Ping();
 7                     if (isOpen == false)
 8                     {
 9                         return;
10                     }
11                     var sub1 = client.CreateSubscription();
12 
13                     //接受消息的委托
14                     sub1.OnMessage = (chanel, message) =>
15                     {
16                         Console.WriteLine("chanel is :{0}", chanel);
17                         Console.WriteLine("message is :{0}", message);
18                     };
19                     sub1.SubscribeToChannels(new string[] { "testchat" });//注意:訂閱信道的時候 會開啟阻塞模式,所以,需要將監聽放到單獨的線程里
20                 }
21                 catch (Exception)
22                 {
23 
24                     throw;
25                 }
26 
27 
28 
29 
30             });

 

注意:在程序終止或者類的實例被銷毀的時候,請將訂閱者實例注銷掉,否則,在redis中一直存在這個訂閱者。

1 使用idispose 顯示釋放

2 使用析構函數 CLR回收的時候 釋放

sub1.Dispose();

例如:

 1  public void Dispose() 
 2         { 
 3             Dispose(true); 
 4             GC.SuppressFinalize(this); 
 5         } 
 6 
 7         protected virtual void Dispose(bool disposing) 
 8         { 
 9             if (disposing) 
10             { 
11                 if (redisClient != null) 
12                     redisClient.Dispose(); 
13                 if (subscription != null) 
14                     subscription.Dispose(); 
15                 if (log != null) 
16                     log.Dispose(); 
17             } 
18         } 
19 
20         ~RedisEx() 
21         { 
22             Dispose(false); 
23         } 

 

官方推薦這種寫法

 var clientsManager = new PooledRedisClientManager(new string[] { "密碼@192.168.1.200:6379" });
            var redisPubSub = new RedisPubSubServer(clientsManager, new string[] { "testchat" })
            {
                OnMessage = (channel, msg) => {
                    Console.WriteLine("方式2訂閱演示.............");
                    Console.WriteLine("Received '{0}' from '{1}'", msg, channel);
                }
            }.Start();

To use RedisPubSubServer, initialize it with the channels you want to subscribe to and assign handlers for each of the events you want to handle.

At a minimum you'll want to handle OnMessage:

Calling Start() after it's initialized will get it to start listening and processing any messages published to the subscribed channels.

官方文檔:https://github.com/ServiceStack/ServiceStack.Redis

 

 

 

 

附加文檔

redis PUB/SUB(發布/訂閱)

 
 
 

 

 

PSUBSCRIBE(訂閱一個或多個符合給定模式的頻道)

PSUBSCRIBE pattern [pattern …] 
訂閱一個或多個符合給定模式的頻道。 
每個模式以* 作為匹配符,比如it* 匹配所有以it 開頭的頻道( it.news 、it.blog 、it.tweets 等等), 
news.* 匹配所有以news. 開頭的頻道( news.it 、news.global.today 等等),諸如此類。 
可用版本: >= 2.0.0 
時間復雜度: O(N),N 是訂閱的模式的數量。 
返回值: 接收到的信息(請參見下面的代碼說明)。

redis> psubscribe news.*
Reading messages... (press Ctrl-C to quit) 1) "psubscribe" # 返回值的類型:顯示訂閱成功 2) "news.*" # 訂閱的模式 3) (integer) 1 # 目前已訂閱的模式的數量 1) "pmessage" # 返回值的類型:信息 2) "news.*" # 信息匹配的模式 3) "news.it" # 信息本身的目標頻道 4) "Google buy Motorola" # 信息的內容

PUBLISH(將信息message 發送到指定的頻道channel )

PUBLISH channel message 
將信息message 發送到指定的頻道channel 。 
可用版本: >= 2.0.0 
時間復雜度: O(N+M),其中N 是頻道channel 的訂閱者數量,而M 則是使用模式訂閱(subscribed 
patterns) 的客戶端的數量。 
返回值: 接收到信息message 的訂閱者數量。

# 對沒有訂閱者的頻道發送信息 redis> publish bad_channel "can any body hear me?" (integer) 0 # 向有一個訂閱者的頻道發送信息 redis> publish msg "good morning" (integer) 1 # 向有多個訂閱者的頻道發送信息 redis> publish chat_room "hello~ everyone" (integer) 3

PUBSUB(是一個查看訂閱與發布系統狀態的內省命令)

PUBSUB [argument [argument …]] 
PUBSUB 是一個查看訂閱與發布系統狀態的內省命令,它由數個不同格式的子命令組成,以下將分別對這 
些子命令進行介紹。 
可用版本: >= 2.8.0

PUBSUB CHANNELS [pattern] (列出當前的活躍頻道)

列出當前的活躍頻道。 
活躍頻道指的是那些至少有一個訂閱者的頻道,訂閱模式的客戶端不計算在內。 
pattern 參數是可選的: 
• 如果不給出pattern 參數,那么列出訂閱與發布系統中的所有活躍頻道。 
• 如果給出pattern 參數,那么只列出和給定模式pattern 相匹配的那些活躍頻道。 
復雜度: O(N) ,N 為活躍頻道的數量(對於長度較短的頻道和模式來說,將進行模式匹配的復雜度視為常 
數)。 
返回值: 一個由活躍頻道組成的列表。

# client-1 訂閱news.it 和news.sport 兩個頻道 client-1> SUBSCRIBE news.it news.sport Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "news.it" 3) (integer) 1 1) "subscribe" 2) "news.sport" 3) (integer) 2 # client-2 訂閱news.it 和news.internet 兩個頻道 client-2> SUBSCRIBE news.it news.internet Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "news.it" 3) (integer) 1 1) "subscribe" 2) "news.internet" 3) (integer) 2 # 首先, client-3 打印所有活躍頻道 # 注意,即使一個頻道有多個訂閱者,它也只輸出一次,比如news.it client-3> PUBSUB CHANNELS 1) "news.sport" 2) "news.internet" 3) "news.it" # 接下來, client-3 打印那些與模式news.i* 相匹配的活躍頻道 # 因為news.sport 不匹配news.i* ,所以它沒有被打印 redis> PUBSUB CHANNELS news.i* 1) "news.internet" 2) "news.it"

PUBSUB NUMSUB [channel-1 … channel-N] (返回給定頻道的訂閱者數量)

返回給定頻道的訂閱者數量,訂閱模式的客戶端不計算在內。 
復雜度: O(N) ,N 為給定頻道的數量。 
返回值: 一個多條批量回復( Multi-bulk reply),回復中包含給定的頻道,以及頻道的訂閱者數量。格式為:頻道 channel-1 ,channel-1 的訂閱者數量,頻道 channel-2 ,channel-2 的訂閱者數量,諸如此類。 
回復中頻道的排列順序和執行命令時給定頻道的排列順序一致。不給定任何頻道而直接調用這個命令也是可以的,在這種情況下,命令只返回一個空列表。

# client-1 訂閱 news.it 和 news.sport 兩個頻道 client-1> SUBSCRIBE news.it news.sport Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "news.it" 3) (integer) 1 1) "subscribe" 2) "news.sport" 3) (integer) 2 # client-2 訂閱 news.it 和 news.internet 兩個頻道 client-2> SUBSCRIBE news.it news.internet Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "news.it" 3) (integer) 1 1) "subscribe" 2) "news.internet" 3) (integer) 2 # client-3 打印各個頻道的訂閱者數量 client-3> PUBSUB NUMSUB news.it news.internet news.sport news.music 1) "news.it" # 頻道 2) "2" # 訂閱該頻道的客戶端數量 3) "news.internet" 4) "1" 5) "news.sport" 6) "1" 7) "news.music" # 沒有任何訂閱者 8) "0"

PUBSUB NUMPAT (返回訂閱模式的數量)

注意,這個命令返回的不是訂閱模式的客戶端的數量,而是客戶端訂閱的所有模式的數量總和。 
復雜度: O(1) 。 
返回值: 一個整數回復( Integer reply)。

# client-1 訂閱 news.* 和 discount.* 兩個模式 client-1> PSUBSCRIBE news.* discount.* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "news.*" 3) (integer) 1 1) "psubscribe" 2) "discount.*" 3) (integer) 2 # client-2 訂閱 tweet.* 一個模式 client-2> PSUBSCRIBE tweet.* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "tweet.*" 3) (integer) 1 # client-3 返回當前訂閱模式的數量為 3 client-3> PUBSUB NUMPAT (integer) 3 # 注意,當有多個客戶端訂閱相同的模式時,相同的訂閱也被計算在 PUBSUB NUMPAT 之內 # 比如說,再新建一個客戶端 client-4 ,讓它也訂閱 news.* 頻道 client-4> PSUBSCRIBE news.* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "news.*" 3) (integer) 1 # 這時再計算被訂閱模式的數量,就會得到數量為 4 client-3> PUBSUB NUMPAT (integer) 4
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

PUNSUBSCRIBE (指示客戶端退訂所有給定模式)

PUNSUBSCRIBE [pattern [pattern …]] 
指示客戶端退訂所有給定模式。 
如果沒有模式被指定,也即是,一個無參數的 PUNSUBSCRIBE 調用被執行,那么客戶端使用PSUBSCRIBE 
命令訂閱的所有模式都會被退訂。在這種情況下,命令會返回一個信息,告知客戶端所有被退訂的模式。 
可用版本: >= 2.0.0 
時間復雜度: O(N+M) ,其中 N 是客戶端已訂閱的模式的數量,M 則是系統中所有客戶端訂閱的模式的數量。 
返回值: 這個命令在不同的客戶端中有不同的表現。

SUBSCRIBE (訂閱給定的一個或多個頻道的信息)

訂閱給定的一個或多個頻道的信息。 
可用版本: >= 2.0.0 
時間復雜度: O(N),其中 N 是訂閱的頻道的數量。 
返回值: 接收到的信息 (請參見下面的代碼說明)。

# 訂閱 msg 和 chat_room 兩個頻道 # 1 - 6 行是執行 subscribe 之后的反饋信息 # 第 7 - 9 行才是接收到的第一條信息 # 第 10 - 12 行是第二條 redis> subscribe msg chat_room Reading messages... (press Ctrl-C to quit) 1) "subscribe" # 返回值的類型:顯示訂閱成功 2) "msg" # 訂閱的頻道名字 3) (integer) 1 # 目前已訂閱的頻道數量 1) "subscribe" 2) "chat_room" 3) (integer) 2 1) "message" # 返回值的類型:信息 2) "msg" # 來源 (從那個頻道發送過來) 3) "hello moto" # 信息內容 1) "message" 2) "chat_room" 3) "testing...haha"

 

UNSUBSCRIBE (指示客戶端退訂給定的頻道)

UNSUBSCRIBE [channel [channel …]] 
指示客戶端退訂給定的頻道。 
如果沒有頻道被指定,也即是,一個無參數的 UNSUBSCRIBE 調用被執行,那么客戶端使用SUBSCRIBE 命令訂閱的所有頻道都會被退訂。在這種情況下,命令會返回一個信息,告知客戶端所有被退訂的頻道。 
可用版本: >= 2.0.0 
時間復雜度: O(N) ,N 是客戶端已訂閱的頻道的數量。 
返回值: 這個命令在不同的客戶端中有不同的表現。

 
 

參考文檔:

http://blog.csdn.net/u011506468/article/details/47337839

http://my.oschina.net/itblog/blog/601284

http://www.oschina.net/code/snippet_584165_52231

http://redis.io/topics/pubsub

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM