Spring Data Redis實現消息隊列——發布/訂閱模式


一般來說,消息隊列有兩種場景,一種是發布者訂閱者模式,一種是生產者消費者模式。利用redis這兩種場景的消息隊列都能夠實現。

定義:
生產者消費者模式:生產者生產消息放到隊列里,多個消費者同時監聽隊列,誰先搶到消息誰就會從隊列中取走消息;即對於每個消息只能被最多一個消費者擁有。
發布者訂閱者模式:發布者生產消息放到隊列里,多個監聽隊列的消費者都會收到同一份消息;即正常情況下每個消費者收到的消息應該都是一樣的。


下面就以Spring Data Redis實現簡單的消息“發布/訂閱”服務。

spring-redis使用RedisMessageListenerContainer進行消息監聽,客戶程序需要自己實現MessageListener,並以指定的topic注冊到RedisMessageListenerContainer,這樣,在指定的topic上如果有消息,RedisMessageListenerContainer便會通知該MessageListener。

下面是在spring配置文件中配置spring-redis:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd ">

    <!-- 引入jedis配置文件 -->
    <context:property-placeholder location="classpath*:resource/redis.properties" />
   
    <context:component-scan base-package="com.ljq.durian" />

    <!-- jedis pool配置 --> 
    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
        <property name="maxIdle" value="${redis.maxIdle}" />
        <property name="maxTotal" value="${redis.maxActive}" />
        <property name="maxWaitMillis" value="${redis.maxWait}" />
        <property name="testOnBorrow" value="${redis.testOnBorrow}" />
        <property name="testOnReturn" value="${redis.testOnReturn}" />
    </bean>
    
    <!-- spring data redis -->  
    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">  
        <property name="hostName" value="${redis.host}"/>  
        <property name="port" value="${redis.port}"/>  
        <property name="poolConfig" ref="jedisPoolConfig"></property>  
        <property name="timeout" value="${redis.timeout}"></property>  
        <property name="usePool" value="true"></property>  
    </bean>  
    
    <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">  
        <property name="connectionFactory" ref="jedisConnectionFactory"/>  
        <property name="defaultSerializer">  
            <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>  
        </property>  
    </bean>  
    
    <bean id="redisMessageListener" class="com.ljq.durian.common.listener.RedisMessageListener">  
        <property name="redisTemplate" ref="redisTemplate"/>  
    </bean>  

    <bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
        <property name="connectionFactory" ref="jedisConnectionFactory" />
        <property name="messageListeners">
            <map>
                <entry key-ref="redisMessageListener">
                    <list>
                        <!-- 普通訂閱,訂閱具體的頻道 -->
                        <bean class="org.springframework.data.redis.listener.ChannelTopic">
                            <constructor-arg value="topic.channel" />
                        </bean>
                        <!-- 模式訂閱,支持模式匹配訂閱,*為模糊匹配符 -->
                        <bean class="org.springframework.data.redis.listener.PatternTopic">
                            <constructor-arg value="topic.*" />
                        </bean>
                        <!-- 匹配所有頻道 -->
                        <bean class="org.springframework.data.redis.listener.PatternTopic">
                            <constructor-arg value="*" />
                        </bean>
                    </list>
                </entry>
            </map>
        </property>
    </bean>
     
</beans>

上面例子中,最后三個bean的配置是實現發布/訂閱服務的關鍵,RedisMessageListener是自己寫的實現了org.springframework.data.redis.connection.MessageListener的業務類,並以“topic.channel” 這個topic注冊到RedisMessageListenerContainer。RedisMessageListenerContainer在消息到達后負責通知MessageListener。
下面是RedisMessageListener的代碼:

package com.ljq.durian.common.listener;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;


public class RedisMessageListener implements MessageListener {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        RedisSerializer<?> serializer = redisTemplate.getValueSerializer();
        Object channel = serializer.deserialize(message.getChannel());
        Object body = serializer.deserialize(message.getBody());  
        System.out.println("主題: " + channel);
        System.out.println("消息內容: " + String.valueOf(body));
    }

    public RedisTemplate<String, String> getRedisTemplate() {
        return redisTemplate;
    }

    public void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
}

 

這樣,應用啟動時,消息的訂閱方(subscriber)就注冊好了。這時候只要使用一個簡單的程序,模擬publisher,向指定topic發布消息,RedisMessageListener就可以接收到消息,spring-redis的寫法是這樣:

redisTemplate.convertAndSend("topic.channel", "hello world!");


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM