一、發布和訂閱機制
當一個客戶端通過 PUBLISH 命令向訂閱者發送信息的時候,我們稱這個客戶端為發布者(publisher)。
而當一個客戶端使用 SUBSCRIBE 或者 PSUBSCRIBE 命令接收信息的時候,我們稱這個客戶端為訂閱者(subscriber)。
為了解耦發布者(publisher)和訂閱者(subscriber)之間的關系,Redis 使用了 channel (頻道)作為兩者的中介 —— 發布者將信息直接發布給 channel ,而 channel 負責將信息發送給適當的訂閱者,發布者和訂閱者之間沒有相互關系,也不知道對方的存在
注意:Redis的Pub Sub功能(或許是暫時)不支持持久化,意思就是消息在管道中是即發即失的,Subscriber端一收到消息,消息即從管道中刪除。所以如果是對消息的准確性要求比較高或者是有持久化的需求,Redis就不是那么合適了,期待以后的版本加入持久化功能。
二、Pub/Sub的作用
其實從Pub/Sub的機制來看,它更像是一個廣播系統,多個Subscriber可以訂閱多個Channel,多個Publisher可以往多個Channel中發布消息。可以這么簡單的理解:
Subscriber:收音機(只不過這個收音機可以收到多個頻道,並以隊列方式顯示)
Publisher:電台(電台可以往不同的FM頻道中發消息)
Channel:不同頻率的FM頻道
所以根據這個理解,那么我覺得有幾種用法是比較可取的:
1.一個Publisher,多個Subscriber:
如下圖所示,可以作為消息隊列或者消息管道。
主要應用:通知、公告。
2.多個Publisher,一個Subscriber:
可以將PubSub做成獨立的HTTP接口,各應用程序作為Publisher向Channel中發送消息,Subscriber端收到消息后執行相應的業務邏輯,比如寫數據庫,顯示等等。
主要應用:排行榜、投票、計數。
3.多個Publisher,多個Subscriber
圖就不上了,故名思議,就是可以向不同的Channel中發送消息,由不同的Subscriber接收。
主要應用:群聊、聊天。
可參考Spring data redis主頁的開源項目retwisj。
Github地址:https://github.com/spring-projects/spring-data-keyvalue-examples/tree/master/retwisj
從上述幾種用法來看,根據不同的限制條件,限制Publisher、Subscriber和Channel的數量,可以實現不同的功能,其實完全可以把Pub/Sub理解為Socket編程,用Socket也可以實現上述功能,但是Redis提供了相應的封裝和底層實現,不管是安全性、健壯性的等各方面都有不錯的表現,以及未來的一些拓展,個人覺得Redis是個不錯的選擇。
三、Demo演示:
因為我的上一篇博客Spring Data Redis簡介以及項目Demo,RedisTemplate和 Serializer詳解,已經演示了Spring Data Redis的基本配置和使用,所以這里就只貼上Pub/Sub的重要代碼,讀者可以閱讀上篇博客或者下載源碼。
Pub/Sub配置(XMl):
1 <!-- SDR Pub/Sub配置 --> 2 <!-- SubServiceImpl是實現了MessageListener接口的類,MessageListener接口中定義了onMessage方法,也就是接收消息的方法,每當Channel中有消息,onMessage方法會被自動調用, --> 3 <bean id="messageListener" class="com.chr.service.impl.SubServiceImpl"> 4 </bean> 5 6 <!-- 可以有多個messageListener,每個messageListener必須注冊到RedisMessageListenerContainer中,讀者可參閱API文檔 --> 7 <bean id="messageContainer" 8 class="org.springframework.data.redis.listener.RedisMessageListenerContainer" 9 destroy-method="destroy"> 10 <property name="connectionFactory" ref="connectionFactory" /> 11 <!--<property name="taskExecutor"> <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler"> 12 <property name="poolSize" value="3"></property> </bean> </property> 13 此處可以定義Executor,參閱java.util.concurrent.Executor--> 14 <property name="messageListeners"> 15 <map> 16 <entry key-ref="messageListener"> 17 <ref bean="channelTopic" /> 18 </entry> 19 </map> 20 </property> 21 </bean> 22 23 <!-- Channel設置 --> 24 <bean id="channelTopic" class="org.springframework.data.redis.listener.ChannelTopic"> 25 <constructor-arg value="user:topic" /> 26 </bean>
在代碼中可以看到subServiceImpl實現類被手動注冊到配置文件中,這樣可能會使代碼混亂,並且會帶來一些問題,比如需要使用注解自動注入rankService,但是因為Spring配置中,XML的優先級大於Annotation,所以subServiceImpl中的rankService不能被@Autowired。
那么解決辦法有兩種:
1.在配置文件中(messageLisener bean前)加入:
<!-- 類掃描器 --> <context:component-scan base-package="com.songod.service" />
這樣Spring會先掃描Annotation,創建rankService bean,之后再注入messageLisener。
2.在messageContainer bean中,只注入connectionFactory,不注入messageLisener和channelTopic。 之后在Controller中手動注入,調用addMessageListener(MessageListener listener, Topic topic)方法手動注入,但是注意只能注入一次,可以設置Flag判斷。
PubServiceImpl:
1 @Service 2 public class PubServiceImpl implements PubService { 3 @Resource(name="stringRedisTemplate") 4 private StringRedisTemplate stringRedisTemplate; 5 6 private String channelTopic = "user:topic"; 7 8 /*發布消息到Channel*/ 9 public void Publisher(String message) { 10 stringRedisTemplate.convertAndSend(channelTopic, message); 11 } 12 }
我這里用的是StringRedisTemplate,讀者可以使用RedisTemplate設置其它序列化方式,可以看我的上一篇博客。
SubServiceImpl:
public class SubServiceImpl implements SubService { @Autowired private ChannelTopic channelTopic; private MessageList messageList = new MessageList(); public void onMessage(Message message, byte[] pattern) { System.out.println(message.toString() + " " + channelTopic.getTopic()); messageList.add(message.toString()); } public MessageList getMessageList() { return messageList; } }
主要是onMessage方法,可以在此方法中將message傳入其它業務邏輯中進行處理。
四、Demo運行:
Publish:
Subscrib:
五、項目源碼: