.Net中使用 RocketMQ隊列


封裝的RocketMQ類庫已開源在github: https://github.com/antaintan/easyrocketmq

公司的一位同事推薦使用RocketMQ, 並給出了幾個流行MQ的對比, 資料來源, 阿里雲幫助文檔

目前RocketMQ已經成為Apache頂級項目, 作為開源軟件免費提供. 但是阿里雲提供的RocketMQ是收費的, 一個Topic 2元/天, 2元100萬消息.具體價格,可以參照阿里雲的報價.

詳細對比資料地址: https://help.aliyun.com/document_detail/52577.htmlhttps://help.aliyun.com/document_detail/52577.html

(注: 阿里雲的技術文檔寫得不錯, 可以學習到很多知識)

 

 

  RocketMQ 是一個款非常強大的MQ, 它同時支持, 無序消息, 分區順序消息, 全局順序消息, 並支持TCP, Http協議接入.

  客戶端支持Java, C++/.Net(.Net客戶端是通過封裝C++DLL,通過PInvoke調用來實現的, 基於x64位, 所以我們自己項目編譯時也要選擇x64)

 

特色功能

  • 事務消息,實現類似 X/Open XA 的分布事務功能,以達到事務最終一致性狀態。
  • 定時(延時)消息,允許消息生產者指定消息進行定時(延時)投遞,最長支持40天。
  • 大消息,目前默認支持最大 256KB 消息,華北2 地域支持最大 4MB 消息。
  • 消息軌跡,通過消息軌跡,用戶能清晰定位消息從發布者發出,經由 MQ 服務端,投遞給消息訂閱者的完整鏈路,方便定位排查問題。
  • 廣播消息,允許一個 Consumer ID 所標識的所有 Consumer 都會各自消費某條消息一次。
  • 順序消息,允許消息消費者按照消息發送的順序對消息進行消費。
  • 重置消費進度,根據時間重置消費進度,允許用戶進行消息回溯或者丟棄堆積消息。

 

多協議接入

  • 支持 HTTP 協議:支持 RESTful 風格 HTTP 協議完成收發消息,可以解決跨語言使用 MQ 問題。
  • 支持 TCP 協議:區別於 HTTP 簡單的接入方式,提供更為專業、可靠、穩定的 TCP 協議的 SDK 接入。

 注意: Java客戶端的TCP協議是功能支持最全的, .Net TCP要少一些功能支持如順序消息, 而Http協議支持的更少, 具體情況, 請詳細查閱阿里雲的幫助文檔.

消息隊列的幾個核心對象:

  • Topic: 消息主題
  • Consumer: 消息消費者, 消費訂閱的主題消息
  • Producer: 消息生產者, 生產不同的消息

  阿里提供的.Net類庫幾個核心類庫:

    NSClient4CPP.lib, c++類庫

    ONSClient4CPP.dll, c++類庫

    ONSClient4CPP.pdb c++類庫

    ons.dll: Net類庫, 利用開源軟件 SWIG 生成 PINVOKE 封裝代碼

  如果系統沒有c++運行時類庫, 還需要安裝vc_redist.x64.exe.

下面是生產者示例代碼:

private static ProducerClient producerClient = new ProducerClient(AccessKeyId, AccessKeySecret, ProducerId);

private static void Main(string[] args)
{
    producerClient.Start();

    var stopWatch = new Stopwatch();
    stopWatch.Start();

    var taskList = new List<Task>();
    for (int threadIndex = 1; threadIndex <= ProducerThreadCount; threadIndex++)
    {
        // 生產消費
        var task = Task.Factory.StartNew(() => {
            for (int messageIndex = 1; messageIndex <= MessageCountPerThread; messageIndex++)
            {
                string content = "線程ID=" + Thread.CurrentThread.ManagedThreadId + ", 我要測試rocketmq message";
                //producerClient.SendMessage(ShardingKey, Topic, content, Tag);
                producerClient.SendMessage(Topic, content, Tag);

                Console.WriteLine(content);
            }
        }, TaskCreationOptions.LongRunning);

        taskList.Add(task);
    }

    Task.WaitAll(taskList.ToArray());
    stopWatch.Stop();

    // 一定要關閉,不然會有內存泄漏
    producerClient.Shutdown();

    Console.WriteLine("發送消息:{0}條, 使用時間{1}毫秒", MessageCountPerThread * ProducerThreadCount, stopWatch.ElapsedMilliseconds);
    Console.ReadLine();
}
下面是消費者示例代碼
private static PushConsumerClient consumerClient = new PushConsumerClient(AccessKeyId, AccessKeySecret, Topic, ConsumerId, SubExpression);

private static int count = 0;

private class MyMsgListener : DefaultMessageListener
{
    public override ons.Action consume(Message message, ConsumeContext context)
    {
        Console.WriteLine("消息序號: {0}, 當前線程ID = {1}, 內容為: {2}", ++count, Thread.CurrentThread.ManagedThreadId, message.getBody());
        return ons.Action.CommitMessage;
    }
}

private static void Main(string[] args)
{
    var listener = new MyMsgListener();
    consumerClient.setMessageListener(listener);
    consumerClient.Start();

    Console.ReadLine();
    consumerClient.Shutdown();
}

消費者有一個消息監聽類, 有個consume方法, 這里趟過一個大坑, 因為為了省去一個類文件, 就用了匿名方法來代替上面的監聽類, 結果只要消息一條消息后, 程序就自動死, 沒有任何.net錯誤信息, 只有一個很底層的錯誤, 得不到任何有價值的信息, 花了很多時間, 才發現是因為匿名方法通過PPInvoke調用會有問題.后面換成監聽類, 問題就解決了.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM