從 RocketMQ環境搭建_1 我們已經建立了MQ的Server,接下來就是簡單的生產和消費的過程。
1. rocketMQ的源碼中有個示例代碼example ,我們從Apache官網中可以下載源碼source找到example,進行學習。
下載地址:http://rocketmq.apache.org/docs/quick-start/
建立簡單的工程,mvn最主要依賴client
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.1</version> </dependency>
在做transactionProducer時,發現無法消費,問題是需要依賴parent,因此借鑒demo中的mvn依賴:
<parent> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>4.3.0</version> </parent> <modelVersion>4.0.0</modelVersion> <packaging>jar</packaging> <artifactId>rocketmq-example</artifactId> <name>rocketmq-example ${project.version}</name> <dependencies> <dependency> <groupId>${project.groupId}</groupId> <artifactId>rocketmq-client</artifactId> </dependency> <dependency> <groupId>${project.groupId}</groupId> <artifactId>rocketmq-srvutil</artifactId> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> </dependency> <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> </dependency> <dependency> <groupId>io.openmessaging</groupId> <artifactId>openmessaging-api</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-openmessaging</artifactId> <version>4.3.0</version> </dependency> </dependencies>
創建Producer類:
/** * 簡單生產者 * * @author DennyZhao * @since 2018/10/29 * @version 1.0 */ public class Producer { /** * main方法 * * @param args * @throws InterruptedException * @throws MQBrokerException * @throws RemotingException * @throws MQClientException * @throws UnsupportedEncodingException */ public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException { //創建生產者實例,並確定生產組 DefaultMQProducer producer = new DefaultMQProducer("fruitProducerGroup"); // 指定服務NameServer服務 producer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;"); // 生產者啟動 producer.start(); String[] fruitArray = { "apple", "strawbarry", "pear", "banana", "orange" }; for (String fruit : fruitArray) { // 創建消息 Message message = new Message("fruit", "common", fruit.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 發送消息 SendResult result = producer.send(message); SendStatus sendStatus = result.getSendStatus(); // 獲取回執 System.out.println(result); if (sendStatus == SendStatus.SEND_OK) { System.out.println("信息發送成功!"); } else { System.out.println("信息發送失敗!"); } } // 關閉生產者 producer.shutdown(); } }
創建Consumer類:
/** * 消費者群體 * @author DennyZhao * @since 2018/10/29 * @version 1.0 */ public class Consumer { public static void main(String[] args) throws MQClientException { //創建消費者實例和組 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("fruitConsumerGroup"); // 指定nameServer服務地址 consumer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;"); // 訂閱消費Topic consumer.subscribe("fruit", "*"); // 訂閱從何地方開始讀(先進先出,還是先進后出) consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 添加監聽 consumer.registerMessageListener(new MessageListenerConcurrently() { //獲取數據,防止一次獲取太多無法消化,可一次取單個條數。 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext arg1) { if(msgList != null && msgList.size() > 0) { MessageExt msg = msgList.get(0); System.out.println(msg); try { System.out.println(new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者 consumer.start(); } }
※注意:啟動最好先啟動消費者然后再啟動服務者。
從RocketMQ-console中可以看到創建的消費者群和生產者群。至此,簡單的生產消費就算大功告成。
配置說明:
1.producer-生產者類型
- 1. NormalProducer (普通生產者)
- 2. OrderProducer (嚴格順序生產者,例如:訂單創建,付款,發貨等)
- 3. TransactionProducer (事務生產者)
msgId和msgKey非常關鍵:msgId是mq自動生成的,可在控制台message中查找數據。
msgKey大多是業務主鍵key,用於跟蹤數據,比如訂單號等。
msg中有個很重要的屬性:在producer端放置:msg.putUserProperty([key], [value]); //是個map,可放內容,在consumer端獲取
主要可選參數:
// 設置超過多大進行compress壓縮 producer.setCompressMsgBodyOverHowmuch(1024 * 10); // 設置發送失敗的嘗試次數。 producer.setRetryTimesWhenSendFailed(3); // 設置如果返回值不是send_ok,是否要重新發送 producer.setRetryAnotherBrokerWhenNotStoreOK(false); // 設置限制最大的文件大小 producer.setMaxMessageSize(1024*50); // 設置默認主題對應的隊列數 producer.setDefaultTopicQueueNums(4); //創建新的topic producer.createTopic("1121", "vegetables", 4); // 設置發送超時時間 ms producer.setSendMsgTimeout(1000);
OrderProducer 采用將有序內容放在單個queue,保證消費的順序進行。可參見示例中的order代碼。
Producer類不同點展示,發送消息:
// 發送消息 SendResult result = producer.send(message, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> msgList, Message message, Object queueId) { return msgList.get(Integer.valueOf(queueId.toString())); }}, 0); //這個0表示將這些msg放入到隊列0中
Consumer類不同點展示,接受消息:
// 添加監聽 consumer.registerMessageListener(new MessageListenerOrderly() { //獲取數據,防止一次獲取太多無法消化,可一次取單個條數。 public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext arg1) { if(msgList != null && msgList.size() > 0) { MessageExt msg = msgList.get(0); System.out.println(msg); System.out.println(msg.getBody()); } return ConsumeOrderlyStatus.SUCCESS; } });
TransactionProducer :
用於解決事務同步,尤其是金融方面,在單體應用中我們可以通過數據庫事務來控制,但大的電商系統表和表可能分屬於不同的數據庫,數據庫事務則失效。
比如微信轉銀行操作:我們從銀行扣款100元到微信,如果微信增加100元,而此時出現問題,銀行扣款沒有成功這樣理論上是要回滾微信的增加100元。
一般分布式采用2pc(2 phase commit)模式(兩階段提交協議:預留和確認),安全性高,但是因為長連接導致長時間等待。
而RocketMQ采用兩階段補償型,TCC(Try-Confirm-Cancel)的簡稱。
應用場景:買2張票(春運回家,從天津->上海->武漢),先買預留,然后在規定時間內付款則commit,否則過期后rollback.(無論哪個)
異常回溯:在3.2.6+版本的非商業版已經取消,
需要手動回查參見別人的文章RocketMQ事務消息回查設計方案
常對應參數:
/** * 做數據反查輪詢用 UN KNOW,時會用到反查目前已經 deprecated */ producer.setCheckThreadPoolMaxSize(5); producer.setCheckThreadPoolMinSize(2);
producer.setCheckRequestHoldMax(200);//回查最大數
生產者修改:添加
sendMessage變化:TransactionSendResult
producer變化: TransactionMQProducer
添加監聽: TransactionListener
因非商業3.2.6取消回查:因此 producer.setExecutorService(executorService);沒有作用,本來是用於開啟多線程進行回查用
/** * 事務生產者 * * @author DennyZhao * @since 2018/10/31 * @version 1.0 */ public class Producer { /** * main方法 * * @param args * @throws InterruptedException * @throws MQBrokerException * @throws RemotingException * @throws MQClientException * @throws UnsupportedEncodingException */ public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException { //創建生產者實例,並確定生產組 TransactionMQProducer producer = new TransactionMQProducer("transProducerGroup"); TransactionListener transListener = new FruitTransactionListener(); // 指定服務NameServer服務 producer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;"); // // 設置超過多大進行compress壓縮 // producer.setCompressMsgBodyOverHowmuch(1024 * 10); // // 設置發送失敗的嘗試次數。 // producer.setRetryTimesWhenSendFailed(3); // // 設置如果返回值不是send_ok,是否要重新發送 // producer.setRetryAnotherBrokerWhenNotStoreOK(true); // // 設置限制最大的文件大小 // producer.setMaxMessageSize(1024*50); // // 設置默認主題對應的隊列數 // producer.setDefaultTopicQueueNums(4); // // 設置發送超時時間 ms // producer.setSendMsgTimeout(1000); /** * 做數據反查輪詢用 */ // producer.setCheckThreadPoolMaxSize(5); // producer.setCheckThreadPoolMinSize(2); // producer.setCheckRequestHoldMax(200);//回查最大數 ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { public Thread newThread(Runnable r) { Thread th = new Thread(r); th.setName("client-transaction-msg-check-thread"); System.out.println("id:" + th.getId()); System.out.println("name:" + th.getName()); return th; } }); producer.setExecutorService(executorService); /** * 添加監聽 */ producer.setTransactionListener(transListener); // 生產者啟動 producer.start(); String[] fruitArray = { "apple-蘋果", "strawbarry-草莓", "pear-梨子", "banana-香蕉", "orange-橘子"}; for (String fruit : fruitArray) { // 創建消息 Message message = new Message("transactionFruit", "common", "key"+fruit, fruit.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 發送消息 TransactionSendResult result = producer.sendMessageInTransaction(message, "abcd"); Thread.sleep(10); SendStatus sendStatus = result.getSendStatus(); // 獲取回執 System.out.println(result); if (sendStatus == SendStatus.SEND_OK) { System.out.println("信息發送成功!"); } else { System.out.println("信息發送失敗!"); } } int j = 0; while(j <500) { Thread.sleep(1000); j++; } // 關閉生產者 producer.shutdown(); }
監聽:
因回查被取消因此:checkLocalTransaction(MessageExt msg)沒有作用了,所以如果
LocalTransactionState.UNKNOW 將無法處理,會使得topic一直處於不顯示狀態。
/** * 事務執行監聽 * @author DennyZhao * */ public class FruitTransactionListener implements TransactionListener { /** * 執行事務,事務成功commit,不成功rollback,未知unknown * msg Message * arg 附加參數,用於處理傳遞內容加以判斷,使用 * */ public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println((String) arg); // 執行事務處 // 假設以水果入庫為例:蘋果,香蕉 commit,梨子 rollback, 橘子和草莓不知道怎么處理 System.out.println(msg + "---executeLocal"); System.out.println(msg.getTransactionId()); String fruit = ""; try { fruit = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return LocalTransactionState.ROLLBACK_MESSAGE; } if(StringUtils.contains(fruit, "蘋果") || StringUtils.contains(fruit, "香蕉")) {//提交 return LocalTransactionState.COMMIT_MESSAGE; }else if(StringUtils.contains(fruit, "梨")) {//回滾 return LocalTransactionState.ROLLBACK_MESSAGE; }else { //通過輪詢去處理 return LocalTransactionState.UNKNOW; } } /** * 輪詢反查,對於unknow的內容,進行反查獲取結果 */ public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println(msg + "---checkAgain"); String fruit = ""; try { fruit = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return LocalTransactionState.ROLLBACK_MESSAGE; } //從庫中反查知道庫中 存在葡萄,不存在橘子 if(StringUtils.contains(fruit, "葡萄")) {//提交 return LocalTransactionState.COMMIT_MESSAGE; }else if(StringUtils.contains(fruit, "橘子")) {//回滾 return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.UNKNOW; }
錯誤說明:
1. No route info of this topic
因在啟動broker時,參數中未設置可自動創建topic,因此生產者創建topic被認為不合法,需要在console中先創建topic,或者服務端先創建topic。
2. transactionProducer 生產后無法消費
mvn依賴中缺少 <parent>rocketmq-all</parent>導致。
3. 事務回查無效
版本 3.2.6 后rocketmq取消了事務回查機制,如果丟失需要自己手動通過key值回查.