一、引言
本文只站在一個java程序員角度上,去了解Rocketmq和具體使用,不講搭建,這是運維人的事情。
二、介紹
(1)RocketMQ 不遵循任何規范,但是參考了各種規范不同類產品的設計思想。
(2)RocketMQ 是一款分布式、隊列模型的消息中間件,具有以下特點:
- 具有高性能、高可靠、高實時、分布式特點;
- RocketMQ 的Producer、Consumer、隊列都可以分布式;
- 能夠保證嚴格的消息順序;
- 提供豐富的消息拉取模式;
- 高效的訂閱者水平擴展能力;
- 實時的消息訂閱機制;
- 億級消息堆積能力;
- 較少的依賴
(3)選用理由:
- 強調集群無單點,可擴展,任意一點高可用,水平可擴展。
- 海量消息堆積能力,消息堆積后,寫入低延遲。
- 支持上萬個隊列
- 消息失敗重試機制
- 消息可查詢
- 成熟度(經過雙十一考驗)
三、Rocketmq關鍵概念
1、主題與標簽
-
主題Tpoic:第一級消息類型,用來標識一類消息 , (書的標題)
-
標簽Tags:第二級消息類型,區分一個 Topic 下的多種消息 , 可以基於Tag做消息過濾 , (書的目錄)
-------舉例-------
主題: 訂單交易
簽: 訂單交易-創建 、訂單交易-付款、 訂單交易-完成
2、發送與訂閱群組
-
消息生產者Producer : 消息生產者 , 負責生產消息,一般由業務系統負責產生消息。
-
消息消費者Consumer :消息消費者,負責消費消息,一般是后台系統負責異步消費。
分類:
(1) Pull Consumer:消息消費者應用主動調用 Consumer 的拉消息方法從 Broker拉消息,主動權由消息消費者應用控制。
(2) Push Consumer:消息消費者應用通常向 Consumer 對象注冊一個 Listener 接口,一旦收到消息,Consumer 對象立刻回調 Listener 接口方法。
-
生產組Producer Group:一類 Producer 的集合的名稱, 消息的發送。
可以通過運維工具查詢返個發送消息應用下有幾個 Producer 實例
發送分布式事務消息時,如果 Producer 中途意外宕機,Broker 會主動回調 Producer Group 內的任意一台機器來確認事務狀態。
-
消費組Consumer Group:一類 Consumer 的集合的名稱, 消息的訂閱處理。
一個 Consumer Group 下包含多個 Consumer 實例,可以是多台機器,也可以是多個進程,或者是一個進程的多個 Consumer 對象。
3、Broker與NameServer
NameServer:在系統中是做命名服務,更新和發現 broker 消息服務。
Broker-Master:broker 消息主機服務器。
Broker-Slave:broker 消息從機服務器。
關聯關系:
- Name Server 是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。
- Master 與 Slave 的對應關系通過指定相同的 BrokerName,不同的 BrokerId 來定義,BrokerId 為 0 表示 Master,非 0 表示 Slave。
- 每個 Broker 與 Name Server 集群中的所有節點建立長連接,定時注冊 Topic 信息到所有 Name Server
- Producer 與 Name Server 集群中的其中一個節點(隨機選擇)建立長連接,定期從 Name Server 取 Topic 路由信息,並想提供 Topic 服務的 Master 建立長連接,且定時向 Master發収送心跳。Producer 完全無狀態。
- Consumer 與 Name Server 集群中的其中一個節點(隨機選擇)建立長連接,定期從 Name Server 取 Topic 路由信息,並向提供 Topic 服務的 Master、Slave 建立長連接,且定時向 Master、Slave 發送心跳。Consumer 既可以從 Master 訂閱消息,也可以從 Slave 訂閱消息,訂閱規則由 Broker 配置決定。
4、廣播消費與集群消費
- 廣播消費: 一條消息被多個 Consumer 消費。
- 集群消費 : 一個 Consumer Group 中的 Consumer 平均分攤消費消息。例如某個 Topic 有 9 條消息,其中一個 Consumer Group 有 3 個實例(可能是 3 個進程,或者 3 台機器),那么每個實例只消費其中的 3 條消息。
5、消息隊列
-
消息隊列Message Queue : 在 RocketMQ 中,所有消息隊列都是持久化,長度無限的數據結構。
所謂長度無限是指隊列中的每個存儲單元都是定長,訪問其中的存儲單元使用 Offset 來訪問,offset 為 java long 類型,64 位,理論上在 100 年內不會溢出,所以認為是長度無限,另外隊列中只保存最近幾天的數據,之前的數據會按照過期時間來刪除。
也可以認為 Message Queue 是一個長度無限的數組,offset 就是下標。
6、集群方式
這里的Slave都不可寫,但可讀,類似於 Mysql 主備方式。
-
單個 Master
(1)定義:只有一個Master
(2)缺:這種方式風險較大,一旦Broker 重啟或者宕機時,會導致整個服務不可用,不建議線上環境使用。
-
多 Master 模式
(1)定義:一個集群無 Slave,全是 Master
(2)優點:配置簡單,單個Master 宕機或重啟維護對應用無影響。性能最高。異步刷盤丟失少量消息,同步刷盤一條不丟 。
(3)缺點:單台機器宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂 閱,消息實時性會受到受到影響。
(4)啟動順序: 先啟動 NameServer,再在機器 A啟動第一個 Master 再在機器 B啟動第二個 Master。
-
多 Master 多 Slave 模式,異步復制
(1)定義: 每個 Master 配置一個 Slave,有多對Master-Slave,HA (高可用性集群)異步復制方式,主備有短暫消息延遲,毫秒級。
(2)優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,因為 Master 宕機后,消費者仍然可以從 Slave 消費,此過程對應用透明。不需要人工干預。性能同多 Master 模式幾乎一樣。
(3)缺點:Master 宕機,磁盤損壞情況,會丟失少量消息。
(4)啟動順序: 先啟動 NameServer ,再在機器 A啟動第一個 Master,再在機器 B啟動第二個 Master,再在機器 C啟動第一個 Slave,再在機器 D啟動第二個 Slave。
-
多 Master 多 Slave 模式,同步雙寫
(1)定義:每個 Master 配置一個 Slave,有多對Master-Slave,HA(高可用性集群) 采用同步雙寫方式,主備都寫成功,向應用返回成功。
(2)優點:數據與服務都無單點,Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高。
(3 缺點:性能比異步復制模式略低,大約低 10%左右,發送單個消息的 響應時間會略高。目前主宕機后,備機不能自動切換為主機,后續會支持自動切換功能 。
(4)啟動順序:先啟動 NameServer,再在機器 A啟動第一個 Master,再在機器 B啟動第二個 Master ,再在機器 C啟動第一個 Slave ,再在機器 D啟動第二個 Slave。
7、順序消息
消費消息的順序要同發送消息的順序一致,在 RocketMQ 中,主要指的是局部順序,即一類消息為滿足順序性,必須 Producer 單線程順序發送,且發送到同一個隊列,返樣 Consumer 就可以按照 Producer 發送 的順序去消費消息。分類:
(1) 普通順序消息: 正常情況下 RocketMQ 可以保證完全的順序消息,但是一旦發生通信異常,Broker 重啟,由於隊列總數發生發化,哈希取模后定位的隊列會發化,產生短暫的消息順序不一致。
如果業務能容忍在集群異常情況(如某個 Broker 宕機或者重啟)下,消息短暫的亂序,使用普通順序方式比較合適。
(2)嚴格順序消息: 無論正常異常情況都能保證順序,但是犧牲了分布式 Failover(故障轉移)特性,即 Broker 集群中只要有一台機器不可用,則整個集群都不可用,服務可用性大大降低。
如果服務器部署為同步雙寫模式,此缺陷可通過備機自動切換為主來避免,不過仍然會存在幾分鍾的服務不可用。
目前已知的應用只有數據庫 binlog 同步強依賴嚴格順序消息,其他應用絕大部分都可以容忍短暫亂序,推薦使用普通的順序消息。
8、數據存儲結構
commit Log: 操作日志,也可以理解為一個存數據的地方
消息隊列服務:存儲所有消息。
消息索引服務:存儲 offset和消息的匹配表。
事務狀態服務:存儲 每條消息的狀態
定時消息服務:管理 需要定時投遞的消息
四、所有消息中間件 涉及的業務問題(隨便看看)
這里內容很多,但是我希望大家盡量逼着自己讀完,有個印象。非常重點的內容這節后面會再次敘述。並且用代碼實際應用出來。
1、Publish/Subscribe 發布訂閱
發布訂閱是消息中間件的最基本功能,也是相對與傳統 RPC 通信而言。這里不再敘述。
2、Message Priority 消息優先級
在一個消息隊列中,每條消息都有不同的優先級,一般用整數來描述,優先級高的消息先投遞,如果消息完全在一個內存隊列中,那么在投遞前可以按照優先級排序,令優先級高的先投遞。
由於 RocketMQ 所有消息都是持久化的,所以如果按照優先級來排序,開銷會非常大,因此 RocketMQ 沒有特意支持消息優先級,但是可以通過變通的方式實現類似功能,即單獨配置一個優先級高的隊列,和一個普通優先級的隊列, 將不同優先級發送到不同隊列即可。
對於優先級問題,可以歸納為 2 類 :
(1) 只要達到優先級目的即可,不是嚴格意義上的優先級,通常將優先級划分為高、中、低,戒者再多幾個級別。每個優先級可以用不同的 topic 表示,發消息時,指定不同的 topic 來表示優先級,這種方式可以解決絕大部分的優先級問題,但是對業務的優先級精確性做了妥協。
(2) 嚴格的優先級,優先級用整數表示,例如 0 ~ 65535,這種優先級問題一般使用不同 topic 解決就非常不合適。
如果要讓 MQ 解決此問題,會對 MQ 的性能造成非常大的影響。
3、Message Order 消息有序
一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了 3 條消息,分別是訂單創建,訂單付款,訂單完成。消費時,要按照返個順序消費才能有意義。但是同時訂單之間是可以並行消費的。 RocketMQ 可以嚴格的保證消息有序 。
4、Message Filter 消息過濾
-
Broker 端消息過濾:在 Broker 中,按照 Consumer 的要求做過濾。
優點:減少了對於 Consumer 無用消息的網絡傳輸。
缺點:增加了 Broker 的負擔,實現相對復雜。(1) 淘寶 Notify 支持多種過濾方式,包含直接按照消息類型過濾,靈活的語法表達式過濾,幾乎可以滿足最苛刻的過濾需求。
(2) 淘寶 RocketMQ 支持按照簡單的 Message Tag 過濾,也支持按照 Message Header、body 進行過濾。
(3) CORBA Notification 規范中也支持靈活的詫法表達式過濾。
- Consumer 端消息過濾:消費者應用自己過濾。
優點:過濾方式可由消費者應用完全自定義實現,
缺點:很多無用的消息要傳輸到 Consumer 端
5、Message Persistence 消息持久化
消息中間件通常采用的幾種持久化方式:
(1) 持久化到數據庫,例如 Mysql。
(2) 持久化到 KV 存儲,例如 levelDB、伯克利 DB 等 KV 存儲系統。
(3) 文件記錄形式持久化,例如 RocketMQ, Kafka。
(4) 對內存數據做一個持久化鏡像,例如 beanstalkd,VisiNotify 。
(1)、(2)、(3)三種持久化方式都具有將內存隊列 Buffer(緩存) 進行擴展的能力,(4)只是一個內存的鏡像,作用是當 Broker 掛掉重啟后仍然能將之前內存的數據恢復出來。
JMS(Java消息服務應用程序接口) 和 CORBA Notification 規范沒有明確說明如何持久化,但是持久化部分的性能直接決定了整個消息中間件的性能。
RocketMQ 參考了 Kafka 的持久化方式,充分利用 Linux 文件系統內存 cache 來提高性能。
6、Message Reliablity 消息可靠性
影響消息可靠性的幾種情框:
(1) Broker 正常關閉
(2) Broker 異常 Crash
(3) OS Crash
(4) 機器掉電,但是能立即恢復供電情框。
(5) 機器無法開機(可能是 cpu、主板、內存等關鍵設備損壞)
(6) 磁盤設備損壞。
(1)、(2)、(3)、(4)四種情框都屬於硬件資源可立即恢復情況。RocketMQ 在返四種情況下能保證消息不丟,或者丟失少量數據(依賴刷盤方式是同步還是異步)。
(5)、(6)屬於單點故障,且無法恢復,一旦發生,在此單點上的消息全部丟失。RocketMQ 在這兩種情框下,通過異步復制,可保證 99%的消息不丟,但是仍然會有極少量的消息可能丟失。通過同步雙寫技術可以完全避免丟失, 同步雙寫勢必會影響性能,適合對消息可靠性要求極高的場合,例如與 Money 相關的應用。
7、Low Latency Messaging 低延遲消息傳遞
在消息不堆積情框下,消息到達 Broker 后,能立刻到達 Consumer。 RocketMQ 如果使用長輪詢 Pull 方式,可保證消息非常實時,消息實時性不低 Push。
8、At least Once 至少回一次
生產者消息投遞后,如果未能收到ack,則再次投遞。
消費者先pull消息到本地,消費完成后,才向服務器返回ack,如果沒有消費一定不會ack消息,RocketMQ可以很好的支持此特性。
9、Exactly Only Once 不重復
(1) 發送消息階段,不允許發送重復的消息。
(2) 消費消息階段,不允許消費重復的消息。
只有以上兩個條件都滿足情框下,才能人為消息是“Exactly Only Once”,而要實現以上兩點,在分布式系統環境下,不可避免要產生巨大的開銷。所以 RocketMQ 為了追求高性能,並不保證此特性,要求在業務上進行去重, 也就是說消費消息要做到冪等性。
RocketMQ 雖然不能嚴格保證不重復,但是正常情框下很少會出現重復發送、消費情況,只有網絡異常,Consumer 啟停等異常情況下會出現消息重復。 此問題的本質原因是網絡調用存在不確定性,即既不成功也不失敗的第三種狀態,所以才產生了消息重復性問題。
10、Broker 的 Buffer 滿了怎么辦?
Broker 的 Buffer 通常指的是 Broker 中一個隊列的內存 Buffer 大小,這類 Buffer 通常大小有限,如果 Buffer 滿 了以后怎么辦?
-
CORBA Notification 規范中處理方式:
(1)拒絕新來的消息,向 Producer 返回 RejectNewEvents 錯誤碼。
(2)按照特定策略丟棄已有消息 -
RocketMQ 沒有內存 Buffer 概念,RocketMQ 的隊列都是持久化磁盤,數據定期清除。 對於此問題的解決思路,RocketMQ 同其他 MQ 有非常顯著的區別,RocketMQ 的內存 Buffer 抽象成一個無限長度的隊列,不管有多少數據進來都能裝得下,這個無限是有前提的,Broker 會定期刪除過期的數據,例如 Broker 只保存 3 天的消,那么這個 Buffer 雖然長度無限,但是 3 天前的數據會被從隊尾刪除。
11、回溯消費
回溯消費:Consumer 已經消費成功的消息,由於業務上需求需要重新消費。
為了支持此功能,Broker 在向 Consumer 投遞成功消息后,消息仍然需要保留。並且重新消費一般是按照時間維度。
例如由於 Consumer 系統故障, 恢復后需要重新消費 1 小時前的數據,那么 Broker 要提供一種機制,可以按照時間維度來回退消費進度。
RocketMQ 支持按照時間回溯消費,時間維度精確到毫秒,可以向前回溯,也可以向后回溯。
12、消息堆積
(1) 消息中間件的主要功能是異步解耦,還有個重要功能是擋住前端的數據洪峰,保證后端系統的穩定性,這就要求消息中間件具有一定的消息堆積能力。
(2)消息堆積分以下兩種情況:
-
消息堆積在內存 Buffer,一旦超過內存 Buffer,可以根據一定的丟棄策略來丟棄消息,如 CORBA Notification 規范中描述。適合能容忍丟棄消息的業務,這種情
況消息的堆積能力主要在於內存 Buffer 大小,而且消息堆積后,性能下降不會太大,因為內存中數據多少對於對外提供的訪問能力影響有限。 -
消息堆積到持久化存儲系統中,例如 DB,KV 存儲,文件記錄形式。 當消息不能在內存 Cache 命中時,就不可避免的訪問磁盤,會產生大量讀 IO,讀 IO 的吞吐量直接決定了消息堆積后的訪問能力。
評估消息堆積能力主要有以下四點:
(1) 消息能堆積多少條,多少字節?即消息的堆積容量。
(2) 消息堆積后,發消息的吞吐量大小,是否會受堆積影響?
(3) 消息堆積后,正常消費的 Consumer 是否會受影響?
(4) 消息堆積后,訪問堆積在磁盤的消息時,吞吐量有多大?
13、分布式事務
(1)已知的幾個分布式事務規范,如 XA,JTA 等。
(2)分布式事務涉及到兩階段提交問題,在數據存儲方面的方面必然需要 KV 存儲的支持,因為第二階段的提交回滾需要修改消息狀態,一定涉及到根據 Key 去查找 Message 的動作。
(3)RocketMQ 在第二階段繞過了根據 Key 去查找 Message 的問題,采用第一階段發送 Prepared 消息時,拿到了消息的 Offset,第二階段通過 Offset 去訪問消息, 並修改狀態,Offset 就是數據的地址。
(4)RocketMQ 返種實現事務方式,沒有通過 KV 存儲做,而是通過 Offset 方式,存在一個顯著缺陷,即通過 Offset 更改數據,會令系統的臟頁過多,需要特別關注。
14、定時消息
(1)定時消息是指消息收到 Broker 后,不能立刻被 Consumer 消費,要到特定的時間點或者等待特定的時間后才能被消費。 允許消息生產者指定消息進行定時(延時)投遞,最長支持 40 天。
(2)如果要支持任意的時間精度,在 Broker 局面,必須要做消息排序,如果再涉及到持麗化,那舉消息排序要不可避免的產生巨大性能開銷。
(3)RocketMQ 支持定時消息,但是不支持任意時間精度,支持特定的 level,例如定時 5s,10s,1m 等。
15、消息重試
(1)Consumer 消費消息失敗后,要提供一種重試機制,令消息再消費一次。
(2)Consumer 消費消息失敗通常有以下幾種情況:
-
由於消息本身的原因,例如反序列化失敗,消息數據本身無法處理(例如話費充值,當前消息的手機號被注銷,無法充值)等。
返種錯誤通常需要跳過這條消息,再消費其他消息。因為這條失敗的消息即使立刻重試消費,99%也不成功, 所以最好提供一種定時重試機制,即過 10s 秒后再重試。
-
由於消費者應用服務不可用,例如 db 連接不可用,外系統網絡不可達等。
遇到返種錯誤,即使跳過當前失敗的消息,消費其他消息同樣也會報錯。這種情況建議應用 sleep 30s,再消費下一條消息,返樣可以減輕 Broker 重試消息的壓力。
四、具體代碼使用demo
(1)新建兩個springboot項目,分別叫做producer和consumer,這是一個非常簡單的hello初體驗項目,先運行consumer項目,再運行producer項目。
(2)修改producer的application.java
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(MyproducerApplication.class, args);
//1.創建一個生產者,需要指定Producer的分組,
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Group-Producer-1");
//2.設置命名服務的地址,默認是去讀取conf文件下的配置文件 rocketmq.namesrv.addr
defaultMQProducer.setNamesrvAddr("200.200.*.*:端口號");
try{
//3.啟動生產者
defaultMQProducer.start();
//循環發十條消息
for(int i=0;i<10;i++) {
String text = "this is my hello content "+i;
//4.最基本的生產模式 topic+文本信息
Message msg = new Message("topic_hello", "Tag-"+i, text.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 設置代表消息的業務關鍵屬性,請盡可能全局唯一
// 以方便您在無法正常收到消息情況下,可通過控制台查詢消息並補發。
// 注意:不設置也不會影響消息正常收發
msg.setKey("mykey");
//5.發送無序同步消息,發一個就立即獲取發送響應,然后繼續發
SendResult sendResult = defaultMQProducer.send(msg);
System.out.println("發送結果為:" + JSON.toJSONString(sendResult));
}
}catch (Exception e){
e.printStackTrace();
}finally {
//6.釋放生產者
defaultMQProducer.shutdown();
System.out.println("生產者釋放了");
}
}
}
SendResult sendResult值如下:
↓ 里面包含的內容包括 broker信息、消息隊列的信息、發送結果信息等
{
"messageQueue": {
"brokerName": "broker-16",
"queueId": 1,
"topic": "topic_orderCreate"
},
"msgId": "C8C8060A1C9018B4AAC27C008DB90000",
"offsetMsgId": "C8C8061000002A9F000000CBD7063EFA",
"queueOffset": 3,
"regionId": "DefaultRegion",
"sendStatus": "SEND_OK",
"traceOn": true
}
(3)修改consumerr的application.java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.List;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
//這里是push消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group-Consumer-1");
consumer.setNamesrvAddr("200.200.6.16:9876");
//CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息
//CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍
//CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try {
//第二個參數表示消費匹配的tag * 表示topic所有的tag
// Tag1 || Tag2 || Tag3 表示訂閱 Tag1 或 Tag2 或 Tag3 的消息
consumer.subscribe("topic_hello", "*");
//2. 注冊消費者監聽
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* msgs 表示消息體
* @param msgs
* @param context
* @return
*/
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
try {
System.out.println( new String(messageExt.getBody(), "UTF-8"));
} catch (Exception e) {
e.printStackTrace();
}
}
//返回消費狀態
//CONSUME_SUCCESS 消費成功
//RECONSUME_LATER 消費失敗,需要稍后重新消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//3.consumer 啟動
consumer.start();
System.out.println("消費端起來了哈.........");
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
收到的某一條消息舉例如下,類型為MessageExt:
↓ 里面包含的內容有消息信息、隊列信息等
messageExt=[
queueId = 0
storeSize = 333
queueOffset = 650
sysFlag = 0
bornTimestamp = 1556174674734
bornHost = {InetSocketAddress@4406} "/生產者host:54192"
storeTimestamp = 1556174720581
storeHost = {InetSocketAddress@4407} "/mqhost:10911"
msgId = "C8C8061000002A9F000000CBD769DB5C"
commitLogOffset = 875492399964
bodyCRC = 1420010275
reconsumeTimes = 2
preparedTransactionOffset = 0
topic = "topic_hello"
flag = 0
properties = {HashMap@4410} size = 11
body = {byte[26]@4412}
]
運行結果:
(1)producer項目結果如下:
- 注意看圖片,queueId總共是0~3,而且一直是按順序循環交替存儲,由此可得默認有四條消息隊列提供存儲消息,順序循環交替存儲,具體存在隊列的哪個位置是隨機的
- 會自動給每條消息一個msgId
- offsetMsgId就是這個消息隊列當前的游標位置
(2)consumer項目結果如下:
注意看圖片,順序是6879,所以說明不是按照順序的
五、發送的消息分類
1、普通消息
普通消息也叫做無序消息,簡單來說就是沒有順序的消息,producer 只管發送消息,consumer 只管接收消息,至於消息和消息之間的順序並沒有保證,可能先發送的消息先消費,也可能先發送的消息后消費。
分類:可靠同步消息,可靠異步消息,單向發送。
1.1可靠同步發送
同步發送是指消息發送方發出數據后,會在收到接收方發回響應之后才發下一個數據包的通訊方式。
應用場景:此種方式應用場景非常廣泛,例如重要通知郵件、報名短信通知、營銷短信系統等。
上面第四節的demo例子就是普通消息,同步發送消息。
1.2可靠異步發送
異步發送是指發送方發出數據后,不等接收方發回響應,接着發送下個數據包的通訊方式。 如果發送方通過回調接口接收到了服務器響應,就對響應結果進行處理。
consumer類不變,改producer類為下面:
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MyproducerApplication {
public static void main(String[] args) {
SpringApplication.run(MyproducerApplication.class, args);
//1.創建一個生產者,需要指定Producer的分組,
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Group-Producer-1");
//2.設置命名服務的地址,默認是去讀取conf文件下的配置文件 rocketmq.namesrv.addr
defaultMQProducer.setNamesrvAddr("200.200.6.16:9876");
try{
//3.啟動生產者
defaultMQProducer.start();
for(int i=0;i<100;i++) {
String text = "this is my hello content "+i;
//4.最基本的生產模式 topic+文本信息
Message msg = new Message("topic_hello", "Tag-"+i, text.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 異步發送消息, 發送結果通過 callback 返回給客戶端。
defaultMQProducer.send(msg,new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("發送結果為:" + JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable throwable) {
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理
throwable.printStackTrace();
}
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
//6.釋放生產者
defaultMQProducer.shutdown();
System.out.println("生產者釋放了");
}
}
}
1.3單向(Oneway)發送
發送特點為發送方只負責發送消息,不等待服務器回應且沒有回調函數觸發
consumer類不變,改producer類為下面:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MyproducerApplication {
public static void main(String[] args) {
SpringApplication.run(MyproducerApplication.class, args);
//1.創建一個生產者,需要指定Producer的分組,
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Group-Producer-1");
//2.設置命名服務的地址,默認是去讀取conf文件下的配置文件 rocketmq.namesrv.addr
defaultMQProducer.setNamesrvAddr("200.200.*.*:端口號");
try{
//3.啟動生產者
defaultMQProducer.start();
for(int i=0;i<100;i++) {
String text = "this is my hello content "+i;
//4.最基本的生產模式 topic+文本信息
Message msg = new Message("topic_hello", "Tag-"+i, text.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 由於在 oneway 方式發送消息時沒有請求應答處理,一旦出現消息發送失敗,則會因為沒有重試而導致數據丟失。若數據不可丟,建議選用可靠同步或可靠異步發送方式。
defaultMQProducer.sendOneway(msg);
}
}catch (Exception e){
e.printStackTrace();
}finally {
//6.釋放生產者
defaultMQProducer.shutdown();
System.out.println("生產者釋放了");
}
}
}
2、有序消息
有序消息就是按照一定的先后順序的消息類型。也就是說生成者按什么順序發送,消費者就按什么順序消費。分類方法一:全局有序消息、局部有序消息;
2.1全局有序消息
所有消息都存在一個隊列里,那肯定就先進先出了。但是效率太低。很少用到,我就不舉例了。
2.2局部有序消息
比如一個訂單的順序必須是訂單創建、訂單付款、訂單完成,但是可以多個訂單同時進行,所以就同個orderid的順序放同一個隊列。效率高很多。
consumer類不變,改producer類為下面:
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.List;
@SpringBootApplication
public class MyproducerApplication {
public static void main(String[] args) {
SpringApplication.run(MyproducerApplication.class, args);
//1.創建一個生產者,需要指定Producer的分組,
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Group-Producer-1");
//2.設置命名服務的地址,默認是去讀取conf文件下的配置文件 rocketmq.namesrv.addr
defaultMQProducer.setNamesrvAddr("200.200.6.16:9876");
try{
//3.啟動生產者
defaultMQProducer.start();
int orderId =0;
for(int i=0;i<99;i++) {
//這里的意思是每三條的orderId是一樣的
if(i%3==0){
orderId++;
}
String text = "the orderId is order"+orderId;
//每三條的后綴分別是create,pay,finish
if(i%3==0){
text+="-create";
}else if(i%3==1){
text+="-pay";
}else if(i%3==2){
text+="-finish";
}
//4.最基本的生產模式 topic+文本信息
Message msg = new Message("topic_hello", "Tag-"+i, text.getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.獲取發送響應
SendResult sendResult = defaultMQProducer.send(msg, new MessageQueueSelector() {
// 選擇發送消息的隊列
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// arg的值其實就是orderId
Integer id = (Integer) arg;
// mqs是隊列集合,也就是topic所對應的所有隊列
int index = id % mqs.size();
// 這里根據前面的id對隊列集合大小求余來返回所對應的隊列
return mqs.get(index);
}
}, orderId);
System.out.println("發送結果為:" + JSON.toJSONString(sendResult));
}
}catch (Exception e){
e.printStackTrace();
}finally {
//6.釋放生產者
defaultMQProducer.shutdown();
System.out.println("生產者釋放了");
}
}
}
消費者結果如下:
注意看圖,每個order都是先create再pay再finish,雖然可能各個order交替消費,比如order4和order5
3、延時消息和定時消息
3.1延時消息
延時消息,簡單來說就是當 producer 將消息發送到 broker 后,會延時一定時間后才投遞給 consumer 進行消費。RcoketMQ的延時等級為:1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。level=0,表示不延時。level=1,表示 1 級延時,對應延時 1s。level=2 表示 2 級延時,對應5s,以此類推。這種消息一般適用於消息生產和消費之間有時間窗口要求的場景。比如說我們網購時,下單之后是有一個支付時間,超過這個時間未支付,系統就應該自動關閉該筆訂單。那么在訂單創建的時候就會就需要發送一條延時消息(延時15分鍾)后投遞給 consumer,consumer 接收消息后再對訂單的支付狀態進行判斷是否關閉訂單。設置延時非常簡單,只需要在Message設置對應的延時級別即可:
consumer類不變,改producer類的發送部分為下面:
Message msg = new Message("topic_hello", "Tag",
text.getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setDelayTimeLevel(1);//1代表等級,不是1秒
SendResult sendResult = defaultMQProducer.send(msg);
3.2定時消息
定時消息可以做到在指定時間戳之后才可被消費者消費,適用於對消息生產和消費有時間窗口要求,或者利用消息出發定時任務的場景。
代碼示例:
java apache的rocketmq不支持,但是人家阿里雲rocketmq可以支持。
3.3區別和注意事項
定時消息和延時消息的使用在代碼編寫上存在略微的區別:
- 發送定時消息需要明確指定消息發送時間點之后的某一時間點作為消息投遞的時間點。
- 發送延時消息時需要設定一個延時時間長度,消息將從當前發送時間點開始延遲固定時間之后才開始投遞。
注意事項:
- 定時和延時消息的 msg.setStartDeliverTime 參數需要設置成當前時間戳之后的某個時刻(單位毫秒)。如果被設置成當前時間戳之前的某個時刻,消息將立刻投遞給消費者。
- 定時和延時消息的 msg.setStartDeliverTime 參數可設置40天內的任何時刻(單位毫秒),超過40天消息發送將失敗。
- StartDeliverTime 是服務端開始向消費端投遞的時間。 如果消費者當前有消息堆積,那么定時和延時消息會排在堆積消息后面,將不能嚴格按照配置的時間進行投遞。
- 由於客戶端和服務端可能存在時間差,消息的實際投遞時間與客戶端設置的投遞時間之間可能存在偏差。
- 設置定時和延時消息的投遞時間后,依然受 3 天的消息保存時長限制。例如,設置定時消息 5 天后才能被消費,如果第 5 天后一直沒被消費,那么這條消息將在第8天被刪除。
- 除 Java 語言支持延時消息外,其他語言都不支持延時消息。
五、push和pull消費者
1、push消費
Push Consumer:消息消費者應用通常向 Consumer 對象注冊一個 Listener 接口,一旦收到消息,Consumer 對象立刻回調 Listener 接口方法。
push-優點:及時性、服務端統一處理實現方便
push-缺點:容易造成堆積、負載性能不可控
上面所有例子都是push消費,我就再不舉例了
2、pull消費
Pull Consumer:消息消費者應用主動調用 Consumer 的拉消息方法從 Broker拉消息,主動權由消息消費者應用控制。
pull-優點:獲得消息狀態方便、負載均衡性能可控
pull-缺點:及時性差
舉例:
package com.rocket.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("Group-Consumer-1");
//默認CLUSTERING,所以不寫也可以
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setNamesrvAddr("200.200.6.16:9876");
try {
consumer.start();
System.out.println("消費端起來了哈.........");
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topic_hello");
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
//開始拉取,offset是指多個隊列指向哪一個的游標;初始隊列offerset為0
PullResultExt pullResult = (PullResultExt) consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
//獲取下一個隊列offset
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
//開始拿消息
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for (MessageExt m : messageExtList) {
System.out.println(new String(m.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static final Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offsetTable.put(mq, offset);
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offsetTable.get(mq);
if (offset != null)
return offset;
return 0;
}
}
六、消費者的消息過濾
看到這里你肯定知道消息過濾是什么啦,就是消費者依靠那個topic和tages來過濾消息,完全依賴consumer的subscribe(String topic, String tags, byte[] body)這個方法
舉例如下:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group-Consumer-1");
//第二個參數表示消費匹配的tag * 表示topic所有的tag
// Tag1 || Tag2 || Tag3 表示訂閱 Tag1 或 Tag2 或 Tag3 的消息
// Tag1 表示訂閱Tag1的消息
consumer.subscribe("topic_hello", "*");
七、消費者的廣播消費與集群消費
消費者訂閱消息的方式分廣播消費與集群消費,前面三、4節里面有講過廣播消費與集群消費,
1、廣播消費
(1)廣播消費模式:消息隊列 RocketMQ 會將每條消息推送給集群內所有注冊過的客戶端,保證消息至少被每台機器消費一次。
(2)注意事項:
- 廣播消費模式下不支持順序消息。
- 消費進度在客戶端維護,出現重復的概率稍大於集群模式。
- 廣播模式下,消息隊列 RocketMQ 保證每條消息至少被每台客戶端消費一次,但是並不會對消費失敗的消息進行失敗重投,因此業務方需要關注消費失敗的情況。
- 廣播模式下,客戶端第一次啟動時默認從最新消息消費。
- 客戶端的消費進度是被持久化在客戶端本地的隱藏文件中,因此不建議刪除該隱藏文件,否則會丟失部分消息。
- 廣播模式下,每條消息都會被大量的客戶端重復處理,因此推薦盡可能使用集群模式。目前僅 Java 客戶端支持廣播模式。
- 廣播模式下服務端不維護消費進度,所以消息隊列 RocketMQ 控制台不支持消息堆積查詢、消息堆積報警和訂閱關系查詢功能。
(3)代碼舉例:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group-Consumer-1");
consumer.setMessageModel(MessageModel.BROADCASTING);
2、集群消費
(1)集群消費:消息隊列 RocketMQ 認為任意一條消息只需要被集群內的任意一個消費者處理即可。
(2)注意事項:
由於消費進度在服務端維護,可靠性更高。
集群消費模式下,不保證每一次失敗重投的消息路由到同一台機器上,因此處理消息時不應該做任何確定性假設。
(3)代碼舉例:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group-Consumer-1");
//默認CLUSTERING,所以不寫也可以
consumer.setMessageModel(MessageModel.CLUSTERING);
八、rocketmq使用中指標限制
消息隊列 RocketMQ 對某些具體指標進行了約束和規范,使用時注意不要超過相應的限制值,以免程序出現異常。具體的限制項和限制值請參見下表:
九、真正項目開發中的規范化使用方法
前面所有例子都是為了了解和認識,但是真正項目是要很規范使用的,我們來一步一步看。目前現在很多還不是springboot項目,所以下面例子配置文件我都寫到xml里面,如果你們是spingboot項目,就自己改成對應的配置
1、生產者
(1)ProducerUtil工具類:這里面包含了初始化生產者、關閉生產者、常用發送方法(順序異步發送、順序超時異步發送、亂序超時異步發送,亂序發送)
package com.test.service.impl.mq;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 生產者工具
* 此生產者只是包含了四種常用發送方法,其他自己加
*/
public class ProducerUtil {
private static final Logger logger = LoggerFactory.getLogger(ProducerUtil.class);
//默認生產者
private DefaultMQProducer defaultMQProducer;
//namesrv 命名服務地址
private String namesrvAddr;
// 分組名稱
private String producerGroupName;
// 實例名稱
private String instanceName;
/**
* 初始化 生產者相關參數
* @throws MQClientException
*/
public void init() throws MQClientException {
// 參數信息
//logger.info("DefaultMQProducer initialize!");
logger.info("producerGroupName=" + producerGroupName + " &namesrvAddr=" + namesrvAddr + " &instanceName=" + instanceName);
// 初始化 設置相關參數
defaultMQProducer = new DefaultMQProducer(producerGroupName);
defaultMQProducer.setNamesrvAddr(namesrvAddr);
defaultMQProducer.setInstanceName(instanceName);
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(10);
defaultMQProducer.setRetryTimesWhenSendFailed(10);
defaultMQProducer.setRetryAnotherBrokerWhenNotStoreOK(true);
defaultMQProducer.setSendMsgTimeout(5000);
defaultMQProducer.start();
logger.info("[DefaultMQProudcer 生產者初始化成功!]");
}
/**
* 關閉生產者
*/
public void destroy() {
defaultMQProducer.shutdown();
logger.info("DefaultMQProudcer has stop success!");
}
/**
* 順序發送
* @param msg
* @param selector
* @param arg
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
return defaultMQProducer.send(msg, selector, arg);
}
/**
* 順序超時發送
* @param msg
* @param selector
* @param arg
* @param timeout
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducer.send(msg, selector, arg, timeout);
}
/**
* 超時發送
* @param msg
* @param timeout
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public SendResult send(Message msg, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducer.send(msg, timeout);
}
/**
* 發送消息
* @param msg
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public SendResult send(Message msg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducer.send(msg);
}
/***get and set***/
public DefaultMQProducer getDefaultMQProducer() {
return defaultMQProducer;
}
public void setDefaultMQProducer(DefaultMQProducer defaultMQProducer) {
this.defaultMQProducer = defaultMQProducer;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getProducerGroupName() {
return producerGroupName;
}
public void setProducerGroupName(String producerGroupName) {
this.producerGroupName = producerGroupName;
}
public String getInstanceName() {
return instanceName;
}
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
}
(2)我們在使用這個producerUtil類時,要先初始化好一些配置,所以創建一個rocketmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:p="http://www.springframework.org/schema/p" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd">
<context:property-placeholder location="classpath:rockertmq-producer.properties"/>
<!-- 生產者工具 -->
<bean id="producerUtil" class="com.test.service.impl.mq.ProducerUtil"
init-method="init"
destroy-method="destroy"
scope="singleton">
<property name="producerGroupName" value="#{producerGroupName}" />
<property name="namesrvAddr" value="#{namesrvAddr}" />
<property name="instanceName" value="#{instanceName}" />
</bean>
</beans>
(3)rockertmq-producer.properties文件,內容如下:
#namesrvAddr
namesrvAddr =寫自己的
#producerGroupName
producerGroupName = 寫自己的
#instanceName
instanceName = 寫自己的
# topic - name
busTopic =寫自己的
(4)使用的時候直接用注解初始化ProducerUtil producerUti ,然后調用send方法即可。
2、Push消費者
(1)這里用的是push消費者工具類
package com.test.service.impl.mq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* 推模式下的mq消費服務
*/
public class PushConsumer {
private static Logger logger = LoggerFactory.getLogger("PushConsumer");
private String topic;
private String consumerGroupName;
private String namesrvAddr;
private String instanceName;
private MessageListenerConcurrently messageListenerConcurrently;
private DefaultMQPushConsumer consumer;
public void init() throws MQClientException {
//先把任務隊列清空
logger.info("[PushConsumer 開始初始化消費者]");
//updateOffsetByTime();
consumer = new DefaultMQPushConsumer(consumerGroupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumerGroup(consumerGroupName);
consumer.setInstanceName(instanceName);
consumer.subscribe(topic, "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.registerMessageListener(messageListenerConcurrently);
consumer.start();
logger.info("[PushConsumer 初始化消費者成功]");
}
private void updateOffsetByTime(){
long currentTime = System.currentTimeMillis();
MqAdminExecutor.resetOffsetByTime(namesrvAddr, consumerGroupName, topic, currentTime);
}
public void destroy() throws MQClientException{
consumer.shutdown();
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public MessageListenerConcurrently getMessageListenerConcurrently() {
return messageListenerConcurrently;
}
public void setMessageListenerConcurrently(MessageListenerConcurrently messageListenerConcurrently) {
this.messageListenerConcurrently = messageListenerConcurrently;
}
public String getConsumerGroupName() {
return consumerGroupName;
}
public void setConsumerGroupName(String consumerGroupName) {
this.consumerGroupName = consumerGroupName;
}
public DefaultMQPushConsumer getConsumer() {
return consumer;
}
public void setConsumer(DefaultMQPushConsumer consumer) {
this.consumer = consumer;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getInstanceName() {
return instanceName;
}
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
}
(2)我們在使用這個PushConsumer類時,要先初始化好一些配置,所以創建一個rocketmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:p="http://www.springframework.org/schema/p" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd">
<context:property-placeholder location="classpath:rockertmq-comsumer.properties"/>
<!-- 拉取消息后的處理 -->
<bean id="pushConsumerHandler" class="com.test.service.impl.mq.PushConsumerHandler">
</bean>
<!-- 推送消費 -->
<bean id="pushConsumer" class="com.test.service.impl.mq.PushConsumer"
init-method="init"
destroy-method="destroy"
scope="singleton">
<property name="consumerGroupName" value="#{consumerGroupName}" />
<property name="namesrvAddr" value="#{namesrvAddr}" />
<property name="instanceName" value="#{instanceName}" />
<property name="topic" value="#{topic}" />
<property name="messageListenerConcurrently" ref="pushConsumerHandler" />
</bean>
</beans>
(3)rockertmq-consumer.properties文件,內容如下:
#namesrvAddr
namesrvAddr=寫自己的
#consumerGroupName
consumerGroupName=寫自己的
#instanceName
instanceName=寫自己的
#topic
topic=寫自己的
#pullThreadNums
pullThreadNums=25
#pullNextDelayTimeMillis
pullNextDelayTimeMillis=1000
(4)獲取到消息的結果處理器
public class PushConsumerHandler implements MessageListenerConcurrently {
@Overridepublic
ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//自己的消息結果處理
}
}
2、定時Pull消費者
(1)這里用的是push消費者工具類
import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
import org.apache.rocketmq.client.consumer.PullTaskCallback;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 定時拉取服務
*/
public class PullSchduleConsumer {
//日志
private static final Logger logger = LoggerFactory.getLogger(PullSchduleConsumer.class);
//定時拉取消費者服務
private MQPullConsumerScheduleService mqPullConsumerScheduleService;
//命名服務地址
private String namesrvAddr;
//消費組組名
private String consumerGroupName;
//實例名稱
private String instanceName;
//消費主題
private String topic;
//拉取線程數量
private Integer pullThreadNums;
//消費處理
private PullTaskCallback pullTaskCallbackHandler;
/**
* 初始化定時
* @throws MQClientException
*/
public void init() throws MQClientException{
logger.info("PullSchduleConsumer initialize!");
logger.info("consumerGroupName="+consumerGroupName);
logger.info("namesrvAddr="+namesrvAddr);
logger.info("instanceName="+instanceName);
logger.info("topic="+topic);
mqPullConsumerScheduleService = new MQPullConsumerScheduleService(consumerGroupName);
mqPullConsumerScheduleService.getDefaultMQPullConsumer().setNamesrvAddr(namesrvAddr);
mqPullConsumerScheduleService.getDefaultMQPullConsumer().setInstanceName(instanceName);
mqPullConsumerScheduleService.setMessageModel(MessageModel.CLUSTERING);//默認使用集群模式
mqPullConsumerScheduleService.registerPullTaskCallback(topic, pullTaskCallbackHandler);
mqPullConsumerScheduleService.setPullThreadNums(pullThreadNums);
mqPullConsumerScheduleService.start();
logger.info("----PullSchduleConsumer has start successfully!----");
}
/**
* 停止定時消費服務
*/
public void destroy(){
mqPullConsumerScheduleService.shutdown();
logger.info("----PullSchduleConsumer has stop successfully!----");
}
/*****get and set*******/
public MQPullConsumerScheduleService getMqPullConsumerScheduleService() {
return mqPullConsumerScheduleService;
}
public void setMqPullConsumerScheduleService(
MQPullConsumerScheduleService mqPullConsumerScheduleService) {
this.mqPullConsumerScheduleService = mqPullConsumerScheduleService;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getConsumerGroupName() {
return consumerGroupName;
}
public void setConsumerGroupName(String consumerGroupName) {
this.consumerGroupName = consumerGroupName;
}
public String getInstanceName() {
return instanceName;
}
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public PullTaskCallback getPullTaskCallbackHandler() {
return pullTaskCallbackHandler;
}
public void setPullTaskCallbackHandler(PullTaskCallback pullTaskCallbackHandler) {
this.pullTaskCallbackHandler = pullTaskCallbackHandler;
}
public void setPullThreadNums(Integer pullThreadNums) {
this.pullThreadNums = pullThreadNums;
}
}
(2)我們在使用這個PullConsumer類時,要先初始化好一些配置,所以創建一個rocketmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:p="http://www.springframework.org/schema/p" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd">
<!-- 引入config -->
<context:property-placeholder location="classpath*:properties/rocketmq-consumer.properties" />
<!-- 拉取消息后的處理 -->
<bean id="pullSchduleConsumerHandler" class="com.test.service.impl.mq.PullSchduleConsumerHandler">
</bean>
<!-- 定時拉取消費者 -->
<bean id="pullSchduleConsumer" class="com.test.service.impl.mq.PullSchduleConsumer"
init-method="init"
destroy-method="destroy"
scope="singleton">
<property name="consumerGroupName" value="#{consumerGroupName}" />
<property name="namesrvAddr" value="#{namesrvAddr}" />
<property name="instanceName" value="#{nstanceName}" />
<property name="topic" value="#{topic}" />
<property name="pullThreadNums" value="#{pullThreadNums}" />
<property name="pullTaskCallbackHandler" ref="pullSchduleConsumerHandler" />
</bean>
</beans>
(3)rockertmq-consumer.properties文件,內容如下:
#namesrvAddr
namesrvAddr=寫自己的
#consumerGroupName
consumerGroupName=寫自己的
#instanceName
instanceName=寫自己的
#topic
topic=寫自己的
#pullThreadNums
pullThreadNums=25
#pullNextDelayTimeMillis
pullNextDelayTimeMillis=1000
(4)獲取到消息的結果處理器
public class PullSchduleConsumerHandler implements
PullTaskCallback {
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
//自己的消息結果處理
}
}