RocketMQ 消息隊列單機部署及使用


 

轉載請注明來源:http://blog.csdn.net/loongshawn/article/details/51086876

相關文章:

0 RocketMQ簡單介紹

0.1 介紹

RocketMQ是一個消息中間件。

消息中間件中有兩個角色:消息生產者和消息消費者。RocketMQ里相同有這兩個概念。消息生產者負責創建消息並發送到RocketMQ服務器。RocketMQ服務器會將消息持久化到磁盤,消息消費者從RocketMQ服務器拉取消息並提交給應用消費。

 

0.2 特點

RocketMQ是一款分布式、隊列模型的消息中間件,具有下面特點:

  • 支持嚴格的消息順序
  • 支持Topic與Queue兩種模式
  • 億級消息堆積能力
  • 比較友好的分布式特性
  • 同一時候支持Push與Pull方式消費消息
  • 歷經多次天貓雙十一海量消息考驗

0.3 部署結構

這里寫圖片描寫敘述

上圖所看到的為RocketMQ的部署結構,圖中Meta字樣為RocketMQ早期代號。

1 RocketMQ 消息隊列單機部署

1.1 系統配置環境

主機:Linux 
內存:8G 
硬盤:250G 
CPU:4核 
這里寫圖片描寫敘述 
這里寫圖片描寫敘述

1.2 須要用到的軟件包和文檔

眼下在Github上可下載最新的安裝包alibaba-rocketmq-3.2.6.tar

下載地址:https://github.com/alibaba/RocketMQ

歷史版本號說明文檔:Metaq原理與應用.docx

備注:RocketMQ早起在淘寶內部叫Metaq,去年改名為RocketMQ。不過該文檔是針對歷史版本號的Metaq,僅供參考和熟悉一些概念。

1.3 服務器java環境

$java -version
java version "1.8.0_45" Java(TM) SE Runtime Environment (build 1.8.0_45-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)

1.4 rocketmq服務端安裝

解壓alibaba-rocketmq-3.2.6.tar

tar xvf alibaba-rocketmq-3.1.8.tar.gz -C /opt/

配置rocketmq的環境變量,在/etc/profile最后加入

export ROCKETMQ_HOME=/opt/alibaba-rocketmq
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH

這里寫圖片描寫敘述

使rocketmq的環境變量生效

source /etc/profile

給下列命令可運行權限

cd /opt/alibaba-rocketmq/bin/;
chmod +x mqadmin mqbroker mqfiltersrv mqshutdown  mqnamesrv

這里寫圖片描寫敘述

新建日志目錄

cd /opt/alibaba-rocketmq
mkdir log

這里寫圖片描寫敘述

啟動nameserver

nohup mqnamesrv 1>/opt/alibaba-rocketmq/log/ng.log 2>/opt/alibaba-rocketmq/log/ng-err.log &

查看啟動狀態

$ps aux|grep java
125233 12248 21.1 0.9 7151512 75844 pts/1 Sl 11:37 0:01 /opt/java/jdk1.8.0_45/bin/java -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC -verbose:gc -Xloggc:/home/xiaolong.xiao/rmq_srv_gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -Djava.ext.dirs=/opt/alibaba-rocketmq/bin/../lib -cp .:/opt/alibaba-rocketmq/bin/../conf:.:/opt/java/jdk1.8.0_45/lib/dt.jar:/opt/java/jdk1.8.0_45/lib/tools.jar com.alibaba.rocketmq.namesrv.NamesrvStartup

驗證nameserver是否啟動

$tail -f /opt/alibaba-rocketmq/log/ng.log
The Name Server boot success.

啟動broker,在啟動borker之前須要指定nameserver地址。當中10.125.1.186為所在服務器IP

export NAMESRV_ADDR=10.125.1.186:9876 nohup mqbroker >/opt/alibaba-rocketmq/log/mq.log &

驗證mqbroker是否啟動

tail -f /opt/alibaba-rocketmq/log/mq.log

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0 Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=320m; support was removed in 8.0 Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release. The broker[e010125001186.bja, 10.125.1.186:10911] boot success. and name server is 10.125.1.186:9876

最后配置防火牆 
nameserver端口為9876 
broker端口為10911

lokkit -p 9876:tcp -p 10911:tcp

關閉nameserver broker運行的命令

mqshutdown namesrv
mqshutdown broker

關閉nameserver

mqshutdown namesrv
The mqnamesrv(12248) is running... Send shutdown request to mqnamesrv(12248) OK

關閉broker

$mqshutdown broker
The mqbroker(13634) is running... Send shutdown request to mqbroker(13634) OK

成功安裝顯示結果: 
這里寫圖片描寫敘述

2 java客戶端使用RocketMQ 消息隊列

2.1 依賴配置

<!-- RocketMQ Java SDK -->
<dependency>
    <groupId>com.alibaba.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>3.2.6</version> </dependency>

2.2 創建生產者

用來獲取一個單例的生產者。

package com.autonavi.rocketmq.producer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; /** * @author dddd * @description 消息生產者 * @date 2016-04-07 */ public class Producer { /* * Constructs a client instance with your account for accessing DefaultMQProducer */ private static DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); private static int initialState = 0; private Producer() { } public static DefaultMQProducer getDefaultMQProducer(){ if(producer == null){ producer = new DefaultMQProducer("ProducerGroupName"); } if(initialState == 0){ producer.setNamesrvAddr("100.125.1.186:9876"); try { producer.start(); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); return null; } initialState = 1; } return producer; } }

2.3 創建消費者

用來獲取一個單例的消費者。

消費者相似於直接操作數據庫的對象,比方生產者下了訂單訂火車票。消費者就一直監聽。有訂單消息過來了,就去運行下訂單操作。

 

package com.autonavi.rocketmq.consumer; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; /** * @author dddd * @description 消息消費者 * @date 2016-04-07 */ public class Consumer { /* * Constructs a client instance with your account for accessing DefaultMQConsumer */ private static DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); private static int initialState = 0; private Consumer() { } public static DefaultMQPushConsumer getDefaultMQPushConsumer(){ if(consumer == null){ consumer = new DefaultMQPushConsumer("ConsumerGroupName"); } if(initialState == 0){ consumer.setNamesrvAddr("100.125.1.186:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); initialState = 1; } return consumer; } }

2.4 創建生產和消費服務

package com.autonavi.rocketmq.service; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.remoting.exception.RemotingException; import com.autonavi.rocketmq.consumer.Consumer; import com.autonavi.rocketmq.producer.Producer; public class Test { private static final Logger logger = LoggerFactory.getLogger(Test.class); public static void main(String[] args){ sendMsg(); } public static void sendMsg(){ // 獲取消息生產者 DefaultMQProducer producer = Producer.getDefaultMQProducer(); try { for(int i=0;i<2000;i++){ Message msg = new Message( "TopicTest1", // topic "TagA", // tag "OrderID00"+i, // key ("Hello MetaQ"+i).getBytes()); // body SendResult sendResult = producer.send(msg); //logger.info("sendResult:{}", sendResult); } } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (RemotingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (MQBrokerException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } producer.shutdown(); } public static void receiveMsg(){ // 獲取消息生產者 DefaultMQPushConsumer consumer = Consumer.getDefaultMQPushConsumer(); // 訂閱主體 try { consumer.subscribe("TopicTest1", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * * 默認msgs里唯獨一條消息,能夠通過設置consumeMessageBatchMaxSize參數來批量接收消息 */ public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { logger.info("currentThreadName:{} and Receive New Messages:{}",Thread.currentThread().getName(),msgs); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicTest1")) { // 運行TopicTest1的消費邏輯 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 運行TagA的消費 logger.info("MsgBody:{}",new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("TagC")) { // 運行TagC的消費 } else if (msg.getTags() != null && msg.getTags().equals("TagD")) { // 運行TagD的消費 } } else if (msg.getTopic().equals("TopicTest2")) { // 運行TopicTest2的消費邏輯 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer對象在使用之前必須要調用start初始化。初始化一次就可以<br> */ consumer.start(); logger.info("Consumer Started."); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }

2.5 測試效果

2.5.1 生產100個消息

2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C286, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=0], queueOffset=617] 2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C31B, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=1], queueOffset=616] 2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C3B0, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=2], queueOffset=614] 2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C445, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=3], queueOffset=614] 2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C4DA, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=0], queueOffset=618] 2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C56F, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=1], queueOffset=617] 2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C604, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=2], queueOffset=615] 2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C699, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=3], queueOffset=615] 2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C72E, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=0], queueOffset=619] 2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C7C3, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=1], queueOffset=618] 2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C858, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=2], queueOffset=616] 2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C8EF, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=3], queueOffset=616] 2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C986, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=0], queueOffset=620] 2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005CA1D, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=1], queueOffset=619] ...

這里寫圖片描寫敘述

2.5.2 消費100個消息

2016-04-07-16-04 [main] [com.autonavi.rocketmq.service.Test] [INFO] - Consumer Started. 2016-04-07-16-04 [ConsumeMessageThread_11] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_11 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=618, sysFlag=0, bornTimestamp=1460016115897, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115856, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005CDA7, commitLogOffset=380327, bodyCRC=901334138, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0019, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]] 2016-04-07-16-04 [ConsumeMessageThread_8] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_8 and Receive New Messages:[MessageExt [queueId=3, storeSize=149, queueOffset=615, sysFlag=0, bornTimestamp=1460016115722, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115680, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C699, commitLogOffset=378521, bodyCRC=260218519, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID007, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=12]]] 2016-04-07-16-04 [ConsumeMessageThread_9] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_9 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=616, sysFlag=0, bornTimestamp=1460016115773, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115734, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C8EF, commitLogOffset=379119, bodyCRC=996330568, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0011, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]] 2016-04-07-16-04 [ConsumeMessageThread_3] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_3 and Receive New Messages:[MessageExt [queueId=3, storeSize=149, queueOffset=614, sysFlag=0, bornTimestamp=1460016115669, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115629, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C445, commitLogOffset=377925, bodyCRC=149904014, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID003, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=12]]] 2016-04-07-16-04 [ConsumeMessageThread_12] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_12 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=619, sysFlag=0, bornTimestamp=1460016115951, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115911, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005D003, commitLogOffset=380931, bodyCRC=2118254247, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0023, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]] 2016-04-07-16-04 [ConsumeMessageThread_11] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ19 2016-04-07-16-04 [ConsumeMessageThread_1] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_1 and Receive New Messages:[MessageExt [queueId=1, storeSize=149, queueOffset=616, sysFlag=0, bornTimestamp=1460016115635, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115594, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C31B, commitLogOffset=377627, bodyCRC=1726036898, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID001, WAIT=true, MAX_OFFSET=641, MIN_OFFSET=0}, body=12]]] 2016-04-07-16-04 [ConsumeMessageThread_1] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ1 2016-04-07-16-04 [ConsumeMessageThread_18] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_18 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=625, sysFlag=0, bornTimestamp=1460016116319, bornHost=/30.85.231.35:58198, storeTimestamp=1460016116278, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005DE2B, commitLogOffset=384555, bodyCRC=796302648, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0047, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]] 2016-04-07-16-04 [ConsumeMessageThread_18] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ47 2016-04-07-16-04 [ConsumeMessageThread_4] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_4 and Receive New Messages:[MessageExt [queueId=2, storeSize=149, queueOffset=614, sysFlag=0, bornTimestamp=1460016115648, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115608, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C3B0, commitLogOffset=377776, bodyCRC=2145937944, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID002, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=12]]] 2016-04-07-16-04 [ConsumeMessageThread_4] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ2 2016-04-07-16-04 [ConsumeMessageThread_20] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_20 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=627, sysFlag=0, bornTimestamp=1460016116436, bornHost=/30.85.231.35:58198, storeTimestamp=1460016116393, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005E2E3, commitLogOffset=385763, bodyCRC=1482935637, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0055, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]] 2016-04-07-16-04 [ConsumeMessageThread_20] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ55 2016-04-07-16-04 [ConsumeMessageThread_2] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_2 and Receive New Messages:[MessageExt [queueId=0, storeSize=149, queueOffset=617, sysFlag=0, bornTimestamp=1460016115587, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115577, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C286, commitLogOffset=377478, bodyCRC=300288820, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID000, WAIT=true, MAX_OFFSET=642, MIN_OFFSET=0}, body=12]]] 2016-04-07-16-04 [ConsumeMessageThread_2] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ0 ...

這里寫圖片描寫敘述

3 總結

本文僅供剛開始學習的人學習怎樣使用RocketMQ,眼下不過單機配置,還沒有涉及到集群等配置,興許會不斷學習和記錄。過程中有不正確的地方歡迎吐槽呢。

4 參考文檔


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM