2-rocketmq-消息發送和接收


quick start

添加依賴

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.7.1</version>
</dependency>

生產者

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        /**
         * 生產者組,簡單來說就是多個發送同一類消息的生產者稱之為一個生產者組rocketmq支持事務消息,在發送事務消息時,如果事務消息異常(producer掛了),broker端會來回查事務的狀態,這個時候會根據group名稱來查找對應的producer來執行相應的回查邏輯。相當於實現了producer的高可用
         */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // namesrv地址 多個地址用 ; 隔開  從namesrv上拉取broker信息
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 1000; i++) {
            try {
              	/**
              	 * 創建消息實例,指定topic,tag,消息內容。tag
              	 */
                Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                // 發送消息並獲取發送結果   同步發送
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

SendResult中,有一個sendStatus狀態,表示消息的發送狀態。一共有四種狀態

  1. FLUSH_DISK_TIMEOUT : 表示沒有在規定時間內完成刷盤(需要Broker 的刷盤策Ill創立設置成
    SYNC_FLUSH 才會報這個錯誤) 。
  2. FLUSH_SLAVE_TIMEOUT :表示在主備方式下,並且Broker 被設置成SYNC_MASTER 方式,沒有
    在設定時間內完成主從同步。
  3. SLAVE_NOT_AVAILABLE : 這個狀態產生的場景和FLUSH_SLAVE_TIMEOUT 類似, 表示在主備方
    式下,並且Broker 被設置成SYNC_MASTER ,但是沒有找到被配置成Slave 的Broker 。
  4. SEND OK :表示發送成功,發送成功的具體含義,比如消息是否已經被存儲到磁盤?消息是否被
    同步到了Slave 上?消息在Slave 上是否被寫入磁盤?需要結合所配置的刷盤策略、主從策略來
    定。這個狀態還可以簡單理解為,沒有發生上面列出的三個問題狀態就是SEND OK

消費者

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        //groupName 將多個consumer分組,提高並發處理能力。需要和MessageModel配合
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        // 多個地址 ;分開 獲取broker地址 並定時向broker發送心跳 可以從master/slave獲取訂閱
        consumer.setNamesrvAddr("localhost:9876");
        // 兩種消息模式  BROADCASTING   CLUSTERING
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //設置consumer第一次啟動從隊列頭部還是尾部開始消費
      	//如果非第一次啟動,那么按上一次消費的位置繼續消費(取決於本地的offeset數據)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // topic 可通過tag過濾消息  * 或 null 代表全部
        consumer.subscribe("TopicTest", "*");
        /**注冊消息處理回調
         * MessageListenerConcurrently 普通監聽
         * MessageListenerOrderly 順序監聽
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                // todo 消息處理邏輯
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
              	// 返回消費狀態
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動consumer
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

消費狀態

ConsumeConcurrentlyStatus {
  	// 消費成功
    CONSUME_SUCCESS,
  // 使用失敗,稍后嘗試使用
    RECONSUME_LATER;

消息發送及消費的基本原理

集群部署,一個master可以有多個slave,一個slave只能有一個master.consumer可以從master獲者slave中訂閱消息

2m-2s示例:

image-20200709175842041

rocketMQ 沒有實現master選舉(通過配置文件來指定主從)

當master掛了后 消費者依然能正常消費消息(slave提供讀服務)

通過groupName實現分區,提高消費者的處理能力

消費者

兩種消費者類型

  • DefaultMQPushConsumer 由系統控制讀取操作

DefaultMQPushConsumer

自動保存offset,自動做負載均衡

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        //groupName 將多個consumer分組,提高並發處理能力。需要和MessageModel配合
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        // 多個地址 ;分開
        consumer.setNamesrvAddr("localhost:9876");
        // 兩種消息模式  BROADCASTING   CLUSTERING
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //第一次啟動從 offset頭開始
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // topic 可通過tag過濾消息  * 或 null 代表全部
        consumer.subscribe("TopicTest", "*");
        //注冊消息處理回調
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                // todo 消息處理邏輯
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動consumer
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

兩種消息模式 BROADCASTING CLUSTERING:

1、在 Clustering 模式下,同一個 ConsumerGroup ( GroupName 相同 ) 里的每個 Consumer 只消費所訂閱消息 的一部分 內 容, 同一個 ConsumerGroup里所有的 Consumer 消 費 的內 容合起來才是所訂閱 Topic 內 容 的 整體 ,從而達到負載均衡的目的 (也就是集群消費)

2、在 Broadcasting 模式下,同一個 ConsumerGroup 里的每個 Consumer 都能消費到所訂閱 Topic 的全部消息,也就是一個消息會被多次分發,被多個Consumer 消費 。(也就是廣播模式

image-20200709181438549

通過長輪詢的方式獲取消息

Broker端HOLD住客戶端過來的請求一小段時間,在這個時間內有新消息到達就利用現有的連接立刻返回消息給Consumer。主動權在Consumer

好處是客戶端能充分利用資源,不至於處理不過來

流量控制

DefaultMQPullConsumer

需要自己維護offset,需要通過遍歷MessageQueue獲取消息

 public class PullConsumer {
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
        // 獲取分片
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        // 獲取到消息
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        // 沒有新消息
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }

}

Consumer的啟動、關閉

DefaultMQPushConsumer啟動時不會檢查nameServer地址的正確或者可用性

// 從指定topic中拉取所有消息隊列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");

可以通過上面的方法主動拉取消息隊列來判斷nameServer的可用性

關閉時調用shutdown()即可

DefaultMQPullConsumer關閉或者異常退出時需要將offset保存起來

才能保證下次啟動時拉取消息的正確性

consumerGroup:位於同一個consumerGroup中的consumer實例和producerGroup中的各個produer實例承擔的角色類似;同一個group中可以配置多個consumer,可以提高消費端的並發消費能力以及容災
和kafka一樣,多個consumer會對消息做負載均衡,意味着同一個topic下的不messageQueue會分發給同一個group中的不同consumer

消費端的負載均衡

和kafka一樣,消費端也會針對Message Queue做負載均衡,使得每個消費者能夠合理的消費多個分區的消息。

消費端會通過RebalanceService線程,10秒鍾做一次基於topic下的所有隊列負載
  • 消費端遍歷自己的所有topic,依次調rebalanceByTopic

  • 根據topic獲取此topic下的所有queue

  • 選擇一台broker獲取基於group的所有消費端(有心跳向所有broker注冊客戶端信息)

  • 選擇隊列分配策略實例AllocateMessageQueueStrategy執行分配算法

什么時候觸發負載均衡
  • 消費者啟動之后
  • 消費者數量發生變更
  • 每10秒會觸發檢查一次rebalance
分配算法

RocketMQ提供了6中分區的分配算法

  • (AllocateMessageQueueAveragely)平均分配算法(默認)
  • (AllocateMessageQueueAveragelyByCircle)環狀分配消息隊列
  • (AllocateMessageQueueByConfig)按照配置來分配隊列: 根據用戶指定的配置來進行負載
  • (AllocateMessageQueueByMachineRoom)按照指定機房來配置隊列
  • (AllocateMachineRoomNearby)按照就近機房來配置隊列:
  • (AllocateMessageQueueConsistentHash)一致性hash,根據消費者的cid進行

生產者

DefaultMQProducer 默認生產者

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // producerGroupName
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // namesrv地址 多個地址用 ; 隔開
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                // 返回 
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

消息返回狀態:SendResult.sendStatus

1、FLUSH DISK TIMEOUT : 表示沒有在規定時間內完成刷盤(需要Broker 的刷盤策略設置成 SYNC FLUSH 才會報這個錯誤) 。
2、 FLUSH SLAVE TIMEOUT :表示在主備方式下,並且 Broker 被設置成 SYNC MASTER 方式,沒有在設定時間內完成主從同步 。
3、SLAVE NOT AVAILABLE : 這個狀態產生的場景和 FLUSH SLAVETIMEOUT 類似, 表示在主備 方式下,並且 Broker 被設置成 SYNCMASTER ,但是沒有找到被配置成 S lave 的 Broker 。
4、SEND OK :表示發送成功,發送成功的具體含義,比如消息是否已經被存儲到融盤?消息是否被同步到了 S lave 上?消息在 S lave 上是否被寫人磁盤?需要結合所配置的刷盤策略、主從策略來定 。 這個狀態還可以簡單理解為,沒有發生上面列出的三個問題狀態就是 SEND OK

延遲消息

通過Message.setDelayTimeLevel ( int level ) 方法設置延遲時間,只支持預設值(1s/5s/1Os/30s/Im/2m/3m/4m/5m/6m/7m/8m/9m/1 Om/20m/30m/1 h/2h )。 比如setDelayTimeLevel(3)表示延遲 10s 。

自定義消息發送規則

實現MessageQueueSelector接口
三種默認實現:
SelectMessageQueueByHash
SelectMessageQueueByMachineRoom
SelectMessageQueueByRandom

自定義消息發送可以將消息發送到指定的MessageQueue里

對事物的支持

new TransactionMQProducer("groupName");

設置生產者group,當一個producer掛掉了,消息會分發到其它producer保證消息一定會被回查確定

消息的可靠性原則

只有消費者返回CONSUME_SUCCESS消費成功的才會認為消費成功

返回ConsumeConcurrentlyStatus.RECONSUME_LATER消費失敗會被重試

消息衰減重試

為了保證消息肯定至少被消費一次,RocketMQ會把這批消息重新發回到broker,在延遲的某個時間點
(默認是10秒,業務可設置)后,再次投遞到這個ConsumerGroup。而如果一直這樣重復消費都持續
失敗到一定次數(默認16次),就會投遞到DLQ死信隊列。應用可以監控死信隊列來做人工干預
可以修改broker-a.conf文件
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

重試消息的處理機制

一般情況下我們在實際生產中是不需要重試16次,這樣既浪費時間又浪費性能,理論上當嘗試重復次數
達到我們想要的結果時如果還是消費失敗,那么我們需要將對應的消息進行記錄,並且結束重復嘗試

consumer.registerMessageListener((MessageListenerConcurrently) (list,
  consumeOrderlyContext) -> {
                for (MessageExt messageExt : list) {
                    if(messageExt.getReconsumeTimes()==3) {
                     //可以將對應的數據保存到數據庫,以便人工干預	
                       System.out.println(messageExt.getMsgId()+","+messageExt.getBody());
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                } r
                eturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });

死信隊列

RocketMQ會為每個消費組都設置一個Topic命名為“%DLQ%+consumerGroup"的死信隊列


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM