一直想記錄工作中遇到的問題和解決的方法,奈何沒有找到一方樂土,最近經常反思,是否需要記錄平時的點滴,后台還是決定下定決心記錄一些,以便以后用到的時候找不着,實現這樣的一個功能主要也是業務所需要的。
需求:要求統計所有會員在線人數,並根據會員在線狀態同步改變人數。
之前用戶登錄使用session去控制,那么可以通過session進行在線用戶人數統計,后來實現無狀態不在依賴session作為用戶在線的標准,使用Redis替換了Session,那么用戶直接退出也好做,但是會存在用戶直接關閉頁面的情況,那么這個時候用戶的緩存憑證沒有主動觸發去主動刪掉,所以思來想去查了一些資料通過緩存的Key監聽事件來處理,但是網上的大都是單機版的,對於集群環境下的就少之又少,由於集群是有多個節點,並且key采用的是分片的方式存儲在不同片區,然而使用Spring的RedisTemplate的又不支持集群環境下的監聽事件,由於每次與Redis服務系保持一個有效連接就可以,那么就有可能某個key所在的片區並沒有被監聽到事件,因此需要在源碼上做一些調整,認為讓它遍歷所有集群節點用來監聽集群中的key。所以通過翻閱資料實現下面的功能,還算圓滿的完成了需求任務,當然如果看官看到某些似曾相識的地方請諒解,我也是從大家的經驗中尋找方法有些地方與大家的相似也屬正常。
第一步:修改Redis的配置文件,這一步可讓《運維》同事協助操作,在配置文件中添加如下內容:
Redis的配置文件:
############################# EVENT NOTIFICATION ############################## # Redis can notify Pub/Sub clients about events happening in the key space. # This feature is documented at http://redis.io/topics/notifications # # For instance if keyspace events notification is enabled, and a client # performs a DEL operation on key "foo" stored in the Database 0, two # messages will be published via Pub/Sub: # # PUBLISH __keyspace@0__:foo del # PUBLISH __keyevent@0__:del foo # # It is possible to select the events that Redis will notify among a set # of classes. Every class is identified by a single character: # # K Keyspace events, published with __keyspace@<db>__ prefix. # E Keyevent events, published with __keyevent@<db>__ prefix. # g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ... # $ String commands # l List commands # s Set commands # h Hash commands # z Sorted set commands # x Expired events (events generated every time a key expires) # e Evicted events (events generated when a key is evicted for maxmemory) # A Alias for g$lshzxe, so that the "AKE" string means all the events. # # The "notify-keyspace-events" takes as argument a string that is composed # of zero or multiple characters. The empty string means that notifications # are disabled. # # Example: to enable list and generic events, from the point of view of the # event name, use: # # notify-keyspace-events Elg # # Example 2: to get the stream of the expired keys subscribing to channel # name __keyevent@0__:expired use: # notify-keyspace-events Ex # # By default all notifications are disabled because most users don't need # this feature and the feature has some overhead. Note that if you don't # specify at least one of K or E, no events will be delivered.
此時我們需要開啟緩存的鍵空間通知事件的配置:notify-keyspace-events Ex
第二步配置Redis信息,我采用的是yml格式文件,同時配置了三套模式分別為單機模式、哨兵模式、集群模式,各位看官在配置文件中可自行開啟或關閉。
#緩存配置 redis: database: 0 #host: 127.0.0.1 #port: 6379 #sentinel: #master: mymaster #nodes: 192.168.0.223:27001 #timeout: 6000ms password: Aa123456 cluster: max-redirects: 3 #獲取失敗 最大重定向次數 nodes: - 192.168.104.7:6379 - 192.168.104.7:6380 - 192.168.104.8:6379 - 192.168.104.8:6380 - 192.168.104.9:6379 - 192.168.104.9:6380 lettuce: pool: max-active: 1000 #連接池最大連接數(使用負值表示沒有限制) max-idle: 10 #連接池中的最大空閑連接 min-idle: 5 #連接池中的最小空閑連接 max-wait: 3000 #連接池最大阻塞等待時間(使用負值表示沒有限制)
第三步重寫緩存的默認配置函數了,並綁定監聽的主題,從程序中我們可以看到"__keyevent@0__:expired" 意思就是訂閱Redis的第一個數據庫的鍵值失效事件,這里需要多說一下,Redis有16個數據庫,系統默認使用第一個苦也就是0,如果你在配置的時候不想使用系統默認數據庫,你可以通過配置文件指定庫,那么你這里就需要根據你指定的庫做鍵值事件。
import org.springframework.beans.factory.BeanFactory; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.jedis.JedisConnection; import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import redis.clients.jedis.Jedis; @Configuration @ConditionalOnClass({ JedisConnection.class, RedisOperations.class, Jedis.class, MessageListener.class }) @AutoConfigureAfter({ JacksonAutoConfiguration.class,RedisAutoConfiguration.class }) public class RedisAutoConfiguration { @Configuration @ConditionalOnExpression("!'${spring.redis.host:}'.isEmpty()") public static class RedisStandAloneAutoConfiguration { @Bean public RedisMessageListenerContainer customizeRedisListenerContainer( RedisConnectionFactory redisConnectionFactory,MessageListener messageListener) { RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); redisMessageListenerContainer.addMessageListener(messageListener,new PatternTopic("__keyevent@0__:expired")); return redisMessageListenerContainer; } } @Configuration @ConditionalOnExpression("'${spring.redis.host:}'.isEmpty()") public static class RedisClusterAutoConfiguration { @Bean public RedisMessageListenerFactory redisMessageListenerFactory(BeanFactory beanFactory, RedisConnectionFactory redisConnectionFactory) { RedisMessageListenerFactory beans = new RedisMessageListenerFactory(); beans.setBeanFactory(beanFactory); beans.setRedisConnectionFactory(redisConnectionFactory); return beans; } } }
第四步實現《org.springframework.context.ApplicationListener》的onApplicationEvent方法,主要的目的就是監聽集群中的所有節點,並且給《org.springframework.data.redis.listener.RedisMessageListenerContainer》創建一個鍵空間的主題事件。
import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisClusterConnection; import org.springframework.data.redis.connection.RedisClusterNode; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import redis.clients.jedis.JedisShardInfo; public class RedisMessageListenerFactory implements BeanFactoryAware, ApplicationListener<ContextRefreshedEvent> { @Value("${spring.redis.password}") private String password; private DefaultListableBeanFactory beanFactory; private RedisConnectionFactory redisConnectionFactory; @Autowired private MessageListener messageListener; public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = (DefaultListableBeanFactory) beanFactory; } public void setRedisConnectionFactory(RedisConnectionFactory redisConnectionFactory) { this.redisConnectionFactory = redisConnectionFactory; } @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { RedisClusterConnection redisClusterConnection = redisConnectionFactory.getClusterConnection(); if (redisClusterConnection != null) { Iterable<RedisClusterNode> nodes = redisClusterConnection.clusterGetNodes(); for (RedisClusterNode node : nodes) { if (node.isMaster()) { String containerBeanName = "messageContainer" + node.hashCode(); if (beanFactory.containsBean(containerBeanName)) { return; } JedisShardInfo jedisShardInfo = new JedisShardInfo(node.getHost(), node.getPort()); jedisShardInfo.setPassword(password); JedisConnectionFactory factory = new JedisConnectionFactory(jedisShardInfo); BeanDefinitionBuilder containerBeanDefinitionBuilder = BeanDefinitionBuilder .genericBeanDefinition(RedisMessageListenerContainer.class); containerBeanDefinitionBuilder.addPropertyValue("connectionFactory", factory); containerBeanDefinitionBuilder.setScope(BeanDefinition.SCOPE_SINGLETON); containerBeanDefinitionBuilder.setLazyInit(false); beanFactory.registerBeanDefinition(containerBeanName, containerBeanDefinitionBuilder.getRawBeanDefinition()); RedisMessageListenerContainer container = beanFactory.getBean(containerBeanName, RedisMessageListenerContainer.class); String listenerBeanName = "messageListener" + node.hashCode(); if (beanFactory.containsBean(listenerBeanName)) { return; } container.addMessageListener(messageListener, new PatternTopic("__keyevent@0__:expired")); container.start(); } } } } }
第五步實現監聽事件觸發后的業務代碼
import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; import com.cn.tianxia.api.common.v2.CacheKeyConstants; import com.cn.tianxia.api.project.v2.OnlineUserEntity; import com.cn.tianxia.api.service.v2.OnlineUserService; import com.cn.tianxia.api.utils.SpringContextUtils; /** * @ClassName KeyExpiredEventMessageListener * @Description redis失效事件 * @author Hardy * @Date 2019年5月20日 下午2:53:33 * @version 1.0.0 */ @Component public class KeyExpiredEventMessageListener implements MessageListener { @Override public void onMessage(Message message, byte[] pattern) { String expired = message.toString(); String onlineKey = CacheKeyConstants.ONLINE_USER_KEY_UID; if (expired.contains(onlineKey)) { String uid = expired.replace(CacheKeyConstants.ONLINE_USER_KEY_UID, ""); if (StringUtils.isNoneEmpty(uid)) { OnlineUserService onlineUserService = (OnlineUserService) SpringContextUtils .getBeanByClass(OnlineUserService.class); OnlineUserEntity onlineUser = onlineUserService.getByUid(uid); if (onlineUser != null) { onlineUser.setLogoutTime(System.currentTimeMillis()); onlineUser.setOffStatus((byte) 0); onlineUser.setIsOff((byte) 1); onlineUser.setUid(Long.parseLong(uid)); onlineUserService.insertOrUpdateOnlineUser(onlineUser); } } } } }
整個過程實現這五步就完成了Redis的鍵值空間事件了,其實Redis本身提供訂閱與發布的功能,追其根本就是通過訂閱Redis服務器的發布的一個主題進行消費。