發布訂閱(pub/sub)是一種消息通信模式,主要的目的是解耦消息發布者和消息訂閱者之間的耦合,這點和設計模式中的觀察者模式比較相似。pub /sub不僅僅解決發布者和訂閱者直接代碼級別耦合也解決兩者在物理部署上的耦合。redis作為一個pub/sub server,在訂閱者和發布者之間起到了消息路由的功能。訂閱者可以通過subscribe和psubscribe命令向redis server訂閱自己感興趣的消息類型,redis將消息類型稱為通道(channel)。當發布者通過publish命令向redis server發送特定類型的消息時。訂閱該消息類型的全部client都會收到此消息。這里消息的傳遞是多對多的。一個client可以訂閱多個 channel,也可以向多個channel發送消息。
下面做個實驗。這里使用兩個不同的client一個是redis自帶的redis-cli另一個是用java版客戶端jedis寫的。java代碼如下:
package com.jd.redis.client;
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub;
publicclass TestPubSubextends JedisPubSub{
@Override publicvoid onMessage(String channel, String message) { System.out.println("onMessage: channel["+channel+"], message["+message+"]"); }
@Override publicvoid onPMessage(String pattern, String channel, String message) { System.out.println("onPMessage: channel["+channel+"], message["+message+"]"); }
@Override publicvoid onSubscribe(String channel,int subscribedChannels) { System.out.println("onSubscribe: channel["+channel+"],"+ "subscribedChannels["+subscribedChannels+"]"); }
@Override publicvoid onUnsubscribe(String channel,int subscribedChannels) { System.out.println("onUnsubscribe: channel["+channel+"], "+ "subscribedChannels["+subscribedChannels+"]"); }
@Override publicvoid onPUnsubscribe(String pattern,int subscribedChannels) { System.out.println("onPUnsubscribe: pattern["+pattern+"],"+ "subscribedChannels["+subscribedChannels+"]"); }
@Override publicvoid onPSubscribe(String pattern,int subscribedChannels) { System.out.println("onPSubscribe: pattern["+pattern+"], "+ "subscribedChannels["+subscribedChannels+"]"); }
publicstaticvoid main(String[] args) { Jedis jr = null; try { jr = new Jedis("192.168.157.128", 6379, 0);//redis服務地址和端口號 TestPubSub sp = new TestPubSub(); sp.proceed(jr.getClient(),"news.share", "news.blog"); //sp.proceedWithPatterns(jr.getClient(), "news.*"); } catch (Exception e) { e.printStackTrace(); } finally{ if(jr!=null){ jr.disconnect(); } } } } |
代碼就是用TestPubSub對象來訂閱,對象中的那此onXXX方法監聽到相應事件
1 首先運行此java程序;
onSubscribe: channel[news.share], subscribedChannels[1] onSubscribe: channel[news.blog], subscribedChannels[2] |
2 啟動redis-cli
redis 127.0.0.1:6379> psubscribe news.* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "news.*" 3) (integer) 1 |
3 再啟動一個redis-cli用來發布兩條消息:
redis 127.0.0.1:6379> publish news.share "share a link http://www.google.com" (integer) 2 redis 127.0.0.1:6379> publish news.blog "I post a blog" (integer) 2 |
4.查看兩個訂閱client的輸出 此時java client打印如下內容:
onMessage: channel[news.share], message[share a link http://www.google.com] onMessage: channel[news.blog], message[I post a blog] |
另一個redis-cli輸出如下:
1) "pmessage" 2) "news.*" 3) "news.share" 4) "share a link http://www.google.com" 1) "pmessage" 2) "news.*" 3) "news.blog" 4) "I post a blog" |
redis client使用psubscribe訂閱了一個使用通配符的通道(*表示任意字符串),此訂閱會收到所有與news.*匹配的通道消息。redis- cli打印到控制台的訂閱成功消息表示使用psubscribe命令訂閱news.*成功后,連接訂閱通道總數為1。
當我們在一個client使用publish向news.share和news.blog通道發出兩個消息后。redis返回的(integer) 2表示有兩個連接收到了此消息。
看完一個小例子后應該對pub/sub功能有了一個感性的認識。需要注意的是當一個連接通過subscribe或者psubscribe訂閱通道后就進入訂閱模式。在這種模式除了再訂閱額外的通道或者用unsubscribe或者punsubscribe命令退出訂閱模式,就不能再發送其他命令。另外使用 psubscribe命令訂閱多個通配符通道,如果一個消息匹配上了多個通道模式的話,會多次收到同一個消息。 redis的pub/sub還是有點太單薄(實現才用150行代碼)。在安全,認證,可靠性這方便都沒有太多支持