Redis實戰——Redis的pub/Sub(訂閱與發布)在java中的實現


借鑒:https://blog.csdn.net/canot/article/details/51938955

 

1.什么是pub/sub

Pub/Sub功能(means Publish, Subscribe)即發布及訂閱功能。基於事件的系統中,Pub/Sub是目前廣泛使用的通信模型,它采用事件作為基本的通信機制,提供大規模系統所要求的松散耦合的交互模式:訂閱者(如客戶端)以事件訂閱的方式表達出它有興趣接收的一個事件或一類事件;發布者(如服務器)可將訂閱者感興趣的事件隨時通知相關訂閱者。熟悉設計模式的朋友應該了解這與23種設計模式中的觀察者模式極為相似。 
同樣,Redis的pub/sub是一種消息通信模式,主要的目的是解除消息發布者和消息訂閱者之間的耦合,Redis作為一個pub/sub的server,在訂閱者和發布者之間起到了消息路由的功能。

2.Redis pub/sub的實現

Redis通過publish和subscribe命令實現訂閱和發布的功能。訂閱者可以通過subscribe向redis server訂閱自己感興趣的消息類型。redis將信息類型稱為通道(channel)。當發布者通過publish命令向redis server發送特定類型的信息時,訂閱該消息類型的全部訂閱者都會收到此消息。

客戶端1訂閱CCTV1:

127.0.0.1:6379> subscribe CCTV1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "CCTV1"
3) (integer) 1

 

 

客戶端2訂閱CCTV1和CCTV2:

127.0.0.1:6379> subscribe CCTV1 CCTV2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "CCTV1"
3) (integer) 1
1) "subscribe"
2) "CCTV2"
3) (integer) 2

 

此時這兩個客戶端分別監聽這指定的頻道。現在另一個客戶端向服務器推送了關於這兩個頻道的信息。

127.0.0.1:6379> publish CCTV1 "cctv1 is good"
(integer) 2
//返回2表示兩個客戶端接收了次消息。被接收到消息的客戶端如下所示。
1) "message"
2) "CCTV1"
3) "cctv1 is good"
----
1) "message"
2) "CCTV1"
3) "cctv1 is good"

如上的訂閱/發布也稱訂閱發布到頻道(使用publish與subscribe命令),此外還有訂閱發布到模式(使用psubscribe來訂閱一個模式)

訂閱CCTV的全部頻道

127.0.0.1:6379> psubscribe CCTV*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "CCTV*"
3) (integer) 1

當依然先如上推送一個CCTV1的消息時,該客戶端正常接收。

3.Pub/Sub在java中的實現

導入Redis驅動:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

 

Redis驅動包提供了一個抽象類:JedisPubSub…繼承這個類就完成了對客戶端對訂閱的監聽。示例代碼:

/**
 * redis發布訂閱消息監聽器
 * @ClassName: RedisMsgPubSubListener 
 * @Description: TODO
 * @author OnlyMate
 * @Date 2018年8月22日 上午10:05:35  
 *
 */
public class RedisMsgPubSubListener extends JedisPubSub {
    private Logger logger = LoggerFactory.getLogger(RedisMsgPubSubListener.class);
    
    @Override
    public void unsubscribe() {
        super.unsubscribe();
    }
 
    @Override
    public void unsubscribe(String... channels) {
        super.unsubscribe(channels);
    }
 
    @Override
    public void subscribe(String... channels) {
        super.subscribe(channels);
    }
 
    @Override
    public void psubscribe(String... patterns) {
        super.psubscribe(patterns);
    }
 
    @Override
    public void punsubscribe() {
        super.punsubscribe();
    }
 
    @Override
    public void punsubscribe(String... patterns) {
        super.punsubscribe(patterns);
    }
 
    @Override
    public void onMessage(String channel, String message) {
        logger.info("onMessage: channel[{}], message[{}]",channel, message);
    }
 
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        logger.info("onPMessage: pattern[{}], channel[{}], message[{}]", pattern, channel, message);
    }
 
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        logger.info("onSubscribe: channel[{}], subscribedChannels[{}]", channel, subscribedChannels);
    }
 
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        logger.info("onPUnsubscribe: pattern[{}], subscribedChannels[{}]", pattern, subscribedChannels);
    }
 
    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        logger.info("onPSubscribe: pattern[{}], subscribedChannels[{}]", pattern, subscribedChannels);
    }
 
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        logger.info("channel:{} is been subscribed:{}", channel, subscribedChannels);
    }
}

 

如上所示,抽象類中存在的方法。分別表示

  • 監聽到訂閱模式接受到消息時的回調 (onPMessage)
  • 監聽到訂閱頻道接受到消息時的回調 (onMessage )
  • 訂閱頻道時的回調( onSubscribe )
  • 取消訂閱頻道時的回調( onUnsubscribe )
  • 訂閱頻道模式時的回調 ( onPSubscribe )
  • 取消訂閱模式時的回調( onPUnsubscribe )

運行我們剛剛編寫的類:

訂閱者

/**
 * 訂閱者
 * @ClassName: RedisSubTest 
 * @Description: TODO
 * @author OnlyMate
 * @Date 2018年8月23日 下午2:59:42  
 *
 */
public class RedisSubTest {
    @Test
    public void subjava() {
        System.out.println("訂閱者 ");
        Jedis jr = null;
        try {
            jr = new Jedis("127.0.0.1", 6379, 0);// redis服務地址和端口號
            RedisMsgPubSubListener sp = new RedisMsgPubSubListener();
            // jr客戶端配置監聽兩個channel
            jr.subscribe(sp, "news.share", "news.blog");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (jr != null) {
                jr.disconnect();
            }
        }
    }
}

發布者

/**
 * 發布者
 * @ClassName: RedisPubTest 
 * @Description: TODO
 * @author OnlyMate
 * @Date 2018年8月23日 下午2:59:25  
 *
 */
public class RedisPubTest {
    @Test
    public void pubjava() {
        System.out.println("發布者 ");
        Jedis jr = null;
        try {
            jr = new Jedis("127.0.0.1", 6379, 0);// redis服務地址和端口號
            // jr客戶端配置監聽兩個channel
            jr.publish( "news.share", "新聞分享");
            jr.publish( "news.blog", "新聞博客");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (jr != null) {
                jr.disconnect();
            }
        }
    }
}

 

從代碼中我們不難看出,我們聲明的一個redis鏈接在設置監聽后就可以執行一些操作,例如發布消息,訂閱消息等。。。 
當運行上述代碼后會在控制台輸出:

此時當在有客戶端向new.share或者new.blog通道publish消息時,onMessage方法即可被相應。(jedis.publish(channel, message))。

 

4.Pub/Sub在Spring中的實踐 
導入依賴jar

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-redis</artifactId>
    <version>2.0.8.RELEASE</version>
</dependency>

 核心消息監聽器

/**
 * redis發布訂閱消息監聽器
 * @ClassName: RedisMsgPubSubListener 
 * @Description: TODO
 * @author OnlyMate
 * @Date 2018年8月22日 上午10:05:35  
 *
 */
public class RedisMsgPubSubListener implements MessageListener {
    private Logger logger = LoggerFactory.getLogger(RedisMsgPubSubListener.class);
    
    @Override
    public void onMessage( final Message message, final byte[] pattern ) {
        RedisSerializer<?> serializer = redisTemplate.getValueSerializer();
        // message.getBody()是Redis的值,需要用redis的valueSerializer反序列化
        logger.info("Message receive-->pattern:{},message: {},{}", new String(pattern),
                serializer.deserialize(message.getBody()),
                redisTemplate.getStringSerializer().deserialize(message.getChannel()));
        logger.info(message.toString());
        JSONObject json = JSONObject.parseObject(serializer.deserialize(message.getBody()).toString());
        String cutomerId = json.getString("cutomerId");
        
        //可以與WebSocket結合使用,解決分布式服務中,共享Session
        if(StringUtils.isNotEmpty(cutomerId)) {
            logger.info("cutomerId: {},消息:{}", cutomerId, message.toString());
        }else {
            logger.info("cutomerId 為空,無法推送給對應的客戶端,消息:{}", message.toString());
        }
    }
}

 

 

現在我們在獲取RedisTemplate,並給WEB_SOCKET:LOTTERY這個channel publish數據。

/**
 * 發布者
 * @ClassName: RedisMsgPubClient 
 * @Description: TODO
 * @author OnlyMate
 * @Date 2018年8月23日 下午3:59:33  
 *
 */
@Controller
@RequestMapping(value="/redisMsgPubClientBySpring")
public class RedisMsgPubClient {
    private Logger logger = LoggerFactory.getLogger(RedisMsgPubClient.class);
    
    @Autowired
    private RedisTemplate<Object,Object> redisTemplate;
    
    @RequestMapping
    @ResponseBody
    public String pubMsg(HttpServletRequest request, HttpServletResponse response) {
        String cutomerId = request.getParameter("cutomerId").toString();
        String msg = request.getParameter("msg").toString();
        logger.info("發布消息:{}", request.getParameter("msg").toString());
        JSONObject json = new JSONObject();
        json.put("cutomerId", cutomerId);
        json.put("msg", msg);
        redisTemplate.convertAndSend("WEB_SOCKET:LOTTERY", json);
        return "成功";
    }
}

 

最后一步reids的配置

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:cache="http://www.springframework.org/schema/cache"
    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/context
                        http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/aop
                        http://www.springframework.org/schema/aop/spring-aop.xsd
                        http://www.springframework.org/schema/tx
                        http://www.springframework.org/schema/tx/spring-tx.xsd
                        http://www.springframework.org/schema/mvc 
                        http://www.springframework.org/schema/mvc/spring-mvc.xsd
                        http://www.springframework.org/schema/cache   
                        http://www.springframework.org/schema/cache/spring-cache.xsd"
    default-autowire="byName">
    
    <description>redis 相關類 Spring 托管</description>
    
    <!-- 開啟緩存 -->
    <cache:annotation-driven />
    <bean name="springCacheAnnotationParser" class="org.springframework.cache.annotation.SpringCacheAnnotationParser"></bean>
    <bean name="annotationCacheOperationSource" class="org.springframework.cache.annotation.AnnotationCacheOperationSource">
        <constructor-arg>
            <array>
                <ref bean="springCacheAnnotationParser"/>
            </array>
        </constructor-arg>
    </bean>
    <bean name="cacheInterceptor" class="org.springframework.cache.interceptor.CacheInterceptor">
        <property name="cacheOperationSources" ref="annotationCacheOperationSource" />
    </bean>
    <bean class="org.springframework.cache.interceptor.BeanFactoryCacheOperationSourceAdvisor">
        <property name="cacheOperationSource" ref="annotationCacheOperationSource" />
        <property name="advice" ref="cacheInterceptor" />
        <property name="order" value="2147483647" />
    </bean>
    
    <!--載入 redis 配置文件-->
    <context:property-placeholder location="classpath:redis.properties" ignore-unresolvable="true"/>

    
    <!-- 配置JedisConnectionFactory -->
    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
        <property name="hostName" value="${redis.host}"/>
        <property name="port" value="${redis.port}"/>
        <property name="password" value="${redis.pass}"/>
        <property name="database" value="${redis.dbIndex}"/>
        <property name="poolConfig" ref="jedisPoolConfig"/>
        <!-- <constructor-arg name="sentinelConfig" ref="redisSentinelConfiguration" /> -->
        <constructor-arg name="poolConfig" ref="jedisPoolConfig" />
    </bean>
    <!-- 配置 JedisPoolConfig 實例 -->
    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
        <!-- 最大連接數 -->
        <property name="maxTotal" value="${redis.pool.maxActive}"/>
        <!-- 最大空閑時間 -->
        <property name="maxIdle" value="${redis.pool.maxIdle}"/>
        <!-- 最小空閑時間 -->
        <property name="minIdle" value="${redis.pool.minIdle}"/>
        <!-- 獲得鏈接時的最大等待毫秒數,小於0:阻塞不確定時間,默認-1 -->
        <property name="maxWaitMillis" value="${redis.pool.maxWaitMillis}"/>
        <!-- 在borrow一個jedis實例時,是否提前進行validate操作;如果為true,則得到的jedis實例均是可用的 -->
        <property name="testOnBorrow" value="${redis.pool.testOnBorrow}"/>
        <!-- 在空閑時檢查有效性,默認false -->
        <property name="testWhileIdle" value="${redis.pool.testOnBorrow}"/>
        <!-- 表示idle object evitor兩次掃描之間要sleep的毫秒數 -->
        <property name="timeBetweenEvictionRunsMillis" value="${redis.pool.timeBetweenEvictionRunsMillis}" />
        <!-- 表示一個對象至少停留在idle狀態的最短時間,然后才能被idle object evitor掃描並驅逐;這一項只有在timeBetweenEvictionRunsMillis大於0時才有意義 -->
        <property name="minEvictableIdleTimeMillis" value="${redis.pool.minEvictableIdleTimeMillis}" />
        <!-- 表示idle object evitor每次掃描的最多的對象數 -->
        <property name="numTestsPerEvictionRun" value="${redis.pool.numTestsPerEvictionRun}" />
    </bean>
    <!-- 配置哨兵 -->
    <!-- <bean id="redisSentinelConfiguration" class="org.springframework.data.redis.connection.RedisSentinelConfiguration">
        <property name="master">
            <bean class="org.springframework.data.redis.connection.RedisNode">
                <property name="name" value="mymaster" />
            </bean>
        </property>
        <property name="sentinels">
            <set>
                <bean class="org.springframework.data.redis.connection.RedisNode">
                    <constructor-arg name="host" value="10.252.2.137" />
                    <constructor-arg name="port" value="26391" />
                </bean>
                <bean class="org.springframework.data.redis.connection.RedisNode">
                    <constructor-arg name="host" value="10.252.2.137" />
                    <constructor-arg name="port" value="26392" />
                </bean>
                <bean class="org.springframework.data.redis.connection.RedisNode">
                    <constructor-arg name="host" value="10.252.2.137" />
                    <constructor-arg name="port" value="26393" />
                </bean>
            </set>
        </property>
    </bean> -->
    
    <!-- SDR默認采用的序列化策略有兩種,一種是String的序列化策略,一種是JDK的序列化策略。
        StringRedisTemplate默認采用的是String的序列化策略,保存的key和value都是采用此策略序列化保存的。 
        RedisTemplate默認采用的是JDK的序列化策略,保存的key和value都是采用此策略序列化保存的。 
        就是因為序列化策略的不同,即使是同一個key用不同的Template去序列化,結果是不同的。所以根據key去刪除數據的時候就出現了刪除失敗的問題。
    -->
    <!-- redis 序列化策略 ,通常情況下key值采用String序列化策略, -->
    <!-- 如果不指定序列化策略,StringRedisTemplate的key和value都將采用String序列化策略; -->
    <!-- 但是RedisTemplate的key和value都將采用JDK序列化 這樣就會出現采用不同template保存的數據不能用同一個template刪除的問題 -->
    <!-- 配置RedisTemplate -->
    <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
        <property name="connectionFactory" ref="jedisConnectionFactory" />
        <property name="keySerializer">
            <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
        </property>
        <!-- <property name="valueSerializer" ref="stringRedisSerializer" /> value值如果是對象,這不能用stringRedisSerializer,報類型轉換錯誤-->
        <!-- <property name="valueSerializer">
            hex(十六進制)的格式
            <bean class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
        </property> -->
        <property name="valueSerializer" >
            <!-- json的格式,要注意實體屬性名有沒有‘_’,如user_name,有的話要加注解 ,@JsonNaming會將userName處理為user_name
                   @JsonSerialize
                @JsonNaming(PropertyNamingStrategy.LowerCaseWithUnderscoresStrategy.class) 
               -->
            <bean class="org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer" />
        </property>
    </bean>

    <!-- spring自己的緩存管理器,這里定義了緩存位置名稱 ,即注解中的value -->
    <bean id="cacheManager" class="org.springframework.cache.support.SimpleCacheManager">
        <property name="caches">
            <set>
                <!-- 這里可以配置多個redis -->
                <bean
                    class="org.springframework.cache.concurrent.ConcurrentMapCacheFactoryBean">
                    <property name="name" value="localDefault" /><!-- 缺省本地緩存 -->
                </bean>
                <bean
                    class="org.springframework.cache.concurrent.ConcurrentMapCacheFactoryBean">
                    <property name="name" value="WSLocalTableCache" /><!-- 單表配置 -->
                </bean>
                <!-- 本地緩存2:管理緩存失效 -->
                <bean class="com.only.mate.utils.RedisCache">
                    <property name="name" value="localTest" /><!-- 本地緩存名 -->
                    <property name="timeout" value="10" />  <!-- seconds -->
                    <property name="removeTimeout" value="true" /> <!-- 超時移除 -->
                </bean>
            </set>
        </property>
    </bean>
    
    <!-- 配置redis發布訂閱模式 -->
    <bean id="redisMessageListenerContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
        <property name="connectionFactory" ref="jedisConnectionFactory" />
        <property name="messageListeners">
            <map>
                <entry key-ref="messageListenerAdapter">
                    <bean class="org.springframework.data.redis.listener.ChannelTopic">
                        <constructor-arg value="WEB_SOCKET:LOTTERY"></constructor-arg>
                    </bean>
                </entry>
            </map>
        </property>
    </bean>

    <bean id="messageListenerAdapter" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
        <constructor-arg ref="redisMsgPubSubListener"></constructor-arg>
    </bean>

    <bean id="redisMsgPubSubListener" class="com.redis.pubsub.spring.RedisMsgPubSubListener"></bean>
</beans>

如上的配置即配置了對Redis的鏈接。在配置類中的將ChannelTopic加入IOC容器。則在Spring啟動時會在一個RedisTemplate(一個對Redis的鏈接)中設置的一個channel,即WEB_SOCKET:LOTTERY。 
在上述配置中,RedisMsgPubSubListener是我們生成的,這個類即為核心監聽類,RedisTemplate接受到數據如何處理就是在該類中處理的。

附加上Java配置

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;

import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.interceptor.KeyGenerator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.gfss.common.listener.CustomRedisMsgPubSubListener;

@Configuration
@EnableCaching
public class RedisConfiguration extends CachingConfigurerSupport {

    @Override
    @Bean
    public KeyGenerator keyGenerator() {
        return new KeyGenerator() {
            @Override
            public Object generate(Object target, Method method, Object... params) {
                StringBuilder sb = new StringBuilder();
                sb.append(target.getClass().getName());
                sb.append(method.getName());
                for (Object obj : params) {
                    sb.append(obj.toString());
                }
                return sb.toString();
            }
        };

    }

    @Bean
    public CacheManager cacheManager(RedisTemplate<?, ?> redisTemplate) {
        return new RedisCacheManager(redisTemplate);
    }

    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate template = new StringRedisTemplate(factory);
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(
                Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public RedisTemplate<String, Object> objectRedisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
        template.setConnectionFactory(factory);
        template.setKeySerializer(new StringRedisSerializer());
        return template;
    }

    /************** 配置redis發布訂閱模式 *******************************/
    @Bean
    public CustomRedisMsgPubSubListener customRedisMsgPubSubListener() {
        return new CustomRedisMsgPubSubListener();
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter(MessageListener messageListener) {
        return new MessageListenerAdapter(messageListener);
    }

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,
            MessageListenerAdapter messageListenerAdapter) {

        List<Topic> collection = new ArrayList<Topic>();
        // 普通訂閱,訂閱具體的頻道
        ChannelTopic channelTopic = new ChannelTopic("WEB_SOCKET:LOTTERY");
        collection.add(channelTopic);

        /*// 模式訂閱,支持模式匹配訂閱,*為模糊匹配符
        PatternTopic PatternTopic = new PatternTopic("WEB_SOCKET:*");
        collection.add(PatternTopic);
        // 匹配所有頻道
        PatternTopic PatternTopicAll = new PatternTopic("*");
        collection.add(PatternTopicAll);*/

        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        redisMessageListenerContainer.addMessageListener(messageListenerAdapter, collection);
        return redisMessageListenerContainer;
    }
}

 

訪問頁面去調用發布者

http://localhost:8088/redis/redisMsgPubClientBySpring?cutomerId=all&msg=你們好

訂閱收到的消息

 

5.拓展開發

  在分布式服務中,可以結合WebSocket與Redis的發布訂閱模式相結合,解決session不能共享的問題。

  當業務處理完成之后,通過Redis的發布訂閱模式,發布消息到每個訂閱該頻道的服務節點,然后由每個服務節點通過key尋找自己內存緩存中的session,然后找到了就向客戶端推消息,否則不處理。

Dubbo只能傳輸可序列化的對象,Redis只能緩存可序列化的對象,Dubbo基於網絡流(TCP),Redis緩存的數據要存儲在硬盤上,而WebSocketSession是沒有實現序列化的,所以不能跨服務傳遞WebSocketSession,也不能使用Redis存儲WebSocketSession,只能自定義一塊緩存區。

6.動態訂閱頻道

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.RedisSerializer;

import com.alibaba.fastjson.JSONObject;
import com.gfss.common.websocket.CustomWebSocketHandler;

/**
 * redis發布訂閱消息監聽器
 * @ClassName: RedisMsgPubSubListener
 * @Description: TODO
 * @author OnlyMate
 * @Date 2018年8月22日 上午10:05:35
 *
 */
public class CustomRedisMsgPubSubListener implements MessageListener {
    private Logger logger = LoggerFactory.getLogger(CustomRedisMsgPubSubListener.class);

    @Autowired
    private CustomWebSocketHandler customWebSocketHandler;
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    /**
     * 實例:
     *    JSONObject json = new JSONObject();
     *    json.put("cutomerId", notifyResult.getResult());
     *    json.put("resultCode", map.get("resultCode"));
     *    //向redis發布消息
     *    redisTemplate.convertAndSend(channelName, json);
     * @param message
     * @param pattern
     * @Throws
     * @Author: chetao
     * @Date: 2019年1月8日 下午10:40:21
     * @see org.springframework.data.redis.connection.MessageListener#onMessage(org.springframework.data.redis.connection.Message, byte[])
     */
    @Override
    public void onMessage( final Message message, final byte[] pattern ) {
        RedisSerializer<?> serializer = redisTemplate.getKeySerializer();
        logger.info("Message receive-->pattern:{},message: {},{}", serializer.deserialize(pattern),
                serializer.deserialize(message.getBody()), serializer.deserialize(message.getChannel()));
        if ("WEB_SOCKET:PAY_NOTIFY".equals(serializer.deserialize(message.getChannel()))) {
            RedisMessageListenerContainer redisMessageListenerContainer = applicationContext
                    .getBean("redisMessageListenerContainer", RedisMessageListenerContainer.class);
            MessageListenerAdapter messageListenerAdapter = applicationContext.getBean("messageListenerAdapter",
                    MessageListenerAdapter.class);
            /*List<Topic> collection = new ArrayList<Topic>();
            // 動態添加訂閱主題
            ChannelTopic channelTopic = new ChannelTopic("WEB_SOCKET1:PAY_NOTIFY");
            collection.add(channelTopic);
            PatternTopic PatternTopic = new PatternTopic("WEB_SOCKET:*");
            collection.add(PatternTopic);
            redisMessageListenerContainer.addMessageListener(messageListenerAdapter, collection);*/
            // 動態添加訂閱主題
            ChannelTopic channelTopic = new ChannelTopic("WEB_SOCKET1:PAY_NOTIFY");
            redisMessageListenerContainer.addMessageListener(messageListenerAdapter, channelTopic);
            PatternTopic PatternTopic = new PatternTopic("WEB_SOCKET:*");
            redisMessageListenerContainer.addMessageListener(messageListenerAdapter, PatternTopic);
        }

        JSONObject json = JSONObject.parseObject(message.toString());
        customWebSocketHandler.sendMessage(json.toJSONString());
    }
}

上面兩種動態訂閱頻道的方式都可以,本人已測試是可行的,可以結合自己的業務去拓展,如:臨時訂閱頻道后退訂頻道

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM