OK,我們在使用RocketMQ的Filter服務時候,需要注意一些問題,如題我們迅速開始吧!
首先,我們需要啟動我們的MQ服務,順序為 namesrv、broker、filter。我配置的為:2m-2s-async(也就是兩主兩從模式)
我們需要在broker-*.properties文件里添加一句話,如下:
filterServerNums=1
其實我們也可以手工的去啟動filter!
接下來:
馬上進行建立rocketMQAPI工程,然后引入jar,可從github上來下載demo,地址為:https://github.com/alibaba/RocketMQ ,如圖所示:
首先看一下product端代碼,如下:發送100條消息到broker。
package bhz.mq.filter; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { String group_name = "filter_producer"; DefaultMQProducer producer = new DefaultMQProducer(group_name); producer.setNamesrvAddr("192.168.1.111:9876;192.168.1.112:9876;192.168.1.113:9876;192.168.1.114:9876"); producer.start(); try { for (int i = 0; i < 100; i++) { Message msg = new Message("TopicFilter7",// topic "TagA",// tag "OrderID001",// key ("Hello MetaQ" + i).getBytes());// body msg.putUserProperty("SequenceId", String.valueOf(i)); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
接下來我們看一下consumer代碼,如下:需要注意的一點是我們所上傳的過濾器類里,一定不允許有中文,否則MixAll.file2String方法返回null,Filter不識別!!!!
package bhz.mq.filter; import java.io.UnsupportedEncodingException; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.MixAll; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { String group_name = "filter_consumer"; DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name); consumer.setNamesrvAddr("192.168.1.111:9876;192.168.1.112:9876;192.168.1.113:9876;192.168.1.114:9876"); // 使用Java代碼,在服務器做消息過濾 String filterCode = MixAll.file2String("D:\\workspace_001\\rocketmqAPI\\src\\bhz\\mq\\filter\\MessageFilterImpl.java"); System.out.println(filterCode); consumer.subscribe("TopicFilter7", "bhz.mq.filter.MessageFilterImpl", filterCode); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); try { System.out.println(new String(msgs.get(0).getBody(),"utf-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
OK 最后我們看一下MessageFilterImpl這個類:再次強調一下,注意這個類是上傳到FilterSrv的,不要有中文,上面說了不然MixAll.file2String方法返回null。
package bhz.mq.filter; import com.alibaba.rocketmq.common.filter.MessageFilter; import com.alibaba.rocketmq.common.message.MessageExt; public class MessageFilterImpl implements MessageFilter { @Override public boolean match(MessageExt msg) { // NO Chinese System.out.println("-------------"); String property = msg.getUserProperty("SequenceId"); System.out.println("---------" + property); if (property != null) { int id = Integer.parseInt(property); if((id % 2) == 0) { //if ((id % 3) == 0 && (id > 10)) { return true; } } return false; } }
OK。我們進行測試,先運行consumer,然后運行producer發送數據,這樣就好根據MessageFilterImpl的代碼過濾出來 id % 2 == 0 的所有數據給Consumer端消費了!
最后我們總結下:我們一般是一個broker上配置多個filter服務,其實就是用cpu換取寶貴網卡的資源!