RabbitMQ實例C#


驅動組件.NET版本

官網推薦驅動:RabbitMQ.Client

https://www.rabbitmq.com/devtools.html#dotnet-dev

Connection和Channel

Connection是一個TCP連接,一般服務器這種資源都是很寶貴的,所以提供了Channel,完成消息的發布消費。這樣Connection就可以做成單例模式的。

1、事件

Connection和Channel里面包含了幾個事件。分別在不同的情況下觸發

 

 其他時間執行發生異常,就會執行這個

Connection.CallbackException

恢復連接成功

Connection.RecoverySucceeded

連接恢復異常時會觸發這個事件

Connection.ConnectionRecoveryError

RabbitMQ出於自身保護策略,通過阻塞方式限制寫入,導致了生產者應用“假死”,不對外服務。比若說CPU  IO RAM下降,隊列堆積,導致堵塞。  就會觸發這個事件

Connection.ConnectionBlocked

阻塞解除會觸發這個事件

Connection.ConnectionUnblocked

connection斷開連接時候

Connection.ConnectionShutdown

-------------------------------------------------------------------------------------------------------------------------------------------------------

.NET RabbitMQ.Client中Channel叫做Model

 

channel斷開連接時候觸發

Channel.ModelShutdown

其他時間執行發生異常,就會執行這個

Channel.CallbackException

broker 發現當前消息無法被路由到指定的 queues 中(如果設置了 mandatory 屬性,則 broker 會先發送 basic.return)

Channel.BasicReturn

Channel.BasicRecoverOk

Signalled when a Basic.Nack command arrives from the broker.

Channel.BasicNacks

Signalled when a Basic.Ack command arrives from the broker.

Channel.BasicAcks

Channel.FlowControl

2、屬性

最大channel數量

connetion.ChannelMax

服務上這個連接的對象屬性

connetion.ClientProperties

服務器上這個連接的名字

connetion.ClientProvidedName

關閉原因

connetion.CloseReason

端口

connetion.Endpoint

和客戶端通信時所允許的最大的frame size

connetion.FrameMax

連接的心跳包

connetion.Heartbeat

是否打開

connetion.IsOpen

獲取vhost

connetion.KnownHosts

本地端口

connetion.LocalPort

連接串使用的協議

connetion.Protocol

遠程端口,服務器

connetion.RemotePort

服務器屬性

connetion.ServerProperties

關停信息

connetion.ShutdownReport

----------------------------------------------------------------------------------------------------------------------------------------

channel編號

channel.ChannelNumber

關閉原因

channel.CloseReason

連接超時時間

channel.ContinuationTimeout

channel.IsClosed

channel.IsOpen

下一個消息編號

channel.NextPublishSeqNo

3、方法

終止連接以及他們的channel,可以指定時間長度。

connetion.Abort()

關閉連接以及他的channel

connetion.Close()

創建channel

connetion.CreateModel()

connetion.HandleConnectionBlocked()

connetion.HandleConnectionUnblocked()

 

發送消息Confirm模式

目的確認消息是否到達消息隊列中 

 

1、mandatory

 broker 發現當前消息無法被路由到指定的 queues 中(如果設置了 mandatory 屬性,則 broker 會先發送 basic.return)

channel.BasicReturn += Channel_BasicReturn;
channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);

 

private static void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e)
{
    Console.WriteLine("Channel_BasicReturn");
}

 

2、普通Confirm模式

channel.ConfirmSelect();

channel.BasicPublish("amq.direct", routingKey: "MyRoutKey1",mandatory:true, basicProperties: null, body: body);

if (channel.WaitForConfirms())
{
    Console.WriteLine("普通發送方確認模式");
}

 

3、批量Confirm模式

channel.ConfirmSelect();

channel.BasicPublish("amq.direct", routingKey: "MyRoutKey",mandatory:true, basicProperties: null, body: body);
channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);

channel.WaitForConfirmsOrDie();
Console.WriteLine("普通發送方確認模式");

 

 

4、異步Confirm模式

java版本組件有

5、事物

 

try
{
    //聲明事物
    channel.TxSelect();
    channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
    channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
    channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
    //提交事物
    channel.TxCommit();
}
catch (Exception)
{
    //回滾
    channel.TxRollback();

}

上面說的是生產者發布消息確認,那么消費者消費如何確認呢,大家都知道消費者有ack機制,但是用到事物的時候,是怎樣的呢

1.autoAck=false手動應對的時候是支持事務的,也就是說即使你已經手動確認了消息已經收到了,但在確認消息會等事務的返回解決之后,在做決定是確認消息還是重新放回隊列,如果你手動確認現在之后,又回滾了事務,那么已事務回滾為主,此條消息會重新放回隊列;
2.autoAck=true如果自定確認為true的情況是不支持事務的,也就是說你即使在收到消息之后在回滾事務也是於事無補的,隊列已經把消息移除了;

 

事物比較耗性能

 

簡單消息發送
static void Main(string[] args)
        {


            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.140.161";
            factory.Port = 5672;
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "TestVHost";
            //創建connetion
            using (var connetion = factory.CreateConnection())
            {
                connetion.CallbackException += Connetion_CallbackException;
                connetion.RecoverySucceeded += Connetion_RecoverySucceeded;
                connetion.ConnectionRecoveryError += Connetion_ConnectionRecoveryError;
                connetion.ConnectionBlocked += Connetion_ConnectionBlocked;
                connetion.ConnectionUnblocked += Connetion_ConnectionUnblocked;
                //連接關閉的時候
                connetion.ConnectionShutdown += Connetion_ConnectionShutdown;



                //創建channel
                using (var channel = connetion.CreateModel())
                {

                    //消息會在何時被 confirm?
                    //The broker will confirm messages once:
                    //broker 將在下面的情況中對消息進行 confirm :
                    //it decides a message will not be routed to queues
                    //(if the mandatory flag is set then the basic.return is sent first) or
                    //broker 發現當前消息無法被路由到指定的 queues 中(如果設置了 mandatory 屬性,則 broker 會先發送 basic.return)
                    //a transient message has reached all its queues(and mirrors) or
                    //非持久屬性的消息到達了其所應該到達的所有 queue 中(和鏡像 queue 中)
                    //a persistent message has reached all its queues(and mirrors) and been persisted to disk(and fsynced) or
                    //持久消息到達了其所應該到達的所有 queue 中(和鏡像 queue 中),並被持久化到了磁盤(被 fsync)
                    //a persistent message has been consumed(and if necessary acknowledged) from all its queues
                    //持久消息從其所在的所有 queue 中被 consume 了(如果必要則會被 acknowledge)


                    //broker 發現當前消息無法被路由到指定的 queues 中(如果設置了 mandatory 屬性,則 broker 會先發送 basic.return)
                    channel.BasicReturn += Channel_BasicReturn;

                    //(可以不聲明)如果不聲明交換機 ,那么就使用默認的交換機  (每一個vhost都會有一個默認交換機)
                    //channel.ExchangeDeclare("amq.direct", ExchangeType.Direct,true);

                    //創建一個隊列  bool durable(持久化), bool exclusive(專有的), bool autoDelete(自動刪除)
                    //channel.QueueDeclare("TestQueue", true, false, false, null);
                    //不做綁定的話,使用默認的交換機。
                    //channel.QueueBind("TestQueue", "amq.direct", "MyRoutKey", null);

                    //發布消息
                    var body = Encoding.UTF8.GetBytes("西伯利亞的狼");


                    channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: null, body: body);
                }

                Console.WriteLine("Hello World!");
                Console.ReadKey();
            }
        }

private static void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e)
        {
            Console.WriteLine("Channel_BasicReturn");
        }

private static void Connetion_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            Console.WriteLine("Connetion_ConnectionShutdown");
        }

private static void Connetion_ConnectionUnblocked(object sender, EventArgs e)
        {
            Console.WriteLine("Connetion_ConnectionUnblocked");
        }

private static void Connetion_ConnectionBlocked(object sender, RabbitMQ.Client.Events.ConnectionBlockedEventArgs e)
        {
            Console.WriteLine("Connetion_ConnectionBlocked");
        }

private static void Connetion_ConnectionRecoveryError(object sender, RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs e)
        {
            Console.WriteLine("Connetion_ConnectionRecoveryError");
        }

private static void Connetion_RecoverySucceeded(object sender, EventArgs e)
{
    Console.WriteLine("Connetion_RecoverySucceeded");
}

private static void Connetion_CallbackException(object sender, RabbitMQ.Client.Events.CallbackExceptionEventArgs e)
{
    Console.WriteLine("Connetion_CallbackException");
}

 

 

場景分析

 

 

消息持久化

Broker持久化、交換機持久化、隊列持久化  。目的是維持重啟后  這些東西的存在。

消息持久化,才是把消息持久化到硬盤中,因為消息在隊列中,所以需要隊列持久化。

設置消息持久化,需要設置basicProperties的DeliveryMode=2 (Non-persistent (1) or persistent (2)).  默認的就是持久化。

//發布消息
var body = Encoding.UTF8.GetBytes("西伯利亞的狼");
BasicProperties pro = new BasicProperties();
pro.DeliveryMode = 2;

channel.BasicPublish("amq.direct", routingKey: "MyRoutKey", mandatory: true, basicProperties: pro, body: body);

 

消費者消費消息

為了確保一個消息永遠不會丟失,RabbitMQ支持消息確認(message acknowledgments)。當消費端接收消息並且處理完成后,會發送一個ack(消息確認)信號到RabbitMQ,RabbitMQ接收到這個信號后,就可以刪除掉這條已經處理的消息任務。但如果消費端掛掉了(比如,通道關閉、連接丟失等)沒有發送ack信號。RabbitMQ就會明白某個消息沒有正常處理,RabbitMQ將會重新將消息入隊,如果有另外一個消費端在線,就會快速的重新發送到另外一個消費端。

RabbitMQ中沒有消息超時的概念,只有當消費端關閉或奔潰時,RabbitMQ才會重新分發消息。

ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "192.168.140.161";
factory.Port = 5672;
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "TestVHost";
//創建connetion
using (var connetion = factory.CreateConnection())
{

    using (var channel = connetion.CreateModel())
    {
        //構造消費者實例
        var consumer = new EventingBasicConsumer(channel);
        //綁定消息接收后的事件委托
        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body);
            Console.WriteLine(" [x] Received {0}", message);
            Thread.Sleep(6000);//模擬耗時
            Console.WriteLine(" [x] Done");
            // 主要改動的是將 autoAck:true修改為autoAck:fasle,以及在消息處理完畢后手動調用BasicAck方法進行手動消息確認。
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        };
        //啟動消費者
        channel.BasicConsume(queue: "TestQueue", autoAck: false, consumer: consumer);
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();

    }
}

using (var connetion = factory.CreateConnection())
{

    using (var channel = connetion.CreateModel())
    {
        //構造消費者實例
        var consumer = new EventingBasicConsumer(channel);
        //綁定消息接收后的事件委托
        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body);
            Console.WriteLine(" [x] Received {0}", message);
            Thread.Sleep(6000);//模擬耗時
            Console.WriteLine(" [x] Done");
        };
        //啟動消費者
        channel.BasicConsume(queue: "TestQueue", autoAck: true, consumer: consumer);
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();

    }
}

 

 

消費負載均衡

1、當一個隊列有多個消費者時,隊列會以循環(round-robin)的方式發送給消費者。每條消息只會給一個訂閱的消費者。

2、默認情況下,RabbitMQ將按順序將每條消息發送給下一個消費者。平均每個消費者將獲得相同數量的消息。這種分發消息的方式叫做循環(round-robin)。

3、RabbitMQ的消息分發默認按照消費端的數量,按順序循環分發。這樣僅是確保了消費端被平均分發消息的數量,但卻忽略了消費端的閑忙情況。這就可能出現某個消費端一直處理耗時任務處於阻塞狀態,某個消費端一直處理一般任務處於空置狀態,而只是它們分配的任務數量一樣。

但我們可以通過channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,不再分發消息,也就確保了當消費端處於忙碌狀態時,不再分配任務。

分庫分表模式

比如說客戶積分同步。

一般電商中這種數據量比較大,及時性比較高。

ID 編號 1-10000的用戶積分表更放在隊列1,10001-20000放在隊列2,不同的消費者消費不同隊列。以此類推...

RPC

第一步,主要是進行遠程調用的客戶端需要指定接收遠程回調的隊列,並申明消費者監聽此隊列。
第二步,遠程調用的服務端除了要申明消費端接收遠程調用請求外,還要將結果發送到客戶端用來監聽的結果的隊列中去。

遠程調用客戶端:

//申明唯一guid用來標識此次發送的遠程調用請求
 var correlationId = Guid.NewGuid().ToString();
 //申明需要監聽的回調隊列
 var replyQueue = channel.QueueDeclare().QueueName;
 var properties = channel.CreateBasicProperties();
 properties.ReplyTo = replyQueue;//指定回調隊列
 properties.CorrelationId = correlationId;//指定消息唯一標識
 string number = args.Length > 0 ? args[0] : "30";
 var body = Encoding.UTF8.GetBytes(number);
 //發布消息
 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
 Console.WriteLine($"[*] Request fib({number})");
 // //創建消費者用於處理消息回調(遠程調用返回結果)
 var callbackConsumer = new EventingBasicConsumer(channel);
 channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
 callbackConsumer.Received += (model, ea) =>
 {
      //僅當消息回調的ID與發送的ID一致時,說明遠程調用結果正確返回。
     if (ea.BasicProperties.CorrelationId == correlationId)
     {
         var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
         Console.WriteLine($"[x]: {responseMsg}");
     }
 };

遠程調用服務端:

//申明隊列接收遠程調用請求
channel.QueueDeclare(queue: "rpc_queue", durable: false,
    exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine("[*] Waiting for message.");
//請求處理邏輯
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    int n = int.Parse(message);
    Console.WriteLine($"Receive request of Fib({n})");
    int result = Fib(n);
    //從請求的參數中獲取請求的唯一標識,在消息回傳時同樣綁定
    var properties = ea.BasicProperties;
    var replyProerties = channel.CreateBasicProperties();
    replyProerties.CorrelationId = properties.CorrelationId;
    //將遠程調用結果發送到客戶端監聽的隊列上
    channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
        basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
    //手動發回消息確認
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine($"Return result: Fib({n})= {result}");
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM