PubSub ——“發布/訂閱”模式


訂閱者(Sub)通過SUBSCRIBE 命令和PSUBSCRIBE命令向redis 服務訂閱頻道(channel),當發布者通過PUBLISH 命令向chinnel發布命令時,訂閱該頻道的客戶端都會受到此消息。

##PUB/SUB 機制
三個客戶端都訂閱channel1頻道

訂閱

當有新消息通過PUBLISH命令發布到channel1時,這個消息會被發送給訂閱這個頻道的客戶端。
這里寫圖片描述

 

redis中的pubsub機制:https://www.cnblogs.com/longjee/p/8668974.html

 一個Redis client發布消息,其他多個redis client訂閱消息,發布的消息“即發即失”,redis不會持久保存發布的消息;消息訂閱者也將只能得到訂閱之后的消息,通道中此前的消息將無從獲得。

   消息發布者,即publish客戶端,無需獨占鏈接,你可以在publish消息的同時,使用同一個redis-client鏈接進行其他操作(例如:INCR等)
      消息訂閱者,即subscribe客戶端,需要獨占鏈接,即進行subscribe期間,redis-client無法穿插其他操作,
      此時client以阻塞的方式等待“publish端”的消息;因此這里subscribe端需要使用單獨的鏈接,甚至需要在額外的線程中使用。
      Tcp默認連接時間固定,如果在這時間內sub端沒有接收到pub端消息,或pub端沒有消息產生,sub端的連接都會被強制回收,
     這里就需要使用特殊手段解決,用定時器來模擬pub和sub之間的保活機制,定時器時間不能超過TCP最大連接時間,具體根據機器環境來定;

    一旦subscribe端斷開鏈接,將會失去部分消息,即鏈接失效期間的消息將會丟失,所以這里就需要考慮到借助redis的list來持久化;

總結:pub發布的消息不會持久化,sub是阻塞等待消息,只能獲取訂閱之后的產生的消息,一段時間內sub沒有收到消息或pub沒有生產消息,sub連接會被回收(因為sub是阻塞的).

 如果你非常關注每個消息,那么你應該基於Redis做一些額外的補充工作,如果你期望訂閱是持久的,那么如下的設計思路可以借鑒:

    1) subscribe端:
        首先向一個Set集合中增加“訂閱者ID”, 此Set集合保存了“活躍訂閱”者,訂閱者ID標記每個唯一的訂閱者,此Set為 "活躍訂閱者集合"

    2) subcribe端開啟訂閱操作,並基於Redis創建一個以 "訂閱者ID" 為KEY的LIST數據結構,此LIST中存儲了所有的尚未消費的消息,此List稱為 "訂閱者消息隊列"
        
    3) publish端:
        每發布一條消息之后,publish端都需要遍歷 "活躍訂閱者集合",並依次向每個 "訂閱者消息隊列" 尾部追加此次發布的消息.
        
    4) 到此為止,我們可以基本保證,發布的每一條消息,都會持久保存在每個 "訂閱者消息隊列" 中.
    
    5) subscribe端,每收到一個訂閱消息,在消費之后,必須刪除自己的 "訂閱者消息隊列" 頭部的一條記錄.
    
    6) subscribe端啟動時,如果發現自己的 "訂閱者消息隊列" 有殘存記錄, 那么將會首先消費這些記錄,然后再去訂閱.

以上方法可以保證成功到達的消息必消費不丟失;

pub/sub中消息發布者不需要獨占一個Redis的鏈接,而消費者則需要單獨占用一個Redis的鏈接,在java中便不得獨立出分出一個線程來處理消費者。這種場景一般對應這多個消費者,此時則有着過高的資源消耗。

對於如上的幾種不足,如果在項目中需要考慮的話可以使用JMS來實現該功能。JMS提供了消息的持久化/耐久性等各種企業級的特性。如果依然想使用Redis來實現並做一些數據的持久化操作,則可以根據JMS的特性來通過Redis模擬出來.

  • subscribe端首先向一個Set集合中增加“訂閱者ID”,此Set集合保存了“活躍訂閱”者,訂閱者ID標記每個唯一的訂閱者,例如:sub:email,sub:web。此SET稱為“活躍訂閱者集合”
  • subcribe端開啟訂閱操作,並基於Redis創建一個以“訂閱者ID”為KEY的LIST數據結構,此LIST中存儲了所有的尚未消費的消息。此LIST稱為“訂閱者消息隊列”
  • publish端:每發布一條消息之后,publish端都需要遍歷“活躍訂閱者集合”,並依次向每個“訂閱者消息隊列”尾部追加此次發布的消息。
  • 到此為止,我們可以基本保證,發布的每一條消息,都會持久保存在每個“訂閱者消息隊列”中。
  • subscribe端,每收到一個訂閱消息,在消費之后,必須刪除自己的“訂閱者消息隊列”頭部的一條記錄。
  • subscribe端啟動時,如果發現自己的自己的“訂閱者消息隊列”有殘存記錄,那么將會首先消費這些記錄,然后再去訂閱。

協程通信機制——Pub/Sub

新的協程框架VLCP。它使用的是一種Pub/Sub模型,即發布者、訂閱者模型,這是常用於消息隊列中的模型,熟悉消息隊列用法就會非常熟悉這一套用法。接下來我們詳細介紹一下這一套方法,並且看一下這種設計如何結合前幾種方案的優點,彌補前幾種方案的缺點。

在Pub/Sub模型中,主要分為三個不同的角色:

  1. 發送方:通過send方法發送一個事件
  2. 接收方:通過參數訂閱並接收一個事件
  3. 管理:調整事件的優先順序等

管理也可以由發送方或接收方中任意一個來兼任,主要目的是在許多事件同時存在時,調整優先級順序,從而影響協程的執行先后次序。

在VLCP當中,發送通過scheduler.send(或者更高層的RoutinerContainer.waitForSend),接收則通過yield語句,可以非常容易的進行。管理則通過調整調度器隊列設置進行。

VLCP中的事件是vlcp.event.Event的子類,它首先根據子類類型進行區分,但與其他框架不同,子類可以進一步攜帶一組索引,用來標識這個事件的不同性質,它與事件類型一起共同起着類似於Pub/Sub中的主題(Topic)的作用。比如說,我們處理OpenFlow協議中的PACKET_IN消息,現在希望定義一種事件來表示有一個PACKET_IN消息到來了,對於接收方來說,可能關心的信息有:消息來自於哪個datapath;來自於哪個連接對象;由哪個table中、cookies為多少的流表生成。我們可以將這些信息作為索引來定義這個事件:

from vlcp.event import Event, withIndices @withIndices('datapath', 'connection', 'table' 'cookies') class OpenFlowPacketInEvent(Event): pass 

如你所見,定義一個事件非常容易,而定義一個事件幾乎就完成了通信需要進行的所有准備工作——不需要創建額外的Future或者Channel對象,甚至,不需要關心要進行通信的雙方究竟是誰、在哪、有多少個。使用注解withIndices來定義一個Event的索引,這是必須的,即使Event沒有可選的索引,也必須用@withIndices()來表明Event沒有索引。

接下來,處理OpenFlow協議的協程會在這個事件發生時通知需要處理事件的協程,它只需要調用發送方的標准方法:

for m in container.waitForSend(OpenFlowPacketInEvent( conn.datapath, conn, message.table, message.cookies, message = message )): yield m 

在這個過程中,我們創建了一個新的Event的實例,並提供了相應的索引的值。除了規定的索引值以外,我們還可以給這個Event對象提供額外的屬性,它可以直接通過keyword-argument在構造函數中初始化,也可以在創建后再進行屬性賦值。所有的索引也會自動被賦給相應的屬性,比如說newevent.datapath就會得到datapath索引的值。將這個對象傳遞給waitForSend過程就完成了發送,waitForSend是個協程過程,使用for來在外層協程中代理這個過程,在Python3當中也可以更簡單寫成yield from container.waitForSend(...)

那么接下來是接收方的問題,接收方不需要關心事件何時由誰發出,當需要等待一個新的事件發生的時候,只需要簡單使用:

packet_in_matcher = OpenFlowPacketInEvent.createMatcher(None, None, my_table, my_cookies) # Or: # packet_in_matcher = OpenFlowPacketInEvent.createMatcher(table = my_table, cookies = my_cookies) yield (packet_in_matcher,) 

在協程中使用yield會暫停協程執行,在VLCP中,yield返回的是一個EventMatcher構成的元組,它可以包含一個或多個EventMatcher。EventMatcher通過Event子類的createMatcher方法創建,它代表一種匹配規則,即匹配這個子類的Event中,相應索引匹配相應值的事件。返回多個EventMatcher時,yield語句會在某個事件匹配任意一個EventMatcher時返回。匹配到的EventMatcher會保存在container.matcher,而發生的事件會保存在container.event。

VLCP內部使用前綴樹的數據結構對Event和EventMatcher進行匹配,這是一個很有效率的數據結構,將Event匹配到相應的EventMatcher只需要O(1)的時間。

對同一個事件,不同的協程可以通過createMatcher時的不同參數,來匹配事件集合的不同的子集,這在處理量非常大的時候可以有效提高處理效率,同時不增加程序復雜度。除了使用索引以外,還可以增加一個自定義的篩選過程:

customized_matcher = OpenFlowPacketInEvent.createMatcher( table = my_table, cookies = my_cookies, _ismatch = lambda x: len(x.message.data) < 100) 

_ismatch的keyword參數用來指定一個函數用於篩選,它接受Event作為唯一的參數,返回True或者False表示是否應當匹配這個Event。_ismatch只有指定索引值已經匹配的情況下才會進行計算。

Event的子類可以進一步派生。進一步派生的Event會繼承父類的類型和索引,但也會有自己的類型和索引。子類的子類遵循一般的繼承派生的規則:父類的EventMatcher可以匹配子類的Event,但子類的EventMatcher不能匹配父類的Event。比如:

@withIndices('a', 'b') class MyEventBase(Event): pass @withIndices('c', 'd') class MyEventChild(MyEventBase): pass MyEventChild(1,2,3,4) # a = 1, b = 2, c = 3, d = 4 MyEventBase.createMatcher(1,2).isMatch(MyEventChild(1,2,3,4)) # True MyEventChild.createMatcher(1,2).isMatch(MyEvent(1,2)) # False 

利用這種特殊的繼承關系可以拓展原有邏輯,在兼容以前代碼的情況下提供新的功能。

VLCP的事件循環結構

VLCP的事件循環結構可以用上圖表示。所有在調度器中運行的協程,都會在暫停運行時將自己注冊到匹配樹中,與一個或多個EventMatcher進行關聯,這個過程通過yield語句完成。在事件循環運行過程中,調度器每次從中心隊列中取出一個事件,在匹配樹中查找與這個事件匹配的EventMatcher和相關聯的協程,然后依次喚醒這些協程,通知它們等待的事件已經發生;協程在運行時,可以將事件通過send過程發送到中心消息隊列。在協程停止運行時,協程重新使用yield語句將自己注冊到匹配樹中,等待下一個循環。

當消息隊列為空或無法出隊時,調度器會調用Poller(在Linux當中由EPoll實現,其他操作系統當中使用Select)等待socket活動。Poller會將socket的活動返回成PollEvent,這同樣是Event的子類,這些事件會由負責處理socket活動的協程進行處理,然后進一步產生后續的事件。當沒有活動的socket時,調度器會開始引導整個框架退出。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM