RocketMQ使用Filter問題!


OK,我們在使用RocketMQ的Filter服務時候,需要注意一些問題,如題我們迅速開始吧!

首先,我們需要啟動我們的MQ服務,順序為 namesrv、broker、filter。我配置的為:2m-2s-async(也就是兩主兩從模式)

我們需要在broker-*.properties文件里添加一句話,如下:

filterServerNums=1

其實我們也可以手工的去啟動filter!

接下來:

馬上進行建立rocketMQAPI工程,然后引入jar,可從github上來下載demo,地址為:https://github.com/alibaba/RocketMQ ,如圖所示:

首先看一下product端代碼,如下:發送100條消息到broker。

package bhz.mq.filter;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
 
 
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        String group_name = "filter_producer";
        DefaultMQProducer producer = new DefaultMQProducer(group_name);
        producer.setNamesrvAddr("192.168.1.111:9876;192.168.1.112:9876;192.168.1.113:9876;192.168.1.114:9876");
        producer.start();
        try {
            for (int i = 0; i < 100; i++) {
                Message msg = new Message("TopicFilter7",// topic
                    "TagA",// tag
                    "OrderID001",// key
                    ("Hello MetaQ" + i).getBytes());// body
                msg.putUserProperty("SequenceId", String.valueOf(i));
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        producer.shutdown();
    }
}

接下來我們看一下consumer代碼,如下:需要注意的一點是我們所上傳的過濾器類里,一定不允許有中文,否則MixAll.file2String方法返回null,Filter不識別!!!!

package bhz.mq.filter;

import java.io.UnsupportedEncodingException;
import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
 
 
public class Consumer {
 
    public static void main(String[] args) throws InterruptedException, MQClientException {
        String group_name = "filter_consumer";
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
        consumer.setNamesrvAddr("192.168.1.111:9876;192.168.1.112:9876;192.168.1.113:9876;192.168.1.114:9876");
        // 使用Java代碼,在服務器做消息過濾
        String filterCode = MixAll.file2String("D:\\workspace_001\\rocketmqAPI\\src\\bhz\\mq\\filter\\MessageFilterImpl.java");
        System.out.println(filterCode);
        consumer.subscribe("TopicFilter7", "bhz.mq.filter.MessageFilterImpl", filterCode);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                //System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                try {
                    System.out.println(new String(msgs.get(0).getBody(),"utf-8"));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
   }
}

OK 最后我們看一下MessageFilterImpl這個類:再次強調一下,注意這個類是上傳到FilterSrv的,不要有中文,上面說了不然MixAll.file2String方法返回null。

package bhz.mq.filter;
import com.alibaba.rocketmq.common.filter.MessageFilter;
import com.alibaba.rocketmq.common.message.MessageExt;
 
 
public class MessageFilterImpl implements MessageFilter {
 
    @Override
    public boolean match(MessageExt msg) {
        // NO Chinese
        System.out.println("-------------");
        String property = msg.getUserProperty("SequenceId");
        System.out.println("---------" + property);
        if (property != null) {
            int id = Integer.parseInt(property);
            if((id % 2) == 0) {
            //if ((id % 3) == 0 && (id > 10)) {
                return true;
            }
        }
 
        return false;
    }
}

OK。我們進行測試,先運行consumer,然后運行producer發送數據,這樣就好根據MessageFilterImpl的代碼過濾出來 id % 2 == 0 的所有數據給Consumer端消費了!

最后我們總結下:我們一般是一個broker上配置多個filter服務,其實就是用cpu換取寶貴網卡的資源!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM