有時候我們希望監聽某個key的刪除或者其他事件,來做一些自己的業務操作。redis 的pub/sub 提供了這個能力。
參考官網:https://redis.io/topics/notifications
1. redis 服務端和客戶端測試
redis 官網說了,默認的話事件通知是關閉的。如果需要開啟可以修改redis.conf 文件中 notify-keyspace-events 配置。 或者用CONFIG SET 命令修改(只針對當前進程有效,重啟失效)。
官網提供的事件類型如下:(每個字母代表一個事件類型的縮寫)
K Keyspace events, published with __keyspace@<db>__ prefix. E Keyevent events, published with __keyevent@<db>__ prefix. g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ... $ String commands l List commands s Set commands h Hash commands z Sorted set commands t Stream commands d Module key type events x Expired events (events generated every time a key expires) e Evicted events (events generated when a key is evicted for maxmemory) m Key miss events (events generated when a key that doesn't exist is accessed) A Alias for "g$lshztxed", so that the "AKE" string means all the events except "m".
K或者E至少有一個存在。如果需要監聽所有的事件可以訂閱 'KEA' 事件。
1. 測試監聽失效事件
(1) 修改redis.conf
# notify-keyspace-events Ex # # By default all notifications are disabled because most users don't need # this feature and the feature has some overhead. Note that if you don't # specify at least one of K or E, no events will be delivered. # notify-keyspace-events ""
(2) 服務器啟動后查看配置
127.0.0.1:6379> config get notify-keyspace-events
1) "notify-keyspace-events"
2) "xE"
(3) 啟動一客戶端監聽所有數據庫的失效事件
127.0.0.1:6379> psubscribe __keyevent@*__:expired Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "__keyevent@*__:expired" 3) (integer) 1 1) "pmessage" 2) "__keyevent@*__:expired" 3) "__keyevent@0__:expired" 4) "test" 1) "pmessage" 2) "__keyevent@*__:expired" 3) "__keyevent@0__:expired" 4) "mykey"
__keyevent@*__:expired 中的* 代表任意庫,可以指定0-15庫中的任意一個庫, 也可以用通配符。
2. 監聽所有事件
1. 修改配置通知所有事件
127.0.0.1:6379> config set notify-keyspace-events KEA OK 127.0.0.1:6379> config get notify-keyspace-events 1) "notify-keyspace-events" 2) "AKE"
2. 客戶端進行監聽(psubsribe 后面的參數是可變數組,可以一次就監聽多個事件)
127.0.0.1:6379> psubscribe '__key*__:*' Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "__key*__:*" 3) (integer) 1
3. 重啟一個客戶端進行操作數據
127.0.0.1:6379> set key1 value1 OK 127.0.0.1:6379> expire key1 9000 (integer) 1 127.0.0.1:6379> del key1 (integer) 1
4. 查看上面監測的控制台
127.0.0.1:6379> psubscribe '__key*__:*' Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "__key*__:*" 3) (integer) 1 1) "pmessage" 2) "__key*__:*" 3) "__keyspace@0__:key1" 4) "set" 1) "pmessage" 2) "__key*__:*" 3) "__keyevent@0__:set" 4) "key1" 1) "pmessage" 2) "__key*__:*" 3) "__keyspace@0__:key1" 4) "expire" 1) "pmessage" 2) "__key*__:*" 3) "__keyevent@0__:expire" 4) "key1" 1) "pmessage" 2) "__key*__:*" 3) "__keyspace@0__:key1" 4) "del" 1) "pmessage" 2) "__key*__:*" 3) "__keyevent@0__:del" 4) "key1"
可以看到每次操作之后,訂閱者可以收到消息的相關信息:注冊的事件類型、發生的事件類型、操作的key 名稱。
這里需要注意,如果注冊了並且服務宕機了,或者某種原因客戶端下線了。這時候再次上線不會收到, 也就是下線期間的事件不會進行記錄。
2. Springboot 項目監聽事件
1. 編寫listener
package com.xm.ggn.config.redis; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; @Component public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @Override public void onMessage(Message message, byte[] pattern) { // 獲取過期的key String expireKey = message.toString(); System.out.println("終於失效了"); System.out.println("key is:" + expireKey); } }
2. 源碼查看
(1) org.springframework.data.redis.listener.KeyExpirationEventMessageListener
package org.springframework.data.redis.listener; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.core.RedisKeyExpiredEvent; import org.springframework.lang.Nullable; public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware { private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired"); @Nullable private ApplicationEventPublisher publisher; public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } protected void doRegister(RedisMessageListenerContainer listenerContainer) { listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC); } protected void doHandleMessage(Message message) { this.publishEvent(new RedisKeyExpiredEvent(message.getBody())); } protected void publishEvent(RedisKeyExpiredEvent event) { if (this.publisher != null) { this.publisher.publishEvent(event); } } public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.publisher = applicationEventPublisher; } }
可以看到是借助於Spring的事件機制來完成的。
(2) org.springframework.data.redis.listener.KeyspaceEventMessageListener
package org.springframework.data.redis.listener; import java.util.Properties; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; public abstract class KeyspaceEventMessageListener implements MessageListener, InitializingBean, DisposableBean { private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*"); private final RedisMessageListenerContainer listenerContainer; private String keyspaceNotificationsConfigParameter = "EA"; public KeyspaceEventMessageListener(RedisMessageListenerContainer listenerContainer) { Assert.notNull(listenerContainer, "RedisMessageListenerContainer to run in must not be null!"); this.listenerContainer = listenerContainer; } public void onMessage(Message message, @Nullable byte[] pattern) { if (message != null && !ObjectUtils.isEmpty(message.getChannel()) && !ObjectUtils.isEmpty(message.getBody())) { this.doHandleMessage(message); } } protected abstract void doHandleMessage(Message var1); public void init() { if (StringUtils.hasText(this.keyspaceNotificationsConfigParameter)) { RedisConnection connection = this.listenerContainer.getConnectionFactory().getConnection(); try { Properties config = connection.getConfig("notify-keyspace-events"); if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) { connection.setConfig("notify-keyspace-events", this.keyspaceNotificationsConfigParameter); } } finally { connection.close(); } } this.doRegister(this.listenerContainer); } protected void doRegister(RedisMessageListenerContainer container) { this.listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS); } public void destroy() throws Exception { this.listenerContainer.removeMessageListener(this); } public void setKeyspaceNotificationsConfigParameter(String keyspaceNotificationsConfigParameter) { this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter; } public void afterPropertiesSet() throws Exception { this.init(); } }
可以看到啟動后悔修改redis 的事件機制,同時注冊監聽過期事件。
3. 自己實現類似於Spring的機制
借助於netty 和 spring 的事件機制實現。
1. com.xm.ggn.test.springevent.RedisConnection
啟動后利用netty 建立一個鏈接並且發送訂閱事件
package com.xm.ggn.test.springevent; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component @Slf4j public class RedisConnection { private static final String HOST = System.getProperty("host", "192.168.145.139"); private static final int PORT = Integer.parseInt(System.getProperty("port", "6379")); @PostConstruct public void init() { try { Bootstrap b = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new RedisClientHandler()); } }); // Start the connection attempt. Channel ch = b.connect(HOST, PORT).sync().channel(); String sendCmd = "psubscribe __key*__:*"; ch.writeAndFlush(sendCmd); log.info("已經建立redis 連接, 並且發送訂閱事件命令"); } catch (Exception e) { log.error("redis handler error", e); } } }
2. RedisClientHandler
package com.xm.ggn.test.springevent; import com.xm.ggn.utils.system.SpringBootUtils; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.util.CharsetUtil; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; public class RedisClientHandler extends ChannelDuplexHandler { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { // 轉換發出去的數據格式 msg = rehandleRequest(msg); ctx.writeAndFlush(Unpooled.copiedBuffer(msg.toString(), CharsetUtil.UTF_8)); } /** * 重新處理消息,處理為 RESP 認可的數據 * set foo bar * 對應下面數據 * *3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n */ private String rehandleRequest(Object msg) { String result = msg.toString().trim(); String[] params = result.split(" "); List<String> allParam = new ArrayList<>(); Arrays.stream(params).forEach(s -> { allParam.add("$" + s.length() + "\r\n" + s + "\r\n"); // 參數前$length\r\n, 參數后增加 \r\n }); allParam.add(0, "*" + allParam.size() + "\r\n"); StringBuilder stringBuilder = new StringBuilder(); allParam.forEach(p -> { stringBuilder.append(p); }); return stringBuilder.toString(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg; byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); String result = new String(bytes);// 轉換接受到的數據格式 result = rehandleResponse(result).toString(); try { // 接收到消息之后用Spring 的事件機制發送消息 SpringBootUtils.applicationContext.publishEvent(new RedisEvent(this, result)); } catch (Exception exception) { exception.printStackTrace(); } } /** * 重新處理響應消息 */ private Object rehandleResponse(String result) { // 狀態恢復 - “+OK\r\n” if (result.startsWith("+")) { return result.substring(1, result.length() - 2); } // 錯誤回復(error reply)的第一個字節是 "-"。例如 `flushallE` 返回的 `-ERR unknown command 'flushallE'\r\n` if (result.startsWith("-")) { return result.substring(1, result.length() - 2); } // 整數回復(integer reply)的第一個字節是 ":"。 例如 `llen mylist` 查看list 大小返回的 `:3\r\n` if (result.startsWith(":")) { return result.substring(1, result.length() - 2); } // 批量回復(bulk reply)的第一個字節是 "$", 例如: `get foo` 返回的結果為 `$3\r\nbar\r\n` if (result.startsWith("$")) { result = StringUtils.substringAfter(result, "\r\n"); return StringUtils.substringBeforeLast(result, "\r\n"); } // 多條批量回復(multi bulk reply)的第一個字節是 "*", 例如: *2\r\n$3\r\nfoo\r\n$4\r\nname\r\n if (result.startsWith("*")) { result = StringUtils.substringAfter(result, "\r\n"); String[] split = result.split("\\$\\d\r\n"); List<String> collect = Arrays.stream(split).filter(tmpStr -> StringUtils.isNotBlank(tmpStr)).collect(Collectors.toList()); List<String> resultList = new ArrayList<>(collect.size()); collect.forEach(str1 -> { resultList.add(StringUtils.substringBeforeLast(str1, "\r\n")); }); return resultList; } return "unknow result"; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { System.err.print("exceptionCaught: "); cause.printStackTrace(System.err); ctx.close(); } }
3. RedisEvent
package com.xm.ggn.test.springevent; import org.springframework.context.ApplicationEvent; public class RedisEvent extends ApplicationEvent { private static final long serialVersionUID = -9184671635725233773L; private Object msg; public RedisEvent(Object source, final String msg) { super(source); this.msg = msg; } public Object getMsg() { return msg; } public void setMsg(Object msg) { this.msg = msg; } }
4. RedisEventListener 事件監聽器,用於直接打印消息
package com.xm.ggn.test.springevent; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; @Component public class RedisEventListener implements ApplicationListener<RedisEvent> { @Override public void onApplicationEvent(RedisEvent applicationEvent) { // handle event System.out.println("收到事件,消息為:" + applicationEvent.getMsg()); } }
5. 測試結果:
(1)在redis 客戶進行一系列操作
127.0.0.1:6379> set mykey myvalue OK 127.0.0.1:6379> del mykey (integer) 1
(2) 控制台打印如下:
收到事件,消息為:[pmessage $10 __key*__:* $18 __keyevent@0__:set, mykey] 收到事件,消息為:[pmessage $10 __key*__:* $18 __keyevent@0__:del, mykey]