Redis集群環境下的鍵值空間監聽事件實現方案


一直想記錄工作中遇到的問題和解決的方法,奈何沒有找到一方樂土,最近經常反思,是否需要記錄平時的點滴,后台還是決定下定決心記錄一些,以便以后用到的時候找不着,實現這樣的一個功能主要也是業務所需要的。

需求:要求統計所有會員在線人數,並根據會員在線狀態同步改變人數。

之前用戶登錄使用session去控制,那么可以通過session進行在線用戶人數統計,后來實現無狀態不在依賴session作為用戶在線的標准,使用Redis替換了Session,那么用戶直接退出也好做,但是會存在用戶直接關閉頁面的情況,那么這個時候用戶的緩存憑證沒有主動觸發去主動刪掉,所以思來想去查了一些資料通過緩存的Key監聽事件來處理,但是網上的大都是單機版的,對於集群環境下的就少之又少,由於集群是有多個節點,並且key采用的是分片的方式存儲在不同片區,然而使用Spring的RedisTemplate的又不支持集群環境下的監聽事件,由於每次與Redis服務系保持一個有效連接就可以,那么就有可能某個key所在的片區並沒有被監聽到事件,因此需要在源碼上做一些調整,認為讓它遍歷所有集群節點用來監聽集群中的key。所以通過翻閱資料實現下面的功能,還算圓滿的完成了需求任務,當然如果看官看到某些似曾相識的地方請諒解,我也是從大家的經驗中尋找方法有些地方與大家的相似也屬正常。

第一步:修改Redis的配置文件,這一步可讓《運維》同事協助操作,在配置文件中添加如下內容:

Redis的配置文件:

############################# EVENT NOTIFICATION ##############################

# Redis can notify Pub/Sub clients about events happening in the key space.
# This feature is documented at http://redis.io/topics/notifications
#
# For instance if keyspace events notification is enabled, and a client
# performs a DEL operation on key "foo" stored in the Database 0, two
# messages will be published via Pub/Sub:
#
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo
#
# It is possible to select the events that Redis will notify among a set
# of classes. Every class is identified by a single character:
#
# K Keyspace events, published with __keyspace@<db>__ prefix.
# E Keyevent events, published with __keyevent@<db>__ prefix.
# g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
# $ String commands
# l List commands
# s Set commands
# h Hash commands
# z Sorted set commands
# x Expired events (events generated every time a key expires)
# e Evicted events (events generated when a key is evicted for maxmemory)
# A Alias for g$lshzxe, so that the "AKE" string means all the events.
#
# The "notify-keyspace-events" takes as argument a string that is composed
# of zero or multiple characters. The empty string means that notifications
# are disabled.
#
# Example: to enable list and generic events, from the point of view of the
# event name, use:
#
# notify-keyspace-events Elg
#
# Example 2: to get the stream of the expired keys subscribing to channel
# name __keyevent@0__:expired use:
#
notify-keyspace-events Ex
#
# By default all notifications are disabled because most users don't need
# this feature and the feature has some overhead. Note that if you don't
# specify at least one of K or E, no events will be delivered.

此時我們需要開啟緩存的鍵空間通知事件的配置:notify-keyspace-events Ex

 

第二步配置Redis信息,我采用的是yml格式文件,同時配置了三套模式分別為單機模式、哨兵模式、集群模式,各位看官在配置文件中可自行開啟或關閉。

#緩存配置
  redis:
    database: 0
    #host: 127.0.0.1
    #port: 6379
    #sentinel:
      #master: mymaster
      #nodes: 192.168.0.223:27001
    #timeout: 6000ms
    password: Aa123456    
    cluster:
      max-redirects: 3   #獲取失敗 最大重定向次數
      nodes:
        - 192.168.104.7:6379
        - 192.168.104.7:6380
        - 192.168.104.8:6379
        - 192.168.104.8:6380
        - 192.168.104.9:6379
        - 192.168.104.9:6380
    lettuce:
      pool:
        max-active: 1000  #連接池最大連接數(使用負值表示沒有限制)
        max-idle: 10      #連接池中的最大空閑連接
        min-idle: 5       #連接池中的最小空閑連接
        max-wait: 3000      #連接池最大阻塞等待時間(使用負值表示沒有限制)

第三步重寫緩存的默認配置函數了,並綁定監聽的主題,從程序中我們可以看到"__keyevent@0__:expired" 意思就是訂閱Redis的第一個數據庫的鍵值失效事件,這里需要多說一下,Redis有16個數據庫,系統默認使用第一個苦也就是0,如果你在配置的時候不想使用系統默認數據庫,你可以通過配置文件指定庫,那么你這里就需要根據你指定的庫做鍵值事件。 

import org.springframework.beans.factory.BeanFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnection;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import redis.clients.jedis.Jedis;

@Configuration
@ConditionalOnClass({ JedisConnection.class, RedisOperations.class, Jedis.class, MessageListener.class })
@AutoConfigureAfter({ JacksonAutoConfiguration.class,RedisAutoConfiguration.class })
public class RedisAutoConfiguration {

    @Configuration
    @ConditionalOnExpression("!'${spring.redis.host:}'.isEmpty()")
    public static class RedisStandAloneAutoConfiguration {
        @Bean
        public RedisMessageListenerContainer customizeRedisListenerContainer(
                RedisConnectionFactory redisConnectionFactory,MessageListener messageListener) {
            RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
            redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
            redisMessageListenerContainer.addMessageListener(messageListener,new PatternTopic("__keyevent@0__:expired"));
            return redisMessageListenerContainer;
        }
    }


    @Configuration
    @ConditionalOnExpression("'${spring.redis.host:}'.isEmpty()")
    public static class RedisClusterAutoConfiguration {
        @Bean
        public RedisMessageListenerFactory redisMessageListenerFactory(BeanFactory beanFactory,
                RedisConnectionFactory redisConnectionFactory) {
            RedisMessageListenerFactory beans = new RedisMessageListenerFactory();
            beans.setBeanFactory(beanFactory);
            beans.setRedisConnectionFactory(redisConnectionFactory);
            return beans;
        }
    }
}

第四步實現《org.springframework.context.ApplicationListener》的onApplicationEvent方法,主要的目的就是監聽集群中的所有節點,並且給《org.springframework.data.redis.listener.RedisMessageListenerContainer》創建一個鍵空間的主題事件。

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import redis.clients.jedis.JedisShardInfo;

public class RedisMessageListenerFactory implements BeanFactoryAware, ApplicationListener<ContextRefreshedEvent> {

    @Value("${spring.redis.password}")
    private String password;
    
    private DefaultListableBeanFactory beanFactory;

    private RedisConnectionFactory redisConnectionFactory;

    @Autowired
    private MessageListener messageListener;

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = (DefaultListableBeanFactory) beanFactory;
    }

    public void setRedisConnectionFactory(RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        RedisClusterConnection redisClusterConnection = redisConnectionFactory.getClusterConnection();
        if (redisClusterConnection != null) {
            Iterable<RedisClusterNode> nodes = redisClusterConnection.clusterGetNodes();
            for (RedisClusterNode node : nodes) {
                if (node.isMaster()) {
                    String containerBeanName = "messageContainer" + node.hashCode();
                    if (beanFactory.containsBean(containerBeanName)) {
                        return;
                    }
                    JedisShardInfo jedisShardInfo = new JedisShardInfo(node.getHost(), node.getPort());
                    jedisShardInfo.setPassword(password);
                    JedisConnectionFactory factory = new JedisConnectionFactory(jedisShardInfo);
                    BeanDefinitionBuilder containerBeanDefinitionBuilder = BeanDefinitionBuilder
                            .genericBeanDefinition(RedisMessageListenerContainer.class);
                    containerBeanDefinitionBuilder.addPropertyValue("connectionFactory", factory);
                    containerBeanDefinitionBuilder.setScope(BeanDefinition.SCOPE_SINGLETON);
                    containerBeanDefinitionBuilder.setLazyInit(false);
                    beanFactory.registerBeanDefinition(containerBeanName,
                            containerBeanDefinitionBuilder.getRawBeanDefinition());

                    RedisMessageListenerContainer container = beanFactory.getBean(containerBeanName,
                            RedisMessageListenerContainer.class);
                    String listenerBeanName = "messageListener" + node.hashCode();
                    if (beanFactory.containsBean(listenerBeanName)) {
                        return;
                    }
                    container.addMessageListener(messageListener, new PatternTopic("__keyevent@0__:expired"));
                    container.start();
                }
            }
        }
    }

}

第五步實現監聽事件觸發后的業務代碼

import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

import com.cn.tianxia.api.common.v2.CacheKeyConstants;
import com.cn.tianxia.api.project.v2.OnlineUserEntity;
import com.cn.tianxia.api.service.v2.OnlineUserService;
import com.cn.tianxia.api.utils.SpringContextUtils;

/**
 * @ClassName KeyExpiredEventMessageListener
 * @Description redis失效事件
 * @author Hardy
 * @Date 2019年5月20日 下午2:53:33
 * @version 1.0.0
 */
@Component
public class KeyExpiredEventMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expired = message.toString();
        String onlineKey = CacheKeyConstants.ONLINE_USER_KEY_UID;
        if (expired.contains(onlineKey)) {
            String uid = expired.replace(CacheKeyConstants.ONLINE_USER_KEY_UID, "");
            if (StringUtils.isNoneEmpty(uid)) {
                OnlineUserService onlineUserService = (OnlineUserService) SpringContextUtils
                        .getBeanByClass(OnlineUserService.class);
                OnlineUserEntity onlineUser = onlineUserService.getByUid(uid);
                if (onlineUser != null) {
                    onlineUser.setLogoutTime(System.currentTimeMillis());
                    onlineUser.setOffStatus((byte) 0);
                    onlineUser.setIsOff((byte) 1);
                    onlineUser.setUid(Long.parseLong(uid));
                    onlineUserService.insertOrUpdateOnlineUser(onlineUser);
                }
            }
        }
    }

}

整個過程實現這五步就完成了Redis的鍵值空間事件了,其實Redis本身提供訂閱與發布的功能,追其根本就是通過訂閱Redis服務器的發布的一個主題進行消費。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM