RocketMQ學習筆記(8)----RocketMQ的Producer API簡介


在RocketMQ中提供了三種發送消息的模式:

  1.NormalProducer(普通)

  2.OrderProducer(順序)

  3.TransactionProducer(事務)

下面來介紹一下producer中的各個API的使用:

  1. producerGroup:Producer組名, 默認值為DEFAULT_PRODUCER,多個Producer如果屬於一個應用,發送同樣的消息,則應該將它們歸為同一組。

  2. createTopicKey: 默認值為TBW102,在發送消息時,自動創建服務器不存在的topic,需要指定Key。

  3.  defaultTopicQueueNums: 默認值為4, 在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數。

  4. sendMsgTimeout: 默認值10000,發送消息超時時間,單位毫秒

  5. compressMsgBodyOverHowmuch: 默認值4096,消息Body超過多大開始壓縮(Consumer收到消息會自動解壓縮),單位字節

  6. retryAnotherBrokerWhenNotStoreOK: 默認值false, 如果發送消息返回sendResult,但是sendStatus!=SEND_OK,是否重試發送

  7. maxMessageSize: 默認值131072,客戶端限制的消息大小,超過報錯,同時服務端也會限制

  8. transactionCheckListener: 事務消息回查監聽器,如果發送事務消息,必須設置,在DefaultMQProducer的子類TransactionMQProducer中。

  9. checkThreadPoolMinSize:  默認值為1,Broker回查Producer事務狀態時,線程池大小,在DefaultMQProducer的子類TransactionMQProducer中。

  10. checkThreadPoolMaxSize: 默認值為1,Broker回查Producer事務狀態時,線程池大小。

  11. checkRequestHoldMax: 默認值為2000, Broker回查Producer事務狀態時,Producer本地緩沖請求隊列大小

  使用設置maxMessageSize,將消息最大值設為1024個字節,運行如下代碼:

package com.wangx.rocketmq.quickstart;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

/**
 * 創建一個消費者
 */
public class Producer {

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
        //1. 實例化一個producer group
        DefaultMQProducer producer = new DefaultMQProducer("my-producer-group");
        //2. 設置namesrvAddr,集群環境多個nameserver用;分割
        producer.setNamesrvAddr("47.105.149.61:9876;47.105.145.123:9876");
     //設置消息最大值 producer.setMaxMessageSize(
1024); //3. 啟動 producer.start(); String str = ""; // 4. 發送消息 for (int i = 0; i < 1025; i++) { //構建實例,第一個參數為topic,第二個參數為tabs,第三個參數為消息體 str += i; } Message message = new Message("MyQuickStartTopic1","tabA",str.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.println(result); //關閉生產者 producer.shutdown(); } }

  控制台將會報如下異常:

org.apache.rocketmq.client.exception.MQClientException: CODE: 13  DESC: the message body size over max value, MAX: 1024

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM