RocketMq單個部署啟動及用例


一: 安裝rocketmq與啟動

虛擬機環境分配4G內存,linux centos7

下載連接 http://rocketmq.apache.org/dowloading/releases/

解壓后進入bin目錄,修改內存配置,否則啟動失敗(默認配置內存8G,內存不夠會啟動失敗)

# /home/admin/rocketmqall4.7.0/bin

vi
runbroker.sh
# 設置小些
# JAVA_OPT
="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn4g"

# 關閉防火牆或者自行開放端口,避免網絡不通
systemctl disable firewalld

# 后台啟動 nameserver 默認端口9876
nohup sh mqnamesrv &

# 后台啟動 mqbroker 默認端口 10911
nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true &

#踩坑記錄
#客戶端生產者使用時問題
#報錯說 no router info of this topic
#則通過手動創建 topic, 注意 參數格式 ip:port
sh mqadmin updateTopic -b localhost:10911 -t DemoTopic -n localhost:9876

#手動創建topic時 報錯 簽名算法問題(擴展包沒找到)
#rocketMQ:unable to calculate a request signature. error=Algorithm HmacSHA1 not available

cd ~/rocketmqall4.7.0/bin
vi tools.sh
#在 ${JAVA_HOME}/jre/lib/ext 后加上ext文件夾的絕對路徑,
#如
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:/usr/java/jdk1.8.0_65/jre/lib/ext"
#再次創建 topic.
#這樣就可以基本使用了

#rocketmq web控制台擴展在文末描述

 

二: 運行樣例測試下

引入客戶端依賴包

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.0</version>
        </dependency>

1、Producer端發送同步消息

這種可靠性同步地發送方式使用的比較廣泛,比如:重要的消息通知,短信通知。

public class MySyncProducer {
    public static void main(String[] args) throws Exception {
        // 實例化消息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer("my_SyncProducer");
        // 設置NameServer的地址
        producer.setNamesrvAddr("192.168.1.114:9876");
        // 啟動Producer實例
        producer.start();
        for (int i = 0; i < 10000; i++) {
            // 創建消息,並指定Topic,Tag和消息體
            Message msg = new Message("my_TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 發送消息到一個Broker
            SendResult sendResult = producer.send(msg);
            // 通過sendResult返回消息是否成功送達
            System.out.printf("%s%n", sendResult);
        }
        // 如果不再發送消息,關閉Producer實例。
        Thread.sleep(500000);
        producer.shutdown();
    }
}

 

 觀察產生的消息數據 跑了2次產生2w條消息

 

 

 2w條分布在4個消息隊列中

 

消費者消費

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        // 實例化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_SyncProducer");
        // 設置NameServer的地址
        consumer.setNamesrvAddr("192.168.1.114:9876");
        // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
        consumer.subscribe("my_TopicTest", "*");
        // 注冊回調實現類來處理從broker拉取回來的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 標記該消息已經被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者實例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

 

 消費詳情

 

 

 

2、Producer端發送異步消息

 異步消息通常用在對響應時間敏感的業務場景,即發送端不能容忍長時間地等待Broker的響應

public class AsyncProducer {
        public static void main(String[] args) throws Exception {
            // 實例化消息生產者Producer
            DefaultMQProducer producer = new DefaultMQProducer("my_AsyncProducer");
            // 設置NameServer的地址
            producer.setNamesrvAddr("192.168.1.114:9876");
            // 啟動Producer實例
            producer.start();
            producer.setRetryTimesWhenSendAsyncFailed(0);

            int messageCount = 1000;
            // 根據消息數量實例化倒計時計算器
            final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
            for (int i = 0; i < messageCount; i++) {
                final int index = i;
                // 創建消息,並指定Topic,Tag和消息體
                Message msg = new Message("my_asyncTopic",
                        "TagA",
                        "AUTO_CREATE_TOPIC_KEY",
                        "Hello world2".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback接收異步返回結果的回調
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index,
                                sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            }
            // 等待5s
            countDownLatch.await(5, TimeUnit.SECONDS);
            // 如果不再發送消息,關閉Producer實例。
            producer.shutdown();
        }
    }

3、Producer端單向發送消息

這種方式主要用在不特別關心發送結果的場景,例如日志發送。

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        // 實例化消息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer("my_OnewayProducer");
        // 設置NameServer的地址
        producer.setNamesrvAddr("192.168.1.114:9876");
        // 啟動Producer實例
        producer.start();
        for (int i = 0; i < 10000; i++) {
            // 創建消息,並指定Topic,Tag和消息體
            Message msg = new Message("my_OnewayTopic" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 發送單向消息,沒有任何返回結果
            producer.sendOneway(msg);

        }
        // 如果不再發送消息,關閉Producer實例。
        producer.shutdown();
    }
}

4、 消費消息

 消費指定生產組producerGroup及指定topic的消息

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        // 實例化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_SyncProducer");
        // 設置NameServer的地址
        consumer.setNamesrvAddr("192.168.1.114:9876");
        // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
        consumer.subscribe("my_TopicTest", "*");
        // 注冊回調實現類來處理從broker拉取回來的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 標記該消息已經被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者實例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

5、 順序消息樣例

消息生產到一個隊列那么FIFO,全局中都是有序的消費.如果是分片到多個隊列,每個隊列中都是有序的,分區有序.

順序消息的產生

public class OrderProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("my_OrderProducer");

        producer.setNamesrvAddr("192.168.1.114:9876");

        producer.start();

        String[] tags = new String[]{"TagA", "TagC", "TagD"};

        // 訂單列表
        List<OrderStep> orderList = new OrderProducer().buildOrders();

        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);
        for (int i = 0; i < 10; i++) {
            // 加個時間前綴
            String body = dateStr + " Hello RocketMQ " + orderList.get(i);
            Message msg = new Message("my_OrderProducer_Topic", tags[i % tags.length], "KEY" + i, body.getBytes());
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;  //根據訂單id選擇發送queue
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderList.get(i).getOrderId());//訂單id

            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));
        }

        producer.shutdown();
    }

    /**
     * 訂單的步驟
     */
    private static class OrderStep {
        private long orderId;
        private String desc;

        public long getOrderId() {
            return orderId;
        }

        public void setOrderId(long orderId) {
            this.orderId = orderId;
        }

        public String getDesc() {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc;
        }

        @Override
        public String toString() {
            return "OrderStep{" +
                    "orderId=" + orderId +
                    ", desc='" + desc + '\'' +
                    '}';
        }
    }

    /**
     * 生成模擬訂單數據
     */
    private List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("創建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("創建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("創建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }
}
OrderProducer

 

 

 順序消費消息

public class ConsumerInOrder {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_OrderProducer");
        consumer.setNamesrvAddr("192.168.1.114:9876");
        /**
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("my_OrderProducer_Topic", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {

            Random random = new Random();

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                int anInt = random.nextInt();
                for (MessageExt msg : msgs) {
                    // 可以看到每個queue有唯一的consume線程來消費, 訂單對每個queue(分區)有序
                    System.out.println(anInt+" consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                }
                try {
                    //模擬業務邏輯處理中...
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }

}
ConsumerInOrder

分區順序消費

 

 

 

6、延時消息樣例

先啟動消費者等待延時消息

public class ScheduledMessageConsumer {
    public static void main(String[] args) throws Exception {
        // 實例化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_ScheduledMessageConsumer");
        consumer.setNamesrvAddr("192.168.1.114:9876");
        // 訂閱Topics
        consumer.subscribe("my_Scheduled_Topic", "*");
        // 注冊消息監聽者
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // Print approximate delay time period
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者
        consumer.start();
    }
}
ScheduledMessageConsumer

發送延時消息

public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        // 實例化一個生產者來產生延時消息
        DefaultMQProducer producer = new DefaultMQProducer("my_ScheduledMessage");
        producer.setNamesrvAddr("192.168.1.114:9876");
        // 啟動生產者
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("my_Scheduled_Topic", ("Hello scheduled message " + i).getBytes());
            // 設置延時等級3,這個消息將在10s之后發送(現在只支持固定的幾個時間,詳看delayTimeLevel)
            message.setDelayTimeLevel(3);
            // 發送消息
            producer.send(message);
        }
        // 關閉生產者
        producer.shutdown();
    }
}
ScheduledMessageProducer

延遲時間結束后消息才放入隊列被消費者消費,消費時間比發送時間晚,

使用場景:如電商里,提交了一個訂單就可以發送一個延時消息,1h后去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。(redis中有個key失效時間,失效事件類似)

使用限制: 時間不是任意的,private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

7、消息事務樣例

事務消息共有三種狀態,提交狀態、回滾狀態、中間狀態:

  • TransactionStatus.CommitTransaction: 提交事務,它允許消費者消費此消息。
  • TransactionStatus.RollbackTransaction: 回滾事務,它代表該消息將被刪除,不允許被消費。
  • TransactionStatus.Unknown: 中間狀態,它代表需要檢查消息隊列來確定狀態。

創建事務性生產者

使用 TransactionMQProducer類創建生產者,並指定唯一的 ProducerGroup,就可以設置自定義線程池來處理這些檢查請求。執行本地事務后、需要根據執行結果對消息隊列進行回復。回傳的事務狀態是以上三種。

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("my_TransactionMQProducer");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        // 設置NameServer的地址
        producer.setNamesrvAddr("192.168.1.114:9876");
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                        new Message("my_transaction_msg_topic", tags[i % tags.length], "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
TransactionProducer

事務性消息的監聽

    static class TransactionListenerImpl implements TransactionListener{

        private AtomicInteger transactionIndex = new AtomicInteger(0);

        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            return LocalTransactionState.UNKNOW;
        }

        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
TransactionListenerImpl
事務消息使用上的限制
  1. 事務消息不支持延時消息和批量消息。
  2. 為了避免單個消息被檢查太多次而導致半隊列消息累積,我們默認將單個消息的檢查次數限制為 15 次,但是用戶可以通過 Broker 配置文件的 transactionCheckMax參數來修改此限制。如果已經檢查某條消息超過 N 次的話( N = transactionCheckMax ) 則 Broker 將丟棄此消息,並在默認情況下同時打印錯誤日志。用戶可以通過重寫 AbstractTransactionCheckListener 類來修改這個行為。
  3. 事務消息將在 Broker 配置文件中的參數 transactionMsgTimeout 這樣的特定時間長度之后被檢查。當發送事務消息時,用戶還可以通過設置用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個限制,該參數優先於 transactionMsgTimeout 參數。
  4. 事務性消息可能不止一次被檢查或消費。
  5. 提交給用戶的目標主題消息可能會失敗,目前這依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機制來保證,如果希望確保事務消息不丟失、並且事務完整性得到保證,建議使用同步的雙重寫入機制。
  6. 事務消息的生產者 ID 不能與其他類型消息的生產者 ID 共享。與其他類型的消息不同,事務消息允許反向查詢、MQ服務器能通過它們的生產者 ID 查詢到消費者

 

8、批量消息樣例

9、過濾消息樣例

0、OpenMessaging樣例

 

官方鏈接 https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md

 

三:RocketMQ-Console-Ng擴展控制台

下載 :  https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

修改配置文件 namesrvAddr地址后打包運行

mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.1.jar

訪問: http://localhost:8080/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM