RocketMQ:(5) 消息過濾機制


  RocketMQ支持表達式過濾與類過濾兩種模式,其中表達式又分為TAG和SQL92。類過濾模式允許提交一個過濾類到FilterServer,消息消費者從FilterServer拉取消息,消息經過FilterServer時會執行過濾邏輯。

基於表達式的消息過濾

  消息發送者在消息發送時如果設置了消息的tags屬性,存儲在消息屬性中,先存儲在CommitLog文件中,然后轉發到消息消費隊列,消息消費隊列會用8個字節存儲消息tag的hashcode,之所以不直接存儲tag字符串,是因為將ConumeQueue設計為定長結構,加快消息消費的加載性能。
  RocketMQ基於表達式的消息過濾是在訂閱時做過濾。在Broker端拉取消息時,遍歷ConsumeQueue,只對比消息tag的hashcode,如果匹配則返回,否則忽略該消息。Consume在收到消息后,同樣需要先對消息進行過濾,只是此時比較的是消息tag的值而不再是hashcode

Step1:消費者訂閱消息主題與消息過濾表達式。構建訂閱信息subscriptionData並加入到RebalanceImpl進行消息隊列負載。
  subscriptionData的核心屬性:
  1)String SUB_ALL:過濾模式,默認為全匹配。
  2)boolean classFilterMode:是否是類過濾模式,默認為false。
  3)String topic:消息主題名稱。
  4)String subString:消息過濾表達式,多個用雙豎線隔開,例如“TAGA||TAGB”。
  5)Set<String> tagsSet:消息過濾tag集合,消費端過濾時進行消息過濾的依據。
  6)Set<String> codeSet:消息過濾tag hashcode集合。
  7)String expressionType:過濾類型,TAG或SQL92。
Step2:根據訂閱消息構建消息拉取標記。根據主題、消息過濾表達式構建訂閱消息實體。構建消息過濾對象。
Step3:根據偏移量拉取消息后,首先根據ConsumeQueue條目進行消息過濾,如果不匹配則直接跳過該條消息,繼續拉取下一條消息。
Step4:如果消息根據ConsumeQueue條目通過過濾,則需要從CommitLog文件中加載整個消息體,然后根據屬性進行過濾。基於TAG模式,根據ConsumeQueue進行消息過濾時只對比tag的hashcode,所以基於TAG模式消息過濾,還需要在消息消費端對消息tag進行精確匹配。

  從消息拉取流程知道,消息拉取線程PullMessageService默認會使用異步方式從服務器拉取消息,如果消息過濾模式為TAG模式,並且訂閱TAG集合不為空,則對消息的tag進行判斷,如果集合中包含消息的TAG則返回給消費者消費,否則跳過。

 

消息過濾FilterServer

ClassFilter運行機制

  基於類模式過濾是指在 Broker 端運行1個或多個消息過濾服務器(FilterServer), RocketMQ 允許消息消費者自定義消息過濾實現類並將其代碼上傳到 FilterServer 上,消息消費者向 FilterServer 拉取消息,FilterServer將消息消費者的拉取命令轉發到 Broker,然后對返回的消息執行消息過濾邏輯,最終將消息返回給消費端。

1)Broker 進程所在的服務器會啟動多個 FilterServer 進程。
2)消費者在訂閱消息主題時會上傳一個自定義的消息過濾實現類,FilterServer 加載並實例化。
3)消息消費者(Consume)向 FilterServer 發送消息拉取請求,FilterServer 接收到消息消費者消息拉取請求后,FilterServer 將消息拉取請求轉發給 Broker,Broker 返回消息后在 FilterServer 端執行消息過濾邏輯,然后返回符合訂閱信息的消息給消息消費者進行消費。

FilterServer 注冊

  FilterServer在啟動時會創建一個定時調度任務,每隔10s向Broker注冊自己。
  Step1:FilterServer從配置文件中獲取Broker地址,然后將FilterServer所在機器的IP與監聽端口發送到Broker服務器。
  Step2:FilterServer與Broker通過心跳維持FilterServer在Broker端的注冊,同樣在Broker每隔10s掃描一下該注冊表,如果30s內未收到FilterServer的注冊信息,將關閉Broker與FilterServer的連接。Broker為了避免Broker端FilterServer的異常退出導致FilterServer進程越來越少,同樣提供一個定時任務每30s檢測一下當前存活的FilterServer進程個數。
  經過上面的步驟,Broker上已經保存了FilterServer的信息。那么NameServer中關於Broker的filterServer信息是如何從消息服務器(Broker)傳輸到NameServer的呢?Broker通過與所有NameServer的心跳包向NameServer注冊Broker上存儲的FilterServer列表,指引消息消費者正確從FilterServer上拉取消息。Brokers每30s向所有NameServer發送心跳包,心跳包中包含了集群名稱、Broker名稱、Broker地址、BrokerId、haServer地址、topic配置、過濾服務器列表等。

類過濾模式訂閱機制

  RocketMQ 通過DefaultMQPushConsumerimpl#subscribe方法來實現基於類模式的消息過濾,其參數分別代表消費組訂閱的消息主題、類過濾全路徑名、類過濾源代碼字符串 。
  Step1:構建訂閱信息,然后將該訂閱信息添加到 Rebalancelmpl 中,其主要目標是Rebalancelmpl會對訂閱信息表中的主題進行消息隊列的負載,創建消息拉取任務,以便PullMessageService 線程拉取消息 。
  Step2:定時將消息端訂閱信息中的類過濾模式的過濾類源碼上傳到 FilterServer。
  Step3:根據訂閱的主題獲取該主題的路由信息,如果該主題路由信息中的FilterServer緩存表不為空,則需要將過濾類發送到FilterServer上
  Step4:遍歷主題路由表中的 fiIterServerTable,向緩存中所有的 FilterServer 上傳消息過濾代碼 。
  Step5:FilterServer 端處理 FilterClass上傳並將其源碼編譯的實現為FilterClassManager 。
  Step6:根據消息消費組與主題名稱構建 filterClasTable 緩存 key,從緩存表中嘗試獲取過濾類型信息FilterClasslnfo。如果緩存表中不包含 FilterClasslnfo 則表示第一次注冊,設置 registerNew 為true;如果 FilterClasslnfo不為空,說明該消息消費組不是第一次注冊。如果服務端開啟允許消息消費者上傳FilterClass,比較兩個的 classCRC,如果不相同,說明FilterClass 的源碼發生了變化,設置 registerNew 為 true 。
  Step7:如果是第一次注冊,則創建 FilterClasslnfo,如果 FilterServer 允許消息消費者上傳過濾類源碼,則使用 JDK 提供的方法將源代碼編譯並加裝,然后創建其實例,並強制類型轉換為 MessageFilter,也就是自定義的消息過濾類必須實現 MessageFilter 接口 。
  上述整個過程就完成了消息消費端向 FilterServer 上傳過濾類的過程,但如果FilterServer 不允許消息消費者上傳 FilterC!ass ,則 filterServerTable 中存在的過濾類信息只包含className、classCRC、消息過濾類 MessageFilter 屬性都為空,也就是說會忽略消息消費者上傳的過濾類源代碼,那過濾類的源碼從哪獲取呢?FilterServer 會開啟一個定時任務,每隔1分鍾從遠程服務器下載過濾類源碼,再將其編譯與實例化。

消息拉取

  RocketMQ 消息的過濾發生在消息消費的時候,PullMessageService 線程默認從Broker上拉取消息,執行相關的過濾邏輯。
  Step1:在FilterServer過濾模式下,在消息拉取時,如果發現消息過濾模式為classFilter,將拉取消息服務器地址由原來的Broker地址轉換成該Broker服務器所對應的FilterServer。
  Step2:獲取該消息主題的路由信息,從路由信息中獲取 Broker 對應的FilterServer列表,如果不為空則隨機從FilterServer列表中選擇一個FilterServer,發送拉取消息請求至相應的FilterServer上,FilterServer將拉取請求轉發給Broker,然后對返回的消息執行消息過濾邏輯,在過濾服務器將消息過濾后再返回給消息消費者

類過濾模式相比TAG模式過濾的優勢

1 )基於TAG模式消息過濾,由於在消息服務端進行消息過濾是匹配消息TAG的hashcode,導致服務端過濾並不十分准確,從服務端返回的消息最終並不一定是消息消費者訂閱的消息,造成網絡帶寬的浪費,而基於類模式的消息過濾所有的過濾操作全部在FilterServer端進行。
2 )由於FilterServer 與 Broker 運行在同一台 機器上,消息的傳輸是通過本地回環通信,不會浪費Broker端的網絡資源 。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM