一、三種消費 :1.普通消費 2. 順序消費 3.事務消費
1.1 順序消費:在網購的時候,我們需要下單,那么下單需要假如有三個順序,第一、創建訂單 ,第二:訂單付款,第三:訂單完成。也就是這個三個環節要有順序,這個訂單才有意義。RocketMQ可以保證順序消費,他的實現是生產者(一個生產者可以對多個主題去發送消息)將這個三個消息放在topic(一個topic默認有4個隊列)的一個隊列里面,單機支持上萬個持久化隊列,消費端去消費的時候也是只能有一個Consumer去取得這個隊列里面的數據,然后順序消費。
單個節點(Producer端1個、Consumer端1個)
Producer端
package order; import java.util.List; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * Producer,發送順序消息 */ public class Producer { public static void main(String[] args) { try { DefaultMQProducer producer = new DefaultMQProducer("order_Producer"); producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); producer.start(); // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", // "TagE" }; for (int i = 1; i <= 5; i++) { Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 0); System.out.println(sendResult); } producer.shutdown(); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
Consumer端代碼
package order; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; /** * 順序消息消費,帶事務方式(應用可控制Offset什么時候提交) */ public class Consumer1 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); /** * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> * 如果非第一次啟動,那么按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicOrderTest", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 設置自動提交 context.setAutoCommit(true); for (MessageExt msg : msgs) { System.out.println(msg + ",內容:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } ; return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer1 Started."); } }
結果如下圖所示:
這個五條數據被順序消費了
多個節點(Producer端1個、Consumer端2個)
Producer端代碼:
package order; import java.util.List; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * Producer,發送順序消息 */ public class Producer { public static void main(String[] args) { try { DefaultMQProducer producer = new DefaultMQProducer("order_Producer"); producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); producer.start(); // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", // "TagE" }; for (int i = 1; i <= 5; i++) { Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 0); System.out.println(sendResult); } for (int i = 1; i <= 5; i++) { Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 1); System.out.println(sendResult); } for (int i = 1; i <= 5; i++) { Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 2); System.out.println(sendResult); } producer.shutdown(); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
Consumer1
/** * 順序消息消費,帶事務方式(應用可控制Offset什么時候提交) */ public class Consumer1 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); /** * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> * 如果非第一次啟動,那么按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicOrderTest", "*"); /** * 實現了MessageListenerOrderly表示一個隊列只會被一個線程取到 *,第二個線程無法訪問這個隊列 */ consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 設置自動提交 context.setAutoCommit(true); for (MessageExt msg : msgs) { System.out.println(msg + ",內容:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } ; return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer1 Started."); } }
Consumer2
/** * 順序消息消費,帶事務方式(應用可控制Offset什么時候提交) */ public class Consumer2 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); /** * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> * 如果非第一次啟動,那么按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicOrderTest", "*"); /** * 實現了MessageListenerOrderly表示一個隊列只會被一個線程取到 *,第二個線程無法訪問這個隊列 */ consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 設置自動提交 context.setAutoCommit(true); for (MessageExt msg : msgs) { System.out.println(msg + ",內容:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } ; return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer2 Started."); } }
先啟動Consumer1和Consumer2,然后啟動Producer,Producer會發送15條消息
Consumer1消費情況如圖,都按照順序執行了
Consumer2消費情況如圖,都按照順序執行了
二、事務消費
這里說的主要是分布式事物。下面的例子的數據庫分別安裝在不同的節點上。
事物消費需要先說說什么是事物。比如說:我們跨行轉賬,從工商銀行轉到建設銀行,也就是我從工商銀行扣除1000元之后,我的建設銀行也必須加1000元。這樣才能保證數據的一致性。假如工商銀行轉1000元之后,建設銀行的服務器突然宕機,那么我扣除了1000,但是並沒有在建設銀行給我加1000,就出現了數據的不一致。因此加1000和減1000才行,減1000和減1000必須一起成功,一起失敗。
再比如,我們進行網購的時候,我們下單之后,訂單提交成功,倉庫商品的數量必須減一。但是訂單可能是一個數據庫,倉庫數量可能又是在另個數據庫里面。有可能訂單提交成功之后,倉庫數量服務器突然宕機。這樣也出現了數據不一致的問題。
使用消息隊列來解決分布式事物:
現在我們去外面飯店吃飯,很多時候都不會直接給了錢之后直接在付款的窗口遞飯菜,而是付款之后他會給你一張小票,你拿着這個小票去出飯的窗口取飯。這里和我們的系統類似,提高了吞吐量。即使你到第二個窗口,師傅告訴你已經沒飯了,你可以拿着這個憑證去退款,即使中途由於出了意外你無法到達窗口進行取飯,但是只要憑證還在,可以將錢退給你。這樣就保證了數據的一致性。
如何保證憑證(消息)有2種方法:
1、在工商銀行扣款的時候,余額表扣除1000,同時記錄日志,而且這2個表是在同一個數據庫實例中,可以使用本地事物解決。然后我們通知建設銀行需要加1000給該用戶,建設銀行收到之后給我返回已經加了1000給用戶的確認信息之后,我再標記日志表里面的日志為已經完成。
2、通過消息中間件
RocketMQ第一階段發送Prepared消息時,會拿到消息的地址,第二階段執行本地事物,第三階段通過第一階段拿到的地址去訪問消息,並修改消息的狀態。
細心的你可能又發現問題了,如果確認消息發送失敗了怎么辦?RocketMQ會定期掃描消息集群中的事物消息,如果發現了Prepared消息,它會向消息發送端(生產者)確認,Bob的錢到底是減了還是沒減呢?如果減了是回滾還是繼續發送確認消息呢?RocketMQ會根據發送端設置的策略來決定是回滾還是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。
例子:
Consumer 端
package transaction; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; /** * Consumer,訂閱消息 */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); consumer.setConsumeMessageBatchMaxSize(10); /** * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> * 如果非第一次啟動,那么按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTransactionTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { System.out.println(msg + ",內容:" + new String(msg.getBody())); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功 } }); consumer.start(); System.out.println("transaction_Consumer Started."); } }
Producer端
package transaction; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.client.producer.TransactionMQProducer; import com.alibaba.rocketmq.common.message.Message; /** * 發送事務消息例子 * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("transaction_Producer"); producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); // 事務回查最小並發數 producer.setCheckThreadPoolMinSize(2); // 事務回查最大並發數 producer.setCheckThreadPoolMaxSize(2); // 隊列數 producer.setCheckRequestHoldMax(2000); producer.setTransactionCheckListener(transactionCheckListener); producer.start(); // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" // }; TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); for (int i = 1; i <= 2; i++) { try { Message msg = new Message("TopicTransactionTest", "transaction" + i, "KEY" + i, ("Hello RocketMQ " + i).getBytes()); SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); System.out.println(sendResult); Thread.sleep(10); } catch (MQClientException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } }
TransactionExecuterImpl --執行本地事務
package transaction; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.common.message.Message; /** * 執行本地事務 */ public class TransactionExecuterImpl implements LocalTransactionExecuter { // private AtomicInteger transactionIndex = new AtomicInteger(1); public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { System.out.println("執行本地事務msg = " + new String(msg.getBody())); System.out.println("執行本地事務arg = " + arg); String tags = msg.getTags(); if (tags.equals("transaction2")) { System.out.println("======我的操作============,失敗了 -進行ROLLBACK"); return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; // return LocalTransactionState.UNKNOW; } }
TransactionCheckListenerImpl--未決事務,服務器回查客戶端(目前已經被閹割啦)
package transaction; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.common.message.MessageExt; /** * 未決事務,服務器回查客戶端 */ public class TransactionCheckListenerImpl implements TransactionCheckListener { // private AtomicInteger transactionIndex = new AtomicInteger(0); //在這里,我們可以根據由MQ回傳的key去數據庫查詢,這條數據到底是成功了還是失敗了。 public LocalTransactionState checkLocalTransactionState(MessageExt msg) { System.out.println("未決事務,服務器回查客戶端msg =" + new String(msg.getBody().toString())); // return LocalTransactionState.ROLLBACK_MESSAGE; return LocalTransactionState.COMMIT_MESSAGE; // return LocalTransactionState.UNKNOW; } }
producer端:發送數據到MQ,並且處理本地事物。這里模擬了一個成功一個失敗。Consumer只會接收到本地事物成功的數據。第二個數據失敗了,不會被消費。
Consumer只會接收到一個,第二個數據不會被接收到
原文鏈接:https://blog.csdn.net/u010634288/article/details/57158374