net core3.1使用RocketMQ


 工具:

vs2019 netcore3.1  阿里雲RocketMQ V1.0.1

參考:阿里雲官方示例:https://github.com/aliyunmq/mq-http-csharp-sdk

           Apache RocketMQ開發者指南

      RocketMQ學習筆記

RocketMQ 是什么

Github 上關於 RocketMQ 的介紹:
RcoketMQ 是一款低延遲、高可靠、可伸縮、易於使用的消息中間件。具有以下特性:

  1. 支持發布/訂閱(Pub/Sub)和點對點(P2P)消息模型
  2. 在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞
  3. 支持拉(pull)和推(push)兩種消息模式
  4. 單一隊列百萬消息的堆積能力
  5. 支持多種消息協議,如 JMS、MQTT 等
  6. 分布式高可用的部署架構,滿足至少一次消息傳遞語義
  7. 提供 docker 鏡像用於隔離測試和雲集群部署
  8. 提供配置、指標和監控等功能豐富的 Dashboard

專業術語

Producer
消息生產者,生產者的作用就是將消息發送到 MQ,生產者本身既可以產生消息,如讀取文本信息等。也可以對外提供接口,由外部應用來調用接口,再由生產者將收到的消息發送到 MQ。

Producer Group
生產者組,簡單來說就是多個發送同一類消息的生產者稱之為一個生產者組。在這里可以不用關心,只要知道有這么一個概念即可。

Consumer
消息消費者,簡單來說,消費 MQ 上的消息的應用程序就是消費者,至於消息是否進行邏輯處理,還是直接存儲到數據庫等取決於業務需要。

Consumer Group
消費者組,和生產者類似,消費同一類消息的多個 consumer 實例組成一個消費者組。

Topic
Topic 是一種消息的邏輯分類,比如說你有訂單類的消息,也有庫存類的消息,那么就需要進行分類,一個是訂單 Topic 存放訂單相關的消息,一個是庫存 Topic 存儲庫存相關的消息。

Message
Message 是消息的載體。一個 Message 必須指定 topic,相當於寄信的地址。Message 還有一個可選的 tag 設置,以便消費端可以基於 tag 進行過濾消息。也可以添加額外的鍵值對,例如你需要一個業務 key 來查找 broker 上的消息,方便在開發過程中診斷問題。

Tag
標簽可以被認為是對 Topic 進一步細化。一般在相同業務模塊中通過引入標簽來標記不同用途的消息。

Broker
Broker 是 RocketMQ 系統的主要角色,其實就是前面一直說的 MQ。Broker 接收來自生產者的消息,儲存以及為消費者拉取消息的請求做好准備。

Name Server
Name Server 為 producer 和 consumer 提供路由信息。

第一步:阿里雲創建實例

 

 

 創建完實例后,可以得到我們需要的下面這些參數值。

// 設置HTTP接入域名(此處以公共雲生產環境為例)
private const string _endpoint = "${HTTP_ENDPOINT}";
// AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建
private const string _accessKeyId = "${ACCESS_KEY}";
// SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建
private const string _secretAccessKey = "${SECRET_KEY}";
// 所屬的 Topic
private const string _topicName = "${TOPIC}";
// Topic所屬實例ID,默認實例為空
private const string _instanceId = "${INSTANCE_ID}";
// 您在控制台創建的 Consumer ID(Group ID)
private const string _groupId = "${GROUP_ID}";

 

 

第二步:引用RocketMQ

打開vs2019創建控制台應用程序,打開Nuget包管理器,引用AliyunMQ.Http,選擇最新版安裝即可。

第三步:創建消息生產類:

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;

namespace Aliyun.MQ.Sample
{
    public class ProducerSample
    {
        // 設置HTTP接入域名(此處以公共雲生產環境為例)
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建
        private const string _accessKeyId = "${ACCESS_KEY}";
        // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建
        private const string _secretAccessKey = "${SECRET_KEY}";
        // 所屬的 Topic
        private const string _topicName = "${TOPIC}";
        // Topic所屬實例ID,默認實例為空
        private const string _instanceId = "${INSTANCE_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);

        static MQProducer producer = _client.GetProducer(_instanceId, _topicName);

        static void Main(string[] args)
        {
            try
            {
                // 循環發送4條消息
                for (int i = 0; i < 4; i++)
                {
                    TopicMessage sendMsg;
                    if (i % 2 == 0)
                    {
                        sendMsg = new TopicMessage("dfadfadfadf");
                        // 設置屬性
                        sendMsg.PutProperty("a", i.ToString());
                        // 設置KEY
                        sendMsg.MessageKey = "MessageKey";
                    }
                    else
                    {
                        sendMsg = new TopicMessage("dfadfadfadf", "tag");
                        // 設置屬性
                        sendMsg.PutProperty("a", i.ToString());
                        // 定時消息, 定時時間為10s后
                        sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp() + 10 * 1000;
                    }
                    TopicMessage result = producer.PublishMessage(sendMsg);
                    Console.WriteLine("publis message success:" + result);
                }
            }
            catch (Exception ex)
            {
                Console.Write(ex);
            }
        }
    }
}
View Code

 

第四步,創建消息消費者類

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;

namespace Aliyun.MQ.Sample
{
    public class ConsumerSample
    {
        // 設置HTTP接入域名(此處以公共雲生產環境為例)
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建
        private const string _accessKeyId = "${ACCESS_KEY}";
        // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建
        private const string _secretAccessKey = "${SECRET_KEY}";
        // 所屬的 Topic
        private const string _topicName = "${TOPIC}";
        // Topic所屬實例ID,默認實例為空
        private const string _instanceId = "${INSTANCE_ID}";
        // 您在控制台創建的 Consumer ID(Group ID)
        private const string _groupId = "${GROUP_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
        static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);

        static void Main(string[] args)
        {
            // 在當前線程循環消費消息,建議是多開個幾個線程並發消費消息
            while (true)
            {
                try
                {
                    // 長輪詢消費消息
                    // 長輪詢表示如果topic沒有消息則請求會在服務端掛住3s,3s內如果有消息可以消費則立即返回
                    List<Message> messages = null;

                    try
                    {
                        messages = consumer.ConsumeMessage(
                            3, // 一次最多消費3條(最多可設置為16條)
                            3 // 長輪詢時間3秒(最多可設置為30秒)
                        );
                    }
                    catch (Exception exp1)
                    {
                        if (exp1 is MessageNotExistException)
                        {
                            Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId);
                            continue;
                        }
                        Console.WriteLine(exp1);
                        Thread.Sleep(2000);
                    }

                    if (messages == null)
                    {
                        continue;
                    }

                    List<string> handlers = new List<string>();
                    Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");
                    // 處理業務邏輯
                    foreach (Message message in messages)
                    {
                        Console.WriteLine(message);
                        Console.WriteLine("Property a is:" + message.GetProperty("a"));
                        handlers.Add(message.ReceiptHandle);
                    }
                    // Message.nextConsumeTime前若不確認消息消費成功,則消息會重復消費
                    // 消息句柄有時間戳,同一條消息每次消費拿到的都不一樣
                    try
                    {
                        consumer.AckMessage(handlers);
                        Console.WriteLine("Ack message success:");
                        foreach (string handle in handlers)
                        {
                            Console.Write("\t" + handle);
                        }
                        Console.WriteLine();
                    }
                    catch (Exception exp2)
                    {
                        // 某些消息的句柄可能超時了會導致確認不成功
                        if (exp2 is AckMessageException)
                        {
                            AckMessageException ackExp = (AckMessageException)exp2;
                            Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
                            foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
                            {
                                Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    Thread.Sleep(2000);
                }
            }
        }
    }
}
View Code

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM