rocketmq-批量發送消息


參考:

https://blog.csdn.net/u010277958/article/details/88647281

https://blog.csdn.net/u010634288/article/details/56049305

https://blog.csdn.net/u014004279/article/details/99644995

 

 

 

 

 

 

RocketMQ-批量發送消息

批量發送消息可提高傳遞小消息的性能。同時也需要滿足以下特征:

  • 批量消息要求必要具有同一topic、相同消息配置
  • 不支持延時消息
  • 建議一個批量消息最好不要超過1MB大小

示例:

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "Order1", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "Order2", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "Order3", "Hello world 2".getBytes()));
try {
    producer.send(messages);
} catch (Exception e) {
    e.printStackTrace();
}

 

在發送批量消息時先構建一個消息對象集合,然后調用send(Collection msg)方法即可。由於批量消息的1M限制,所以一般情況下在集合中添加消息需要先計算當前集合中消息對象的大小是否超過限制,如果超過限制也可以使用分割消息的方式進行多次批量發送。

 

 

 

 

 

 

 

RocketMQ之批量消息發送源碼解析

DefaultProducer.send

RocketMQ提供了批量發送消息的API,同樣在DefaultProducer.java中

    @Override
    public SendResult send(
        Collection<Message> msgs) throws MQClientException, RemotingException, 
            MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(batch(msgs));
    }

 

 

它的參數為Message集合,也就是一批消息。它的另外一個重載方法提供了發送超時時間參數

    @Override
    public SendResult send(Collection<Message> msgs,
        long timeout) throws MQClientException, RemotingException,
             MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(batch(msgs), timeout);
    }

 

可以看到是將消息通過batch()方法打包為單條消息,我們看一下batch方法的邏輯

 

DefaultProducer.batch

 

    private MessageBatch batch(Collection<Message> msgs) throws MQClientException {

// 聲明批量消息體
MessageBatch msgBatch;
try {


// 從Message的list生成批量消息體MessageBatch msgBatch = MessageBatch.generateFromList(msgs); for (Message message : msgBatch) { Validators.checkMessage(message, this); MessageClientIDSetter.setUniqID(message); message.setTopic(withNamespace(message.getTopic())); } // 設置消息體,此時的消息體已經是處理過后的批量消息體 msgBatch.setBody(msgBatch.encode()); } catch (Exception e) { throw new MQClientException("Failed to initiate the MessageBatch", e); } // 設置topic msgBatch.setTopic(withNamespace(msgBatch.getTopic())); return msgBatch; }

 




從代碼可以看到,核心思想是將一批消息(Collection msgs)打包為MessageBatch對象,我們看下MessageBatch的聲明

    public class MessageBatch extends Message implements Iterable<Message> {
        private final List<Message> messages;

        private MessageBatch(List<Message> messages) {
            this.messages = messages;
        }
可以看到MessageBatch繼承自Message,持有List 引用。

我們接着看一下generateFromList方法

MessageBatch.generateFromList

    public static MessageBatch generateFromList(Collection<Message> messages) {
        assert messages != null;
        assert messages.size() > 0;
        // 首先實例化一個Message的list
        List<Message> messageList = new ArrayList<Message>(messages.size());
        Message first = null;
        // 對messages集合進行遍歷
        for (Message message : messages) {
            // 判斷延時級別,如果大於0拋出異常,原因為:批量消息發送不支持延時
            if (message.getDelayTimeLevel() > 0) {
                throw new UnsupportedOperationException
                    ("TimeDelayLevel in not supported for batching");
            }
            // 判斷topic是否以 **"%RETRY%"** 開頭,如果是,
            // 則拋出異常,原因為:批量發送消息不支持消息重試
            if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                throw new UnsupportedOperationException("Retry Group is not supported for batching");
            }
            // 判斷集合中的每個Message的topic與批量發送topic是否一致,
            // 如果不一致則拋出異常,原因為:
            // 批量消息中的每個消息實體的Topic要和批量消息整體的topic保持一致。
            if (first == null) {
                first = message;
            } else {
                if (!first.getTopic().equals(message.getTopic())) {
                    throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
                }
                // 判斷批量消息的首個Message與其他的每個Message實體的等待消息存儲狀態是否相同,
                // 如果不同則報錯,原因為:批量消息中每個消息的waitStoreMsgOK狀態均應該相同。
                if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
                    throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
                }
            }
            // 校驗通過后,將message實體添加到messageList中
            messageList.add(message);
        }
        // 將處理完成的messageList作為構造方法,
        // 初始化MessageBatch實體,並設置topic以及isWaitStoreMsgOK狀態。
        MessageBatch messageBatch = new MessageBatch(messageList);
        messageBatch.setTopic(first.getTopic());
        messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
        return messageBatch;
    }

 

 

 

 

總結一下,generateFromList方法對調用方設置的Collection 集合進行遍歷,經過前置校驗之后,轉換為MessageBatch對象並返回給DefaultProducer.batch方法中,我們接着看DefaultProducer.batch的邏輯。

到此,通過MessageBatch.generateFromList方法,將發送端傳入的一批消息集合轉換為了MessageBatch實體。

DefaultProducer.batch

 

    private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
        // 聲明批量消息體
        MessageBatch msgBatch;
        try {
            // 從Message的list生成批量消息體MessageBatch
            msgBatch = MessageBatch.generateFromList(msgs);
            for (Message message : msgBatch) {
                Validators.checkMessage(message, this);
                MessageClientIDSetter.setUniqID(message);
                message.setTopic(withNamespace(message.getTopic()));
            }
            // 設置消息體,此時的消息體已經是處理過后的批量消息體
            msgBatch.setBody(msgBatch.encode());
        } catch (Exception e) {
            throw new MQClientException("Failed to initiate the MessageBatch", e);
        }
        // 設置topic
        msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
        return msgBatch;
    }

 



注意下面這行代碼:

        // 設置消息體,此時的消息體已經是處理過后的批量消息體
        msgBatch.setBody(msgBatch.encode());

 

這里對MessageBatch進行消息編碼處理,通過調用MessageBatch的encode方法實現,代碼邏輯如下:

    public byte[] encode() {
        return MessageDecoder.encodeMessages(messages);
    }

 

可以看到是通過靜態方法 encodeMessages(List messages) 實現的。

我們看一下encodeMessages方法的邏輯:

    public static byte[] encodeMessages(List<Message> messages) {
        //TO DO refactor, accumulate in one buffer, avoid copies
        List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
        int allSize = 0;
        for (Message message : messages) {
            // 遍歷messages集合,分別對每個Message實體進行編碼操作,轉換為byte[]
            byte[] tmp = encodeMessage(message);
            // 將轉換后的單個Message的byte[]設置到encodedMessages中
            encodedMessages.add(tmp);
            // 批量消息的二進制數據長度隨實際消息體遞增
            allSize += tmp.length;
        }
        byte[] allBytes = new byte[allSize];
        int pos = 0;
        for (byte[] bytes : encodedMessages) {
            // 遍歷encodedMessages,按序復制每個Message的二進制格式消息體
            System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
            pos += bytes.length;
        }
        // 返回批量消息整體的消息體二進制數組
        return allBytes;
    }

 

encodeMessages的邏輯在注釋中分析的已經比較清楚了,其實就是遍歷messages,並按序拼接每個Message實體的二進制數組格式消息體並返回。

我們可以繼續看一下單個Message是如何進行編碼的,調用了 MessageDecoder.encodeMessage(message) 方法,邏輯如下:

    public static byte[] encodeMessage(Message message) {
        //only need flag, body, properties
        byte[] body = message.getBody();
        int bodyLen = body.length;
        String properties = messageProperties2String(message.getProperties());
        byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
        //note properties length must not more than Short.MAX
        short propertiesLength = (short) propertiesBytes.length;
        int sysFlag = message.getFlag();
        int storeSize = 4 // 1 TOTALSIZE
            + 4 // 2 MAGICCOD
            + 4 // 3 BODYCRC
            + 4 // 4 FLAG
            + 4 + bodyLen // 4 BODY
            + 2 + propertiesLength;
        ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
        // 1 TOTALSIZE
        byteBuffer.putInt(storeSize);
        // 2 MAGICCODE
        byteBuffer.putInt(0);
        // 3 BODYCRC
        byteBuffer.putInt(0);
        // 4 FLAG
        int flag = message.getFlag();
        byteBuffer.putInt(flag);
        // 5 BODY
        byteBuffer.putInt(bodyLen);
        byteBuffer.put(body);
        // 6 properties
        byteBuffer.putShort(propertiesLength);
        byteBuffer.put(propertiesBytes);
        return byteBuffer.array();
    }

 

 

這里其實就是將消息按照RocektMQ的消息協議進行編碼,格式為:

    消息總長度          ---  4字節
    魔數                --- 4字節
    bodyCRC校驗碼       --- 4字節
    flag標識            --- 4字節
    body長度            --- 4字節
    消息體              --- 消息體實際長度N字節
    屬性長度            --- 2字節
    擴展屬性            --- N字節

 

通過encodeMessage方法處理之后,消息便會被編碼為固定格式,最終會被Broker端進行處理並持久化。

其他

到此便是批量消息發送的源碼分析,實際上RocketMQ在處理批量消息的時候是將其解析為單個消息再發送的,這樣就在底層統一了單條消息、批量消息發送的邏輯,讓整個框架的設計更加健壯,也便於我們進行理解學習。

 

 

 

 

 

 

 

 

 

 

 

RocketMQ批量消費、消息重試、消費模式、刷盤方式

一、Consumer 批量消費

可以通過consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10條

這里需要分為2種情況1、Consumer端先啟動  2、Consumer端后啟動.   正常情況下:應該是Consumer需要先啟動

 

1、Consumer端先啟動

Consumer代碼如下

package quickstart;
 
import java.util.List;
 
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
 
/**
 * Consumer,訂閱消息
 */
public class Consumer {
 
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
        consumer.setConsumeMessageBatchMaxSize(10);
        /**
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
        consumer.subscribe("TopicTest", "*");
 
        consumer.registerMessageListener(new MessageListenerConcurrently() {
 
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                
                try {
                    System.out.println("msgs的長度" + msgs.size());
                    System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                
               
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
 
        consumer.start();
 
        System.out.println("Consumer Started.");
    }
}

 

由於這里是Consumer先啟動,所以他會去輪詢MQ上是否有訂閱隊列的消息,由於每次producer插入一條,Consumer就拿一條所以測試結果如下(每次size都是1):

 

 

 

2、Consumer端后啟動,也就是Producer先啟動

由於這里是Consumer后啟動,所以MQ上也就堆積了一堆數據,Consumer的

consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10條    

所以這段代碼就生效了測試結果如下(每次size最多是10):

二、消息重試機制:消息重試分為2種1、Producer端重試 2、Consumer端重試

1、Producer端重試 

也就是Producer往MQ上發消息沒有發送成功,我們可以設置發送失敗重試的次數

package quickstart;
 
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
 
/**
 * Producer,發送消息
 * 
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
        producer.setRetryTimesWhenSendFailed(10);//失敗的 情況發送10次
        producer.start();
 
        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                        "TagA",// tag
                        ("Hello RocketMQ " + i).getBytes()// body
                );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
 
        producer.shutdown();
    }
}

 

 

 

 

 

 

2、Consumer端重試

2.1、exception的情況,一般重復16次 10s、30s、1分鍾、2分鍾、3分鍾等等

上面的代碼中消費異常的情況返回

return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重試

正常則返回:

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功

 

package quickstart;
 
 
import java.util.List;
 
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
 
/**
 * Consumer,訂閱消息
 */
public class Consumer {
 
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
        consumer.setConsumeMessageBatchMaxSize(10);
        /**
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
        consumer.subscribe("TopicTest", "*");
 
        consumer.registerMessageListener(new MessageListenerConcurrently() {
 
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
 
                try {
                    // System.out.println("msgs的長度" + msgs.size());
                    System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                    for (MessageExt msg : msgs) {
                        String msgbody = new String(msg.getBody(), "utf-8");
                        if (msgbody.equals("Hello RocketMQ 4")) {
                            System.out.println("======錯誤=======");
                            int a = 1 / 0;
                        }
                    }
 
                } catch (Exception e) {
                    e.printStackTrace();
                    if(msgs.get(0).getReconsumeTimes()==3){
                        //記錄日志
                        
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
                    }else{
                        
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試
                    }
                }
 
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
            }
        });
 
        consumer.start();
 
        System.out.println("Consumer Started.");
    }
}

 

 

 

打印結果:



假如超過了多少次之后我們可以讓他不再重試記錄 日志。

if(msgs.get(0).getReconsumeTimes()==3){
//記錄日志
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}

 

2.2超時的情況,這種情況MQ會無限制的發送給消費端。

就是由於網絡的情況,MQ發送數據之后,Consumer端並沒有收到導致超時。也就是消費端沒有給我返回return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;這樣的就認為沒有到達Consumer端。

這里模擬Producer只發送一條數據。consumer端暫停1分鍾並且不發送接收狀態給MQ

package model;
 
import java.util.List;
 
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
 
/**
 * Consumer,訂閱消息
 */
public class Consumer {
 
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
        consumer.setConsumeMessageBatchMaxSize(10);
        /**
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
        consumer.subscribe("TopicTest", "*");
 
        consumer.registerMessageListener(new MessageListenerConcurrently() {
 
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
 
                try {
 
                    // 表示業務處理時間
                    System.out.println("=========開始暫停===============");
                    Thread.sleep(60000);
 
                    for (MessageExt msg : msgs) {
                        System.out.println(" Receive New Messages: " + msg);
                    }
 
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試
                }
 
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
            }
        });
 
        consumer.start();
 
        System.out.println("Consumer Started.");
    }
}

 

 

 


 

三、消費模式

廣播消費:rocketMQ默認是集群消費,我們可以通過在Consumer來支持廣播消費

consumer.setMessageModel(MessageModel.BROADCASTING);// 廣播消費

package model;
 
import java.util.List;
 
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
 
/**
 * Consumer,訂閱消息
 */
public class Consumer2 {
 
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
        consumer.setConsumeMessageBatchMaxSize(10);
        consumer.setMessageModel(MessageModel.BROADCASTING);// 廣播消費
    
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
        consumer.subscribe("TopicTest", "*");
 
        consumer.registerMessageListener(new MessageListenerConcurrently() {
 
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
 
                try {
 
                    for (MessageExt msg : msgs) {
                        System.out.println(" Receive New Messages: " + msg);
                    }
 
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試
                }
 
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
            }
        });
 
        consumer.start();
 
        System.out.println("Consumer Started.");
    }
}

 

 

如果我們有2台節點(非主關系),2個節點物理上是分開的,Producer往MQ上寫入20條數據 其中broker1中拉取了12條 。broker2中拉取了8 條,這種情況下,假如broker1宕機,那么我們消費數據的時候,只能消費到broker2中的8條,broker1中的12條已經持久化到中。需要broker1回復之后這12條數據才能繼續被消費。

 

異步復制和同步雙寫主要是主和從的關系。消息需要實時消費的,就需要采用主從模式部署

異步復制:比如這里有一主一從,我們發送一條消息到主節點之后,這樣消息就算從producer端發送成功了,然后通過異步復制的方法將數據復制到從節點

同步雙寫:比如這里有一主一從,我們發送一條消息到主節點之后,這樣消息就並不算從producer端發送成功了,需要通過同步雙寫的方法將數據同步到從節點后, 才算數據發送成功。

 

四、刷盤方式

同步刷盤:在消息到達MQ后,RocketMQ需要將數據持久化,同步刷盤是指數據到達內存之后,必須刷到commitlog日志之后才算成功,然后返回producer數據已經發送成功

異步刷盤:是指數據到達內存之后,返回producer說數據已經發送成功。,然后再寫入commitlog日志

commitlog:

commitlog就是來存儲所有的元信息,包含消息體,類似於Mysql、Oracle的redolog,所以主要有CommitLog在,Consume Queue即使數據丟失,仍然可以恢復出來。

consumequeue:記錄數據的位置,以便Consume快速通過consumequeue找到commitlog中的數據

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM