Pub/Sub功能(means Publish, Subscribe)即發布及訂閱功能。基於事件的系統中,Pub/Sub是目前廣泛使用的通信模型,它采用事件作為基本的通信機制,提供大規模系統所要求的松散耦合的交互模式:訂閱者(如客戶端)以事件訂閱的方式表達出它有興趣接收的一個事件或一類事件;發布者(如服務器)可將訂閱者感興趣的事件隨時通知相關訂閱者。
通俗來講,就是說我sub端(訂閱者)一直監聽着,一旦pub端(發布者)發布了消息,那么我就接收過來,舉個例子,先是發布者:
#coding:utf-8 import time import redis number_list = ['300033', '300032', '300031', '300030'] signal = ['1', '-1', '1', '-1'] rc = redis.StrictRedis(host='***', port='6379', db=3, password='********') for i in range(len(number_list)): value_new = str(number_list[i]) + ' ' + str(signal[i]) rc.publish("liao", value_new) #發布消息到liao
接着我們來看看訂閱者:
#coding:utf-8 import time import redis rc = redis.StrictRedis(host='****', port='6379', db=3, password='******') ps = rc.pubsub() ps.subscribe('liao') #從liao訂閱消息 for item in ps.listen(): #監聽狀態:有消息發布了就拿過來 if item['type'] == 'message': print item['channel'] print item['data']
關於數據結構,也就是item,是類似於:{'pattern': None, 'type': 'message', 'channel': 'liao', 'data': '300033 1'}這樣的,所以可以通過channel來判斷這個消息是屬於哪一個隊列里的。(運行程序的時候,先運行訂閱者,在運行發布者程序)
總結,要點有兩個:
- 一是連接方式。使用python連接redis有三種方式:①使用庫中的Redis類(或StrictRedis類,其實差不多);②使用ConnectionPool連接池(可保持長連接);③使用Sentinel類(如果有多個redis做集群時,程序會自己選擇一個合適的連接)。
- 二是訂閱方法。這里使用的是StrictRedis類中的pubsub方法。連接好之后,可使用subscribe或psubscribe方法來訂閱redis消息。其中subscribe是訂閱一個頻道,psubscribe可訂閱多個頻道(這樣寫的時候,作為參數的頻道應該是一個列表)。之后就可以開始監聽了。
python開發-實現封裝redis中的發布訂閱功能
redis中的發布/訂閱模型是一種消息通信模式,今天聊一下在python中實現簡單的發布訂閱功能。
實現方式一:
redis_helper.py: 封裝發布訂閱方法
import redis class RedisHelper(object): def __init__(self): self.__conn = redis.Redis(host="localhost") # 訂閱頻道 self.chan_sub = "fm104.5" def public(self, msg): """ 在指定頻道上發布消息 :param msg: :return: """ # publish(): 在指定頻道上發布消息,返回訂閱者的數量 self.__conn.publish(self.chan_sub, msg) return True def subscribe(self): # 返回發布訂閱對象,通過這個對象你能1)訂閱頻道 2)監聽頻道中的消息 pub = self.__conn.pubsub() # 訂閱頻道,與publish()中指定的頻道一樣。消息會發布到這個頻道中 pub.subscribe(self.chan_sub) ret = pub.parse_response() # [b'subscribe', b'fm86', 1] print("ret:%s" % ret) return pub
redis_pub.py: 發布者
from redis_helper import RedisHelper obj = RedisHelper() for i in range(5): obj.public("hello_%s" % i)
redis_sub.py: 訂閱者
from redis_helper import RedisHelper obj = RedisHelper() redis_sub = obj.subscribe() while True: msg = redis_sub.parse_response() print(msg)
實現方式二:
redis_helper.py: 封裝發布訂閱方法
import redis class RedisHelper(object): def __init__(self): self.__conn = redis.Redis(host="localhost") # 頻道名稱 self.chan_sub = "orders" def public(self, msg): """ 在指定頻道上發布消息 :param msg: :return: """ # publish(): 在指定頻道上發布消息,返回訂閱者的數量 self.__conn.publish(self.chan_sub, msg) return True def subscribe(self): # 返回發布訂閱對象,通過這個對象你能1)訂閱頻道 2)監聽頻道中的消息 pub = self.__conn.pubsub() # 訂閱某個頻道,與publish()中指定的頻道一樣。消息會發布到這個頻道中 pub.subscribe(self.chan_sub) return pub
redis_pub.py:
from redis_helper import RedisHelper obj = RedisHelper() for i in range(5): obj.public("hello_%s" % i)
redis_sub.py:
from redis_helper import RedisHelper obj = RedisHelper() redis_sub = obj.subscribe() while True: # listen()函數封裝了parse_response()函數 msg = redis_sub.listen() for i in msg: if i["type"] == "message": print(str(i["channel"], encoding="utf-8") + ":" + str(i["data"], encoding="utf-8")) elif i["type"] == "subscrube": print(str(i["chennel"], encoding="utf-8"))
以上兩種方式的不同之處在於,方式一使用發布訂閱對象的parse_response()方法獲取訂閱信息,方式二使用發布訂閱對象的listen()方法獲取訂閱信息。listen()方法是對parse_response()方法的封裝,加入了阻塞,並將parse_response()返回的結果進行了處理,使結果更加簡單。