1、下載rocketmq二進制包 rocketmq-all-4.3.2-bin-release.zip 並解壓到指定目錄
2、配置rocketmq環境變量
ROCKETMQ_HOME=rocketmq解壓目錄
3、啟動nameserver
在ROCKETMQ_HOME/bin目錄下雙擊運行mqnamesrv.cmd,出現如下信息表示啟動成功,保持命令窗口開啟(若窗口一閃而過,說明沒有配置環境變量,請先配置環境變量)
4、啟動broker
方法一:開啟另一個windows終端cmd,進入ROCKETMQ_HOME/bin目錄,先輸入set NAMESRV_ADDR=127.0.0.1:9876
設置環境變量,輸入mqbroker回車
啟動broker,保持mqbroker運行,不要關閉這個終端。
方法二:開啟另一個windows終端cmd,進入ROCKETMQ_HOME/bin目錄,也可一步輸入mqbroker -n 127.0.0.1:9876
啟動broker,保持mqbroker運行,不要關閉這個終端
5、編寫producer和consumer代碼
pom依賴
<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build>
RocketMQConsumer.java
import java.util.UUID; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.MessageListener; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; public class RocketMQConsumer { private DefaultMQPushConsumer consumer; private MessageListener listener; protected String nameServer; protected String groupName; protected String topics; public RocketMQConsumer(MessageListener listener, String nameServer, String groupName, String topics) { this.listener = listener; this.nameServer = nameServer; this.groupName = groupName; this.topics = topics; } public void init() { consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(nameServer); try { consumer.subscribe(topics, "*"); } catch (MQClientException e) { e.printStackTrace(); } consumer.setInstanceName(UUID.randomUUID().toString()); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.registerMessageListener((MessageListenerConcurrently) this.listener); try { consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } System.out.println("RocketMQConsumer Started! group=" + consumer.getConsumerGroup() + " instance=" + consumer.getInstanceName() ); } }
RocketMQListener.java
import java.util.List; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; public class RocketMQListener implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // System.out.println("get data from rocketMQ:" + msgs); for (MessageExt message : msgs) { String msg = new String(message.getBody()); System.out.println("msg data from rocketMQ:" + msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
RocketMQProducer.java
import java.util.UUID; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.Message; public class RocketMQProducer { private DefaultMQProducer sender; protected String nameServer; protected String groupName; protected String topics; public void init() { sender = new DefaultMQProducer(groupName); sender.setNamesrvAddr(nameServer); sender.setInstanceName(UUID.randomUUID().toString()); try { sender.start(); } catch (MQClientException e) { e.printStackTrace(); } } public RocketMQProducer(String nameServer, String groupName, String topics) { this.nameServer = nameServer; this.groupName = groupName; this.topics = topics; } public void send(Message message) { message.setTopic(topics); try { SendResult result = sender.send(message); SendStatus status = result.getSendStatus(); System.out.println("messageId=" + result.getMsgId() + ", status=" + status); } catch (Exception e) { e.printStackTrace(); } } }
RocketMQConsumerTest.java
public class RocketMQConsumerTest { public static void main(String[] args) { String mqNameServer = "127.0.0.1:9876"; String mqTopics = "MQ-MSG-TOPICS-TEST"; String consumerMqGroupName = "CONSUMER-MQ-GROUP"; RocketMQListener mqListener = new RocketMQListener(); RocketMQConsumer mqConsumer = new RocketMQConsumer(mqListener, mqNameServer, consumerMqGroupName, mqTopics); mqConsumer.init(); try { Thread.sleep(1000 * 60L); } catch (InterruptedException e) { e.printStackTrace(); } } }
RocketMQProducerTest.java
import org.apache.rocketmq.common.message.Message; public class RocketMQProducerTest { public static void main(String[] args) { String mqNameServer = "127.0.0.1:9876"; String mqTopics = "MQ-MSG-TOPICS-TEST"; String producerMqGroupName = "PRODUCER-MQ-GROUP"; RocketMQProducer mqProducer = new RocketMQProducer(mqNameServer, producerMqGroupName, mqTopics); mqProducer.init(); for (int i = 0; i < 5; i++) { Message message = new Message(); message.setBody(("I send message to RocketMQ " + i).getBytes()); mqProducer.send(message); } } }
運行RocketMQConsumerTest,輸出如下
運行RocketMQProducerTest,輸出如下
2、Rocketmq事務消息
Half(Prepare) Message
指的是暫不能投遞的消息,發送方已經將消息成功發送到了 MQ 服務端,但是服務端未收到生產者對該消息的二次確認,此時該消息被標記成“暫不能投遞”狀態,處於該種狀態下的消息即半消息。
消息回查
由於網絡閃斷、生產者應用重啟等原因,導致某條事務消息的二次確認丟失,MQ 服務端通過掃描發現某條消息長期處於“半消息”時,需要主動向消息生產者詢問該消息的最終狀態(Commit 或是 Rollback),該過程即消息回查。
執行流程圖
1.發送方向 MQ 服務端發送消息。
2.MQ Server 將消息持久化成功之后,向發送方 ACK 確認消息已經發送成功,此時消息為半消息。
3.發送方開始執行本地事務邏輯。
4.發送方根據本地事務執行結果向 MQ Server 提交二次確認(Commit 或是 Rollback),MQ Server 收到 Commit 狀態則將半消息標記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態則刪除半消息,訂閱方將不會接受該消息。
5.在斷網或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達 MQ Server,經過固定時間后 MQ Server 將對該消息發起消息回查。
6.發送方收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。
7.發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,MQ Server 仍按照步驟4對半消息進行操作。
事務消息發送對應步驟1、2、3、4,事務消息回查對應步驟5、6、7。
使用示例代碼
事務消息的生產者為TransactionMQProducer實例,我們需要編寫事務監聽器TransactionListener的實現類,並實現
executeLocalTransaction和checkLocalTransaction方法,並為事務消息的生產者綁定事務監聽器,發送消息時調用
sendMessageInTransaction方法,此時會向rocketmq服務器發送一條消息,
本地事務執行邏輯寫在executeLocalTransaction方法里,該方法返回本地事務執行的狀態,rocketmq服務器根據返回
的狀態值決定該事務消息如何處理:
1、如果return LocalTransactionState.COMMIT_MESSAGE ,rocketmq服務器則會將該事務消息標記為可投遞
2、如果return LocalTransactionState.ROLLBACK_MESSAGE ,rocketmq服務器則會將該事務消息丟棄
3、如果return LocalTransactionState.UNKNOW ,rocketmq服務器對該事務消息不做任何操作,該事務消息依然為半消息
事務監聽器 TransactionListenerImpl.java
import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; //事務監聽器實現類 public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //執行本地事務 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println(sdf.format(new Date()) + "============== 執行本地事務 ============"); // int value = transactionIndex.getAndIncrement(); // int status = value % 3; // localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.ROLLBACK_MESSAGE; // return LocalTransactionState.UNKNOW; // return LocalTransactionState.COMMIT_MESSAGE; } //檢查本地事務 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println(sdf.format(new Date()) + "============== 消息回查,檢查本地事務 ============"); Integer status = localTrans.get(msg.getTransactionId()); if(null != status) { switch(status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; default: return LocalTransactionState.COMMIT_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }
TransactionProducer.java
import java.util.UUID; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; public class TransactionProducer { private static final String mqNameServer = "127.0.0.1:9876"; private static final String mqTopics = "MQ-MSG-TOPICS-TEST"; private static final String producerMqGroupName = "PRODUCER-MQ-GROUP"; private static final TransactionListener transactionListener = new TransactionListenerImpl(); public static void main(String[] args) throws MQClientException { //創建消息生產者實例 TransactionMQProducer mqProducer = new TransactionMQProducer(producerMqGroupName); mqProducer.setNamesrvAddr(mqNameServer); mqProducer.setInstanceName(UUID.randomUUID().toString()); mqProducer.setTransactionListener(transactionListener); //啟動生產者 mqProducer.start(); //發送消息 Message msg = new Message(mqTopics, "Msg456".getBytes()); SendResult result = mqProducer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", result); } }
拉模式消息消費
PullConsumer.java
import java.util.List; import java.util.Set; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; public class PullConsumer { private static final String consumerMqGroupName = "CONSUMER-MQ-GROUP"; private static final String mqTopics = "MQ-MSG-TOPICS-TEST"; private static final String mqNameServer = "127.0.0.1:9876"; public static void main(String[] args) throws MQClientException { //創建拉模式的消息消費者 DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerMqGroupName); consumer.setNamesrvAddr(mqNameServer); //啟動消息消費者 consumer.start(); //獲取指定topic下的所有消息隊列(發送消息到指定topic時消息會被隨機發送到不同的消息隊列) Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(mqTopics); for (MessageQueue mq : mqs) { if(mq.getQueueId() == 0) { System.out.printf("Consume from the queue: %s%n", mq); try { /* * pullBlockIfNotFound方法從指定消息隊列中拉取消息 * 第一個參數 : 指定消息隊列 * 第二個參數 : * 第三個參數 : 指定從消息隊列中讀取消息的起始位置offset * 第四個參數 : 指定從消息隊列中讀取消息的個數 */ PullResult result = consumer.pullBlockIfNotFound(mq, null, 0, 10); //從消息拉取結果中獲取找到的消息集 List<MessageExt> msgFoundList = result.getMsgFoundList(); for(MessageExt msg : msgFoundList) { System.out.println("收到消息 " + new String(msg.getBody())); } } catch (Exception e) { e.printStackTrace(); } } } } }