RocketMQ的順序消費和事務消費


一、三種消費 :1.普通消費 2. 順序消費 3.事務消費

1.1  順序消費:在網購的時候,我們需要下單,那么下單需要假如有三個順序,第一、創建訂單 ,第二:訂單付款,第三:訂單完成。也就是這個三個環節要有順序,這個訂單才有意義。RocketMQ可以保證順序消費,他的實現是生產者(一個生產者可以對多個主題去發送消息)將這個三個消息放在topic(一個topic默認有4個隊列)的一個隊列里面,單機支持上萬個持久化隊列,消費端去消費的時候也是只能有一個Consumer去取得這個隊列里面的數據,然后順序消費。

單個節點(Producer端1個、Consumer端1個)

Producer端

    package order;
     
    import java.util.List;
     
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.common.message.MessageQueue;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
     
    /**
     * Producer,發送順序消息
     */
    public class Producer {
        public static void main(String[] args) {
            try {
                DefaultMQProducer producer = new DefaultMQProducer("order_Producer");
                producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
     
                producer.start();
     
                // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",
                // "TagE" };
     
                for (int i = 1; i <= 5; i++) {
     
                    Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());
     
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, 0);
     
                    System.out.println(sendResult);
                }
     
                producer.shutdown();
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

Consumer端代碼

    package order;
     
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicLong;
     
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
     
    /**
     * 順序消息消費,帶事務方式(應用可控制Offset什么時候提交)
     */
    public class Consumer1 {
     
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
            consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
     
            /**
             * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
             * 如果非第一次啟動,那么按照上次消費的位置繼續消費
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
     
            consumer.subscribe("TopicOrderTest", "*");
     
            consumer.registerMessageListener(new MessageListenerOrderly() {
                AtomicLong consumeTimes = new AtomicLong(0);
     
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    // 設置自動提交
                    context.setAutoCommit(true);
                    for (MessageExt msg : msgs) {
                        System.out.println(msg + ",內容:" + new String(msg.getBody()));
                    }
     
                    try {
                        TimeUnit.SECONDS.sleep(5L);
                    } catch (InterruptedException e) {
     
                        e.printStackTrace();
                    }
                    ;
     
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
     
            consumer.start();
     
            System.out.println("Consumer1 Started.");
        }
     
    }

結果如下圖所示:

 

這個五條數據被順序消費了


多個節點(Producer端1個、Consumer端2個)

Producer端代碼:

    package order;
     
    import java.util.List;
     
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.common.message.MessageQueue;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
     
    /**
     * Producer,發送順序消息
     */
    public class Producer {
        public static void main(String[] args) {
            try {
                DefaultMQProducer producer = new DefaultMQProducer("order_Producer");
                producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
     
                producer.start();
     
                // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",
                // "TagE" };
     
                for (int i = 1; i <= 5; i++) {
     
                    Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());
     
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, 0);
     
                    System.out.println(sendResult);
                }
                for (int i = 1; i <= 5; i++) {
     
                    Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes());
     
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, 1);
     
                    System.out.println(sendResult);
                }
                for (int i = 1; i <= 5; i++) {
     
                    Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes());
     
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, 2);
     
                    System.out.println(sendResult);
                }
     
                producer.shutdown();
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

Consumer1

    /**
     * 順序消息消費,帶事務方式(應用可控制Offset什么時候提交)
     */
    public class Consumer1 {
     
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
            consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
     
            /**
             * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
             * 如果非第一次啟動,那么按照上次消費的位置繼續消費
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
     
            consumer.subscribe("TopicOrderTest", "*");
            
            /**
             * 實現了MessageListenerOrderly表示一個隊列只會被一個線程取到    
             *,第二個線程無法訪問這個隊列
             */
            consumer.registerMessageListener(new MessageListenerOrderly() {
                AtomicLong consumeTimes = new AtomicLong(0);
     
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    // 設置自動提交
                    context.setAutoCommit(true);
                    for (MessageExt msg : msgs) {
                        System.out.println(msg + ",內容:" + new String(msg.getBody()));
                    }
     
                    try {
                        TimeUnit.SECONDS.sleep(5L);
                    } catch (InterruptedException e) {
     
                        e.printStackTrace();
                    }
                    ;
     
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
     
            consumer.start();
     
            System.out.println("Consumer1 Started.");
        }
     
    }

Consumer2

    /**
     * 順序消息消費,帶事務方式(應用可控制Offset什么時候提交)
     */
    public class Consumer2 {
     
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
            consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
     
            /**
             * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
             * 如果非第一次啟動,那么按照上次消費的位置繼續消費
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
     
            consumer.subscribe("TopicOrderTest", "*");
            
            /**
             * 實現了MessageListenerOrderly表示一個隊列只會被一個線程取到    
             *,第二個線程無法訪問這個隊列
             */
            consumer.registerMessageListener(new MessageListenerOrderly() {
                AtomicLong consumeTimes = new AtomicLong(0);
     
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    // 設置自動提交
                    context.setAutoCommit(true);
                    for (MessageExt msg : msgs) {
                        System.out.println(msg + ",內容:" + new String(msg.getBody()));
                    }
     
                    try {
                        TimeUnit.SECONDS.sleep(5L);
                    } catch (InterruptedException e) {
     
                        e.printStackTrace();
                    }
                    ;
     
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
     
            consumer.start();
     
            System.out.println("Consumer2 Started.");
        }
     
    }

先啟動Consumer1和Consumer2,然后啟動Producer,Producer會發送15條消息
Consumer1消費情況如圖,都按照順序執行了

 


Consumer2消費情況如圖,都按照順序執行了

 

 
二、事務消費

這里說的主要是分布式事物。下面的例子的數據庫分別安裝在不同的節點上。

事物消費需要先說說什么是事物。比如說:我們跨行轉賬,從工商銀行轉到建設銀行,也就是我從工商銀行扣除1000元之后,我的建設銀行也必須加1000元。這樣才能保證數據的一致性。假如工商銀行轉1000元之后,建設銀行的服務器突然宕機,那么我扣除了1000,但是並沒有在建設銀行給我加1000,就出現了數據的不一致。因此加1000和減1000才行,減1000和減1000必須一起成功,一起失敗。

再比如,我們進行網購的時候,我們下單之后,訂單提交成功,倉庫商品的數量必須減一。但是訂單可能是一個數據庫,倉庫數量可能又是在另個數據庫里面。有可能訂單提交成功之后,倉庫數量服務器突然宕機。這樣也出現了數據不一致的問題。

使用消息隊列來解決分布式事物:

現在我們去外面飯店吃飯,很多時候都不會直接給了錢之后直接在付款的窗口遞飯菜,而是付款之后他會給你一張小票,你拿着這個小票去出飯的窗口取飯。這里和我們的系統類似,提高了吞吐量。即使你到第二個窗口,師傅告訴你已經沒飯了,你可以拿着這個憑證去退款,即使中途由於出了意外你無法到達窗口進行取飯,但是只要憑證還在,可以將錢退給你。這樣就保證了數據的一致性。

如何保證憑證(消息)有2種方法:

1、在工商銀行扣款的時候,余額表扣除1000,同時記錄日志,而且這2個表是在同一個數據庫實例中,可以使用本地事物解決。然后我們通知建設銀行需要加1000給該用戶,建設銀行收到之后給我返回已經加了1000給用戶的確認信息之后,我再標記日志表里面的日志為已經完成。

2、通過消息中間件


RocketMQ第一階段發送Prepared消息時,會拿到消息的地址,第二階段執行本地事物,第三階段通過第一階段拿到的地址去訪問消息,並修改消息的狀態。
細心的你可能又發現問題了,如果確認消息發送失敗了怎么辦?RocketMQ會定期掃描消息集群中的事物消息,如果發現了Prepared消息,它會向消息發送端(生產者)確認,Bob的錢到底是減了還是沒減呢?如果減了是回滾還是繼續發送確認消息呢?RocketMQ會根據發送端設置的策略來決定是回滾還是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。

例子:
Consumer 端

    package transaction;
     
    import java.util.List;
     
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
     
    /**
     * Consumer,訂閱消息
     */
    public class Consumer {
     
        public static void main(String[] args) throws InterruptedException, MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_Consumer");
            consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
            consumer.setConsumeMessageBatchMaxSize(10);
            /**
             * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
             * 如果非第一次啟動,那么按照上次消費的位置繼續消費
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
     
            consumer.subscribe("TopicTransactionTest", "*");
     
            consumer.registerMessageListener(new MessageListenerConcurrently() {
     
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
     
                    try {
     
                        for (MessageExt msg : msgs) {
                            System.out.println(msg + ",內容:" + new String(msg.getBody()));
                        }
     
                    } catch (Exception e) {
                        e.printStackTrace();
     
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試
     
                    }
     
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
                }
            });
     
            consumer.start();
     
            System.out.println("transaction_Consumer Started.");
        }
    }

Producer端

    package transaction;
     
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
    import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
    import com.alibaba.rocketmq.common.message.Message;
     
    /**
     * 發送事務消息例子
     *
     */
    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
     
            TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
            TransactionMQProducer producer = new TransactionMQProducer("transaction_Producer");
            producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
            // 事務回查最小並發數
            producer.setCheckThreadPoolMinSize(2);
            // 事務回查最大並發數
            producer.setCheckThreadPoolMaxSize(2);
            // 隊列數
            producer.setCheckRequestHoldMax(2000);
            producer.setTransactionCheckListener(transactionCheckListener);
            producer.start();
     
            // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE"
            // };
            TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
            for (int i = 1; i <= 2; i++) {
                try {
                    Message msg = new Message("TopicTransactionTest", "transaction" + i, "KEY" + i,
                            ("Hello RocketMQ " + i).getBytes());
                    SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
                    System.out.println(sendResult);
     
                    Thread.sleep(10);
                } catch (MQClientException e) {
                    e.printStackTrace();
                }
            }
     
            for (int i = 0; i < 100000; i++) {
                Thread.sleep(1000);
            }
     
            producer.shutdown();
     
        }
    }

TransactionExecuterImpl  --執行本地事務

    package transaction;
     
    import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
    import com.alibaba.rocketmq.client.producer.LocalTransactionState;
    import com.alibaba.rocketmq.common.message.Message;
     
    /**
     * 執行本地事務
     */
    public class TransactionExecuterImpl implements LocalTransactionExecuter {
        // private AtomicInteger transactionIndex = new AtomicInteger(1);
     
        public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
     
            System.out.println("執行本地事務msg = " + new String(msg.getBody()));
     
            System.out.println("執行本地事務arg = " + arg);
     
            String tags = msg.getTags();
     
            if (tags.equals("transaction2")) {
                System.out.println("======我的操作============,失敗了  -進行ROLLBACK");
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            return LocalTransactionState.COMMIT_MESSAGE;
            // return LocalTransactionState.UNKNOW;
        }
    }

TransactionCheckListenerImpl--未決事務,服務器回查客戶端(目前已經被閹割啦)

    package transaction;
     
    import com.alibaba.rocketmq.client.producer.LocalTransactionState;
    import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
    import com.alibaba.rocketmq.common.message.MessageExt;
     
    /**
     * 未決事務,服務器回查客戶端
     */
    public class TransactionCheckListenerImpl implements TransactionCheckListener {
        // private AtomicInteger transactionIndex = new AtomicInteger(0);
     
        //在這里,我們可以根據由MQ回傳的key去數據庫查詢,這條數據到底是成功了還是失敗了。
        public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
            System.out.println("未決事務,服務器回查客戶端msg =" + new String(msg.getBody().toString()));
            // return LocalTransactionState.ROLLBACK_MESSAGE;
     
            return LocalTransactionState.COMMIT_MESSAGE;
     
            // return LocalTransactionState.UNKNOW;
        }
    }

producer端:發送數據到MQ,並且處理本地事物。這里模擬了一個成功一個失敗。Consumer只會接收到本地事物成功的數據。第二個數據失敗了,不會被消費。

 

 


Consumer只會接收到一個,第二個數據不會被接收到

 

 


 
原文鏈接:https://blog.csdn.net/u010634288/article/details/57158374


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM