RocketMQ簡單使用(二)批處理、過濾、事務消息


簡單研究下rockermq 批量消息、過濾消息、事務消息的使用。

1. 批量消息

  1. 簡介

   批量消息的發送能提升投遞小消息的性能。但是批量消息有一些限制,一批投遞的消息應該有相同的主題、具有相同的刷盤策略、不支持延時消息與事務型消息。

  另外,生產者發送消息的大小有一些限制。默認不超過1MB 的消息。如果超出可以將批量消息進行拆分。或者通過修改配置。

  生產者發送的消息結構如下:

  生產者發送的消息並不是將消息直接序列化后發送到網絡上的,而是通過Message 對象生成了一個字符串發送出去的。這個字符串包含:Topic、消息body、消息日志(20字節)、以及用於描述消息屬性的key-value 結構,這些屬性包括生產者地址、生產時間、以及要發送的queueId 等。最終寫入到broker 中消息單元中的數據都來自於這些屬性。

  1. 批量消費消息

  org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage 方法的第一個參數為消息列表,但是默認情況下每次只能消費一條。其規則簡單理解是:一次性拉取 pullBatchSize(默認32) 參數指定的消息數量,然后按照consumeMessageBatchMaxSize (默認為1)指定的大小分發到多個消費線程。

  1. 代碼

(1)生產者代碼

package com.zd.bx.rocketmq.batch;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class Producer2 {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("batchProducer");
        producer.setNamesrvAddr("192.168.13.111:9876");
        producer.start();

        // 構造消息
        String topic = "batchTest";
        List<Message> messages = new ArrayList<>();
        for (int index = 0; index < 100000; index++) {
            messages.add(new Message(topic, "TagA", "OrderI" + index, ("Hello world " + index).getBytes()));
        }

//        redirectSend(producer, messages);

        // 批量發送
        ListSplitter splitter = new ListSplitter(messages);
        int sendNum = 0;
        while (splitter.hasNext()) {
            System.out.println("sendNum: " + (++sendNum));
            try {
                List<Message> listItem = splitter.next();
                producer.send(listItem);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        producer.shutdown();
    }

    private static void redirectSend(DefaultMQProducer producer, List<Message> messages) {
        try {
            // 直接發送會報錯: org.apache.rocketmq.client.exception.MQClientException: CODE: 13  DESC: the message body size over max value, MAX: 4194304
            SendResult send = producer.send(messages);
            System.out.println(send);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

class ListSplitter implements Iterator<List<Message>> {

    private final int SIZE_LIMIT = 1 * 1024 * 1024;

    private final List<Message> messages;

    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > SIZE_LIMIT) {
                //it is unexpected that single message exceeds the SIZE_LIMIT
                //here just let it go, otherwise it will block the splitting process
                if (nextIndex - currIndex == 0) {
                    //if the next sublist has no element, add this one and then break, otherwise just break
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

(2)批量消費者

package com.zd.bx.rocketmq.batch;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        // 定義一個push消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myTestConsumerGroup");
        // 指定nameserver
        consumer.setNamesrvAddr("192.168.13.111:9876");
        // 指定消費的topic與tag
        consumer.subscribe("batchTest", "*");
        // 指定使用 廣播模式進行消費,默認為集群模式
//        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 修改一次拉取的最大值(默認32),可以理解為一次拉取回來之后然后按下面的參數consumeMessageBatchMaxSize,分發到多個線程組中進行消費
        consumer.setPullBatchSize(60);
        // 指定每次可以消費100條消息(可以理解是下面監聽器中集合的大小),默認為1,值范圍是[1, 1024]
        consumer.setConsumeMessageBatchMaxSize(3);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " msgs.size(): " + msgs.size());
                for (MessageExt msg : msgs) {
                    System.out.printf(Thread.currentThread().getName() + " %s Receive New Messages: %s body: %s %n", Thread.currentThread().getName(), msgs, new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

2. 過濾消息

  在接收消息的時候除了可以指定消息的Topic外,還可以對指定Topic的消息進行過濾。可以根據Tag 進行過濾,也可以根據sql 過濾。sql過濾是對消息中屬性進行篩選過濾,只有push模式的消費者才能使用sql過濾。

  sql過濾表達式中支持多種常量類型與運算符。支持的常量類型:

  • 數值:比如1,2,3
  • 字符:必須用單引號包裹,比如'abc'
  • 布爾:TRUE或者FALSE

   支持的運算符有:

  • 數值比較:>,<,>=,<=,BETWEEN,=
  • 字符比較:=,<>,IN
  • 邏輯比較:AND,OR,NOT
  • NULL判斷: IS NULL 或者 IS NOT NULL

  默認情況下Broker沒有開啟sql過濾功能,需要在Broker加載的配置文件中添加如下屬性:(單機在broker.conf 文件中)

enablePropertyFilter=true

  測試代碼:

(1)生產者:自定義屬性

package com.zd.bx.rocketmq.filter;


import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class Producer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("batchProducer");
        producer.setNamesrvAddr("192.168.13.111:9876");
        producer.start();

        String[] tags = new String[]{"TagA", "TagB", "TagC"};
        String[] sexs = new String[]{"男", "女"};
        // 構造消息
        String topic = "filterTopic";
        for (int index = 0; index < 20; index++) {
            byte[] body = ("Hi, " + index).getBytes();
            String tag = tags[index % tags.length];
            Message message = new Message(topic, tag, body);
            // 存一個自定義屬性age, 一個自定義屬性sex
            message.putUserProperty("age", index + "");
            message.putUserProperty("sex", sexs[index % sexs.length]);
            System.out.println(producer.send(message));
        }

        producer.shutdown();
    }
}

(2)消費者

消費者一:按標簽進行過濾

package com.zd.bx.rocketmq.filter;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer1 {

    public static void main(String[] args) throws MQClientException {
        // 定義一個push消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myTestConsumerGroup");
        // 指定從第一條消息開始消費
        // 指定nameserver
        consumer.setNamesrvAddr("192.168.13.111:9876");
        // 指定消費的topic與tag. "TagA || TagB" 等價於 MessageSelector.byTag("TagA || TagB")
        consumer.subscribe("filterTopic", "TagA || TagB");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive Messages: %s, property: %s %n", Thread.currentThread().getName(), new String(msg.getBody()), msg.getProperties());
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

消費者二:按sql條件進行過濾(接收age大於並且sex為男的)

package com.zd.bx.rocketmq.filter;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer2 {

    public static void main(String[] args) throws MQClientException {
        // 定義一個push消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myTestConsumerGroup");
        // 指定nameserver
        consumer.setNamesrvAddr("192.168.13.111:9876");
        // 指定消費的topic與tag
        consumer.subscribe("filterTopic", MessageSelector.bySql("age > 5 and sex = '男'"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s, property: %s %n", Thread.currentThread().getName(),  new String(msg.getBody()), msg.getProperties());
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

3. 事務消息

  事務型消息可以確保執行本地事務和發送消息保證原子性。這里需要注意是執行本地事務和發送消息是在一個事務里面。

  1. 舉一個場景:工行用戶A向建行用戶B轉賬一萬元。同步消息處理該需求的場景:

  這里有個問題:若第三步驟的扣款失敗,消息已經到達Broker。對於MQ來說,只要消息寫入MQ就可以被消費。此時建行系統增加了1萬元。此時就出現了數據不一致的問題。

  1. 解決思路:

  使用MQ的事務消息。讓1、2、3具有原子性,這里就是用到事務消息。但是對於4、5操作,是不在該事務中的,也就是事務消息只能保證本地事務和發送消息在是一致的。解決示意圖如下:

解釋:第3步發送的消息是半事務消息(預提交到Broker),消費者是不能消費該消息的;第6步發送扣款執行結果,實際是向TC匯報本地事務的執行狀態(LocalTransactionState里面的狀態)。

  1. 事務消息的使用限制:

(1)不支持定時和批處理消息

(2)事務消息要做好冪等性檢查,因為事務消息可能不止一次被消費(存在回滾后再提交的情況)

  1. 基本概念

(1) 分布式事務:可以理解為多個本地事務的集合體,多個事務直接構成一個大的分布式事務。要么全失敗,要么全成功。

(2)事務消息:RocketMQ提供了類似於X/OpenXA的分布式事務功能,通過事務消息能達到分布式事務的最終一致性。XA是一種分布式事務解決方案,一種分布式事務處理模式。

(3)半事務消息:暫不能投遞的消息, 發送方已經成功地將消息發送到了Broker,但是Broker未收到最終確認指令,該消息被標記成"暫不能投遞"狀態,即不能為消費者看到。出於該狀態下的消息成為半事務消息。

(4)本地事務狀態:Producer回調執行的結果為本地事務狀態,其會發給TC(事務控制器),而TC再發給TM(事務管理器)。TM根據TC送來的本地事務狀態來決定全局事務確認指令。

package org.apache.rocketmq.client.producer;

public enum LocalTransactionState {
    COMMIT_MESSAGE,    // 提交事務,表示允許消費者消費該消息
    ROLLBACK_MESSAGE,    // 消息回滾,類似於預提交的消息撤回
    UNKNOW,    // 未知,執行回調
}

(5)消息回查:重新查詢本地事務的執行狀態。關於消息回查的設置:

transactionTimeout=20, 指定TM應該在20s內將消息發送給TC,否則引發消息回查。默認60s。

transactionCheckMax=5,指定最多回查次數,超過后將丟棄消息並且記錄日志。默認15次。

transactionCheckInterval=10,指定設置的多次的消息回查的時間間隔為10s。默認為60s。

  1. 測試代碼

(1)監聽器

package com.zd.bx.rocketmq.tx;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class Listener implements TransactionListener {

    // 回調操作方法
    // 消息預提交成功就會觸發該方法的執行,用於完成本地事務
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("預提交消息成功:" + msg);
        // 假設接收到TagA的消息就表示扣款操作成功,TagB的消息表示扣款失敗,
        // TagC表示扣款結果不清楚,需要執行消息回查
        if (StringUtils.equals("TagA", msg.getTags())) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.equals("TagB", msg.getTags())) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else if (StringUtils.equals("TagC", msg.getTags())) {
            return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.UNKNOW;
    }

    // 消息回查方法
    // 引發消息回查的原因最常見的有兩個:
    // 1)回調操作返回UNKNWON
    // 2)TC沒有接收到TM的最終全局事務確認指令
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("執行消息回查: body" + new String(msg.getBody()) + " tag: " + msg.getTags() + " txId: " + msg.getTransactionId());
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

(2)生產者

package com.zd.bx.rocketmq.tx;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;

public class Producer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        TransactionMQProducer producer = new TransactionMQProducer("transactionProducerGroup");
        producer.setNamesrvAddr("192.168.13.111:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        // 為生產者指定線程池和監聽器
        producer.setExecutorService(executorService);
        producer.setTransactionListener(new Listener());
        producer.start();

        String[] tags = new String[]{"TagA", "TagB", "TagC"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("txTopic", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

日志如下:

預提交消息成功:Message{topic='txTopic', flag=0, properties={KEYS=KEY0, TRAN_MSG=true, UNIQ_KEY=7F0000015BC818B4AAC2793C9CF80000, WAIT=true, PGROUP=transactionProducerGroup, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='7F0000015BC818B4AAC2793C9CF80000'}
SendResult [sendStatus=SEND_OK, msgId=7F0000015BC818B4AAC2793C9CF80000, offsetMsgId=null, messageQueue=MessageQueue [topic=txTopic, brokerName=broker-a, queueId=2], queueOffset=40]
預提交消息成功:Message{topic='txTopic', flag=0, properties={KEYS=KEY1, TRAN_MSG=true, UNIQ_KEY=7F0000015BC818B4AAC2793C9D170001, WAIT=true, PGROUP=transactionProducerGroup, TAGS=TagB}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='7F0000015BC818B4AAC2793C9D170001'}
SendResult [sendStatus=SEND_OK, msgId=7F0000015BC818B4AAC2793C9D170001, offsetMsgId=null, messageQueue=MessageQueue [topic=txTopic, brokerName=broker-a, queueId=3], queueOffset=41]
預提交消息成功:Message{topic='txTopic', flag=0, properties={KEYS=KEY2, TRAN_MSG=true, UNIQ_KEY=7F0000015BC818B4AAC2793C9D280002, WAIT=true, PGROUP=transactionProducerGroup, TAGS=TagC}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='7F0000015BC818B4AAC2793C9D280002'}
SendResult [sendStatus=SEND_OK, msgId=7F0000015BC818B4AAC2793C9D280002, offsetMsgId=null, messageQueue=MessageQueue [topic=txTopic, brokerName=broker-a, queueId=0], queueOffset=42]
預提交消息成功:Message{topic='txTopic', flag=0, properties={KEYS=KEY3, TRAN_MSG=true, UNIQ_KEY=7F0000015BC818B4AAC2793C9D3D0003, WAIT=true, PGROUP=transactionProducerGroup, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51], transactionId='7F0000015BC818B4AAC2793C9D3D0003'}
SendResult [sendStatus=SEND_OK, msgId=7F0000015BC818B4AAC2793C9D3D0003, offsetMsgId=null, messageQueue=MessageQueue [topic=txTopic, brokerName=broker-a, queueId=1], queueOffset=43]
預提交消息成功:Message{topic='txTopic', flag=0, properties={KEYS=KEY4, TRAN_MSG=true, UNIQ_KEY=7F0000015BC818B4AAC2793C9D510004, WAIT=true, PGROUP=transactionProducerGroup, TAGS=TagB}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='7F0000015BC818B4AAC2793C9D510004'}
SendResult [sendStatus=SEND_OK, msgId=7F0000015BC818B4AAC2793C9D510004, offsetMsgId=null, messageQueue=MessageQueue [topic=txTopic, brokerName=broker-a, queueId=2], queueOffset=44]
預提交消息成功:Message{topic='txTopic', flag=0, properties={KEYS=KEY5, TRAN_MSG=true, UNIQ_KEY=7F0000015BC818B4AAC2793C9D650005, WAIT=true, PGROUP=transactionProducerGroup, TAGS=TagC}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53], transactionId='7F0000015BC818B4AAC2793C9D650005'}
SendResult [sendStatus=SEND_OK, msgId=7F0000015BC818B4AAC2793C9D650005, offsetMsgId=null, messageQueue=MessageQueue [topic=txTopic, brokerName=broker-a, queueId=3], queueOffset=45]
執行消息回查: bodyHello RocketMQ 2 tag: TagC txId: 7F0000015BC818B4AAC2793C9D280002
執行消息回查: bodyHello RocketMQ 5 tag: TagC txId: 7F0000015BC818B4AAC2793C9D650005

(3)消費者

package com.zd.bx.rocketmq.tx;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        // 定義一個push消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myTestConsumerGroup");
        // 指定nameserver
        consumer.setNamesrvAddr("192.168.13.111:9876");
        consumer.subscribe("txTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive Messages: %s, property: %s %n", Thread.currentThread().getName(), new String(msg.getBody()), msg.getProperties());
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

日志如下:

ConsumeMessageThread_myTestConsumerGroup_5 Receive Messages: Hello RocketMQ 0, property: {MIN_OFFSET=0, REAL_TOPIC=txTopic, MAX_OFFSET=4, KEYS=KEY0, TRAN_MSG=true, CONSUME_START_TIME=1648098015500, UNIQ_KEY=7F0000015BC818B4AAC2793C9CF80000, CLUSTER=DefaultCluster, PGROUP=transactionProducerGroup, WAIT=true, TAGS=TagA, REAL_QID=2} 
ConsumeMessageThread_myTestConsumerGroup_6 Receive Messages: Hello RocketMQ 3, property: {MIN_OFFSET=0, REAL_TOPIC=txTopic, MAX_OFFSET=3, KEYS=KEY3, TRAN_MSG=true, CONSUME_START_TIME=1648098015559, UNIQ_KEY=7F0000015BC818B4AAC2793C9D3D0003, CLUSTER=DefaultCluster, PGROUP=transactionProducerGroup, WAIT=true, TAGS=TagA, REAL_QID=1} 
ConsumeMessageThread_myTestConsumerGroup_7 Receive Messages: Hello RocketMQ 5, property: {TRANSACTION_CHECK_TIMES=1, TRAN_MSG=true, CONSUME_START_TIME=1648098030724, MIN_OFFSET=0, REAL_TOPIC=txTopic, MAX_OFFSET=4, KEYS=KEY5, UNIQ_KEY=7F0000015BC818B4AAC2793C9D650005, CLUSTER=DefaultCluster, PGROUP=transactionProducerGroup, WAIT=true, TAGS=TagC, REAL_QID=3} 
ConsumeMessageThread_myTestConsumerGroup_8 Receive Messages: Hello RocketMQ 2, property: {TRANSACTION_CHECK_TIMES=1, TRAN_MSG=true, CONSUME_START_TIME=1648098030733, MIN_OFFSET=0, REAL_TOPIC=txTopic, MAX_OFFSET=4, KEYS=KEY2, UNIQ_KEY=7F0000015BC818B4AAC2793C9D280002, CLUSTER=DefaultCluster, PGROUP=transactionProducerGroup, WAIT=true, TAGS=TagC, REAL_QID=0} 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM