封裝的RocketMQ類庫已開源在github: https://github.com/antaintan/easyrocketmq
公司的一位同事推薦使用RocketMQ, 並給出了幾個流行MQ的對比, 資料來源, 阿里雲幫助文檔
目前RocketMQ已經成為Apache頂級項目, 作為開源軟件免費提供. 但是阿里雲提供的RocketMQ是收費的, 一個Topic 2元/天, 2元100萬消息.具體價格,可以參照阿里雲的報價.
詳細對比資料地址: https://help.aliyun.com/document_detail/52577.htmlhttps://help.aliyun.com/document_detail/52577.html
(注: 阿里雲的技術文檔寫得不錯, 可以學習到很多知識)
RocketMQ 是一個款非常強大的MQ, 它同時支持, 無序消息, 分區順序消息, 全局順序消息, 並支持TCP, Http協議接入.
客戶端支持Java, C++/.Net(.Net客戶端是通過封裝C++DLL,通過PInvoke調用來實現的, 基於x64位, 所以我們自己項目編譯時也要選擇x64)
特色功能
- 事務消息,實現類似 X/Open XA 的分布事務功能,以達到事務最終一致性狀態。
- 定時(延時)消息,允許消息生產者指定消息進行定時(延時)投遞,最長支持40天。
- 大消息,目前默認支持最大 256KB 消息,華北2 地域支持最大 4MB 消息。
- 消息軌跡,通過消息軌跡,用戶能清晰定位消息從發布者發出,經由 MQ 服務端,投遞給消息訂閱者的完整鏈路,方便定位排查問題。
- 廣播消息,允許一個 Consumer ID 所標識的所有 Consumer 都會各自消費某條消息一次。
- 順序消息,允許消息消費者按照消息發送的順序對消息進行消費。
- 重置消費進度,根據時間重置消費進度,允許用戶進行消息回溯或者丟棄堆積消息。
多協議接入
- 支持 HTTP 協議:支持 RESTful 風格 HTTP 協議完成收發消息,可以解決跨語言使用 MQ 問題。
- 支持 TCP 協議:區別於 HTTP 簡單的接入方式,提供更為專業、可靠、穩定的 TCP 協議的 SDK 接入。
注意: Java客戶端的TCP協議是功能支持最全的, .Net TCP要少一些功能支持如順序消息, 而Http協議支持的更少, 具體情況, 請詳細查閱阿里雲的幫助文檔.
消息隊列的幾個核心對象:
- Topic: 消息主題
- Consumer: 消息消費者, 消費訂閱的主題消息
- Producer: 消息生產者, 生產不同的消息
阿里提供的.Net類庫幾個核心類庫:
NSClient4CPP.lib, c++類庫
ONSClient4CPP.dll, c++類庫
ONSClient4CPP.pdb c++類庫
ons.dll: Net類庫, 利用開源軟件 SWIG 生成 PINVOKE 封裝代碼
如果系統沒有c++運行時類庫, 還需要安裝vc_redist.x64.exe.
下面是生產者示例代碼:
private static ProducerClient producerClient = new ProducerClient(AccessKeyId, AccessKeySecret, ProducerId); private static void Main(string[] args) { producerClient.Start(); var stopWatch = new Stopwatch(); stopWatch.Start(); var taskList = new List<Task>(); for (int threadIndex = 1; threadIndex <= ProducerThreadCount; threadIndex++) { // 生產消費 var task = Task.Factory.StartNew(() => { for (int messageIndex = 1; messageIndex <= MessageCountPerThread; messageIndex++) { string content = "線程ID=" + Thread.CurrentThread.ManagedThreadId + ", 我要測試rocketmq message"; //producerClient.SendMessage(ShardingKey, Topic, content, Tag); producerClient.SendMessage(Topic, content, Tag); Console.WriteLine(content); } }, TaskCreationOptions.LongRunning); taskList.Add(task); } Task.WaitAll(taskList.ToArray()); stopWatch.Stop(); // 一定要關閉,不然會有內存泄漏 producerClient.Shutdown(); Console.WriteLine("發送消息:{0}條, 使用時間{1}毫秒", MessageCountPerThread * ProducerThreadCount, stopWatch.ElapsedMilliseconds); Console.ReadLine(); }
下面是消費者示例代碼
private static PushConsumerClient consumerClient = new PushConsumerClient(AccessKeyId, AccessKeySecret, Topic, ConsumerId, SubExpression); private static int count = 0; private class MyMsgListener : DefaultMessageListener { public override ons.Action consume(Message message, ConsumeContext context) { Console.WriteLine("消息序號: {0}, 當前線程ID = {1}, 內容為: {2}", ++count, Thread.CurrentThread.ManagedThreadId, message.getBody()); return ons.Action.CommitMessage; } } private static void Main(string[] args) { var listener = new MyMsgListener(); consumerClient.setMessageListener(listener); consumerClient.Start(); Console.ReadLine(); consumerClient.Shutdown(); }
消費者有一個消息監聽類, 有個consume方法, 這里趟過一個大坑, 因為為了省去一個類文件, 就用了匿名方法來代替上面的監聽類, 結果只要消息一條消息后, 程序就自動死, 沒有任何.net錯誤信息, 只有一個很底層的錯誤, 得不到任何有價值的信息, 花了很多時間, 才發現是因為匿名方法通過PPInvoke調用會有問題.后面換成監聽類, 問題就解決了.