Spring Data Redis—Pub/Sub(附Web項目源碼)


一、發布和訂閱機制

  當一個客戶端通過 PUBLISH 命令向訂閱者發送信息的時候,我們稱這個客戶端為發布者(publisher)。

  而當一個客戶端使用 SUBSCRIBE 或者 PSUBSCRIBE 命令接收信息的時候,我們稱這個客戶端為訂閱者(subscriber)。

為了解耦發布者(publisher)和訂閱者(subscriber)之間的關系,Redis 使用了 channel (頻道)作為兩者的中介 —— 發布者將信息直接發布給 channel ,而 channel 負責將信息發送給適當的訂閱者,發布者和訂閱者之間沒有相互關系,也不知道對方的存在

注意:Redis的Pub Sub功能(或許是暫時)不支持持久化,意思就是消息在管道中是即發即失的,Subscriber端一收到消息,消息即從管道中刪除。所以如果是對消息的准確性要求比較高或者是有持久化的需求,Redis就不是那么合適了,期待以后的版本加入持久化功能。

 

二、Pub/Sub的作用

         其實從Pub/Sub的機制來看,它更像是一個廣播系統,多個Subscriber可以訂閱多個Channel,多個Publisher可以往多個Channel中發布消息。可以這么簡單的理解:

Subscriber:收音機(只不過這個收音機可以收到多個頻道,並以隊列方式顯示)

Publisher:電台(電台可以往不同的FM頻道中發消息)

Channel:不同頻率的FM頻道

 

所以根據這個理解,那么我覺得有幾種用法是比較可取的:

  1.一個Publisher,多個Subscriber:

  如下圖所示,可以作為消息隊列或者消息管道。

  主要應用:通知、公告。

  2.多個Publisher,一個Subscriber:

  可以將PubSub做成獨立的HTTP接口,各應用程序作為Publisher向Channel中發送消息,Subscriber端收到消息后執行相應的業務邏輯,比如寫數據庫,顯示等等。

  主要應用:排行榜、投票、計數。

 

3.多個Publisher,多個Subscriber

圖就不上了,故名思議,就是可以向不同的Channel中發送消息,由不同的Subscriber接收。

主要應用:群聊、聊天。

可參考Spring data redis主頁的開源項目retwisj。

Github地址:https://github.com/spring-projects/spring-data-keyvalue-examples/tree/master/retwisj

 

從上述幾種用法來看,根據不同的限制條件,限制Publisher、Subscriber和Channel的數量,可以實現不同的功能,其實完全可以把Pub/Sub理解為Socket編程,用Socket也可以實現上述功能,但是Redis提供了相應的封裝和底層實現,不管是安全性、健壯性的等各方面都有不錯的表現,以及未來的一些拓展,個人覺得Redis是個不錯的選擇。

 

三、Demo演示:

因為我的上一篇博客Spring Data Redis簡介以及項目Demo,RedisTemplate和 Serializer詳解,已經演示了Spring Data Redis的基本配置和使用,所以這里就只貼上Pub/Sub的重要代碼,讀者可以閱讀上篇博客或者下載源碼。

Pub/Sub配置(XMl):

 

 1 <!-- SDR Pub/Sub配置 -->
 2     <!-- SubServiceImpl是實現了MessageListener接口的類,MessageListener接口中定義了onMessage方法,也就是接收消息的方法,每當Channel中有消息,onMessage方法會被自動調用, -->
 3     <bean id="messageListener" class="com.chr.service.impl.SubServiceImpl">
 4     </bean>
 5     
 6     <!-- 可以有多個messageListener,每個messageListener必須注冊到RedisMessageListenerContainer中,讀者可參閱API文檔 -->
 7     <bean id="messageContainer"
 8         class="org.springframework.data.redis.listener.RedisMessageListenerContainer"
 9         destroy-method="destroy">
10         <property name="connectionFactory" ref="connectionFactory" />
11         <!--<property name="taskExecutor"> <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler"> 
12 <property name="poolSize" value="3"></property> </bean> </property>
13 此處可以定義Executor,參閱java.util.concurrent.Executor-->
14         <property name="messageListeners">
15             <map>
16                 <entry key-ref="messageListener">
17                     <ref bean="channelTopic" />
18                 </entry>
19             </map>
20         </property>
21     </bean>
22 
23     <!-- Channel設置 -->
24     <bean id="channelTopic" class="org.springframework.data.redis.listener.ChannelTopic">
25         <constructor-arg value="user:topic" />
26     </bean>

 

在代碼中可以看到subServiceImpl實現類被手動注冊到配置文件中,這樣可能會使代碼混亂,並且會帶來一些問題,比如需要使用注解自動注入rankService,但是因為Spring配置中,XML的優先級大於Annotation,所以subServiceImpl中的rankService不能被@Autowired。

那么解決辦法有兩種:

  1.在配置文件中(messageLisener bean前)加入:

<!-- 類掃描器 -->

    <context:component-scan base-package="com.songod.service" />

 

這樣Spring會先掃描Annotation,創建rankService bean,之后再注入messageLisener。

 

  2.在messageContainer bean中,只注入connectionFactory,不注入messageLisener和channelTopic。 之后在Controller中手動注入,調用addMessageListener(MessageListener listener, Topic topic)方法手動注入,但是注意只能注入一次,可以設置Flag判斷。

 

PubServiceImpl:

 1 @Service
 2 public class PubServiceImpl implements PubService {
 3     @Resource(name="stringRedisTemplate")
 4     private  StringRedisTemplate stringRedisTemplate;
 5     
 6     private String channelTopic = "user:topic";
 7     
 8     /*發布消息到Channel*/
 9     public void Publisher(String message) {
10         stringRedisTemplate.convertAndSend(channelTopic, message);
11     }
12 }

我這里用的是StringRedisTemplate,讀者可以使用RedisTemplate設置其它序列化方式,可以看我的上一篇博客。

 

SubServiceImpl:

public class SubServiceImpl implements SubService {
    @Autowired
    private ChannelTopic channelTopic;

    private MessageList messageList = new MessageList();

    public void onMessage(Message message, byte[] pattern) {
        System.out.println(message.toString() + "  " + channelTopic.getTopic());
        messageList.add(message.toString());
    }

    public MessageList getMessageList() {
        return messageList;
    }
}

主要是onMessage方法,可以在此方法中將message傳入其它業務邏輯中進行處理。

 

四、Demo運行:

Publish:

Subscrib:

 

五、項目源碼:

 redis-web-pubsub

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM