python中使用redis發布訂閱者模型
redis發布訂閱者模型:
Redis提供了發布訂閱功能,可以用於消息的傳輸,Redis的發布訂閱機制包括三個部分,發布者,訂閱者和Channel。發布者和訂閱者都是Redis客戶端,Channel則為Redis服務器端,發布者將消息發送到某個的頻道,訂閱了這個頻道的訂閱者就能接收到這條消息。Redis的這種發布訂閱機制與基於主題的發布訂閱類似,Channel相當於主題。
發布者:
pub.py
import redis conn = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True) conn.publish("333", "18")
訂閱者:
sub.py
import redis conn = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True) # 第一步 生成一個訂閱者對象 pubsub = conn.pubsub() # 第二步 訂閱一個消息 pubsub.subscribe("gaoxin333") # 創建一個接收 while True: print("working~~~") msg = pubsub.parse_response() print(msg)
(1)發送消息
Redis采用PUBLISH命令發送消息,其返回值為接收到該消息的訂閱者的數量。
(2)訂閱某個頻道
Redis采用SUBSCRIBE命令訂閱某個頻道,其返回值包括客戶端訂閱的頻道,目前已訂閱的頻道數量,以及接收到的消息,其中subscribe表示已經成功訂閱了某個頻道。
(3)模式匹配
模式匹配功能允許客戶端訂閱符合某個模式的頻道,Redis采用PSUBSCRIBE訂閱符合某個模式所有頻道,用“”表示模式,“”可以被任意值代替。假設客戶端同時訂閱了某種模式和符合該模式的某個頻道,那么發送給這個頻道的消息將被客戶端接收到兩次,只不過這兩條消息的類型不同,一個是message類型,一個是pmessage類型,但其內容相同。
(4)取消訂閱
Redis采用UNSUBSCRIBE和PUNSUBSCRIBE命令取消訂閱,其返回值與訂閱類似。
由於Redis的訂閱操作是阻塞式的,因此一旦客戶端訂閱了某個頻道或模式,就將會一直處於訂閱狀態直到退出。在SUBSCRIBE,PSUBSCRIBE,UNSUBSCRIBE和PUNSUBSCRIBE命令中,其返回值都包含了該客戶端當前訂閱的頻道和模式的數量,當這個數量變為0時,該客戶端會自動退出訂閱狀態。
import redis
conn = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)
conn.publish("gaoxin333", "18")
python開發-實現redis中的發布訂閱功能
Pub/Sub功能(means Publish, Subscribe)即發布及訂閱功能。基於事件的系統中,Pub/Sub是目前廣泛使用的通信模型,它采用事件作為基本的通信機制,提供大規模系統所要求的松散耦合的交互模式:訂閱者(如客戶端)以事件訂閱的方式表達出它有興趣接收的一個事件或一類事件;發布者(如服務器)可將訂閱者感興趣的事件隨時通知相關訂閱者。
通俗來講,就是說我sub端(訂閱者)一直監聽着,一旦pub端(發布者)發布了消息,那么我就接收過來,舉個例子,先是發布者:
#coding:utf-8 import time import redis number_list = ['300033', '300032', '300031', '300030'] signal = ['1', '-1', '1', '-1'] rc = redis.StrictRedis(host='***', port='6379', db=3, password='********') for i in range(len(number_list)): value_new = str(number_list[i]) + ' ' + str(signal[i]) rc.publish("liao", value_new) #發布消息到liao
接着我們來看看訂閱者:
#coding:utf-8 import time import redis rc = redis.StrictRedis(host='****', port='6379', db=3, password='******') ps = rc.pubsub() ps.subscribe('liao') #從liao訂閱消息 for item in ps.listen(): #監聽狀態:有消息發布了就拿過來 if item['type'] == 'message': print item['channel'] print item['data']
關於數據結構,也就是item,是類似於:{'pattern': None, 'type': 'message', 'channel': 'liao', 'data': '300033 1'}這樣的,所以可以通過channel來判斷這個消息是屬於哪一個隊列里的。(運行程序的時候,先運行訂閱者,在運行發布者程序)
總結,要點有兩個:
- 一是連接方式。使用python連接redis有三種方式:①使用庫中的Redis類(或StrictRedis類,其實差不多);②使用ConnectionPool連接池(可保持長連接);③使用Sentinel類(如果有多個redis做集群時,程序會自己選擇一個合適的連接)。
- 二是訂閱方法。這里使用的是StrictRedis類中的pubsub方法。連接好之后,可使用subscribe或psubscribe方法來訂閱redis消息。其中subscribe是訂閱一個頻道,psubscribe可訂閱多個頻道(這樣寫的時候,作為參數的頻道應該是一個列表)。之后就可以開始監聽了。
redis中的發布/訂閱模型是一種消息通信模式,今天聊一下在python中實現簡單的發布訂閱功能。
實現方式一:
redis_helper.py: 封裝發布訂閱方法
import redis class RedisHelper(object): def __init__(self): self.__conn = redis.Redis(host="localhost") # 訂閱頻道 self.chan_sub = "fm104.5" def public(self, msg): """ 在指定頻道上發布消息 :param msg: :return: """ # publish(): 在指定頻道上發布消息,返回訂閱者的數量 self.__conn.publish(self.chan_sub, msg) return True def subscribe(self): # 返回發布訂閱對象,通過這個對象你能1)訂閱頻道 2)監聽頻道中的消息 pub = self.__conn.pubsub() # 訂閱頻道,與publish()中指定的頻道一樣。消息會發布到這個頻道中 pub.subscribe(self.chan_sub) ret = pub.parse_response() # [b'subscribe', b'fm86', 1] print("ret:%s" % ret) return pub
redis_pub.py: 發布者
from redis_helper import RedisHelper obj = RedisHelper() for i in range(5): obj.public("hello_%s" % i)
redis_sub.py: 訂閱者
from redis_helper import RedisHelper obj = RedisHelper() redis_sub = obj.subscribe() while True: msg = redis_sub.parse_response() print(msg)
實現方式二:
redis_helper.py: 封裝發布訂閱方法
import redis class RedisHelper(object): def __init__(self): self.__conn = redis.Redis(host="localhost") # 頻道名稱 self.chan_sub = "orders" def public(self, msg): """ 在指定頻道上發布消息 :param msg: :return: """ # publish(): 在指定頻道上發布消息,返回訂閱者的數量 self.__conn.publish(self.chan_sub, msg) return True def subscribe(self): # 返回發布訂閱對象,通過這個對象你能1)訂閱頻道 2)監聽頻道中的消息 pub = self.__conn.pubsub() # 訂閱某個頻道,與publish()中指定的頻道一樣。消息會發布到這個頻道中 pub.subscribe(self.chan_sub) return pub
redis_pub.py:
from redis_helper import RedisHelper obj = RedisHelper() for i in range(5): obj.public("hello_%s" % i)
redis_sub.py:
from redis_helper import RedisHelper obj = RedisHelper() redis_sub = obj.subscribe() while True: # listen()函數封裝了parse_response()函數 msg = redis_sub.listen() for i in msg: if i["type"] == "message": print(str(i["channel"], encoding="utf-8") + ":" + str(i["data"], encoding="utf-8")) elif i["type"] == "subscrube": print(str(i["chennel"], encoding="utf-8"))
以上兩種方式的不同之處在於,方式一使用發布訂閱對象的parse_response()方法獲取訂閱信息,方式二使用發布訂閱對象的listen()方法獲取訂閱信息。listen()方法是對parse_response()方法的封裝,加入了阻塞,並將parse_response()返回的結果進行了處理,使結果更加簡單。