在RocketMQ中提供了三種發送消息的模式:
1.NormalProducer(普通)
2.OrderProducer(順序)
3.TransactionProducer(事務)
下面來介紹一下producer中的各個API的使用:
1. producerGroup:Producer組名, 默認值為DEFAULT_PRODUCER,多個Producer如果屬於一個應用,發送同樣的消息,則應該將它們歸為同一組。
2. createTopicKey: 默認值為TBW102,在發送消息時,自動創建服務器不存在的topic,需要指定Key。
3. defaultTopicQueueNums: 默認值為4, 在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數。
4. sendMsgTimeout: 默認值10000,發送消息超時時間,單位毫秒
5. compressMsgBodyOverHowmuch: 默認值4096,消息Body超過多大開始壓縮(Consumer收到消息會自動解壓縮),單位字節
6. retryAnotherBrokerWhenNotStoreOK: 默認值false, 如果發送消息返回sendResult,但是sendStatus!=SEND_OK,是否重試發送
7. maxMessageSize: 默認值131072,客戶端限制的消息大小,超過報錯,同時服務端也會限制
8. transactionCheckListener: 事務消息回查監聽器,如果發送事務消息,必須設置,在DefaultMQProducer的子類TransactionMQProducer中。
9. checkThreadPoolMinSize: 默認值為1,Broker回查Producer事務狀態時,線程池大小,在DefaultMQProducer的子類TransactionMQProducer中。
10. checkThreadPoolMaxSize: 默認值為1,Broker回查Producer事務狀態時,線程池大小。
11. checkRequestHoldMax: 默認值為2000, Broker回查Producer事務狀態時,Producer本地緩沖請求隊列大小
使用設置maxMessageSize,將消息最大值設為1024個字節,運行如下代碼:
package com.wangx.rocketmq.quickstart; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; /** * 創建一個消費者 */ public class Producer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException { //1. 實例化一個producer group DefaultMQProducer producer = new DefaultMQProducer("my-producer-group"); //2. 設置namesrvAddr,集群環境多個nameserver用;分割 producer.setNamesrvAddr("47.105.149.61:9876;47.105.145.123:9876");
//設置消息最大值 producer.setMaxMessageSize(1024); //3. 啟動 producer.start(); String str = ""; // 4. 發送消息 for (int i = 0; i < 1025; i++) { //構建實例,第一個參數為topic,第二個參數為tabs,第三個參數為消息體 str += i; } Message message = new Message("MyQuickStartTopic1","tabA",str.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.println(result); //關閉生產者 producer.shutdown(); } }
控制台將會報如下異常:
org.apache.rocketmq.client.exception.MQClientException: CODE: 13 DESC: the message body size over max value, MAX: 1024