.NET Core 使用RabbitMQ


1.什么是RabbitMQ

  RabbitMQ是一個開源的,基於AMQP(Advanced Message Queuing Protocol)協議的完整,可復用的企業級消息隊列(Message Queue 一種應用程序與應用程序之間的一種通信方法)系統,RabbitMQ可以實現點對點,發布訂閱等消息處理模式

2.安裝RabbitMQ

  網上有許多RabbitMQ的安裝博客,所以在此不介紹   LINUX安裝 WINDOWS安裝

3..NET Core中使用RabbitMQ

  RabbitMQ從信息接收者角度可以看做三種模式,一對一,一對多(此一對多並不是發布訂閱,而是每條信息只有一個接收者)發布訂閱。其中一對一是簡單隊列模式,一對多是Worker模式,而發布訂閱包括發布訂閱模式,路由模式和通配符模式,為什么說發布訂閱模式包含三種模式呢,其實發布訂閱,路由,通配符三種模式都是使用只是交換機(Exchange)類型不一致

3.1 簡單隊列

  首先,我們需要創建兩個控制台項目.Send(發送者)和Receive(接收者),然后為兩個項目安裝RabbitMQ.Client驅動

install-package rabbitmq.client

  然后在Send和Receive項目中編寫我們的消息隊列代碼

  發送者代碼

using RabbitMQ.Client;
using System;
using System.Text;

namespace Send
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Start");
            IConnectionFactory conFactory = new ConnectionFactory//創建連接工廠對象
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//端口號
                UserName = "yan",//用戶賬號
                Password = "yan"//用戶密碼
            };
            using (IConnection con = conFactory.CreateConnection())//創建連接對象
            {
                using (IModel channel = con.CreateModel())//創建連接會話對象
                {
                    String queueName = String.Empty;
                    if (args.Length > 0)
                        queueName = args[0];
                    else
                        queueName = "queue1";
                    //聲明一個隊列
                    channel.QueueDeclare(
                      queue: queueName,//消息隊列名稱
                      durable: false,//是否緩存
                      exclusive: false,
                      autoDelete: false,
                      arguments: null
                       );
                    while (true)
                    {
                        Console.WriteLine("消息內容:");
                        String message = Console.ReadLine();
                        //消息內容
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        //發送消息
                        channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
                        Console.WriteLine("成功發送消息:" + message);
                    }
                }
            }
        }
    }
}
View Code  

  可以看到RabbitMQ使用了IConnectionFactory,IConnection和IModel來創建鏈接和通信管道,IConnection實例對象只負責與Rabbit的連接,而發送接收這些實際操作全部由會話通道進行,

  而后使用QueneDeclare方法進行創建消息隊列,創建完成后可以在RabbitMQ的管理工具中看到此隊列,QueneDelare方法需要一個消息隊列名稱的必須參數.后面那些參數則代表緩存,參數等信息.

  最后使用BasicPublish來發送消息,在一對一中routingKey必須和 queueName一致

  接收者代碼

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Receive1
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Start");
            IConnectionFactory connFactory = new ConnectionFactory//創建連接工廠對象
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//端口號
                UserName = "yan",//用戶賬號
                Password = "yan"//用戶密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    String queueName = String.Empty;
                    if (args.Length > 0)
                        queueName = args[0];
                    else
                        queueName = "queue1";
                    //聲明一個隊列
                    channel.QueueDeclare(
                      queue: queueName,//消息隊列名稱
                      durable: false,//是否緩存
                      exclusive: false,
                      autoDelete: false,
                      arguments: null
                       );
                    //創建消費者對象
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        byte[] message = ea.Body;//接收到的消息
                        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message));
                    };
                    //消費者開啟監聽
                    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
}
View Code

  在接收者中是定義一個EventingBasicConsumer對象的消費者(接收者),這個消費者與會話對象關聯,

  然后定義接收事件,輸出從消息隊列中接收的數據,

  最后使用會話對象的BasicConsume方法來啟動消費者監聽.消費者的定義也是如此簡單.

  不過注意一點,可以看到在接收者代碼中也有聲明隊列的方法,其實這句代碼可以去掉,但是如果去掉的話接收者在程序啟動時監聽隊列,而此時這個隊列還未存在,所以會出異常,所以往往會在消費者中也添加一個聲明隊列方法

  此時,簡單消息隊列傳輸就算寫好了,我們可以運行代碼就行測試

  

  

3.2 Worker模式

  Worker模式其實是一對多的模式,但是這個一對多並不是像發布訂閱那種,而是信息以順序的傳輸給每個接收者,我們可以使用上個例子來運行worker模式甚至,只需要運行多個接收者即可

  

  

  

  可以看到運行兩個接收者,然后發送者發送了1-5這五個消息,第一個接收者接收的是奇數,而第二個接收者接收的是偶數,但是現在的worker存在這很大的問題,

    1.丟失數據:一旦其中一個宕機,那么另外接收者的無法接收原本這個接收者所要接收的數據

    2.無法實現能者多勞:如果其中的接收者接收的較慢,那么便會極大的浪費性能,所以需要實現接收快的多接收

  下面針對上面的兩個問題進行處理

  首先我們先來看一下所說的宕機丟失數據一說,我們在上個例子Receive接收事件中添加線程等待

 consumer.Received += (model, ea) =>
                    {
                        Thread.Sleep(1000);//等待1秒,
                        byte[] message = ea.Body;//接收到的消息
                        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message));
                    };

  然后再次啟動兩個接收者進行測試

      

  

  

  可以看到發送者發送了1-9的數字,第二個接收者在接收數據途中宕機,第一個接收者也並沒有去接收第二個接收者宕機后的數據,有的時候我們會有當接收者宕機后,其余數據交給其它接收者進行消費,那么該怎么進行處理呢,解決這個問題得方法就是改變其消息確認模式

  在Rabbit中存在兩種消息確認模式,

    自動確認:只要消息從隊列獲取,無論消費者獲取到消息后是否成功消費,都認為是消息成功消費,也就是說上面第二個接收者其實已經消費了它所接收的數據

    手動確認:消費從隊列中獲取消息后,服務器會將該消息處於不可用狀態,等待消費者反饋

  也就是說我們只要將消息確認模式改為手動即可,改為手動確認方式只需改兩處,1.開啟監聽時將autoAck參數改為false,2.消息消費成功后返回確認

 consumer.Received += (model, ea) =>
                    {
                        Thread.Sleep(1000);//等待1秒,
                        byte[] message = ea.Body;//接收到的消息
                        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message));
                        //返回消息確認
                        channel.BasicAck(ea.DeliveryTag, true);
                    };
                    //消費者開啟監聽
                    //將autoAck設置false 關閉自動確認
                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

  然后再次測試便會出現下面結果

   

  

  能者多勞是建立在手動確認基礎上,下面修改一下代碼中等待的時間

 consumer.Received += (model, ea) =>
                    {
                        Thread.Sleep((new Random().Next(1,6))*1000);//隨機等待,實現能者多勞,
                        byte[] message = ea.Body;//接收到的消息
                        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message));
                        //返回消息確認
                        channel.BasicAck(ea.DeliveryTag, true);
                    };

  然后只需要再添加BasicQos方法即可

 //聲明一個隊列
                    channel.QueueDeclare(
                      queue: queueName,//消息隊列名稱
                      durable: false,//是否緩存
                      exclusive: false,
                      autoDelete: false,
                      arguments: null
                       );
                    //告訴Rabbit每次只能向消費者發送一條信息,再消費者未確認之前,不再向他發送信息
                    channel.BasicQos(0, 1, false);

      

  

  可以看到此時已實現能者多勞

  worker模式接收者完整代碼

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace Receive1
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Start");
            IConnectionFactory connFactory = new ConnectionFactory//創建連接工廠對象
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//端口號
                UserName = "yan",//用戶賬號
                Password = "yan"//用戶密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    String queueName = String.Empty;
                    if (args.Length > 0)
                        queueName = args[0];
                    else
                        queueName = "queue1";
                    //聲明一個隊列
                    channel.QueueDeclare(
                      queue: queueName,//消息隊列名稱
                      durable: false,//是否緩存
                      exclusive: false,
                      autoDelete: false,
                      arguments: null
                       );
                    //告訴Rabbit每次只能向消費者發送一條信息,再消費者未確認之前,不再向他發送信息
                    channel.BasicQos(0, 1, false);
                    //創建消費者對象
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        Thread.Sleep((new Random().Next(1,6))*1000);//隨機等待,實現能者多勞,
                        byte[] message = ea.Body;//接收到的消息
                        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message));
                        //返回消息確認
                        channel.BasicAck(ea.DeliveryTag, true);
                    };
                    //消費者開啟監聽
                    //將autoAck設置false 關閉自動確認
                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
}
View Code

3.3 Exchange模式(發布訂閱模式,路由模式,通配符模式)

   前面說過發布,路由,通配符這三種模式其實可以算為一種模式,區別僅僅是交互機類型不同.在這里出現了一個交換機的東西,發送者將消息發送發送到交換機,接收者創建各自的消息隊列綁定到交換機,

       發布訂閱模式

     路由模式        

     通配符模式

  通過上面三幅圖可以看出這三種模式本質就是一種訂閱模式,路由,通配符模式只是訂閱模式的變種模式。使其可以選擇發送訂閱者中的接收者。

  注意:交換機本身並不存儲數據,數據存儲在消息隊列中,所以如果向沒有綁定消息隊列的交換機中發送信息,那么信息將會丟失

  下面依次來看一下這三種模式

  發布訂閱模式(fanout)

  發送者代碼

using RabbitMQ.Client;
using System;
using System.Text;

namespace Send3
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Start");
            IConnectionFactory connFactory = new ConnectionFactory//創建連接工廠對象
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//端口號
                UserName = "yan",//用戶賬號
                Password = "yan"//用戶密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using(IModel channel = conn.CreateModel())
                {
                    //交換機名稱
                    String exchangeName = String.Empty;
                    if (args.Length > 0)
                        exchangeName = args[0];
                    else
                        exchangeName = "exchange1";
                    //聲明交換機
                    channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
                    while (true)
                    {
                        Console.WriteLine("消息內容:");
                        String message = Console.ReadLine();
                        //消息內容
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        //發送消息
                        channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
                        Console.WriteLine("成功發送消息:" + message);
                    }
                }
            }
        }
    }
}
View Code

  發送者代碼與上面沒有什么差異,只是由上面的消息隊列聲明變成了交換機聲明(交換機類型為fanout),也就說發送者發送消息從原來的直接發送消息隊列變成了發送到交換機

  接收者代碼

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Receive3
{
    class Program
    {
        static void Main(string[] args)
        {
            //創建一個隨機數,以創建不同的消息隊列
            int random = new Random().Next(1, 1000);
            Console.WriteLine("Start"+random.ToString());
            IConnectionFactory connFactory = new ConnectionFactory//創建連接工廠對象
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//端口號
                UserName = "yan",//用戶賬號
                Password = "yan"//用戶密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    //交換機名稱
                    String exchangeName = String.Empty;
                    if (args.Length > 0)
                        exchangeName = args[0];
                    else
                        exchangeName = "exchange1";
                    //聲明交換機
                    channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
                    //消息隊列名稱
                    String queueName = exchangeName + "_" + random.ToString();
                    //聲明隊列
                    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    //將隊列與交換機進行綁定
                    channel.QueueBind(queue: queueName, exchange: exchangeName,routingKey:"");
                    //聲明為手動確認
                    channel.BasicQos(0, 1, false);
                    //定義消費者
                    var consumer = new EventingBasicConsumer(channel);
                    //接收事件
                    consumer.Received += (model, ea) =>
                    {
                        byte[] message = ea.Body;//接收到的消息
                        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message));
                        //返回消息確認
                        channel.BasicAck(ea.DeliveryTag, true);
                    };
                    //開啟監聽
                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
}
View Code

 

  可以看到接收者代碼與上面有些差異

  首先是聲明交換機(同上面一樣,為了防止異常)

  然后聲明消息隊列並對交換機進行綁定,在這里使用了隨機數,目的是聲明不重復的消息隊列,如果是同一個消息隊列,則就變成worker模式,也就是說對於發布訂閱模式有多少接收者就有多少個消息隊列,而這些消息隊列共同從一個交換機中獲取數據

  然后同時開兩個接收者,結果就如下

  

  

  

  路由模式(direct)

  上面說過路由模式是訂閱模式的一個變種模式,以路由進行匹配發送,例如將消息1發送給A,B兩個消息隊列,或者將消息2發送給B,C兩個消息隊列,路由模式的交換機是direct

  發送者代碼

using RabbitMQ.Client;
using System;
using System.Text;

namespace Send3
{
    class Program
    {
        static void Main(string[] args)
        {
            if (args.Length == 0) throw new ArgumentException("args");
            Console.WriteLine("Start");
            IConnectionFactory connFactory = new ConnectionFactory//創建連接工廠對象
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//端口號
                UserName = "yan",//用戶賬號
                Password = "yan"//用戶密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using(IModel channel = conn.CreateModel())
                {
                    //交換機名稱
                    String exchangeName = "exchange2";
                    //路由名稱
                    String routeKey = args[0];
                    //聲明交換機   路由交換機類型direct
                    channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
                    while (true)
                    {
                        Console.WriteLine("消息內容:");
                        String message = Console.ReadLine();
                        //消息內容
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        //發送消息  發送到路由匹配的消息隊列中
                        channel.BasicPublish(exchange: exchangeName, routingKey: routeKey, basicProperties: null, body: body);
                        Console.WriteLine("成功發送消息:" + message);
                    }
                }
            }
        }
    }
}
View Code

  發送者代碼相比上面只改了兩處

    1.將交換機類型改為了direct類型

    2.將運行時的第一個參數改成了路由名稱,然后發送數據時由指定路由的消息隊列進行獲取數據

  接收者代碼

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Receive3
{
    class Program
    {
        static void Main(string[] args)
        {
            if (args.Length == 0) throw new ArgumentException("args");
            //創建一個隨機數,以創建不同的消息隊列
            int random = new Random().Next(1, 1000);
            Console.WriteLine("Start"+random.ToString());
            IConnectionFactory connFactory = new ConnectionFactory//創建連接工廠對象
            {
                HostName = "47.104.206.56",//IP地址
                Port = 5672,//端口號
                UserName = "yan",//用戶賬號
                Password = "yan"//用戶密碼
            };
            using (IConnection conn = connFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    //交換機名稱
                    String exchangeName = "exchange2";
                    //聲明交換機
                    channel.ExchangeDeclare(exchange: exchangeName, type:"direct");
                    //消息隊列名稱
                    String queueName = exchangeName + "_" + random.ToString();
                    //聲明隊列
                    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    //將隊列與交換機進行綁定
                    foreach (var routeKey in args)
                    {//匹配多個路由
                        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKey);
                    }
                    //聲明為手動確認
                    channel.BasicQos(0, 1, false);
                    //定義消費者
                    var consumer = new EventingBasicConsumer(channel);
                    //接收事件
                    consumer.Received += (model, ea) =>
                    {
                        byte[] message = ea.Body;//接收到的消息
                        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message));
                        //返回消息確認
                        channel.BasicAck(ea.DeliveryTag, true);
                    };
                    //開啟監聽
                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
}
View Code

   在接收者代碼中的改動點也是與發送者一致,但是一個接收者消息隊列可以聲明多個路由與交換機進行綁定

  運行情況如下

  

  

  

  通配符模式(topic)

  通配符模式與路由模式一致,只不過通配符模式中的路由可以聲明為模糊查詢,RabbitMQ擁有兩個通配符

    #:匹配0-n個字符語句

    *:匹配一個字符語句

    注意:RabbitMQ中通配符並不像正則中的單個字符,而是一個以“.”分割的字符串,如 ”topic1.*“匹配的規則以topic1開始並且"."后只有一段語句的路由  例:“topic1.aaa”,“topic1.bb”

  發送者代碼

//交換機名稱
String exchangeName = "exchange3";
//路由名稱
String routeKey = args[0];
//聲明交換機   通配符類型為topic
channel.ExchangeDeclare(exchange: exchangeName, type: "topic");
while (true)
{
    Console.WriteLine("消息內容:");
    String message = Console.ReadLine();
     //消息內容
     byte[] body = Encoding.UTF8.GetBytes(message);
     //發送消息  發送到路由匹配的消息隊列中
     channel.BasicPublish(exchange: exchangeName, routingKey: routeKey, basicProperties: null, body: body);
     Console.WriteLine("成功發送消息:" + message);
}
View Code

   修改了兩點:交換機名稱(每個交換機只能聲明一種類型,如果還用exchang2的話就會出異常),交換機類型改為topic

  接收者代碼

//交換機名稱
String exchangeName = "exchange3";
//聲明交換機    通配符類型為topic
channel.ExchangeDeclare(exchange: exchangeName, type:"topic");
//消息隊列名稱
String queueName = exchangeName + "_" + random.ToString();
//聲明隊列
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
//將隊列與交換機進行綁定
foreach (var routeKey in args)
{//匹配多個路由
    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKey);
}
//聲明為手動確認
channel.BasicQos(0, 1, false);
//定義消費者
var consumer = new EventingBasicConsumer(channel);
//接收事件
consumer.Received += (model, ea) =>
{
   byte[] message = ea.Body;//接收到的消息
   Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message));
    //返回消息確認
    channel.BasicAck(ea.DeliveryTag, true);
};
//開啟監聽
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.ReadKey();
View Code

  接收者修改與發送者一致

  運行結果如下

  

  

  

 

  

 

 

 

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM