Redis 是一個開源的內存數據庫,它以鍵值對的形式存儲數據。由於數據存儲在內存中,因此Redis的速度很快,但是每次重啟Redis服務時,其中的數據也會丟失,因此,Redis 也提供了持久化存儲機制,將數據以某種形式保存在文件中,每次重啟時,可以自動從文件加載數據到內存當中。
Redis 的架構包括兩個部分:Redis Client 和 Redis Server。Redis 客戶端負責向服務器端發送請求並接受來自服務器端的響應。服務器端負責處理客戶端請求,例如,存儲數據,修改數據等。 Redis通常用作數據庫,緩存以及消息系統。
一、Redis 發布訂閱機制
1、發布訂閱架構
Redis 提供了發布訂閱功能,可以用於消息的傳輸,Redis 的發布訂閱機制包括三個部分:發布者,訂閱者和 Channel。
- PUBLISH 命令向通道發送信息,此客戶端稱為publisher 發布者;
- SUBSCRIBE 向命令通道訂閱信息,此客戶端稱為subscriber 訂閱者;
- redis 中 發布訂閱模塊的名字叫着 PubSub,也就是 PublisherSubscriber;
- 一個發布者向一個通道發送消息,訂閱者可以向多個通道訂閱消息;當發布者向通道發布消息后,如果有訂閱者訂閱該通道,訂閱者就會收到消息;這有點像電台,我收聽了一個電台的頻道,當頻道發送消息后,我就能收到消息;
發布者和訂閱者都是 Redis 客戶端,Channel 則為Redis服務器端,發布者將消息發送到某個的頻道,訂閱了這個頻道的訂閱者就能接收到這條消息。Redis的這種發布訂閱機制與基於主題的發布訂閱類似,Channel 相當於主題。
2、PUBSub模塊命令
- subscribe: 訂閱一個或者多個頻道;
- unsubscribe: 退訂一個或者多個頻道;
- publish: 向通道發送消息;
- psubscribe: 訂閱給定模式相匹配的所有頻道;
- punsubscribe: 退訂 給定模式所有的頻道,若未指定模式,退訂所有頻道;
具體的命令使用方式 可以使用 help 命令,示例如下:help subscribe
3、發布訂閱功能
(1)發送消息 :Redis 采用PUBLISH命令發送消息,其返回值為接收到該消息的訂閱者的數量。
(2)訂閱某個頻道:Redis采用SUBSCRIBE命令訂閱某個頻道,其返回值包括客戶端訂閱的頻道,目前已訂閱的頻道數量,以及接收到的消息,其中subscribe表示已經成功訂閱了某個頻道。
(3)模式匹配 :模式匹配功能允許客戶端訂閱符合某個模式的頻道,Redis采用 PSUBSCRIBE 訂閱符合某個模式所有頻道,用“”表示模式,“”可以被任意值代替。
127.0.0.1:6379> publish news.1 first (integer) 1
127.0.0.1:6379> publish news.2 second (integer) 1
假設客戶端同時訂閱了某種模式和符合該模式的某個頻道,那么發送給這個頻道的消息將被客戶端接收到兩次,只不過這兩條消息的類型不同,一個是message類型,一個是pmessage類型,但其內容相同。
(4)取消訂閱 :Redis采用UNSUBSCRIBE和PUNSUBSCRIBE命令取消訂閱,其返回值與訂閱類似。
由於Redis的訂閱操作是阻塞式的,因此一旦客戶端訂閱了某個頻道或模式,就將會一直處於訂閱狀態直到退出。在SUBSCRIBE,PSUBSCRIBE,UNSUBSCRIBE和PUNSUBSCRIBE命令中,其返回值都包含了該客戶端當前訂閱的頻道和模式的數量,當這個數量變為 0 時,該客戶端會自動退出訂閱狀態。
4、發布訂閱實現
由於Redis是一個開源的系統,因此我們可以通過其源代碼查看內部的實現細節。
(1)SUBSCRIBE
當客戶端訂閱某個頻道時,Redis需要將該頻道和該客戶端綁定。
首先,在客戶端結構體client中,有一個屬性為pubsub_channels,該屬性表明了該客戶端訂閱的所有頻道,它是一個字典類型,通過哈希表實現,其中的每個元素都包含了一個鍵值對以及指向下一個元素的指針,每次訂閱都要向其中插入一個結點,鍵表示訂閱的頻道,值為空。
然后,在表示服務器端的結構體redisServer中,也有一個屬性為pubsub_channels,但此處它表示的是該服務器端中的所有頻道以及訂閱了這個頻道的客戶端,它也是一個字典類型,插入結點時,鍵表示頻道,值則是訂閱了這個頻道的所有客戶端組成的鏈表。
最后Redis通知客戶端其訂閱成功。
(2)PSUBSCRIBE
當客戶端訂閱某個模式時,Redis同樣需要將該模式和該客戶端綁定。
首先,在結構體client中,有一個屬性為pubsub_patterns,該屬性表示該客戶端訂閱的所有模式,它是一個鏈表類型,每個結點包括了訂閱的模式和指向下一個結點的指針,每次訂閱某個模式時,都要向其中插入一個結點。
然后,在結構體redisServer中,有一個屬性也叫pubsub_patterns,它表示了該服務器端中的所有模式和訂閱了這些模式的客戶端,它也是一個鏈表類型,插入結點時,每個結點都要包含訂閱的模式,以及訂閱這個模式的客戶端,和指向下一個結點的指針。
(3)PUBLISH
當客戶端向某個頻道發送消息時,Redis首先在結構體redisServer中的pubsub_channels中找出鍵為該頻道的結點,遍歷該結點的值,即遍歷訂閱了該頻道的所有客戶端,將消息發送給這些客戶端。
然后,遍歷結構體redisServer中的pubsub_patterns,找出包含該頻道的模式的結點,將消息發送給訂閱了該模式的客戶端。
5、發布訂閱在 Redis 中的應用
Redis的發布訂閱功能與Redis中的數據存儲是無關的,它不會影響Redis的key space,即不會影響Redis中存儲的數據,但通過發布訂閱機制,Redis還提供了另一個功能,即Keyspace Notification,允許客戶端通過訂閱特定的頻道,從而得知是否有改變Redis中的數據的事件。
例如,有一個客戶端刪除了Redis中鍵為mykey的數據,該操作會觸發兩條消息,mykey del和del mykey,前者屬於頻道keysapce,表示keyspace發生的變化,后者屬於頻道keyevent,表示執行的操作。
6、Redis發布訂閱與ActiveMQ的比較
(1)ActiveMQ支持多種消息協議,包括AMQP,MQTT,Stomp等,並且支持JMS規范,但Redis沒有提供對這些協議的支持;
(2)ActiveMQ提供持久化功能,但Redis無法對消息持久化存儲,一旦消息被發送,如果沒有訂閱者接收,那么消息就會丟失;
(3)ActiveMQ提供了消息傳輸保障,當客戶端連接超時或事務回滾等情況發生時,消息會被重新發送給客戶端,Redis沒有提供消息傳輸保障。
總之,ActiveMQ所提供的功能遠比Redis發布訂閱要復雜,畢竟Redis不是專門做發布訂閱的,但是如果系統中已經有了Redis,並且需要基本的發布訂閱功能,就沒有必要再安裝ActiveMQ了,因為可能ActiveMQ提供的功能大部分都用不到,而Redis的發布訂閱機制就能滿足需求。
二、Java 實現
定義 2 個訂閱者用於訂閱頻道的消息,在使用 Jedis 時需要繼承 JedisPubSub 類, 並重寫 onMessage 方法; 訂閱者可以在該方法里面進行消息的業務邏輯處理;
注意 redis 的 發布訂閱模式 是阻塞模式 ,一個訂閱者需要 重新起一個線程;
缺點:
(1)PubSub 的生產者來一個消息會直接傳遞給消費者。如果沒有消費者,消息會直接丟棄。如果有多個消費者,一個消費者突然掛掉,生產者會繼續發送消息,另外的消費者可以持續收到消息。但是掛掉的消費者重新連上后,斷連期間的消息會徹底丟失;
(2)如果 Redis 停機重啟,PubSub 的消息是不會持久化的。
首先我們創建兩個客戶端執行體:第 2 個是一樣的創建即可
package com.example.redisdemo.service; import redis.clients.jedis.JedisPubSub; // 訂閱消息消費體
public class OneJedisPubSub extends JedisPubSub { //接收到消息時執行
@Override public void onMessage(String channel, String message){ System.out.println("oneJedisPubSub message is" + message); } //接收到模式消息時執行
@Override public void onPMessage(String pattern, String channel, String message){ System.out.println("oneJedisPubSub pattern是"+pattern+"channel是"+channel + "message是" + message); } //訂閱時執行
@Override public void onSubscribe(String channel, int subscribedChannels) { System.out.println("oneJedisPubSub訂閱成功"); } //取消訂閱時執行
@Override public void onUnsubscribe(String channel, int subscribedChannels){ System.out.println("oneJedisPubSub取消訂閱"+channel); } //取消模式訂閱時執行
@Override public void onPUnsubscribe(String pattern, int subscribedChannels) { System.out.println("oneJedisPubSub取消多訂閱"+pattern); } }
然后我們開始給這兩個客戶端訂閱消息
@RestController @RequestMapping("test") @Slf4j public class TestController { @Autowired private RedisClient redisClient; private final OneJedisPubSub oneJedisPubSub = new OneJedisPubSub(); private final SecondJedisPubSub secondJedisPubSub = new SecondJedisPubSub(); @PostMapping("subscribe") public void subscribe(@RequestBody QueueTest queueTest){ new Thread(new Runnable() { @Override public void run() { try { if("1".equals(queueTest.getTopic())){ redisClient.subscribe(oneJedisPubSub,"topic1","topic2"); } if("2".equals(queueTest.getTopic())){ redisClient.subscribe(secondJedisPubSub,"topic2"); } } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
請求情況如下圖:
請求結果如圖:
可以看出我們將兩個客戶端都訂閱了一定channel,此時OneJedisPubSub訂閱了topic1和topic2,SecondJedisPubSub訂閱了topic2,我們嘗試推送消息,demo如下:
@PostMapping("push") public void push(@RequestBody QueueTest queueTest){ log.info("發布一條消息"); Long publish = redisClient.publish(queueTest.getTopic(), queueTest.getName()); System.out.println("消費者數量"+publish); }
可以看到我們往topic1發布了消息只有OneJedisPubSub接收到了消息,接下來我們往topic2發布消息
可以看到此時兩個客戶端都接收到了消息。
在測試完畢客戶端接收消息的能力,我們這時取消SecondJedisPubSub訂閱topic2,demo如下:
@PostMapping("unno") public void unno(@RequestBody QueueTest queueTest){ log.info("取消訂閱消息"); try { secondJedisPubSub.unsubscribe(queueTest.getTopic()); } catch (Exception e) { e.printStackTrace(); } }
在取消后我們再往topic2推送消息,可以看到只有一個客戶端接收消息。
至此我們實驗了大部分場景,至於模式訂閱由於貼圖太麻煩,我就將代碼提供出來,大家可以自己實驗:
@PostMapping("subscribe") public void subscribe(@RequestBody QueueTest queueTest){ new Thread(new Runnable() { @Override public void run() { try { if("1".equals(queueTest.getTopic())){ redisClient.pubsubPattern(oneJedisPubSub,"topic*"); } if("2".equals(queueTest.getTopic())){ redisClient.subscribe(secondJedisPubSub,"topic2"); } } catch (Exception e) { e.printStackTrace(); } } }).start(); } @PostMapping("unno") public void unno(@RequestBody QueueTest queueTest){ log.info("取消模式訂閱消息"); try { secondJedisPubSub.punsubscribe(queueTest.getTopic()); } catch (Exception e) { e.printStackTrace(); } }
最后將redisclient的代碼提供給大家
@Component("redisClient") @Slf4j public class RedisClient { @Resource private JedisPool jedisPool; /** * 發布消息 * @param topic * @param message */
public Long publish(String topic,String message){ Jedis jedis = null; try { jedis = jedisPool.getResource(); return jedis.publish(topic, message); } catch (Exception e) { throw e; } finally { if(jedis != null){ jedis.close(); } } } /** * 訂閱消息 * @param jedisPubSub * @param topics */
public void subscribe(JedisPubSub jedisPubSub, String... topics) throws Exception { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.subscribe(jedisPubSub,topics); } catch (Exception e) { throw e; } finally { if(jedis != null){ jedis.close(); } } } /** * 模式匹配訂閱消息 * @param topic */
public void pubsubPattern(JedisPubSub jedisPubSub,String topic){ Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.psubscribe(jedisPubSub,topic); } catch (Exception e) { throw e; } finally { if(jedis != null){ jedis.close(); } } } }
以上就是reids的發布訂閱功能,代碼部分來自文章:https://zhuanlan.zhihu.com/p/136484218
三、主要命令及其原理
首先介紹一下實現功能的主要幾個命令:
- SUBSCRIBE 命令,這個命令可以讓我們訂閱任意數量的頻道
- PUBLISH 命令,此命令是用來發布消息
- PSUBSCRIBE命令,此命令用來支持模糊訂閱的功能
在展示具體的demo之前,我們先簡單了解下這其中的原理:
在redisServer結構中的其中一個屬性pubsub_channels是用來記錄channel和客戶端之間的關系,是使用key-->List的數據格式。如圖:
在我們使用SUBSCRIBE 命令在客戶端client10086訂閱了channel1 channel2,channel3
1、訂閱:SUBSCRIBE channel1 channel2,channel3
這時pubsub_channels的數據將會變為,如圖:
這就可以看出來執行SUBSCRIBE 命令就是將客戶端信息添加到對應的channel對應列表的尾部。
2、模式訂閱: 模式訂閱設計到redisServer的另一個屬性pubsub_patterns,也是一個鏈表,里面存儲着客戶端訂閱的所有模式。結構如下圖:
當客戶端訂閱了一個模式,此時結構變為:
3、發布:PUBLISH 命令發布消息將消息推送到對應的客戶端
在執行PUBLISH 命令發布消息的時候,首先會在pubsub_channels上找到對應的channel,遍歷其中所有的client信息,將消息發送到所有client;同時也會在pubsub_patterns上遍歷找到匹配的模式,發給對應的客戶端
4、取消訂閱:UNSUBSCRIBE命令取消對應客戶端的訂閱
當執行UNSUBSCRIBE命令時則將對應的client從channel列表中移除。