【Broker端進行消息過濾】
在Broker端進行消息過濾,可以減少無效消息發送到Consumer,少占用網絡寬帶從而提高吞吐量。
【過濾方式1——通過Tag過濾】
[ 關於Tag和Key ]
對一個應用來說,盡可能只用一個Topic,不同消息子類型用Tag來標識,每條消息只能有一個Tag,服務端基於Tag進行過濾,並不需要讀取消息體的內容,效率較高。Producer發送消息設置了Tag以后,Consumer在訂閱消息時,才會利用Tag在Broker端做消息過濾。
消息的Key,發送的消息設置好Key,以后可以根據這個Key來查詢消息,這個Key一般用消息在業務層面的唯一標識碼識別,這樣后續查詢消息異常、消息丟失等都很方便。Broker會創建專門的索引文件,來存儲Key到消息的映射,由於Hash索引,應盡量使Key唯一,避免潛在的Hash沖突。
Tag和Key主要差別是試用場景不同,Tag用在Consumer的代碼匯總,用來進行服務端消息過濾,Key主要用於通過命令行查詢消息。
[ 通過Tag進行消息過濾 ]
Tag標簽是一個普通字符串,在創建Message時添加,一個Message只能有一個Tag,使用Tag方式過濾非常高效,Broker端可以在ConsumerQueue中做這種過濾,只從CommitLog里讀取過濾后被命中的消息。
Consumer的存儲格式如下:
ConsumerQueue的第三部分存儲的是Tag對應的HashCode,是一個定長的字符串通過Tag過濾的過程就是對比定長的Hashcode,經過hashcode對比,符合要求的消息從CommitLog中讀取出來,不用擔心Hash沖突問題,消息在被消費前,會對比完整的MessageTag字符串,消除Hash沖突造成的誤讀。
【過濾方式2——用SQL表達式的方式進行過濾】
Tag方式雖然高效,但是支持的邏輯比較簡單。
在Producer端構造Message時,還可以通過putUserProperty方法來增加多個自定義的屬性,基於這些屬性可以做復雜的過濾邏輯。
[ 構建Message的代碼 ]
String tag= "AAA"; Message message = new Message("TopicA",tag,"Hello RocketMQ ".getBytes(RemotingHelper.DEFAULT_CHARSET)); message.putUserProperty("a",String.valueOf(1)); message.putUserProperty("b","hello");
[ 消費端的代碼 ]
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_A"); //過濾屬性a的值在0-3之間 consumer.subscribe("TopicA", MessageSelector.bySql("a between 0 and 3")); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
類似SQL表達式,支持如下語法:
1.數字對比:比如 >、>=、<、<=、BETWEEN、= 2.字符串對比:比如=、<>、IN 3.IS NULL 或 IS NOT NULL 4.邏輯符號 AND、OR、NOT
支持的數據類型:
1.數字型,比如123,456 2.字符型,比如 'abc',注意必須用單引號 3.NULL,這個特殊字符 4.布爾類型,TRUE或FALSE
SQL表達式的過濾需要Broker先讀出消息里的屬性內容,然后做SQL計算,增大磁盤壓力,沒有Tag方式高效。
【過濾方式3——Filter Server 方式過濾】
Filter Server方式是一種比SQL方式更靈活的過濾方式,允許用戶自定義Java方法,根據Java方法的邏輯進行過濾。