RocketMQ的簡單使用


RocketMQ的基礎知識

1-1 RocketMQ的特點

RocketMQ特點:

1、支持事務消息
2、支持延遲消息
3、天然支持集群、負載均衡
4、支持指定次數和時間間隔的失敗消息重發

1-2 RocketMQ的組成

RocketMQ的組成

broker: 經紀人,代理商 ;
1) Producer Cluster: 消息生產者群,負責發送消息,一般由業務系統負責產生消息(從NameServer獲取broker信息)
2) NameServer Cluster: 集群架構中的組織協調員,相當於注冊中心,收集broker的工作情況,不負責消息的處理(從NameServer獲取broker信息)                                                   
3) Broker Cluster(消息服務器): RocketMQ的核心,負責消息的接受,存儲,發送等。
4) Consumer Cluster:  負責消費消息,一般是后台系統負責異步消費。

RocketMQ的配置文件(runserver.sh)

#===========================================================================================
# JVM Configuration,開發環境可以將內存參數設置小一點
堆參數:
  -Xmx  最大堆
  -Xms  最小堆
  -Xmn  新生代大小
#================================默認堆的配置是4G=========================================================

配置命令

nohup sh mqnamesrv &                        # 啟動nameserver
nohup sh mqbroker -n localhost:9876 &       # 啟動broker server並測試

測試消息發送

export NAMESRV_ADDR=127.0.0.1:9876
bash tools.sh org.apache.rocketmq.example.quickstart.Producer

1-3 RocketMQ消息的發送模式與消息的結構(重要)

1-3-1 三種消息發送方式

方式1:同步消息(sync message )

producer向 broker 發送消息,執行 API 時同步等待, 直到broker 服務器返回發送結果 

方式2:異步消息(async message)

producer向 broker 發送消息時指定消息發送成功及發送異常的回調方法,調用 API 后立即返回,producer發送消
息線程不阻塞 ,消息發送成功或失敗的回調任務在一個新的線程中執行 。

方式3:單向消息(oneway message)

producer向 broker 發送消息,執行 API 時直接返回,不等待broker 服務器的結果

1-3-2 消息結構

消息結構
基本屬性 topic (一級分類) 消息體(4M) 消息 Flag (通常業務代碼使用)
擴展屬性 tag (一般為空,用於消息過濾) keys: Message(運維檢索) waitStoreMsgOK (發送是否等待消息存儲)

基本屬性三個組成部分

1)主題:消息的一級分類,具有相同topic的消息將發送至該topic下的消息隊列中 
2)消息體:即消息的內容 ,可以的字符串、對象等類型(可系列化)。消息的最大長度 是4M
3) 消息flag:消息的一個標記,RocketMQ不處理,留給業務系統使用

擴展屬性的三個組成部分

1)tag :相當於消息的二級分類,用於消費消息時進行過濾,可為空(區別於基本屬性flag,擴展屬性的tag可以用於過濾消息)
2)keys: Message 索引鍵,在運維中可以根據這些 key 快速檢索到消息,可為空 。
3)waitStoreMsgOK :消息發送時是否等消息存儲完成后再返回 。

1-3-3 RocketMQ簡單實例

生產者代碼

生產者

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ProducerSimple {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 消息發送的模式1: 同步消息
     * 應用場景:
     * 向topic隊列發送同步消息
     * @param topic
     * @param msg
     */
    public void sendSyncMsg(String topic, String msg){
        rocketMQTemplate.syncSend(topic,msg,100000);
    }

    /**
     * 消息的發送模式2:異步消息

     */
    public void sendASyncMsg(String topic,String msg){
        /*異步消息需要設置回調對象,消息發送成功/失敗后,會由另外一個線程調用對象中的方法*/
        rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
            }
        },100000);
    }
}

調用生產者發送消息

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ProducerSimpleTest {
    @Autowired
    private ProducerSimple producerSimple;
    //測試發送同步消息
    @Test
    public void testSendSyncMsg(){
        this.producerSimple.sendSyncMsg("testTopic", "第3條同步消息");
        System.out.println("end...");
    }
    // 測試發送異步消息
    @Test
    public void testSendASyncMsg(){
        this.producerSimple.sendASyncMsg("testTopic","第一條異步消息");
        try{
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

生產者配置

server:
  port: 8182 #服務端口
  servlet:
    context‐path: /rocketmq‐consumer

spring:
  application:
    name: rocketmq‐consumer #指定服務名
rocketmq:
  consumer:
    group: demo_consumer_group           # 必須配置才能注入RocketMQTemplate模板
  name-server: 49.52.10.41:9876
消費者代碼

消費者

package com.shanjupay.test.rocketmq.message;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;


/**
 * 監聽消息隊列需要指定:
 * 1)topic:監聽的主題
 * 2)consumerGroup:消費組,相同消費組的消費者共同消費該主題的消息,它們組成一個集群(配置文件中設置),與之對應
 * 生成者需要配置producer group.
 */
@Component
@RocketMQMessageListener(topic = "testTopic",consumerGroup = "demo_consumer_group")
public class ConsumerSimple implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println(s);
    }
}

消費者配置

server:
  port: 8181 #服務端口
  servlet:
    context‐path: /rocketmq‐producer

spring:
  application:
    name: rocketmq‐producer
rocketmq:
  producer:
    group: demo-producer-group
  name-server: 49.52.10.41:9876

1-4 RocketMQ的消息傳遞流程與消費模式

消息發送流程

step1:從Nameserver獲取路由信息,選擇消息隊列: Broker會將信息上報給Nameserver,因此NameServer中存有每個broker的topic以及隊列,producer發送前根據topic從NameServer查詢所有消息隊列。如果該topic沒有隊列則會新建,通常一個topic會查詢到多個隊列,因此會按照一定的算法選擇一個隊列發送。

根據topic查詢的結果如下所示:
[
    {"brokerName":"Broker‐1","queueId":0},
    {"brokerName":"Broker‐1","queueId":1},
    {"brokerName":"Broker‐2","queueId":0},
    {"brokerName":"Broker‐2","queueId":1}
]

step2:檢驗並發送消息

  • 發送消息前進行校驗,比如消息的內容長度不能為0、消息最大長度、消息必要的屬性是否具備等

  • 若topic下還沒有隊列則自動創建,默認一個topic下自動創建4個寫隊列,4個讀隊列

多個隊列的動機:高可用(一個隊列掛了,還有其他),高性能(並發度高)

問題:為什么設置producer group?

方便在事務消息中broker(代理)需要回查producer(回調),同一個生產組的producer組成一個集群,提高並發能力

step3:consumer處於監聽隊列狀態,消費消息

辨析三個概念: topic, consumer group,consumer

1)一個消費組可以包括多個消費者,一個消費組可以訂閱多個主題。
2)一個隊列同時只允許一個消費者消費,一個消費者可以消費多個隊列中的消息。
問題:消息隊列的消費模式(廣播模式的推拉模式)?
1)集群模式(點對點模式):一個消費組內的消費者組成一個集群,主題下的一條消息只能被一個消費者消費。
2)廣播模式(發布訂閱模式):主題下的一條消息能被消費組下的所有消費者消費,消費者和broker之間通過推模式和拉模式接收消息

廣播模式下的消息消費方式?

推模式:broker主動將消息推送給消費者

拉模式:消費者從broker中查詢消息

1-5 延遲消息的應用與實現

典型應用場景:訂單的關閉

  • 延遲消息也叫做定時消息,比如在電商項目的交易系統中,當用戶下單之后超過一段時間之后仍然沒有支付,此時就需要將該訂單關l閉。

功能實現:可以在用戶創建訂單時就發送一條包含訂單內容的延遲消息,該消息在一段時間之后投遞給消息消費者,當消息消費者接收到該消息后,
判斷該訂單的支付狀態,如果處於未支付狀態,則將該訂單關閉,商品回庫(刪除訂單)

RocketMQ的延遲等級:RocketMQ的延遲消息實現非常簡單,只需要發送消息前設置延遲的時間,延遲時間存在十八個等級
(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),調用setDelayTimeLevel()設置與時間相對應的延遲級別即可

import com.shanjupay.test.rocketmq.model.OrderExt;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;         // spring的message對象
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class ProducerSimple {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 消息發送的模式1: 同步消息
     * 應用場景:
     * 向topic隊列發送同步消息
     * @param topic
     * @param msg
     */
    public void sendSyncMsg(String topic, String msg){
        rocketMQTemplate.syncSend(topic,msg,100000);
    }

    /**
     * 消息的發送模式2:異步消息

     */
    public void sendASyncMsg(String topic,String msg){
        /*異步消息需要設置回調對象,消息發送成功/失敗后,會由另外一個線程調用對象中的方法*/
        rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
            }
        },100000);
    }


    /**
     * 將對象轉換為json字符串作為消息同步發送
     */
    public void sendMsgByJson(String topic, OrderExt orderExt){
        rocketMQTemplate.convertAndSend(topic,orderExt);
    }

    /**
     * 發送同步延遲消息(需要將對象轉換為spring的message對象)
     * @param topic     broker中隊列topic
     * @param orderExt  傳遞的消息對象內容
     */
    public void sendMsgByJsonDelay(String topic, OrderExt orderExt) {
        Message<OrderExt> message = MessageBuilder.withPayload(orderExt).build();      //發送同步消息,消息內容將orderExt轉為json
        this.rocketMQTemplate.syncSend(topic,message,1000,3); //指定發送超時時間(毫秒)和延遲等級
        System.out.printf("send msg : %s",orderExt);
    }
    
}
延遲隊列的實現流程
1)如果消息的延遲級別大於0,則表示該消息為延遲消息,修改該消息的主題為SCHEDULE_TOPIC_XXXX,隊列Id為延遲級別減1。
2)消息進入SCHEDULE_TOPIC_XXXX的隊列中。
3)定時任務根據上次拉取的偏移量不斷從隊列中取出所有消息。
4)根據消息的物理偏移量和大小再次獲取消息。
5)根據消息屬性重新創建消息,清除延遲級別,恢復原主題和隊列Id。
6)重新發送消息到原主題的隊列中,供消費者進行消費。

基本思想:通過定時任務+隊列來實現消息延時發送到broker

1-6 消費重試

消費重試定義:producer線程成功將消息發送到Broker,被consumer消費時,發生意外情況,沒有被正常消費,此時需要進行消費重試

何時需要消費重試?
1)消息沒有被消費者接收,比如消費者與broker存在網絡異常。此種情況消息會一直被消費重試。
2)消息接受成功,但執行時產生異常,無法向broker返回結果,這個時候也會消費重試(實際場景更為常見的問題)。
broker是如何知道消息消費的成功與否的?
broker會從消費者獲取信息消費的結果,如果沒有返回消費成功的狀態,那么消費者就會進行重試。
消費者拋出異常,該如何處理?
當消息在消費時出現異常,此時消息被有限次的重試消費。
默認策略:消息會按照延遲消息的延遲時間等級(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)從第3級開始重試,每試一次如果還不成功則延遲等級加1,因此最多重試16次,如果依舊無法消費成功,那么該消息會進入到死信隊列中。
實際開發中如何處理消費失敗的情況?

默認策略:進行有限次的消費重試,每次重試仍然消費失敗的話,延遲下一次重試的時間。

實際開發策略基本思想:實際生產中不會讓消息重試這么多次,通常在重試一定的次數后將消息寫入數據庫,由另外單獨的程序或人工去處

這種處理失敗的情況,通常屬於線上的異常情況,當重試次數達到一定的閾值,則首先需要保存消息,便於定位問題,維護系統
/*處理的邏輯如下:*/
public class ConsumerSimple implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
        	 //取出當前重試次數
        	int reconsumeTimes = messageExt.getReconsumeTimes();
        	    //當大於一定的次數后將消息寫入數據庫,由單獨的程序或人工去處理
            if(reconsumeTimes >=2){
                //將消息寫入數據庫,之后正常返回
                return;
            }
    		throw new RuntimeException(String.format("第%s次處理失敗..",reconsumeTimes));
    }
}

參考資料

RocketMQ的延遲消息
消息隊列的冪等性
消息重復,消息丟失,消息積壓的解決策略
RocketMQ的基礎課程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM