RabbitMQ與.net core(二)Producer與Exchange


Producer:消息的生產者,也就是創建消息的對象

Exchange:消息的接受者,也就是用來接收消息的對象,Exchange接收到消息后將消息按照規則發送到與他綁定的Queue中。下面我們來定義一個Producer與Exchange。

1.新建.netcore console項目,並引入RabbitMQ.Client的Nuget包

2.創建Exchange

using RabbitMQ.Client;

namespace RabbitMQConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "39.**.**.**";
            factory.Port = 5672;
            factory.VirtualHost = "/";
            factory.UserName = "root";
            factory.Password = "root";

            var exchange = "change2";
            var route = "route2";
            var queue = "queue2";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type:"direct", durable: true, autoDelete: false);   //創建Exchange
                    
                }
            }
        }
    }
}

可以看到Echange的參數有:

type:可選項為,fanout,direct,topic,headers。區別如下:

    fanout:發送到所有與當前Exchange綁定的Queue中

    direct:發送到與消息的routeKey相同的Rueue中

    topic:fanout的模糊版本

    headers:發送到與消息的header屬性相同的Queue中

durable:持久化

autoDelete:當最后一個綁定(隊列或者exchange)被unbind之后,該exchange自動被刪除。

 運行程序,可以在可視化界面看到change2

接下來我們可以創建與change2綁定的queue

3.創建Queue

                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);  #創建queue2
                    channel.QueueBind(queue, exchange, route);  #將queue2綁定到exchange2
                }

可以看到Echange的參數有:

durable:持久化

exclusive:如果為true,則queue只在channel存在時存在,channel關閉則queue消失

autoDelete:當最后一個綁定(隊列或者exchange)被unbind之后,該exchange自動被刪除。

去可視化界面看Queue

4.發送消息

                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                    channel.QueueBind(queue, exchange, route);
                    var props = channel.CreateBasicProperties();
                    props.Persistent = true; #持久化
                    channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
                }

5.消費消息

using RabbitMQ.Client;
using System;
using System.Text;

namespace RabbitMQClient
{
    class Program
    {
        private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
        {
            HostName = "39.**.**.**",
            Port = 5672,
            UserName = "root",
            Password = "root",
            VirtualHost = "/"
        };
        static void Main(string[] args)
        {
            var exchange = "change2";
            var route = "route2";
            var queue = "queue2";


            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false);
                channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                channel.QueueBind(queue, exchange, route);
                while (true)
                {
                    var message = channel.BasicGet(queue, true);  #第二個參數說明自動釋放消息,如為false需手動釋放消息
                    if(message!=null)
                    {
                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    }
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }
            }
        }
    }
}

運行查看結果

查看可視化界面

6.手動釋放消息

                while (true)
                {
                    var message = channel.BasicGet(queue, false);#設置為手動釋放
                    if(message!=null)
                    {
                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    }
                    channel.BasicAck(message.DeliveryTag, false); #手動釋放
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }

我們再發一條消息,然后開始消費,加個斷點調試一下

查看一下Queue中消息狀態

然后直接取消調試,不讓程序走到釋放的那一步,再查看一下消息狀態

這么說來只要不走到 channel.BasicAck(message.DeliveryTag, false);這一行,消息就不會被釋放掉,我們讓程序直接走到這一行代碼,查看一下消息的狀態

如圖已經被釋放了

7.讓失敗的消息回到隊列中

                while (true)
                {
                    var message = channel.BasicGet(queue, false);
                    if(message!=null)
                    {
                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                        Console.WriteLine(message.DeliveryTag);    #當前消息被處理的次序數
                        if (1==1)
                            channel.BasicReject(message.DeliveryTag, true);
                    }
                    
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }

重新發送4條消息

開始消費

我們可以看到消息一直沒有沒消費,因為消息被處理之后又放到了隊尾

8.監聽消息

 using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false);
                channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                channel.QueueBind(queue, exchange, route);

                channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);  #一次接受10條消息,否則rabbit會把所有的消息一次性推到client,會增大client的負荷
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    Byte[] body = ea.Body;
                    String message = Encoding.UTF8.GetString(body);
                    Console.WriteLine( message+Thread.CurrentThread.ManagedThreadId);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };

                channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
                Console.ReadLine();
            }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM