介紹
Redis是內存中的數據結構存儲,用於緩存、高速數據攝取、處理消息隊列、分布式鎖定等等。
與其他內存存儲相比,使用Redis的優勢在於它提供了持久性和數據結構,比如列表、集合、排序集合和散列。
在這篇文章中,介紹一個Redis keyspace通知的簡短概述。並演示如何配置Redis來接收它們。並展示如何在python中訂閱Redis通知
在開始之前,請安裝並啟動Redis服務器,如下所述:https://redis.io/topics/quickstart
啟用通知
默認情況下,redis的通知事件是關閉的,在終端執行以下命令開啟:
$ redis-cli config set notify-keyspace-events KEA
OK
KEA字符串表示啟用了所有可能的事件。要查看每個字符的含義,請查看文檔
CLI 可以在特殊模式下工作,允許您訂閱一個通道以接收消息。
現在檢查事件是否起作用:
# 用於檢查事件 # psubscribe '*'表示我們想訂閱所有帶有模式*的事件 $ redis-cli --csv psubscribe '*' Reading messages... (press Ctrl-C to quit) "psubscribe","*",1
開啟一個新的終端,設置一個值
127.0.0.1:6379> set key1 value1
OK
在上一個終端,會看到:
$ redis-cli --csv psubscribe '*' Reading messages... (press Ctrl-C to quit) "psubscribe","*",1 "pmessage","*","__keyspace@0__:key1","set" "pmessage","*","__keyevent@0__:set","key1
發現通知是工作中的
復述
Redis的鍵盤空間通知從2.8.0版起就可以使用了。對於更改任何Redis鍵的每個操作,可以配置Redis將消息發布到發布/訂閱。然后可以訂閱這些通知。值得一提的是,事件僅在確實修改了鍵的情況下才生成。例如,刪除不存在的鍵將不會生成事件。
以上收到三個事件:
"psubscribe","*",1 "pmessage","*","__keyspace@0__:key1","set" "pmessage","*","__keyevent@0__:set","key1
- 第一個事件意味着已經成功訂閱了reply中作為第二個元素給出的通道。1 表示目前訂閱的頻道數量。
- 第二個事件是鍵空間通知。在keyspace通道中,接收事件集的名稱作為消息。
- 第三個事件是鍵-事件通知。在keyevent通道中,接收到key key1的名稱作為消息。
Redis Pub/Sub
事件是通過Redis的Pub/Sub層交付的。
為了訂閱channel channel1和channel2,客戶端發出帶有通道名稱的subscribe命令
在Python中訂閱通知
第一步,需要python操作redis的包
$ pip install redis
事件循環,請看以下代碼:
import time from redis import StrictRedis redis = StrictRedis(host='localhost', port=6379) # redis 發布訂閱 pubsub = redis.pubsub() # 監聽通知 pubsub.psubscribe('__keyspace@0__:*') # 開始消息循環 print('Starting message loop') while True: # 獲取消息 message = pubsub.get_message() if message: print(message) else: time.sleep(0.01)
分析:
這是創建 `redis` 連接的代碼
redis = StrictRedis(host='localhost', port=6379)
默認情況下,所有響應都以字節的形式返回。用戶負責解碼。如果客戶機的所有字符串響應都應該被解碼,那么用戶可以指定decode_responses=True to StrictRedis。在這種情況下,任何返回字符串類型的Redis命令都將用指定的編碼進行解碼。
接下來,創建一個訂閱頻道並偵聽新消息的pubsub對象:
pubsub = redis.pubsub() pubsub.psubscribe('__keyspace@0__:*')
繼而通過一個無限循環來等待事件
while True: message = pubsub.get_message() ...
如果有數據,get_message()將讀取並返回它。如果沒有數據,該方法將不返回任何數據。
從pubsub實例中讀取的每條消息都是一個字典,其中包含以下鍵:
- type 以下之一:訂閱,取消訂閱,psubscribe, punsubscribe, message, pmessage
- channel 訂閱消息的通道或消息發布到的通道
- pattern 與已發布消息的通道匹配的模式(除pmessage類型外,在所有情況下都不匹配)
- data 消息數據
現在啟動 `python` 腳本,在終端中設置一個key值
127.0.0.1:6379> set mykey myvalue
OK
將看到腳本以下輸出
$ python subscribe.py Starting message loop {'type': 'psubscribe', 'data': 1, 'channel': b'__keyspace@0__:*', 'pattern': None} {'type': 'pmessage', 'data': b'set', 'channel': b'__keyspace@0__:mykey', 'pattern': b'__keyspace@0__:*'}
回調注冊
注冊回調函數來處理已發布的消息。消息處理程序接受一個參數,即消息。要使用消息處理程序訂閱通道或模式,請將通道或模式名稱作為關鍵字參數傳遞,其值為回調函數。當使用消息處理程序在通道或模式上讀取消息時,將創建消息字典並將其傳遞給消息處理程序。在這種情況下,get_message()返回一個None值,因為消息已經被處理

import time from redis import StrictRedis redis = StrictRedis(host='localhost', port=6379) pubsub = redis.pubsub() def event_handler(msg): print('Handler', msg) pubsub.psubscribe(**{'__keyspace@0__:*': event_handler}) print('Starting message loop') while True: message = pubsub.get_message() if message: print(message) else: time.sleep(0.01)
事件循環在單獨的線程中
選擇是在單獨的線程中運行事件循環

import time from redis import StrictRedis redis = StrictRedis(host='localhost', port=6379) def event_handler(msg): print(msg) thread.stop() pubsub = redis.pubsub() pubsub.psubscribe(**{'__keyevent@0__:expired': event_handler}) thread = pubsub.run_in_thread(sleep_time=0.01)
總結
Redis的一個常見用例是,應用程序需要能夠響應存儲在特定鍵或鍵中的值可能發生的更改。由於有了鍵盤空間通知和發布/訂閱,可以對Redis數據中的變化做出響應。通知很容易使用,而事件處理器可能在地理位置上分布。
最大的缺點是,Pub/Sub的實現要求發布者和訂閱方始終處於啟動狀態。用戶在停止或連接丟失時丟失數據。
參考文獻
- https://redis.io/topics/notifications - Redis keyspace notifications documentation
- https://redis.io/topics/pubsub - Redis Pub/Sub documentation
- https://github.com/andymccurdy/redis-py - Python client for Redis
- https://www.infoworld.com/article/3212768/database/how-to-use-redis-for-real-time-stream-processing.html - How to use Redis for real-time stream processing
- https://matt.sh/advanced-redis-pubsub-scripts - Subscribe script to Pub/Sub channel