win10 RocketMQ的簡單運用


1、在官網下載RocketMQ,下載完解壓即可,下面也主要是根據官網來進行操作的,同時也適當地借鑒了其他blog。

2、添加環境變量:

ROCKETMQ_HOME="D:\rocketmq"
NAMESRV_ADDR="localhost:9876"

3、啟動Name Server

mqnamesrv.cmd

4、啟動Broker,如果啟動失敗,刪除C:\Users\"當前系統用戶名"\store下的所有文件,注意要添加autoCreateTopicEnable=true 否則在創建消息組時會報錯:MQClientException: No route info of this topic

mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

5、創建Maven工程,添加RocketMQ依賴,注意版本要和自己的服務器(下載的版本一致),否則可能會出現MQClientException: No route info of this topic這個錯誤

  <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.6.0</version>
        </dependency>

 

6、provider:

package provider;

import com.sun.xml.internal.bind.api.impl.NameConverter;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Pro {
    public static void main(String [] args){
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Demo");
        defaultMQProducer.setNamesrvAddr("localhost:9876");
        try {
            defaultMQProducer.start();
            for(int i = 0;i<100;i++){
                try {
                    Message message = new Message("Topic1","Tag1",
                            ("Hello World"+i).getBytes("UTF-8"));
                    SendResult sendResult = defaultMQProducer.send(message);
                    //defaultMQProducer.sendOneway(message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            defaultMQProducer.shutdown();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}

7、consumer

package consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public  static  void main(String [] args){
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerDemo");
        consumer.setNamesrvAddr("localhost:9876");
        try {
            consumer.subscribe("Topic1","*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus
                consumeMessage(List<MessageExt> list,
                               ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    for(MessageExt messageExt :list){
                        String str  =  new String(messageExt.getBody());
                        System.out.println(str);
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
//            consumer.registerMessageListener(new MessageListenerOrderly() {
//                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
//                    for(MessageExt messageExt :list){
//                        String str  =  new String(messageExt.getBody());
//                        System.out.println(str);
//                    }
//                    return  ConsumeOrderlyStatus.SUCCESS;
//                }
//            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

 

7、輸出結果,可以看到輸出的結果並不是很一致。

先看看RocketMQ架構圖

 

 

1,啟動Namesrv,Namesrv起來后監聽端口,等待Broker、Produer、Consumer連上來,相當於一個路由控制中心。
2,Broker啟動,跟所有的Namesrv保持長連接,定時發送心跳包。心跳包中包含當前Broker信息(IP+端口等)以及存儲所有topic信息。注冊成功后,namesrv集群中就有Topic跟Broker的映射關系。
3,收發消息前,先創建topic,創建topic時需要指定該topic要存儲在哪些Broker上。也可以在發送消息時自動創建Topic。
4,Producer發送消息,啟動時先跟Namesrv集群中的其中一台建立長連接,並從Namesrv中獲取當前發送的Topic存在哪些Broker上,然后跟對應的Broker建立長連接,直接向Broker發消息。
5,Consumer跟Producer類似。跟其中一台Namesrv建立長連接,獲取當前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道,開始消費消息。


 

 再看看消息框架圖:

 

 可以看到在同一個Topic下的消息在被放在了不同的broker,並且在同一個broker下,存放在不同的隊列當中,雖然本地運行時只有一個broker,但是還是有很多隊列,所以消息獲取者獲取到消息順序是不一定的。


進行改進:

provide:在這里將同一業務流程放在一同一隊列

package provider;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
import java.util.Scanner;

public class Pro {
    public static void main(String [] args){
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Demo");
        defaultMQProducer.setNamesrvAddr("localhost:9876");
        String[] tags = new String[]{"創建訂單", "支付", "發貨", "收貨", "五星好評"};
        try {
            defaultMQProducer.start();
            for(int i = 0;i<100;i++){
                int ordId = i /5 +1;
                try {
                    Message message = new Message("Topic1",tags[i%5],"ID"+i,
                            ("order"+ordId+":"+tags[i%5]).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    //SendResult sendResult = defaultMQProducer.send(message);
                    //defaultMQProducer.sendOneway(message);
                    SendResult sendResult = defaultMQProducer.send(message,new MessageQueueSelector(){
                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                            System.out.println(list.size()+"  message:"+message.toString()+"    o:"+o);
                            //o即為ordId
                            Integer id = (Integer) o;
                            int index = id % list.size();
                            return list.get(index);
                        }
                    },ordId);
                    System.out.printf("%s%n",sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            defaultMQProducer.shutdown();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}

 

 

consumer:在這里消息接收換了個函數,在這里同一隊列里面的消息是按順序消費的

package consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public  static  void main(String [] args){
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerDemo");
        consumer.setNamesrvAddr("localhost:9876");
        try {
            consumer.subscribe("Topic1","*");
//            consumer.registerMessageListener(new MessageListenerConcurrently() {
//                public ConsumeConcurrentlyStatus
//                consumeMessage(List<MessageExt> list,
//                               ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//                    for(MessageExt messageExt :list){
//                        String str  =  new String(messageExt.getBody());
//                        System.out.println(str);
//                    }
//                    try {
//                        Thread.sleep(1000);
//                    } catch (InterruptedException e) {
//                        e.printStackTrace();
//                    }
//                    return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//                }
//            });
            consumer.registerMessageListener(new MessageListenerOrderly() {
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                    for(MessageExt messageExt :list){
                        String str  =  new String(messageExt.getBody());
                        System.out.println(str);
                    }
                    return  ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

 結果如下:可以看到同一訂單的順序沒有打亂

 那如何把所有的消息都按順序執行呢,放在一個隊列就可以了,就是把上面的return語句改為 return list.get(0),但是這樣做肯定會犧牲效率的,下面是演示結果

 


producer發送消息的三種方式:

可靠同步消息:同步發送是指消息發送方發出數據后,會在收到接收方發回響應之后才發下一個數據包的通訊方式。在代碼中也可以看到發送完每一個message后都會有一個SendResult返回,這個當消息存在順序問題時可以使用

 

package provider;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
import java.util.Scanner;

public class Pro {
    public static void main(String [] args){
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Demo");
        defaultMQProducer.setNamesrvAddr("localhost:9876");
        String[] tags = new String[]{"創建訂單", "支付", "發貨", "收貨", "五星好評"};
        try {
            defaultMQProducer.start();
            for(int i = 0;i<100;i++){
                int ordId = i /5 +1;
                try {
                    Message message = new Message("Topic1",tags[i%5],"ID"+i,
                            ("order"+ordId+":"+tags[i%5]).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = defaultMQProducer.send(message);
//                    //defaultMQProducer.sendOneway(message);
//                    SendResult sendResult = defaultMQProducer.send(message,new MessageQueueSelector(){
//                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//                            System.out.println(list.size()+"  message:"+message.toString()+"    o:"+o);
//                            //o即為ordId
//                            Integer id = (Integer) o;
//                            int index = id % list.size();
//                            return list.get(index);
//                        }
//                    },ordId);
                    System.out.printf("%s%n",sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            Scanner scanner = new Scanner(System.in);
            defaultMQProducer.shutdown();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}

 2、可靠異步發送,異步發送是指發送方發出數據后,不等接收方發回響應,接着發送下個數據包的通訊方式。由於是發了消息繼續發第二個消息,可能消息響應沒那么及時,因此這里用了countdownLatch,在5s內如果能接收到所有響應,結束程序,如果5s內不能收到,5s后結束程序,當然也可以使用countdownLatch.awite()得到所以的響應后停止程序。這個當消息順序不重要時可以使用,如搶購,拼網速。

 

 

 

      int messageCount = 100;
            defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);
            final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
            for (int i = 0; i < messageCount; i++) {
                try {
                    final int index = i;
                    Message msg = new Message("Jodie_topic_1023",
                            "TagA",
                            "OrderID188",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    defaultMQProducer.send(msg, new SendCallback() {

                        public void onSuccess(SendResult sendResult) {
                            countDownLatch.countDown();
                            System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                        }

                        public void onException(Throwable e) {
                            countDownLatch.countDown();
                            System.out.printf("%-10d Exception %s %n", index, e);
                            e.printStackTrace();
                        }
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            countDownLatch.await(5, TimeUnit.SECONDS);
            defaultMQProducer.shutdown();

  

 3、單向(Oneway)發送,單向(Oneway)發送特點為發送方只負責發送消息,不等待服務器回應且沒有回調函數觸發,即只發送請求不等待應答。但是可能存在消息沒有完全交付。可以看到producer.sendOneway(msg)並沒有返回結果。

 

 

 

 producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
        }

 


消息廣播:broadcasting ,官網描述很清晰:Broadcasting is sending a message to all subscribers of a topic. If you want all subscribers receive messages about a topic, broadcasting is a good choice.就是發送方發消息時,所有訂閱該topic的消費者都會收到這個消息。

這個主要是消息接收者做出改變:

package consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;

public class ConsumerBroadcast {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

        //一個新的訂閱組第一次啟動從隊列的最后位置開始消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //set to broadcast mode
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //訂閱主題
        consumer.subscribe("Topic1", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}

  


 

 定時發送與接收

     目前RocketMQ只支持固定精度級別的定時消息,服務器按照1-N定義了如下級別: “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;;若要發送定時消息,在應用層初始化Message消息對象之后,調用setDelayTimeLevel(int level)方法來設置延遲級別,按照序列取相應的延遲級別,例如level=2,則延遲為5s:

通過運行 下面的代碼中可以看到發送消息並沒有延遲,因為我的程序只睡了5s,但是消息時全部發送了的,通過SendResult可以看到,但是當我們提前打開消息接收方時,然后在打開消息發送方,可以看到消息發送10s后消息接收方才收到。

package provider;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class Pro {
    public static void main(String [] args){
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Demo");
        defaultMQProducer.setNamesrvAddr("localhost:9876");
        String[] tags = new String[]{"創建訂單", "支付", "發貨", "收貨", "五星好評"};
        try {
            defaultMQProducer.start();
            for(int i = 0;i<100;i++){
                int ordId = i /5 +1;
                try {
                    Message message = new Message("Topic1",tags[i%5],"ID"+i,
                            ("order"+ordId+":"+tags[i%5]).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    message.setDelayTimeLevel(3);
                    SendResult sendResult = defaultMQProducer.send(message);
//                    //defaultMQProducer.sendOneway(message);
//                    SendResult sendResult = defaultMQProducer.send(message,new MessageQueueSelector(){
//                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//                            System.out.println(list.size()+"  message:"+message.toString()+"    o:"+o);
//                            //o即為ordId
//                            Integer id = (Integer) o;
//                            int index = id % list.size();
//                            return list.get(index);
//                        }
//                    },ordId);
                    System.out.printf("%s%n",sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            Thread.sleep(5000);
            defaultMQProducer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  


batch :一次可以發送多個消息,但是整個一次發送消息的大小不能超過1M,List<Message> messages = new ArrayList<>();

 


 

Filter過濾:SQL feature could do some calculation through the properties you put in when sending messages.可以在發送消息添加屬性,然后通過SQL做些計算來過濾掉不相干的消息。

provider:

package provider;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class Pro {
    public static void main(String [] args){
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Demo");
        defaultMQProducer.setNamesrvAddr("localhost:9876");
        String[] tags = new String[]{"創建訂單", "支付", "發貨", "收貨", "五星好評"};
        try {
            defaultMQProducer.start();
            for(int i = 0;i<100;i++){
                int ordId = i /5 +1;
                try {
                    Message message = new Message("Topic1",tags[i%5],"ID"+i,
                            ("order"+ordId+":"+tags[i%5]).getBytes(RemotingHelper.DEFAULT_CHARSET));
                   // message.setDelayTimeLevel(3);
                    message.putUserProperty("ordId",String.valueOf(ordId));
                    SendResult sendResult = defaultMQProducer.send(message);
//                    //defaultMQProducer.sendOneway(message);
//                    SendResult sendResult = defaultMQProducer.send(message,new MessageQueueSelector(){
//                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//                            System.out.println(list.size()+"  message:"+message.toString()+"    o:"+o);
//                            //o即為ordId
//                            Integer id = (Integer) o;
//                            int index = id % list.size();
//                            return list.get(index);
//                        }
//                    },ordId);
                    System.out.printf("%s%n",sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            Thread.sleep(5000);
            defaultMQProducer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 consumer:只接收訂單單號為1的訂單消息

package consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public  static  void main(String [] args){
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerDemo");
        consumer.setNamesrvAddr("localhost:9876");
        try {
            consumer.subscribe("Topic1",MessageSelector.bySql("ordId = 1"));
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus
                consumeMessage(List<MessageExt> list,
                               ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    for(MessageExt messageExt :list){
                        String str  =  new String(messageExt.getBody());
                        System.out.println(str);
                    }
                    return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
//            consumer.registerMessageListener(new MessageListenerOrderly() {
//                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
//                    for(MessageExt messageExt :list){
//                        String str  =  new String(messageExt.getBody());
//                        System.out.println(str);
//                    }
//                    return  ConsumeOrderlyStatus.SUCCESS;
//                }
//            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

  運行后出現如下面的錯誤:

 

網上的解決辦法是:/conf/broker.conf文件下面添加:enablePropertyFilter=true

然后用 start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf來啟動,用這個方法也確實成功了

 

 但是每次啟動broker時都要使用start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf這個來啟動,暫時還沒想到其他方法,如有其他方法也請告訴我,我找到了也會更新blog

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM