好久沒寫博客了。
最近公司開了新項目,我負責的內容之一是系統的后端。具體項目內容我就不介紹了,但是用到的技術有些還是很有趣的,值得記錄一下。今天介紹的就是其中一個:利用redis的pubsub訂閱消息功能做消息隊列。
對於這個功能本身,還是比較簡單的。redis本身支持了publish/subscribe的功能,publish是廣播消息,subscribe是訂閱消息。服務端使用
publish [channel] [content]
發布了一條消息,如果客戶端已經提前訂閱了這個頻道,這個時候就可以收到消息了。訂閱的命令也很簡單
subscribe [channel]
之后客戶端就開始進入監聽狀態了。
這個功能用python實現起來也很簡單,直接使用redis庫就可以。至於基本的使用方法,我就不介紹了,這個隨便百度一下就一大片。重點來說說redis里面的pubsub功能——其實也是百度翻到的,寫一個輔助類:
class RedisSubscriber(object): """ Redis頻道訂閱輔助類 """ def __init__(self, channel): self._sentinel = Sentinel(config.RedisConfig.HOST_PORT, password=config.RedisConfig.PASSWORD) self.conn = self._sentinel.master_for(config.RedisConfig.MASTER) self.channel = channel # 定義頻道名稱 def psubscribe(self): """ 訂閱方法 """ pub = self.conn.pubsub() pub.psubscribe(self.channel) # 同時訂閱多個頻道,要用psubscribe pub.listen() return pub
這個類里面需要解釋的有兩個地方:
- 一是連接方式。使用python連接redis有三種方式:①使用庫中的Redis類(或StrictRedis類,其實差不多);②使用ConnectionPool連接池(可保持長連接);③使用Sentinel類(如果有多個redis做集群時,程序會自己選擇一個合適的連接)。我項目中的redis就是個集群,所以使用了第三種方式。
- 二是訂閱方法。這里使用的是StrictRedis類中的pubsub方法。連接好之后,可使用subscribe或psubscribe方法來訂閱redis消息。其中subscribe是訂閱一個頻道,psubscribe可訂閱多個頻道(這樣寫的時候,作為參數的頻道應該是一個列表)。之后就可以開始監聽了。
接收的地方是這樣:
def test(): subscriber = RedisSubscriber([channel1, channel2, ...]) redis_sub = subscriber.psubscribe() # 調用訂閱方法 while True: msg = redis_sub.parse_response(block=False, timeout=60) print("收到訂閱消息 %s" % msg)
注意:
- 剛開始監聽的時候,會收到一條消息,類似於 [b'psubscribe', b'#你訂閱的頻道#', 1] 這樣。出現了這條消息,說明訂閱成功了。
- parse_response像這么使用的話,是非阻塞的,如果收不到消息,60秒收不到消息就會返回None。這倆參數可以不加,變成阻塞的。
這就完了。
這就完了?大多數文章就只是簡單的介紹到這里了。但是我在使用的時候發現一個非常惡心的問題:
訂閱消息過一段時間后就沒動靜了。沒有任何異常,就是簡單的停下了。時間不定,比較常見的是2-4個小時,長的話可能兩三天(python群里有位朋友也出現了一毛一樣的問題,也是找了很多資料無果)。我也找了很多資料,有的說是redis服務器緩存滿了,就斷開了,可以通過修改redis-server的緩存大小來解決。可是,這不科學啊!
再經過幾天的實驗和研究,我猜測了這種情況可能發生的原因:
客戶端只是主動連接了服務器,而服務器是不在意的,過段時間發現這個客戶端沒啥用,就主動斷開了。之后,客戶端也不會有報錯,只是尷尬地訂閱着空氣。。。
這個世界好安靜啊!
於是我又嘗試了各種方法,比如:訂閱返回None的時候把訂閱取消,重新訂閱——不管用;把連接斷掉重新建立連接——不管用;隨便給redis發一條消息——也不管用。
所以我不開心了。我決定采用比較暴力的方式:redis連接建立后,就開一條線程,每分鍾主動給服務器發送一條消息(這就好比你睡覺的時候,有人在你身邊,每分鍾問你一遍,喂,你還活着嗎?)。我在RedisSubscriber這個輔助類里面加了個方法:
def keep_alive(self): """ 保持客戶端長連接 """ ka_thread = threading.Thread(target=self._ping) ka_thread.start() def _ping(self): """ 發個消息,刷存在感 """ while True: time.sleep(60) # 嘗試向redis-server發一條消息 if not self.conn.ping(): print("oops~ redis-server get lost. call him back now!") del self._sentinel self._sentinel = Sentinel(config.RedisConfig.HOST_PORT, password=config.RedisConfig.PASSWORD) self.conn = self._sentinel.master_for(config.RedisConfig.MASTER)
然后,在test()中,創建好RedisSubscriber類對象之后,加一句
subscriber.keep_alive()
就好。
經過了一個禮拜的測試,訂閱消息還活着。我想這差不多可以算是我猜對了。暫時當做這個問題解決了吧。
