Ø 簡介
C# 實現消息隊列的方式有很多種,比如:MSMQ、RabbitMQ、EQueue 等,本文主要介紹使用 RabbitMQ 實現消息隊列的基礎入門。包括如下內容:
1. 什么是消息隊列?
2. 什么是 RabbitMQ?
3. 安裝 RabbitMQ Server
4. RabbitMQ 的基本運用
5. 遠程部署 RabbitMQ 服務
6. RabbitMQ 常用命令
1. 什么是消息隊列?
消息隊列 MQ(全稱為 Message Queue),可實現兩個應用程序之間進行通信,MQ 是生產者與消費者模型的典型代表,一端往消息隊列中寫入消息,另一端可以讀取或者訂閱隊列中的消息。MQ 遵循的是 AMQP 協議(高級消息隊列協議:使得遵從該規范的客戶端應用和消息中間件服務器的全功能互操作成為可能)的具體實現和產品。
MQ 是一種消息中間件技術,所以它能夠支持多種類型的語言開發,同時也是跨平台的通信機制,也就是說 MQ 支持將信息轉化為 XML 或者 Json 等類型的數據存儲到消息隊列中,然后可以使用不同的語言來處理消息隊列中的消息,這樣就很容易的做到了信息的通信,同時也為信息的通信起到了緩沖的作用,經常會在金融項目中使用這種通信機制。
MQ 的主要作用用於提高系統的並發性,將一些不需要及時響應客戶端且占用較多資源的操作放入隊列,再由另外一個線程去異步處理這些隊列,可極大的提高系統的並發能力。
消息傳遞相比較文件傳遞與遠程調用(RPC)而言,視乎更勝一籌,因為跟平台無關,並能夠很好的支持並發與異步調用,所以通常用於以下情況:
1. 對操作的實時性要求不高;
2. 緩沖(消息/數據);
3. 需要執行的任務比較耗時;
4. 存在異構系統間的整合;
2. 什么是 RabbitMQ?
RabbitMQ 是一個在 AMQP 基礎上完整的,可復用的企業消息系統,它遵循 Mozilla Public License 開源協議。
RabbitMQ 是部署最廣泛的開源消息代理。RabbitMQ 擁有成千上萬的用戶,是最受歡迎的開源消息代理之一。RabbitMQ 在全球范圍內的小型初創企業和大型企業中都得到使用。
RabbitMQ 輕巧,易於在內部和雲中部署。它支持多種消息傳遞協議。RabbitMQ 可以部署在分布式和聯合配置中,以滿足大規模,高可用性的要求。RabbitMQ 官方網址:https://www.rabbitmq.com/
1) RabbitMQ 的特點
支持多種消息傳遞協議;
分布式部署;
支持多平台部署,例如:Windows Server、Linux 等;
支持多語言運用,例如:C#、Java、PHP、Python 等;
3. 安裝 RabbitMQ Server
1) 進入官網下載地址
https://www.rabbitmq.com/#getstarted
2) 點擊 Download + Installation
3) 點擊官網推薦的 Windows 系統的安裝包
4) Windows 上安裝 RabbitMQ,推薦了兩個安裝選項:
1. 使用 Chocolatey 安裝;
Chocolatey 類似於 Nuget 軟件包管理工具,這里不使用該方式安裝。
2. 使用官方安裝程序;
這里使用該方式進行安裝,點擊【Using the official installer】。
5) 這里說了,在 Windows 上運行 RabbitMQ 依賴於 Erlang 框架,所以先下載並安裝它:
說明:RabbitMQ 服務是用 Erlang 語言編寫的,Erlang 類似於 C# 調用的 .NET Framework 框架。
1. 點擊 OTP 22.1 Windows 64-bit Binary File (264) 進行下載
2. 下載完成后進行安裝(注意:需要以管理員身份安裝)
默認安裝在 C:\Program Files\erl10.5 目錄下
3. 安裝完成后,設置環境變量
系統變量 -> Path -> 編輯 -> 新建,輸入:C:\Program Files\erl10.5\bin,保存即可。
4. 檢查是否安裝成功,運行 CMD
6) 依然是剛才的頁面,下載 RabbitMQ Server
7) 下載后直接安裝
默認安裝在 C:\Program Files\RabbitMQ Server 目錄下
8) 檢查是否安裝成功,運行 CMD
CD 到 RabbitMQ 的 sbin 目錄下,輸入:CD C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.1\sbin
檢查安裝狀態,輸入:rabbitmqctl status
啟用 RabbitMQ 管理工具,輸入:rabbitmq-plugins enable rabbitmq_management
進入管理頁面,瀏覽器地址欄輸入:http://127.0.0.1:15672/
另外,在 Windows 服務列表中也可以找到 RabbitMQ Server
4. RabbitMQ 的基本運用
1) 首先創建兩個控制台應用程序
RabbitMQProducer(生產者),用於向消息隊列中發送消息;
RabbitMQConsumer(消費者),用於接收消息隊列中的消息。
2) 添加 RabbitMQ.Client 客戶端程序集
提示:對於 .NET Framework 4.0 的目標項目,RabbitMQ.Client 最高支持的版本是 RabbitMQ.Client.3.5.7。
3) 向消息隊列中發送消息
1. C# 代碼
using RabbitMQ.Client;
var factory = new ConnectionFactory(); //連接 RabbitMQ 工廠實例
factory.HostName = "localhost"; //要連接到的主機,默認為 localhost
factory.Port = 5672; //連接斷開,默認為 -1(5672)
factory.UserName = "guest"; //RabbitMQ 連接用戶名,默認為 guest
factory.Password = "guest"; //RabbitMQ 連接密碼,默認為 guest
//創建連接對象
using (var connection = factory.CreateConnection())
{
//創建一個新的通道、會話和模型
using (var channel = connection.CreateModel())
{
/*
* 創建一個名為 myQueue1 的消息隊列,如果名稱相同不會重復創建,參數解釋:
* 參1:myQueue1, 消息隊列名稱;
* 參2:false, 是否持久化,持久化的隊列會存盤,服務器重啟后任然存在;
* 參3:false, 是否為排他隊列,排他隊列表示僅對首次聲明它的連接可見,並在連接斷開時自動刪除。這種隊列適用於一個客戶端同時發送和讀取消息的應用場景。
* 參4:false, 是否自動刪除,自動刪除的前提是:至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除。
* 參5:設置隊列的其他一些參數,如 x-rnessage-ttl、x-expires、x-rnax-length、x-rnax-length-bytes、x-dead-letter-exchange、x-deadletter-routing-key、x-rnax-priority 等。
*/
channel.QueueDeclare("myQueue1", false, false, false, null);
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 1;
string message = "Hello RabbitMQ"; //消息內容
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "myQueue1", properties, body); //發送(生產)消息
Console.WriteLine($"Send Message: {message}");
}
}
Console.ReadLine();
2. 運行以上代碼,使用 rabbitmqctl list_queues 命令查看結果:
3. 也可以使用管理工具查看
4) 接收消息隊列中的消息
var factory = new ConnectionFactory(); //連接 RabbitMQ 工廠實例
factory.HostName = "localhost"; //要連接到的主機,默認為 localhost
factory.Port = 5672; //連接斷開,默認為 -1(5672)
factory.UserName = "guest"; //RabbitMQ 連接用戶名,默認為 guest
factory.Password = "guest"; //RabbitMQ 連接密碼,默認為 guest
//不能放入 using 語句中,否則當 using 語句結束后會 Close 連接,EventingBasicConsumer.Received 事件將不會被觸發
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare("myQueue1", false, false, false, null);
var consumer = new EventingBasicConsumer(channel); //消費者(指定消息通道)
channel.BasicConsume("myQueue1", true, consumer); //消費消息(在當前通道中監聽 myQueue1 隊列,並進行消費)
//該事件在接收到消息時觸發
consumer.Received += (sender, e) =>
{
byte[] body = e.Body; //消息字節數組
string message = Encoding.UTF8.GetString(body); //消息內容
Console.WriteLine($"Received: {message}, {DateTime.Now.ToString("HH: mm:ss fff")}");
};
Console.ReadLine();
connection.Close();
channel.Close();
5) 模擬頻繁發送/接收消息
1. 修改代碼
for (int i = 0; i < 100; i++)
{
string message = $"Hello RabbitMQ {i}";
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "myQueue1", properties, body);
Console.WriteLine($"Send: {message}, {DateTime.Now.ToString("HH:mm:ss fff")}");
System.Threading.Thread.Sleep(1000); //間隔1秒鍾發送一次
}
2. 運行效果
5. 遠程部署 RabbitMQ 服務
我們之前是將 RabbitMQ 服務部署在本機,以 localhost 的主機名去訪問。在實際工作通常是將 RabbitMQ 服務部署在遠程的服務器上,提供給更多的客戶端去訪問。下面是具體的操作步驟:
1) 具體的安裝步驟與本地安裝一致(可以參考本地安裝)
2) 開放 5672 端口(這是 RabbitMQ 服務的默認端口)
1. 打開防火牆,開放 5672 端口的訪問;
2. 如果是阿里雲 ECS 服務器,還需要在安全組規則開放對 5672 端口訪問。
3) RabbitMQ 默認不支持 guest 用戶遠程訪問
4) 解決辦法很簡單,添加一個新用戶即可
1. 打開 RabbitMQ 管理工具,進入用戶管理 Tab
2. 添加用戶后,默認是沒有權限的,點擊用戶名進行權限設置
3. 點擊 Set permission 即可
4. 另外,也可以使用命令進行權限設置
rabbitmqctl set_permissions -p "/" abeam "." "." ".*"
6. RabbitMQ 常用命令
首先,進入 RabbitMQ 命令
CMD: CD C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.1\sbin
創建 abeam 用戶並指定密碼 |
rabbitmqctl add_user abeam abeamli |
設置 abeam 用戶讀寫所有隊列的權限 |
rabbitmqctl set_permissions abeam ".*" ".*" ".*" |
設置 abeam 用戶所屬的用戶組 |
rabbitmqctl set_user_tags abeam administrator |
查看用戶列表 |
rabbitmqctl list_users |
修改 abeam 用戶的密碼為 abeam123 |
rabbitmqctl change_password abeam abeam123 |
刪除 abeam 用戶 |
rabbitmqctl delete_user abeam |