《分布式消息隊列XXL-MQ》
一、簡介
1.1 概述
XXL-MQ是一款輕量級分布式消息隊列,支持 "並發消息、串行消息、廣播消息、延遲消息、事務消息、失敗重試、超時控制" 等消息特性。現已開放源代碼,開箱即用。
1.2 特性
- 1、簡單易用: 一行代碼即可發布一條消息; 一行注解即可訂閱一個消息主題;
- 2、輕量級: 部署簡單,不依賴第三方服務,一分鍾上手;
- 3、消息中心HA:消息中心支持集群部署,可大大提高系統可用性,以及消息吞吐能力;
- 4、消費者HA:消費者支持集群部署,保證消費者可用性;
- 5、三種消息模式:
- 並行消息:消息平均分配在該主題在線消費者,分片方式並行消費;適用於吞吐量較大的消息場景,如郵件發送、短信發送等業務邏輯
- 串行消息:消息固定分配給該主題在線消費者中其中一個,FIFO方式串行消費;適用於嚴格限制並發的消息場景,如秒殺、搶單等排隊業務邏輯;
- 廣播消息:消息將會廣播發送給該主題在線消費者分組,全部分組都會消費該消息,但是一個分組下只會消費一次;適用於廣播場景,如廣播更新緩存等
- 6、延時消息: 支持設置消息的延遲生效時間, 到達設置的生效時間時該消息才會被消費;適用於延時消費場景,如訂單超時取消等;
- 7、事務性: 消費者開啟事務開關后,消息事務性保證只會成功執行一次;
- 8、失敗重試: 支持設置消息的重試次數, 在消息執行失敗后將會按照設置的值進行消息重試執行,直至重試次數耗盡或者執行成功;
- 9、超時控制: 支持自定義消息超時時間,消息消費超時將會主動中斷;
- 10、吞吐量: 依賴於部署的消費中心集群和DB性能;DB可借助多表提升性能,不考慮DB的情況下,吞吐量可以無限橫向擴展;可參考示例項目性能測試用例,單機TPS過萬;
- 11、消息可見: 系統中每一條消息可通過Web界面在線查看,甚至支持編輯消息內容和消息狀態;
- 12、消息可追蹤: 支持追蹤每一條消息的執行路徑, 便於排查業務問題;
- 13、消息失敗告警:支持以Topic粒度監控消息,存在失敗消息時主動推送告警郵件;默認提供郵件方式失敗告警,同時預留擴展接口,可方面的擴展短信、釘釘等告警方式;
- 14、容器化:提供官方docker鏡像,並實時更新推送dockerhub,進一步實現產品開箱即用;
- 15、消息持久化:全部消息持久化存儲,消息中心支持通過配置選擇是否清理過期消息。
- 16、訪問令牌(accessToken):為提升系統安全性,消息中心和客戶端進行安全性校驗,雙方AccessToken匹配才允許通訊;
1.3 發展
於2015年中,我在github上創建XXL-MQ項目倉庫並提交第一個commit,隨之進行系統結構設計,UI選型,交互設計……
至今,XXL-MQ已接入多家公司的線上產品線,截止2016-09-18為止,XXL-MQ已接入的公司包括不限於:
- 1、農信互聯
- ……
更多接入的公司,歡迎在 登記地址 登記,登記僅僅為了產品推廣。
歡迎大家的關注和使用,XXL-MQ也將擁抱變化,持續發展。
Why MQ
- 異步: 很多場景下,不會立即處理消息,此時可以在MQ中存儲message,並在某一時刻再進行處理;
- 解耦: 不同進程間添加一層實現解耦,方便今后的擴展。
- 消除峰值: 在高並發環境下,由於來不及同步處理,請求往往會發生堵塞,比如大量的insert,update之類的請求同時到達mysql,直接導致無數的行鎖表鎖,甚至最后請求會堆積過多,從而觸發too manyconnections錯誤。通過使用消息隊列,我們可以異步處理請求,從而緩解系統的壓力。
- 耗時業務: 在一些比較耗時的業務場景中, 可以耗時較多的業務解耦通過異步隊列執行, 提高系統響應速度和吞吐量;
Why XXL-MQ
目前流行的ActiveMQ、RabbitMQ和ZeroMQ等消息隊列的軟件中,大多為了實現AMQP,STOMP,XMPP之類的協議,變得極其重量級(如新版本Activemq建議分配內存達1G+),但在很多Web應用中的實際情況是:我們只是想找到一個緩解高並發請求的解決方案,一個輕量級的消息隊列實現方式才是我們真正需要的。
1.4 下載
文檔地址
源碼倉庫地址
| 源碼倉庫地址 | Release Download |
|---|---|
| https://github.com/xuxueli/xxl-mq | Download |
| https://gitee.com/xuxueli0323/xxl-mq | Download |
技術交流
中央倉庫地址
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-mq-client</artifactId>
<version>{最新Release版本}</version>
</dependency>
1.5 環境
- Maven3+
- Jdk1.7+
- Mysql5.6+
二、快速入門
2.1 初始化"消息中心數據庫"
請下載項目源碼並解壓,獲取 "消息中心數據庫初始化SQL腳本" 並執行即可
"消息中心數據庫初始化SQL腳本" 位置為:
/xxl-mq/doc/db/xxl-mq-mysql.sql
消息中心支持集群部署,集群情況下各節點務必連接同一個mysql實例;
2.2 編譯項目
解壓源碼,按照maven格式將源碼導入IDE, 使用maven進行編譯即可,源碼結構如下:
- /xxl-mq-admin :消息中心,提供消息Broker、服務注冊、消息在線管理功能;
- /xxl-mq-client :客戶端核心依賴, 提供API開發Producer和Consumer;
- /xxl-mq-samples :接入項目參考示例, 可自行參考學習並使用;
- /xxl-mq-samples-frameless :無框架示例項目,不依賴第三方框架,只需main方法即可啟動運行;
- /xxl-mq-samples-springboot :springboot版本示例項目;
2.3 配置部署“消息中心”
步驟一:消息中心配置:
消息中心配置文件地址:
/xxl-mq/xxl-mq-admin/src/main/resources/application.properties
消息中心配置內容說明:
### 數據庫配置
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-mq?Unicode=true&characterEncoding=UTF-8
### 告警郵箱發送方配置
spring.mail.username=xxx@qq.com
spring.mail.password=xxx
### 注冊心跳時間
xxl.mq.registry.beattime=10
### 注冊信息磁盤存儲目錄,務必擁有讀寫權限;
xxl.mq.registry.data.filepath=/data/applogs/xxl-mq/registrydata
### 消息中心Broker服務RPC通訊地址,為空則自動獲取
xxl-mq.rpc.remoting.ip=
### 消息中心Broker服務RPC通訊端口
xxl-mq.rpc.remoting.port=7080
### 日志保存天數,超過該閾值的成功消息將會被自動清理;大於等於3時生效
xxl.mq.log.logretentiondays=3
### 登陸信息配置
xxl.mq.login.username=admin
xxl.mq.login.password=123456
步驟二:部署項目:
如果已經正確進行上述配置,可將項目編譯打包部署。 消息中心訪問地址:http://localhost:8080/xxl-mq-admin (該地址接入方項目將會使用到,作為注冊地址),登錄后運行界面如下圖所示

至此“消息中心”項目已經部署成功。
步驟三:消息中心集群(可選):
消息中心支持集群部署,提升消息系統容災和可用性。
消息中心集群部署時,幾點要求和建議:
- DB配置保持一致;
- 登陸賬號配置保持一致;
- 集群機器時鍾保持一致(單機集群忽視);
- 建議:推薦通過nginx為消息中心集群做負載均衡,分配域名。消息中心訪問、客戶端使用等操作均通過該域名進行。
其他:Docker 鏡像方式搭建消息中心:
- 下載鏡像
// Docker地址:https://hub.docker.com/r/xuxueli/xxl-mq-admin/
docker pull xuxueli/xxl-mq-admin
- 創建容器並運行
docker run -p 8080:8080 -v /tmp:/data/applogs --name xxl-mq-admin -d xuxueli/xxl-mq-admin
/**
* 如需自定義 mysql 等配置,可通過 "PARAMS" 指定,參數格式 RAMS="--key=value --key2=value2" ;
* 配置項參考文件:/xxl-mq/xxl-mq-admin/src/main/resources/application.properties
*/
docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-mq?Unicode=true&characterEncoding=UTF-8" -p 8080:8080 -p 7080 -v /tmp:/data/applogs --name xxl-mq-admin -d xuxueli/xxl-mq-admin
2.4 接入XXL-MQ並使用
接入XXL-MQ項目:"xxl-mq-samples-springboot" (提供多種版本示例項目供參考選擇,現以springboot版本為例講解)
作用:生產消息、消費消息;可直接部署,也可以將集成到現有業務項目中。
步驟一:maven依賴
確認pom文件中引入了 "xxl-mq-client" 的maven依賴;
步驟二:"消息接入方",屬性配置
消息接入方配置,配置文件地址:
/xxl-mq/xxl-mq-samples/xxl-mq-samples-springboot/src/main/resources/application.properties
消息接入方配置,配置內容說明:
# 消息中心跟地址;支持配置多個,建議域名方式配置;
xxl.mq.admin.address=http://localhost:8080/xxl-mq-admin
步驟三:"消息接入方",組件配置
@Bean
public XxlMqSpringClientFactory getXxlMqConsumer(){
XxlMqSpringClientFactory xxlMqSpringClientFactory = new XxlMqSpringClientFactory();
xxlMqSpringClientFactory.setAdminAddress(adminAddress);
return xxlMqSpringClientFactory;
}
步驟四:部署"消息接入方"項目:
如果已經正確進行上述配置,可將項目編譯打包部署。 springboot版本示例項目,訪問地址:http://localhost:8081/
至此“消息接入方”示例項目已經部署結束。
步驟五:"消息接入方"集群(可選):
消息接入方支持集群部署,提升消息系統可用性,同時提升消息處理能力。
消息接入方集群部署時,要求和建議:
- 消息中心跟地址(xxl.mq.admin.address)需要保持一致;
2.5 生產消息、消費消息
生產消息
/**
* 生產消息:並行消息
*/
XxlMqProducer.produce(new XxlMqMessage(topic, data));
/**
* 生產消息:串行消費( ShardingId 保持一致即可;如秒殺消息,可將 ShardingId 設置為商品ID,則該商戶全部消息固定在一台機器消費;)
*/
XxlMqMessage mqMessage = new XxlMqMessage();
mqMessage.setTopic(topic);
mqMessage.setData(data);
mqMessage.setShardingId(1);
XxlMqProducer.produce(mqMessage);
/**
* 生產消息:廣播消費( 消費者 IMqConsumer 注解的 group 屬性修改不一致即可;一條消息將會廣播給該主題全部在線 group,每個group都會消費,單個group只會消費一次; )
*/
XxlMqProducer.broadcast(new XxlMqMessage(topic, data));
/**
* 生產消息:延時消費( EffectTime 設置為固定時間點即可;如訂單30min超時取消,可將 EffectTime 設置為30min后的時間點,到時將會自動消費;)
*/
XxlMqMessage mqMessage = new XxlMqMessage();
mqMessage.setTopic(topic);
mqMessage.setData(data);
mqMessage.setEffectTime(effectTime);
XxlMqProducer.produce(mqMessage);
/**
* 生產消息:失敗重試消費( RetryCount 設置重試次數即可;如發送短信消息,第三方服務不穩定時失敗很常見,可設置 RetryCount 為3,失敗是將會自動重試指定次數;)
*/
XxlMqMessage mqMessage = new XxlMqMessage();
mqMessage.setTopic(topic);
mqMessage.setData(data);
mqMessage.setRetryCount(3);
XxlMqProducer.produce(mqMessage);
……
更多消息屬性、場景,可參考章節 "4.2 Message設計";
消費消息
@MqConsumer(topic = "topic_1")
@Service
public class DemoAMqComsumer implements IMqConsumer {
private Logger logger = LoggerFactory.getLogger(DemoAMqComsumer.class);
@Override
public MqResult consume(String data) throws Exception {
logger.info("[DemoAMqComsumer] 消費一條消息:{}", data);
return MqResult.SUCCESS;
}
}
系統中每個消費者以 "IMqConsumer" 的形式存在, 規定如下:
- 1、每個 "IMqConsumer" 需要繼承 "com.xxl.mq.client.consumer.IMqConsumer" 接口;
- 2、需要掃描為Spring的Bean實例, 需加上 "@Service" 注解並被Spring掃描;
- 3、需要加上注解 "com.xxl.mq.client.consumer.annotation.MqConsumer"。該注解 "value" 值為訂閱的消息主題, "type" 值為消息類型(TOPIC廣播消息、QUEUE並發消息隊列 和 SERIAL_QUEUE串行消息隊列);
更多消費者屬性、場景,可參考章節 "4.6 Consumer設計";
2.6 功能測試 & 性能測試
首選啟動消息中心,然后啟動 "springboot版本示例項目";
訪問部署成功的 "springboot版本示例項目" 地址,瀏覽器訪問展示如下如下:

該示例項目已經提供了多個消息生產與消費的實例:
a、"並行消費" 測試:連續點擊 "並行消費" 按鈕4次,將會生產4條並行消息;
進入消息中心 "消息記錄" 菜單,消息列表如下: 
逐個查看消息流轉日志如下:

可以注意 "鎖定消息" 的 "消費者信息",可以查看到當前消費者在集群中的排序 "rank"。
逐個查看每條消息對應消費者的 "rank" 屬性,可以看到上面4條消息平局分配給不同 "rank" 的消費者,即平均分配給了不同消費者。測試正常;
b、"串行消費" 測試:連續點擊 "串行消費" 按鈕4次,將會生產4條串行消費;
操作步驟同 "並行消息"。最后一步逐個查看每條消息對應消費者的 "rank" 屬性,會發現全部一致,即固定分配給了一個消費者。測試正常
c、"廣播消息":點擊 "廣播消息" 按鈕一次,將會生產一條廣播消息;
進入消息中心 "消息記錄" 菜單,消息列表如下:

一條廣播消息將會廣播給該主題全部在線group,該消息主題存在2個消息group,所以會每個group創建一條,即兩條消息。測試正常。
d、"延時消息":點擊 “延時消息” 按鈕一次,將會生產一條延時消息;
進入消息中心 "消息記錄" 菜單,可以查看消息 “生效時間”屬性為 5min 之后,最終該消息在 5min 之后被消費執行。測定正常。
e、"性能測試" 測試:點擊 “性能測試”按鈕,將會批量發送10000條消息;
點擊按鈕后,頁面下方展示文案 “Cost = 1055”,說明在 1055ms 之內客戶端發送了 1000 條消息;
但是,由於測試代碼中采用異步方式發送,消息發送事件與是否成功需要在消息中心中確認。
進入消息中心 “消息記錄” 菜單,如下圖,可以看到 10000 條消息創建事件最大為 “2018-12-02 04:51:54”,最小為 “2018-12-02 04:51:55”。說明在 1s 左右客戶端成功發送了 10000 條消息,且 100% 投遞成功,即單機TPS過萬;

然后進入 “運行報表” 界面,如下圖,點擊成功比例圖可知,成功消費 10000 條,比例 100%。說明客戶端發送的 10000 條消息 100% 消費成功。

其他測試
如延時消息、重試消息 …… 可自行參考示例代碼測試;
三、消息中心,操作指南
3.1 運行報表:
運行報表界面,展示消息中心系統信息,如業務線、消息主題、消息數量等;支持日期分布圖、成功比例圖方式查看;

3.2 消息主題
消息主題界面,可查看在線消息主題列表;底層會周期性掃描消息記錄,發型並錄入新的消息主題,並展示在這里; 
消息主題界面,支持為消息主題設置一些附屬參數,提供一些增強功能;如負責人、告警郵箱等;

消息主題屬性:
- 業務線:該消息所屬業務線,方便分組管理;
- 負責人:該消息所屬負責人;
- 告警郵箱:一個或多個,多個逗號分隔;消息消費失敗時,將會周期性發送告警郵件;
3.3 消息記錄
消息記錄界面,可查看在線消息記錄;支持篩選、查看消息流轉軌跡; 
- 消息在線管理功能:支持在線 "新增"、"編輯" 和 "刪除" 消息記錄;
消息新增如下圖所示,消息屬性說明,可參考章節 "4.2 Message設計";

- 消息手動清理:支持在線清理消息,可選擇消息主題、狀態、清理類型等;

3.4 業務線
業務先界面,可查看在線業務線列表,並管理維護;可通過自定義業務線,綁定消息主題,從而方便消息主題的分組管理; 
四、系統設計
4.1 系統架構圖

角色解釋:
- Message : 消息實體;
- Broker : 消息代理中心, 負責連接Producer和Consumer;
- Topic : 消息主題, 每個消息隊列的唯一性標示;
- Topic segment : 消息分段, 同一個Topic的消息隊列,將會根據訂閱的Consumer進行分片分組,每個Consumer擁有的消息片即一個segment;
- Producer : 消息生產者, 綁定一個消息Topic, 並向該Topic消息隊列中生產消息;
- Consumer : 消息消費者, 綁定一個消息Topic, 只能消費該Topic消息隊列中的消息;
- Consumer Group : 消費者分組,隔離消息;同一個Topic下一條消息消費一次;
架構圖模塊解讀:
- Server
- Broker: 消息代理中心, 系統核心組成模塊, 負責接受消息生產者Producer推送生產的消息, 同時負責提供RPC服務供消費者Consumer使用來消費消息;
- Message Queue: 消息存儲模塊, 目前底層使用mysql消息表;
- Registry Center
- Broker Registry Center: Broker注冊中心子模塊, 供Broker注冊RPC服務使用;
- Consumer Registry Center: Consumer注冊中心子模塊, 供Consumer注冊消費節點使用;
- Client
- Producer: 消息生產者模塊, 負責提供API接口供開發者調用,並生成和發送隊列消息;
- Consumer: 消息消費者模塊, 負責訂閱消息並消息;
4.2 Message設計
| 消息核心屬性 | 說明 |
|---|---|
| topic | 消息主題 |
| group | 消息分組, 分組一致時消息僅消費一次;存在多個分組時,多個分組時【廣播消費】; |
| data | 消息數據 |
| retryCount | 重試次數, 執行失敗且大於0時生效,每重試一次減一; |
| shardingId | 分片ID, 大於0時啟用,否則使用消息ID;消費者通過該參數進行消息分片消費;分片ID不一致時分片【並發消費】、一致時【串行消費】; |
| timeout | 超時時間,單位秒;大於0時生效,處於鎖定運行狀態且運行超時時,將主動標記運行失敗; |
| effectTime | 生效時間, new Date()立即執行, 否則在生效時間點之后開始執行; |
4.3 Broker設計
Broker(消息代理中心):系統核心組成模塊, 負責接受消息生產者Producer推送生產的消息, 同時負責提供RPC服務供消費者Consumer使用來消費消息;
Broker支持集群部署, 集群節點之間地位平等, 集群部署情況下可大大提高系統的消息吞吐量。
Broker通過內置注冊中心實現集群功能, 各節點在啟動時會自動注冊到注冊中心, Producer或Consumer在生產消息或者消費消息時,將會通過內置注冊中心自動感知到在線的Broker節點。
Broker在接收到Produce的生產消息的RPC調用時, 並不會立即存儲該消息, 而是立即push到內存隊列中, 同時立即響應RPC調用。 內存隊列將會異步將隊列中的消息數據存儲到Mysql中。
Broker在接收到 "消息鎖定" 等同步RPC調用時, 將會觸發同步調用, 采用樂觀鎖方式鎖定消息;
4.4 Registry Center設計
Registry Center(注冊中心)主要分為兩個子模塊: Broker注冊中心、Consumer注冊中心;
- Broker注冊中心子模塊: 供Broker注冊RPC服務使用;
- Consumer注冊中心子模塊: 供Consumer注冊消費節點使用;
4.5 Producer設計
Producer(消息生產者), 兼容“異步批量多線程生產”+“同步生產”兩種方式,提升消息發送性能;
底層通訊全異步化:消息新增 + 消息新增接受 + 消息回調 + 消息回調接受;僅批量PULL消息與鎖消息非異步;
4.6 Consumer設計
| MqConsumer注解屬性 | 說明 |
|---|---|
| group | 消息分組, |
| topic | 消息主題 |
| transaction | 事務開關,開啟消息事務性保證只會成功執行一次;關閉時可能重復消費,性能較優; |
消費者通過 "多線程輪訓 + 消息分片 + PULL + 消息鎖定" 的方式來實現:
- 多線程輪訓: 該模式下每個Consumer將會存在一個線程, 如存在多個Consumer, 多個Consumer將會並行消息同一主題下的消息, 大大提高消息的消費速度;
- 輪訓延時自適應:線程輪訓方式PULL消息,如若獲取不到消息將會主動休眠,休眠時間依次遞增10s,最長60s;即消息生產之后距離被消費存在 0~60s 的時間差,分鍾范圍內;
- 消息分片 : 隊列中消息將會按照 "Registry Center" 中注冊的Consumer列表順序進行消息分段, 保證一條消息只會被分配給其中一個Consumer, 每個Consumer只會消費分配給自己的消息。 因此在多個Consumer並發消息時, 可以保證同一條消息不被多個Consumer競爭來重復消息。
- 分片函數: MOD("消息分片ID", #{在線消費者總數}) = #{當前消費者排名} ,
- 分片邏輯解釋: 每個Consumer通過注冊中心感知到在線所有的Consumer, 計算出在線Consumer總數total, 以及當前Consumer在所有Consumer中的排名rank; 把消息分片ID對在線Consumer總數total進行取模, 余數和當前Consumer排名rank一致的消息認定為分配給自己的消息;
- PULL : 每個Consumer將會輪訓PULL消息分片分配給自己的消息, 順序消費。
- 消息鎖定: Consumer在消費每一條消息時,開啟事務時,將會主動進行消息鎖定, 通過數據庫樂觀鎖來實現, 鎖定成功后消息狀態變更為執行中狀態, 將不會被Consumer再次PULL到。因此, 可以更進一步保證每條消息只會被消費一次;
- 消息狀態和日志: 消息執行結束后, 將會調用Broker的RPC服務修改消息狀態並追加消息日志, Broker將會通過內存隊列方式, 異步消息隊列中變更存儲到數據庫中。
4.7 延時消息
支持設置消息的延遲生效時間, 到達設置的生效時間時該消息才會被消費;適用於延時消費場景,如訂單超時取消等;
4.8 事務性
消費者開啟事務開關后,消息事務性保證只會成功執行一次;
4.9 失敗重試
支持設置消息的重試次數, 在消息執行失敗后將會按照設置的值進行消息重試執行,直至重試次數耗盡或者執行成功;
4.10 超時控制
支持自定義消息超時時間,消息消費超時將會主動中斷;
五、版本更新日志
5.1 版本V1.1.0 新特性
- 1、簡單易用: 一行代碼即可發布一條消息; 一行注解即可訂閱一個消息主題;
- 2、部署簡單: 除ZK之外不依賴第三方服務;
- 3、三種消息模式: TOPIC(廣播消息)模型、QUEUE(並發隊列)模型 和 SERIAL_QUEUE(串行隊列)模型,下文將會詳細講解:
- 4、Broker集群、HA: Broker支持集群部署, 可大大提高系統可用性,以及消息吞吐能力;
- 5、吞吐量: 依賴於部署的Broker集群和Mysql性能;
- 5、消息可追蹤: 支持追蹤每一條消息的執行路徑, 便於排查業務問題;
- 6、消息可見: 系統中每一條消息可通過Web界面在線查看,甚至支持編輯消息內容和消息狀態;
- 7、一致性: QUEUE(並發隊列)模型 和 SERIAL_QUEUE(串行隊列)模型的消息,保證只會成功執行一次;
- 8、Delay執行: 支持設置消息的延遲生效時間, 到達設置的Delay執行時間時該消息才會被消費 ,提供DelayQueue的功能;
- 9、消息重試: 支持設置消息的重試次數, 在消息執行失敗后將會按照設置的值進行消息重試執行,直至重試次數耗盡或者執行成功;
5.2 版本V1.1.1 特性
- 1、項目groupId改為com.xuxueli,為推送maven中央倉庫做准備;
- 2、項目推送Maven中央倉庫;
- 3、底層系統優化,CleanCode等;
- 4、修復confirm和alert彈框沖突導致消息列表錯亂的問題;
- 5、優化ZK注冊邏輯,ZK注冊基礎路徑提前初始化;
- 6、broadcast 廣播消息時ZK 發送方不進行watch, 否則發送方也會監聽到;
- 7、修復一處因ReentrantLock導致可能死鎖的問題;
5.3 版本V1.2.0 Release Notes[2018-11-28]
- 1、client端與Broker長鏈初始化優化,防止重復創建連接。
- 2、POM多項依賴升級;
- 3、UI組件升級;
- 4、規范項目目錄結構;
- 6、超時控制;
- 5、通訊遷移至 xxl-rpc;
- 6、除了springboot類型示例;新增無框架示例項目 "xxl-mq-samples-frameless"。不依賴第三方框架,只需main方法即可啟動運行;
- 7、消息生產,兼容“異步批量多線程生產”+“同步生產”兩種方式,提升消息發送性能;
- 8、底層通訊全異步化:消息新增 + 消息新增接受 + 消息回調 + 消息回調接受;僅批量PULL消息與鎖消息非異步;
- 9、串行消費優化,舊版本固定第一台消費,導致其壓力過大;新版支持自定義shardingId從而實現串行消息的負載均衡,緩解單台壓力;
- 10、廣播消息優化,舊版本不支持消息持久化,新版本支持消息持久化,而且廣播支持與串行結合實用,更加靈活;
- 11、並發消息、串行消息、廣播消息全部優化重構,底層邏輯統一,方便后續維護擴展;
- 串行:取消ZK依賴,廢棄舊版ZK鎖方式;優化為通過消息 shardingId 結合消費者排序取模方式;相同 shardingId 的消息將會固定被同一個消費者消費;
- 並行:沿用舊版消費者排序取模方式,不過取模參數新增支持 shardingId 參數;確保消息平均分配給在線消費者;
- 廣播:取消ZK依賴,廢棄舊版ZK方式;優化為通過消息 group 屬性群發方式;每個group都會消費該消息,但相同group下消息僅被消費一次;
- 12、Broker服務支持自定義指定注冊IP等信息,位置 "XxlMqBrokerImpl.initServer";
- 13、Topic自動發現:消息中心支持動態發現Topic,並展示在消息主題列表,延時1min;
- 14、運行報表:支持展示在線業務線、消息主題、消息記錄等信息、可在線查看消息日期分布圖,成功分布圖等;
- 15、業務線管理:支持設置業務線,用於分組管理消息主題;
- 16、消息主題管理:支持在線管理消息主題,自動發現消息主題;並支持完善消息主題擴展信息,如業務線、負責人、告警郵箱等;
- 17、消息記錄界面,交互優化重構,進一步優化消息篩選、管理交互;
- 18、自動重試優化,任務重試時,生效時間重置為1min之后,重試次數減一;
- 19、記住密碼功能優化,選中時永久記住;非選中時關閉瀏覽器即登出;
- 20、事務開關:支持設置消息事務開關,開啟時事務保證消息精准消費一次;未開啟時小概率存在重復消費,僅依靠注冊中心分片檢測避免重復,但性能略高;
- 21、告警功能:支持以Topic粒度監控消息,存在失敗消息時主動推送告警郵件;
- 22、軌跡Log優化,新增、更新時記錄核心數據;消息日志格式統一;
- 23、消息在線清理:在消息記錄界面,支持在線清理消息數據;
- 24、過期消息自動清理:消息中心新增參數 “xxl-mq.log.logretentiondays”設置消息過期天數,過期成功消息將會自動清理;
- 25、超時強化,除了客戶端支持超時控制外;服務端新增線程掃描,主動處理超時消息;消息超過 "生效時間 + 超時時間 + 1HOUT" 之后仍然未結束,將會主動標記為失敗;
- 26、左側菜單規范:運行報表(業務線,主題數,消息記錄數;總消息成功率,日分布柱狀圖,總分布餅圖) + 消息主題 + 消息記錄 + 使用教程;
- 27、注冊中心遷移至DB,基於 "long polling" 實現注冊機器實時感知;注冊中心代碼及邏輯來源自“XXL-RPC原生輕量級注冊中心”;
- 28、輕量級改造,移除對ZK依賴,僅依賴DB即可完整集群方式提供服務;缺點,非強一致性可能導致重復消費,開啟事務開關可以避免該問題;
- 29、文檔示例完善,包括:並發消息、串行消息、廣播消息、延遲消息、失敗重試消息、超時控制消息等;
- 30、文檔完善:消息模型說明,延時消息說明、事務消息說明、失敗重試、超時控制說明,
5.4 版本 v1.2.1 Release Notes[2018-12-02]
- 1、單機TPS過萬:示例項目中新增功能測試、性能測試用例,以及消息生產、消費、成功率等方便的數據分析;可參考示例項目性能測試用例(章節 “2.6 功能測試 & 性能測試”),單機TPS過萬;
- 2、底層long polling監控keys非法去重問題修復;
- 3、注冊邏輯優化,批量注冊,提高注冊性能,降低注冊中心壓力;
- 4、消息中心RPC服務支持自定義注冊IP地址;
- 5、消息中心內置注冊中心線程數優化,精簡;
5.5 版本 v1.2.2 Release Notes[2018-12-21]
- 1、訪問令牌(accessToken):為提升系統安全性,消息中心和客戶端進行安全性校驗,雙方AccessToken匹配才允許通訊;
- 2、支持批量注冊、摘除,提升注冊發現性能;升級 xxl-rpc 至 v1.3.1;
- 3、升級 pom 依賴至較新版本;
- 4、表結構調整提升兼容性,表名轉小寫;
- 5、客戶端取消Consumer非空的限制;
5.6 版本 v1.3.0 Release Notes[迭代中]
- 1、消息流轉日志格式優化;
TODO
- 會考慮移除 mysql 強依賴的,遷移 jpa 進一步提升通用型。
- producer消息,推送broker失敗,先緩存本次文件;
- producer消息,生成UUID,推送失敗重復推送,同時避免重復;
- 延遲消息方案優化:增加時間輪算法;
- 客戶端,Server端支持消息落磁盤;發送失敗,存儲失敗時,寫磁盤,避免消息丟失;LocalQueue消息可能丟失,考慮LocalFile;
- 消息數據、Log使用text字段存儲,為避免超長限制長度20000;后續考慮優化,盡量不限制數據長度、避免軌跡較多時Log超長問題;
- 消息告警功能增強,目前僅支持失敗告警,考慮支持消息堆積告警、阻塞告警等,Topic擴展屬性存儲閾值;30分鍾統計一次消息情況, 將會根據topic分組, 堆積超過閾值的topic將會在報警郵件報表中進行記錄;
- 消息主題界面,支持查看在線消費者列表;consumer:topic+group 在線展示;producer:在線展示;
- MQ存儲層進一步抽象,兼容支持Mysql、Oracle、Tidb等存儲方式;
六、其他
6.1 項目貢獻
歡迎參與項目貢獻!比如提交PR修復一個bug,或者新建 Issue 討論新特性或者變更。
6.2 用戶接入登記
更多接入的公司,歡迎在 登記地址 登記,登記僅僅為了產品推廣。
6.3 開源協議和版權
產品開源免費,並且將持續提供免費的社區技術支持。個人或企業內部可自由的接入和使用。
- Licensed under the GNU General Public License (GPL) v3.
- Copyright (c) 2015-present, xuxueli.
捐贈
無論金額多少都足夠表達您這份心意,非常感謝 :) 前往捐贈






