在工業生產設計中,我們往往需要實現一個基於消息訂閱的模式,用來對非定時的的消息進行監聽訂閱。
這種設計模式在 總線設計模式中得到體現。微軟以前的WCF中實現了服務總線 ServiceBus的設計模式。然並卵。WCF已經好像是上個世紀的產物................
基於事件訂閱的模式,比如 EventBus類的組件產品。但是往往設計比較復雜。
如果依賴於 Redis做事件消息推送。那就大大簡化了這種設計模式,而且性能也比較客觀。
Redis在 2.0之后的版本中 實現了 事件推送的 pub/sub命令
PSUBSCRIBE訂閱一個或多個符合給定模式的頻道
PUBLISH將信息message 發送到指定的頻道channel
PUBSUB是一個查看訂閱與發布系統狀態的內省命令
PUBSUB CHANNELS pattern 列出當前的活躍頻道
PUBSUB NUMSUB channel-1 channel-N 返回給定頻道的訂閱者數量
PUBSUB NUMPAT 返回訂閱模式的數量
PUNSUBSCRIBE 指示客戶端退訂所有給定模式
SUBSCRIBE 訂閱給定的一個或多個頻道的信息
UNSUBSCRIBE 指示客戶端退訂給定的頻道
P 開頭的 (pattern)支持通配符模式。
簡單例子(來自:http://www.yiibai.com/redis/redis_pub_sub.html)
以下舉例說明如何發布用戶的概念工作。在下面的例子給出一個客戶端訂閱一個通道名為redisChat
redis 127.0.0.1:6379> SUBSCRIBE redisChat
Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "redisChat" 3) (integer) 1
現在,兩個客戶端都發布在同一個通道名redisChat消息及以上的訂閱客戶端接收消息。
redis 127.0.0.1:6379> PUBLISH redisChat "Redis is a great caching technique"
(integer) 1 redis 127.0.0.1:6379> PUBLISH redisChat "Learn redis by tutorials point" (integer) 1 1) "message" 2) "redisChat" 3) "Redis is a great caching technique" 1) "message" 2) "redisChat" 3) "Learn redis by tutorials point"
上面的代碼簡單的演示了,訂閱信道,向指定的信道發布消息。然后消息推送到訂閱者。
在 redis-cli客戶端中 推送消息的時候,返回成功發送到訂閱者的數目。
如:(integer) 1
原理:
RedisServer包含兩個重要的結構:
1. channels:實際上就是一個key-value的Map結構,key為訂閱地頻道,value為Client的List
2. patterns:存放模式+client地址的列表
流程:從pubsub_channels中找出跟publish中channel相符的clients-list,然后再去pubsub_patterns中找出每一個相符的pattern和client。向這些客戶端發送publish的消息。
訂閱信道
消息推送
在C#中的實現
基於
ServiceStack.Redis
1 Task.Factory.StartNew(() => 2 { 3 var client = new RedisClient("192.168.1.100", 6379, "你的密碼"); 4 try 5 { 6 var isOpen = client.Ping(); 7 if (isOpen == false) 8 { 9 return; 10 } 11 var sub1 = client.CreateSubscription(); 12 13 //接受消息的委托 14 sub1.OnMessage = (chanel, message) => 15 { 16 Console.WriteLine("chanel is :{0}", chanel); 17 Console.WriteLine("message is :{0}", message); 18 }; 19 sub1.SubscribeToChannels(new string[] { "testchat" });//注意:訂閱信道的時候 會開啟阻塞模式,所以,需要將監聽放到單獨的線程里 20 } 21 catch (Exception) 22 { 23 24 throw; 25 } 26 27 28 29 30 });
注意:在程序終止或者類的實例被銷毀的時候,請將訂閱者實例注銷掉,否則,在redis中一直存在這個訂閱者。
1 使用idispose 顯示釋放
2 使用析構函數 CLR回收的時候 釋放
sub1.Dispose();
例如:
1 public void Dispose() 2 { 3 Dispose(true); 4 GC.SuppressFinalize(this); 5 } 6 7 protected virtual void Dispose(bool disposing) 8 { 9 if (disposing) 10 { 11 if (redisClient != null) 12 redisClient.Dispose(); 13 if (subscription != null) 14 subscription.Dispose(); 15 if (log != null) 16 log.Dispose(); 17 } 18 } 19 20 ~RedisEx() 21 { 22 Dispose(false); 23 }
官方推薦這種寫法
var clientsManager = new PooledRedisClientManager(new string[] { "密碼@192.168.1.200:6379" }); var redisPubSub = new RedisPubSubServer(clientsManager, new string[] { "testchat" }) { OnMessage = (channel, msg) => { Console.WriteLine("方式2訂閱演示............."); Console.WriteLine("Received '{0}' from '{1}'", msg, channel); } }.Start();
To use RedisPubSubServer
, initialize it with the channels you want to subscribe to and assign handlers for each of the events you want to handle.
At a minimum you'll want to handle OnMessage
:
Calling Start()
after it's initialized will get it to start listening and processing any messages published to the subscribed channels.
官方文檔:https://github.com/ServiceStack/ServiceStack.Redis
附加文檔
PSUBSCRIBE(訂閱一個或多個符合給定模式的頻道)
PSUBSCRIBE pattern [pattern …]
訂閱一個或多個符合給定模式的頻道。
每個模式以* 作為匹配符,比如it* 匹配所有以it 開頭的頻道( it.news 、it.blog 、it.tweets 等等),
news.* 匹配所有以news. 開頭的頻道( news.it 、news.global.today 等等),諸如此類。
可用版本: >= 2.0.0
時間復雜度: O(N),N 是訂閱的模式的數量。
返回值: 接收到的信息(請參見下面的代碼說明)。
redis> psubscribe news.*
Reading messages... (press Ctrl-C to quit) 1) "psubscribe" # 返回值的類型:顯示訂閱成功 2) "news.*" # 訂閱的模式 3) (integer) 1 # 目前已訂閱的模式的數量 1) "pmessage" # 返回值的類型:信息 2) "news.*" # 信息匹配的模式 3) "news.it" # 信息本身的目標頻道 4) "Google buy Motorola" # 信息的內容
PUBLISH(將信息message 發送到指定的頻道channel )
PUBLISH channel message
將信息message 發送到指定的頻道channel 。
可用版本: >= 2.0.0
時間復雜度: O(N+M),其中N 是頻道channel 的訂閱者數量,而M 則是使用模式訂閱(subscribed
patterns) 的客戶端的數量。
返回值: 接收到信息message 的訂閱者數量。
# 對沒有訂閱者的頻道發送信息 redis> publish bad_channel "can any body hear me?" (integer) 0 # 向有一個訂閱者的頻道發送信息 redis> publish msg "good morning" (integer) 1 # 向有多個訂閱者的頻道發送信息 redis> publish chat_room "hello~ everyone" (integer) 3
PUBSUB(是一個查看訂閱與發布系統狀態的內省命令)
PUBSUB [argument [argument …]]
PUBSUB 是一個查看訂閱與發布系統狀態的內省命令,它由數個不同格式的子命令組成,以下將分別對這
些子命令進行介紹。
可用版本: >= 2.8.0
PUBSUB CHANNELS [pattern] (列出當前的活躍頻道)
列出當前的活躍頻道。
活躍頻道指的是那些至少有一個訂閱者的頻道,訂閱模式的客戶端不計算在內。
pattern 參數是可選的:
• 如果不給出pattern 參數,那么列出訂閱與發布系統中的所有活躍頻道。
• 如果給出pattern 參數,那么只列出和給定模式pattern 相匹配的那些活躍頻道。
復雜度: O(N) ,N 為活躍頻道的數量(對於長度較短的頻道和模式來說,將進行模式匹配的復雜度視為常
數)。
返回值: 一個由活躍頻道組成的列表。
# client-1 訂閱news.it 和news.sport 兩個頻道 client-1> SUBSCRIBE news.it news.sport Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "news.it" 3) (integer) 1 1) "subscribe" 2) "news.sport" 3) (integer) 2 # client-2 訂閱news.it 和news.internet 兩個頻道 client-2> SUBSCRIBE news.it news.internet Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "news.it" 3) (integer) 1 1) "subscribe" 2) "news.internet" 3) (integer) 2 # 首先, client-3 打印所有活躍頻道 # 注意,即使一個頻道有多個訂閱者,它也只輸出一次,比如news.it client-3> PUBSUB CHANNELS 1) "news.sport" 2) "news.internet" 3) "news.it" # 接下來, client-3 打印那些與模式news.i* 相匹配的活躍頻道 # 因為news.sport 不匹配news.i* ,所以它沒有被打印 redis> PUBSUB CHANNELS news.i* 1) "news.internet" 2) "news.it"
PUBSUB NUMSUB [channel-1 … channel-N] (返回給定頻道的訂閱者數量)
返回給定頻道的訂閱者數量,訂閱模式的客戶端不計算在內。
復雜度: O(N) ,N 為給定頻道的數量。
返回值: 一個多條批量回復( Multi-bulk reply),回復中包含給定的頻道,以及頻道的訂閱者數量。格式為:頻道 channel-1 ,channel-1 的訂閱者數量,頻道 channel-2 ,channel-2 的訂閱者數量,諸如此類。
回復中頻道的排列順序和執行命令時給定頻道的排列順序一致。不給定任何頻道而直接調用這個命令也是可以的,在這種情況下,命令只返回一個空列表。
# client-1 訂閱 news.it 和 news.sport 兩個頻道 client-1> SUBSCRIBE news.it news.sport Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "news.it" 3) (integer) 1 1) "subscribe" 2) "news.sport" 3) (integer) 2 # client-2 訂閱 news.it 和 news.internet 兩個頻道 client-2> SUBSCRIBE news.it news.internet Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "news.it" 3) (integer) 1 1) "subscribe" 2) "news.internet" 3) (integer) 2 # client-3 打印各個頻道的訂閱者數量 client-3> PUBSUB NUMSUB news.it news.internet news.sport news.music 1) "news.it" # 頻道 2) "2" # 訂閱該頻道的客戶端數量 3) "news.internet" 4) "1" 5) "news.sport" 6) "1" 7) "news.music" # 沒有任何訂閱者 8) "0"
PUBSUB NUMPAT (返回訂閱模式的數量)
注意,這個命令返回的不是訂閱模式的客戶端的數量,而是客戶端訂閱的所有模式的數量總和。
復雜度: O(1) 。
返回值: 一個整數回復( Integer reply)。
# client-1 訂閱 news.* 和 discount.* 兩個模式 client-1> PSUBSCRIBE news.* discount.* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "news.*" 3) (integer) 1 1) "psubscribe" 2) "discount.*" 3) (integer) 2 # client-2 訂閱 tweet.* 一個模式 client-2> PSUBSCRIBE tweet.* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "tweet.*" 3) (integer) 1 # client-3 返回當前訂閱模式的數量為 3 client-3> PUBSUB NUMPAT (integer) 3 # 注意,當有多個客戶端訂閱相同的模式時,相同的訂閱也被計算在 PUBSUB NUMPAT 之內 # 比如說,再新建一個客戶端 client-4 ,讓它也訂閱 news.* 頻道 client-4> PSUBSCRIBE news.* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "news.*" 3) (integer) 1 # 這時再計算被訂閱模式的數量,就會得到數量為 4 client-3> PUBSUB NUMPAT (integer) 4
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
PUNSUBSCRIBE (指示客戶端退訂所有給定模式)
PUNSUBSCRIBE [pattern [pattern …]]
指示客戶端退訂所有給定模式。
如果沒有模式被指定,也即是,一個無參數的 PUNSUBSCRIBE 調用被執行,那么客戶端使用PSUBSCRIBE
命令訂閱的所有模式都會被退訂。在這種情況下,命令會返回一個信息,告知客戶端所有被退訂的模式。
可用版本: >= 2.0.0
時間復雜度: O(N+M) ,其中 N 是客戶端已訂閱的模式的數量,M 則是系統中所有客戶端訂閱的模式的數量。
返回值: 這個命令在不同的客戶端中有不同的表現。
SUBSCRIBE (訂閱給定的一個或多個頻道的信息)
訂閱給定的一個或多個頻道的信息。
可用版本: >= 2.0.0
時間復雜度: O(N),其中 N 是訂閱的頻道的數量。
返回值: 接收到的信息 (請參見下面的代碼說明)。
# 訂閱 msg 和 chat_room 兩個頻道 # 1 - 6 行是執行 subscribe 之后的反饋信息 # 第 7 - 9 行才是接收到的第一條信息 # 第 10 - 12 行是第二條 redis> subscribe msg chat_room Reading messages... (press Ctrl-C to quit) 1) "subscribe" # 返回值的類型:顯示訂閱成功 2) "msg" # 訂閱的頻道名字 3) (integer) 1 # 目前已訂閱的頻道數量 1) "subscribe" 2) "chat_room" 3) (integer) 2 1) "message" # 返回值的類型:信息 2) "msg" # 來源 (從那個頻道發送過來) 3) "hello moto" # 信息內容 1) "message" 2) "chat_room" 3) "testing...haha"
UNSUBSCRIBE (指示客戶端退訂給定的頻道)
UNSUBSCRIBE [channel [channel …]]
指示客戶端退訂給定的頻道。
如果沒有頻道被指定,也即是,一個無參數的 UNSUBSCRIBE 調用被執行,那么客戶端使用SUBSCRIBE 命令訂閱的所有頻道都會被退訂。在這種情況下,命令會返回一個信息,告知客戶端所有被退訂的頻道。
可用版本: >= 2.0.0
時間復雜度: O(N) ,N 是客戶端已訂閱的頻道的數量。
返回值: 這個命令在不同的客戶端中有不同的表現。
參考文檔:
http://blog.csdn.net/u011506468/article/details/47337839
http://my.oschina.net/itblog/blog/601284
http://www.oschina.net/code/snippet_584165_52231
http://redis.io/topics/pubsub