RocketMQ 生產者核心配置和核心知識


一、RocketMQ 4.X 生產者常見核心配置

  • compressMsgBodyOverHowmuch :消息超過默認字節4096后進行壓縮
  • retryTimesWhenSendFailed : 失敗重發次數
  • maxMessageSize : 最大消息配置,默認128k
  • topicQueueNums : 主題下面的隊列數量,默認是4
  • autoCreateTopicEnable : 是否自動創建主題 Topic, 開發建議為 true,生產要為 false
  • defaultTopicQueueNums : 自動創建服務器不存在的 Topic,默認創建的隊列數
  • autoCreateSubscriptionGroup: 是否允許 Broker 自動創建訂閱組,建議線下開發開啟,線上關閉
  • brokerClusterName : 集群名稱
  • brokerId : 0表示Master主節點,大於0表示從節點
  • brokerIP1 : Broker 服務地址
  • brokerRole : Broker 角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE
  • deleteWhen : 每天執行刪除過期文件的時間,默認每天凌晨4點
  • flushDiskType :刷盤策略, 默認為 ASYNC_FLUSH (異步刷盤),另外是 SYNC_FLUSH (同步刷盤)
  • listenPort : Broker 監聽的端口號
  • mapedFileSizeCommitLog : 單個 conmmitlog 文件大小,默認是 1GB
  • mapedFileSizeConsumeQueue:ConsumeQueue 每個文件默認存30W條,可以根據項目調整
  • storePathRootDir : 存儲消息以及一些配置信息的根目錄 默認為用戶的 ${HOME}/store
  • storePathCommitLog:commitlog 存儲目錄默認為 ${storePathRootDir}/commitlog
  • storePathIndex: 消息索引存儲路徑
  • syncFlushTimeout : 同步刷盤超時時間
  • diskMaxUsedSpaceRatio : 檢測可用的磁盤空間大小,超過后會寫入報錯

二、Broker 消息投遞狀態有四種

  • FLUSH_DISK_TIMEOUT :沒有在規定時間內完成刷盤 (刷盤策略需要為 SYNC_FLUSH 才會出這個錯誤)
  • FLUSH_SLAVE_TIMEOUT :主從模式下,Broker 是 SYNC_MASTER,沒有在規定時間內完成主從同步
  • SLAVE_NOT_AVAILABLE : 主從模式下,Broker 是 SYNC_MASTER,沒有找到被配置成 Slave 的 Broker
  • SEND_OK :發送成功,沒有發生上面的三種問題

三、RocketMQ 消息生產和消費異常重試和閾值設定

生產者 Producer 重試異步和 SendOneWay下配置無效

  • 消息重投(保證數據的高可靠性),本身內部支持重試,默認次數是2,如果網絡情況比較差,或者跨集群則建改多幾次

消費端重試

  • 原因:消息處理異常、broker 端到 consumer 端各種問題,如網絡原因閃斷,消費處理失敗,ACK 返回失敗等等問題。
  • 注意:
    • 重試間隔時間配置 ,默認每條消息最多重試 16 次
    • messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h   超過重試次數人工補償
    • 消費端去重
    • 一條消息無論重試多少次,這些重試消息的 Message ID,key 不會改變。
    • 消費重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續消費新的消息

 

PayController 類代碼如下:

package net.xdclass.xdclassmq.controller;

import net.xdclass.xdclassmq.jms.JmsConfig;
import net.xdclass.xdclassmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;

@RestController
public class PayController {

    @Autowired
    private PayProducer payProducer;


    @RequestMapping("/api/v1/pay_cb")
    public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

        Message message = new Message(JmsConfig.TOPIC,"taga" ,"6688" , ("hello xdclass rocketmq = "+text).getBytes() );

        SendResult sendResult = payProducer.getProducer().send(message);
        System.out.println(sendResult);

        return new HashMap<>();
    }

}
PayController 類

JmsConfig 類代碼如下:

package net.xdclass.xdclassmq.jms;

public class JmsConfig {
    public static final String NAME_SERVER = "192.168.159.129:9876;192.168.159.130:9876";

    public static final String TOPIC = "xdclass_pay_test_topic_666";

}
JmsConfig 類

PayProducer 類代碼如下:

package net.xdclass.xdclassmq.jms;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

@Component
public class PayProducer {

    private String producerGroup = "pay_producer_group";

    private DefaultMQProducer producer;

    public  PayProducer(){
        producer = new DefaultMQProducer(producerGroup);

        //生產者投遞消息重試次數
        producer.setRetryTimesWhenSendFailed(3); //指定NameServer地址,多個地址以 ; 隔開
        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);

        start();
    }

    public DefaultMQProducer getProducer(){
        return this.producer;
    }

    /**
     * 對象在使用之前必須要調用一次,只能初始化一次
     */
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }


    /**
     * 一般在應用上下文,使用上下文監聽器,進行關閉
     */
    public void shutdown(){
        this.producer.shutdown();
    }
}

PayConsumer 類代碼如下:

package net.xdclass.xdclassmq.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

@Component
public class PayConsumer {


    private DefaultMQPushConsumer consumer;

    private String consumerGroup = "pay_consumer_group";

    public  PayConsumer() throws MQClientException {

        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //默認是集群方式,可以更改為廣播,但是廣播方式不支持重試
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.subscribe(JmsConfig.TOPIC, "*");


        consumer.registerMessageListener( new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt msg = msgs.get(0);
                int times = msg.getReconsumeTimes();
                System.out.println("重試次數="+times);

                try {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));

                String topic = msg.getTopic();
                String body = new String(msg.getBody(), "utf-8");
                String tags = msg.getTags();
                String keys = msg.getKeys();

                // 模擬報錯
                if(keys.equalsIgnoreCase("6688")){
                    throw new Exception();
                }

                System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {

                System.out.println("消費異常");
                //如果重試2次不成功,則記錄,人工介入
                if(times >= 2){
                    System.out.println("重試次數大於2,記錄數據庫,發短信通知開發人員或者運營人員");
                    //TODO 記錄數據庫,發短信通知開發人員或者運營人員
                    //告訴broker,消息成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            }
        });

        consumer.start();
        System.out.println("consumer start ...");
    }

}

四、RocketMQ 異步發送消息

官方文檔:https://rocketmq.apache.org/docs/simple-example/

@RequestMapping("/api/v1/pay_cb")
public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

  Message message = new Message(JmsConfig.TOPIC,"taga" ,"66881" , ("hello xdclass rocketmq = "+text).getBytes() );

  payProducer.getProducer().send(message, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
      System.out.printf("發送結果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString());
    }

    @Override
    public void onException(Throwable e) {
      e.printStackTrace();
      //補償機制,根據業務情況進行使用,看是否進行重試
    }
  });

  return new HashMap<>();
}

注意:官方例子:如果異步發送消息,調用 producer.shutdown() 后會失敗。異步發送:不會重試,發送總次數等於1。

五、RocketMQ OneWay 發送消息及多種場景對比

  • SYNC:應用場景:重要通知郵件、報名短信通知、營銷短信系統等
  • ASYNC :應用場景:對RT時間敏感,可以支持更高的並發,回調成功觸發相對應的業務,比如注冊成功后通知積分系統發放優惠券
  • ONEWAY:一次發送,無需要等待響應。使用場景:主要是日志收集,適用於某些耗時非常短,但對可靠性要求並不高的場景,也就是 LogServer,只負責發送消息,不等待服務器回應且沒有回調函數觸發,即只發送請求不等待應答
@RequestMapping("/api/v1/pay_cb")
public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
    Message message = new Message(JmsConfig.TOPIC,"taga" ,"66881" , ("hello xdclass rocketmq = "+text).getBytes() );
    payProducer.getProducer().sendOneway(message);
    return new HashMap<>();
}

匯總對比

發送方式 發送 TPS 發送結果反饋 可靠性
同步發送 不丟失
異步發送 不丟失
單向發送 最快 可能丟失

 


 

 

 

六、RocketMQ 延遲消息

什么是延遲消息?

Producer 將消息發送到消息隊列 RocketMQ 服務端,但並不期望這條消息立馬投遞,而是推遲到在當前時間點之后的某一個時間投遞到 Consumer 進行消費,該消息即延遲消息,目前支持固定延遲精度的消息。

固定精度為:"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"

使用 message.setDelayTimeLevel(xxx); xxx是級別,1表示配置里面的第一個級別,2表示第二個級別。

@RequestMapping("/api/v1/pay_cb")
public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
    Message message = new Message(JmsConfig.TOPIC,"taga" ,"66881" , ("hello xdclass rocketmq = "+text).getBytes() );
    message.setDelayTimeLevel(2);// 表示5s后進行投遞
    payProducer.getProducer().sendOneway(message);
    return new HashMap<>();
}

使用場景

  • 通過消息觸發一些定時任務,比如在某一固定時間點向用戶發送提醒消息。
  • 消息生產和消費有時間窗口要求:比如在天貓電商交易中超時未支付關閉訂單的場景,在訂單創建時會發送一條 延時消息。這條消息將會在 30 分鍾以后投遞給消費者,消費者收到此消息后需要判斷對應的訂單是否已完成支付。 如支付未完成,則關閉訂單。如已完成支付則忽略。

RocketMQ 還有定時消息功能,目前開源版本還不支持,商業版本則有,兩者使用場景類似。

七、RocketMQ 生產消息使用 MessageQueueSelector 投遞到 Topic 下指定的 queue

應用場景:順序消息,分攤負載。

默認 Topic 下的 queue 數量是4,可以配置。

import net.xdclass.xdclassmq.jms.JmsConfig;
import net.xdclass.xdclassmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.List;

@RestController
public class PayController {

    @Autowired
    private PayProducer payProducer;


    @RequestMapping("/api/v1/pay_cb")
    public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

        Message message = new Message(JmsConfig.TOPIC,"taga" ,"66881" , ("hello xdclass rocketmq = "+text).getBytes() );

        //同步發送
//      SendResult sendResult =  payProducer.getProducer().send(message, new MessageQueueSelector() {
//           @Override
//            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//               int queueNum = Integer.parseInt(arg.toString());
//               return mqs.get(queueNum);
//            }
//
//      }, 3);
//      System.out.printf("發送結果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString());

        //異步發送到指定queue, SendCallback不能用lambda表達式,有兩個函數需要被實現
        payProducer.getProducer().send(message, (mqs, msg, arg) -> {
            int queueNum = Integer.parseInt(arg.toString());
            return mqs.get(queueNum);

        }, 0, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("發送結果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString());
            }

            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });

        return new HashMap<>();
    }
}

支持同步,異步發送指定的 MessageQueue。

注意:選擇的 queue 數量必須小於配置的,否則會出錯。

八、順序消息在電商和證券系統中的應用場景

  • 什么是順序消息:消息的生產和消費順序一致
    • 全局順序:topic下面全部消息都要有序(少用)
      • 性能要求不高,所有的消息嚴格按照 FIFO 原則進行消息發布和消費的場景,並行度成為消息系統的瓶頸, 吞吐量不夠.
      • 在證券處理中,以人民幣兌換美元為例子,在價格相同的情況下,先出價者優先處理,則可以通過全局順序的方式按照 FIFO 的方式進行發布和消費
    • 局部順序:只要保證一組消息被順序消費即可(RocketMQ 使用)
      • 性能要求高
      • 電商的訂單創建,同一個訂單相關的創建訂單消息、訂單支付消息、訂單退款消息、訂單物流消息、訂單交易成功消息 都會按照先后順序來發布和消費(阿里巴巴集團內部電商系統均使用局部順序消息,既保證業務的順序,同時又能保證業務的高性能)
  • 順序發布:對於指定的一個 Topic,客戶端將按照一定的先后順序發送消息
  • 順序消費:對於指定的一個 Topic,按照一定的先后順序接收消息,即先發送的消息一定會先被客戶端接收到。
  • 注意:
    • 順序消息暫不支持廣播模式
    • 順序消息不支持異步發送方式,否則將無法嚴格保證順序

九、順序消息的使用

生產端保證發送消息有序,且發送到同一個 Topic 的同個 queue 里面,RocketMQ 的確是能保證 FIFO 的。

例子:訂單的順序流程是:創建、付款、物流、完成,訂單號相同的消息會被先后發送到同一個隊列中,根據 MessageQueueSelector 里面自定義策略,根據同個業務 id 放置到同個 queue 里面,如訂單號取模運算再放到 selector 中,同一個模的值都會投遞到同一條 queue。

public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    //如果是訂單號是字符串,則進行hash,得到一個hash值
    Long id = (Long) arg;
    long index = id % mqs.size();
    return mqs.get((int)index);
}

消費端要在保證消費同個 Topic 里的同個隊列,不應該用 MessageListenerConcurrently,應該使用 MessageListenerOrderly,自帶單線程消費消息,不能再 Consumer 端再使用多線程去消費,消費端分配到的 queue 數量是固定的,集群消會鎖住當前正在消費的隊列集合的消息,所以會保證順序消費。

下面代碼來演示生產者投遞消息和消費者消費消息,項目目錄如下:

 

ProductOrder 類

package net.xdclass.xdclassmq.domain;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class ProductOrder implements Serializable {

    //訂單id
    private long orderId;

    //操作類型
    private String type;


    public long getOrderId() {
        return orderId;
    }

    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public ProductOrder() {
    }

    public ProductOrder(long orderId, String type) {
        this.orderId = orderId;
        this.type = type;

    }


    public static List<ProductOrder> getOrderList() {

        List<ProductOrder> list = new ArrayList<>();
        list.add(new ProductOrder(111L, "創建訂單"));
        list.add(new ProductOrder(222L, "創建訂單"));
        list.add(new ProductOrder(111L, "支付訂單"));
        list.add(new ProductOrder(222L, "支付訂單"));
        list.add(new ProductOrder(111L, "完成訂單"));
        list.add(new ProductOrder(333L, "創建訂單"));
        list.add(new ProductOrder(222L, "完成訂單"));
        list.add(new ProductOrder(333L, "支付訂單"));
        list.add(new ProductOrder(333L, "完成訂單"));

        return list;

    }

    @Override
    public String toString() {
        return "ProductOrder{" +
                "orderId=" + orderId +
                ", type='" + type + '\'' +
                '}';
    }
}
ProductOrder 類

PayController 類

package net.xdclass.xdclassmq.controller;

import net.xdclass.xdclassmq.domain.ProductOrder;
import net.xdclass.xdclassmq.jms.JmsConfig;
import net.xdclass.xdclassmq.jms.PayProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.List;

@RestController
public class PayController {

    @Autowired
    private PayProducer payProducer;

    @RequestMapping("/api/v2/pay_cb")
    public Object callback() throws Exception {

        List<ProductOrder> list = ProductOrder.getOrderList();

        for (int i = 0; i < list.size(); i++) {
            ProductOrder order = list.get(i);
            Message message = new Message(JmsConfig.ORDERLY_TOPIC, "",
                    order.getOrderId() + "", order.toString().getBytes());

            SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, order.getOrderId());


            System.out.printf("發送結果=%s, sendResult=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), sendResult.toString(), order.getOrderId(), order.getType());

        }
        return new HashMap<>();
    }
}

JmsConfig 類

package net.xdclass.xdclassmq.jms;

public class JmsConfig {
    public static final String NAME_SERVER = "192.168.159.129:9876;192.168.159.130:9876";

    public static final String TOPIC = "xdclass_pay_test_topic_888";

    public static final String ORDERLY_TOPIC = "xdclass_pay_order_topic_orderly";

}

PayConsumer 類

package net.xdclass.xdclassmq.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class PayConsumer {


    private DefaultMQPushConsumer consumer;

    private String consumerGroup = "pay_consumer_group";

    public  PayConsumer() throws MQClientException {

        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //默認是集群方式,可以更改為廣播,但是廣播方式不支持重試
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.subscribe(JmsConfig.TOPIC, "*");

        consumer.registerMessageListener( new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt msg = msgs.get(0);
                int times = msg.getReconsumeTimes();
                System.out.println("重試次數="+times);

                try {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));

                String topic = msg.getTopic();
                String body = new String(msg.getBody(), "utf-8");
                String tags = msg.getTags();
                String keys = msg.getKeys();

                System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            } catch (Exception e) {
                System.out.println("消費異常");
                //如果重試2次不成功,則記錄,人工介入
                if(times >= 2){
                    System.out.println("重試次數大於2,記錄數據庫,發短信通知開發人員或者運營人員");
                    //TODO 記錄數據庫,發短信通知開發人員或者運營人員
                    //告訴broker,消息成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            }
        });

        consumer.start();
        System.out.println("consumer start ...");
    }

}
PayConsumer 類

PayOrderlyConsumer 類

package net.xdclass.xdclassmq.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class PayOrderlyConsumer {


    private DefaultMQPushConsumer consumer;

    private String consumerGroup = "pay_orderly_consumer_group";

    public PayOrderlyConsumer() throws MQClientException {

        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //默認是集群方式,可以更改為廣播,但是廣播方式不支持重試
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.subscribe(JmsConfig.ORDERLY_TOPIC, "*");


        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                MessageExt msg = msgs.get(0);
                try {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));

                    //做業務邏輯操作 TODO

                    return ConsumeOrderlyStatus.SUCCESS;

                } catch (Exception e) {

                    e.printStackTrace();
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }

        });

        consumer.start();
        System.out.println("consumer start ...");
    }

}

PayProducer 類

package net.xdclass.xdclassmq.jms;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

@Component
public class PayProducer {

    private String producerGroup = "pay_producer_group";

    private DefaultMQProducer producer;


    public  PayProducer(){
        producer = new DefaultMQProducer(producerGroup);

        //生產者投遞消息重試次數
        producer.setRetryTimesWhenSendFailed(3);

        //指定NameServer地址,多個地址以 ; 隔開
        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);

        start();
    }

    public DefaultMQProducer getProducer(){
        return this.producer;
    }

    /**
     * 對象在使用之前必須要調用一次,只能初始化一次
     */
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }


    /**
     * 一般在應用上下文,使用上下文監聽器,進行關閉
     */
    public void shutdown(){
        this.producer.shutdown();
    }


}
PayProducer 類

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM