第二節:RabbitMq基本使用(生產消費者、優先級隊列、發布訂閱等)


一. RabbitMq基本使用

1. 條件准備

 (1).通過指令【net start rabbitmq】啟動服務

 (2).准備1個生產者程序Producer, 1個消費者程序Consumer01

 (3).通過Nuget給三個程序安裝 【RabbitMQ.Client 6.2.1

 (4).通過地址:http://127.0.0.1:15672 訪問RabbitMq的管理系統,進行監控,賬號和密碼都是guest

 (5).設置程序的啟動順序,先啟動Producer,然后延遲2s啟動Consumer01

2. 核心代碼剖析

 (1). 創建連接工廠ConnectionFactory,指定HostName、UserName、Password(連接地址、賬號、密碼),也可以指定VirtualHost。

PS:默認情況向,RabbitMq的信息都是在“/”這一虛擬機中,比如我可以指定Virtual為“/ypf”,當然需要先去可視化界面中創建/ypf,否則程序會報錯

(關於 RabbitMq、Queue、Exchange、Virtual之間的關系,詳見第一節:xxxxx)

 (2).創建連接 factory.CreateConnection() 和 創建傳輸信道 connection.CreateModel()

 (3).創建隊列: QueueDeclare

channel.QueueDeclare(queue: "SimpleProducerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

A. queue: 隊列名稱

B. durable:是否持久化到硬盤, true 則設置隊列為持久化,持久化的隊列會存磁盤,在服務器重啟的時候可以保證不丟失相關信息。

C.exclusive:設置隊列是否排他。為 true 則設置隊列為排他的。如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除

 這里需要注意 3點:

  ① 排他隊列是基於連接( Connection) 可見的,同 個連接的不同信道 (Channel) 是可以同時訪問同一連接創建的排他隊列;

  ② "首次"是指如果1個連接己經聲明了 排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同:

  ③ 即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除,這種隊列適用於一個客戶端同時發送和讀取消息的應用場景

D. autoDelete:至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會 自動刪除

E. arguments:設置隊列的其他一些參數,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes 等等。

 (4).創建交換機(交換機):ExchangeDeclare

 channel.ExchangeDeclare(exchange: "SimpleProducerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

A. exchange:交換機名稱。

B. type:交換機類型,主要有(Direct、Fanout、Topic、Header)

C. durable:是否持久化到磁盤

D. autoDelete:自動刪除 ,至少有一個隊列與這個交換機綁定,之后,所有與這個交換機綁定的隊列都與此解綁,才會觸發刪除

E. arguments:設置交換機的一些參數。

 (5).隊列和交換機綁定 :QueueBind

  channel.QueueBind(queue: "SimpleProducerQueue", exchange: "SimpleProducerExChange", routingKey: string.Empty, arguments: null);

A. queue:需要綁定的隊列名稱

B. exchange:需要綁定的交換機名稱

C. routingKey:路由key,用於指定發送到隊列的規則。

D. arguments:設置一些參數

 (6).發送消息:BasicPublish

IBasicProperties basicProperties = channel.CreateBasicProperties();
basicProperties.Persistent = true;  //配置消息持久化
//basicProperties.DeliveryMode = 2;
string message = $"ypf{i}";
byte[] body = Encoding.UTF8.GetBytes(message);
//發消息(不指定路由key)
channel.BasicPublish(exchange: "SimpleProducerExChange",  routingKey: string.Empty,  basicProperties: basicProperties,body: body);

A. queue:需要綁定的隊列名稱

B. exchange:需要綁定的交換機名稱

C. routingKey:綁定路由key,用於指定發送的規則。

D. arguments:設置一些參數

 (7).接收消息:事件模式,BasicConsume+Received

//channel.QueueDeclare(queue: "SimpleProducerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
//channel.ExchangeDeclare(exchange: "SimpleProducerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//channel.QueueBind(queue: "SimpleProducerQueue", exchange: "SimpleProducerExChange", routingKey: string.Empty, arguments: null);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
   var body = ea.Body;
   var message = Encoding.UTF8.GetString(body.ToArray());
   Console.WriteLine($"消費者01 接收消息: {message}");
};
channel.BasicConsume(queue: "SimpleProducerQueue", autoAck: true, consumer: consumer);  //進行消費

A. queue:消費的隊列(這里只能指定一個隊列哦)

B. autoAck:true 接收到傳遞過來的消息后acknowledged(應答服務器),false 接收到消息后不應答服務器.

C. consumer:指定消費者。

注意:消費者可以不用再次聲明 路由、交換機、綁定,前提是生產者已經執行,該消費的隊列在RabbitMq中已經存在了。

 

二. 幾個場景

1. 生產者-消費者

(1). 1個生產者-1個消費者

模擬:生產者生產的同時,消費者進行消費。

剖析:這里采用的是ExchangeType.Direct,但是綁定的時候不指定路由key

生產者代碼

{
                //設置控制台的顏色
                Console.ForegroundColor = ConsoleColor.Red;

                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服務在本地運行
                factory.UserName = "guest";//用戶名
                factory.Password = "guest";//密碼 
                //factory.VirtualHost = "/ypf";
                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        //創建隊列
                        channel.QueueDeclare(queue: "SimpleProducerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        //創建交換機(Direct路由)
                        channel.ExchangeDeclare(exchange: "SimpleProducerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        //隊列和路由綁定
                        channel.QueueBind(queue: "SimpleProducerQueue", exchange: "SimpleProducerExChange", routingKey: string.Empty, arguments: null);

                        Console.WriteLine("------------------------------下面是生產者開始生產消息(2s后開始)------------------------------------------");
                        Thread.Sleep(2000);
                        for (int i = 1; i <= 100; i++)
                        {
                            IBasicProperties basicProperties = channel.CreateBasicProperties();
                            basicProperties.Persistent = true;
                            //basicProperties.DeliveryMode = 2;
                            string message = $"ypf{i}";
                            byte[] body = Encoding.UTF8.GetBytes(message);
                            //發消息(不指定路由key)
                            channel.BasicPublish(exchange: "SimpleProducerExChange",
                                                 routingKey: string.Empty,
                                                 basicProperties: basicProperties,
                                                 body: body);
                            Console.WriteLine($"消息:{message} 已發送~");
                            Thread.Sleep(500);
                        }
                    }
                }
            }
View Code

消費者代碼

 {
                Thread.Sleep(2000);  //休眠兩秒,等待生產者
                Console.ForegroundColor = ConsoleColor.Green;

                var factory = new ConnectionFactory();
                factory.HostName = "localhost";//RabbitMQ服務在本地運行
                factory.UserName = "guest";//用戶名
                factory.Password = "guest";//密碼 
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {                     
                        try
                        {
                            //channel.QueueDeclare(queue: "SimpleProducerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                            //channel.ExchangeDeclare(exchange: "SimpleProducerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                            //channel.QueueBind(queue: "SimpleProducerQueue", exchange: "SimpleProducerExChange", routingKey: string.Empty, arguments: null);

                            var consumer = new EventingBasicConsumer(channel);
                            consumer.Received += (model, ea) =>
                            {
                                var body = ea.Body;
                                var message = Encoding.UTF8.GetString(body.ToArray());
                                Console.WriteLine($"消費者01 接收消息: {message}");
                            };
                            channel.BasicConsume(queue: "SimpleProducerQueue", autoAck: true, consumer: consumer);
                            Console.WriteLine(" Press [enter] to exit.");
                            Console.ReadLine();
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine(ex.Message);
                        }
                    }
                }
            }
View Code

運行效果

(2). 多個生產者-多個消費者 

模擬:1個隊列,利用多線程開始多個生產者生產的同時,多個消費者進行消費。

剖析:這里采用的是ExchangeType.Direct,但是綁定的時候不指定路由key

生產者代碼:

 /// <summary>
    /// 模擬多個生產者
    /// </summary>
    public class ManyProducer
    {
        /// <summary>
        /// 生產者
        /// </summary>
        /// <param name="producerName">生產者名稱</param>
        /// <param name="num">模擬消息內容</param>
        public static void Show(string producerName, int num)
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "ManyProducerConsumerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.ExchangeDeclare(exchange: "ManyProducerConsumerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    channel.QueueBind(queue: "ManyProducerConsumerQueue", exchange: "ManyProducerConsumerExChange", routingKey: string.Empty, arguments: null);
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine($"生產者{producerName}已准備就緒~~~");
                    for (int i = num; i <= num + 100; i++)
                    {
                        string message = $"生產者{producerName}:消息{i}";
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "ManyProducerConsumerExChange",
                                             routingKey: string.Empty,
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine($"消息:{message} 已發送~");

                        Thread.Sleep(800);
                    }
                }
            }
        }
    }
View Code
{
                //模擬多個生產者,向同一個隊列里生產消息(這里使用的一定一個隊列, 路由可以1個或多個)
                Task.Run(() =>
                {
                    //生產者ypfProducer1從10開始生產消息
                    ManyProducer.Show("ypfProducer1", 10);
                });
                Task.Run(() =>
                {
                    //生產者ypfProducer2從500開始生產消息
                    ManyProducer.Show("ypfProducer2", 500);
                });
}

消費者代碼

 /// <summary>
    /// 模擬多個消費者
    /// </summary>
    public class ManyConsumer
    {
        /// <summary>
        /// 消費者01
        /// </summary>
        public static void Show01()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    try
                    {
                        //channel.QueueDeclare(queue: "ManyProducerConsumerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                        //channel.ExchangeDeclare(exchange: "ManyProducerConsumerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                        //channel.QueueBind(queue: "ManyProducerConsumerQueue", exchange: "ManyProducerConsumerExChange", routingKey: string.Empty, arguments: null);
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"消費者001 接受消息: {message}");
                        };
                        channel.BasicConsume(queue: "ManyProducerConsumerQueue", autoAck: true, consumer: consumer);
                        Console.WriteLine(" Press [enter] to exit. 001");
                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }
                }
            }

        }



        /// <summary>
        /// 消費者02
        /// </summary>
        public static void Show02()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using var connection = factory.CreateConnection();
            using var channel = connection.CreateModel();
            Console.ForegroundColor = ConsoleColor.Green;
            try
            {
                //channel.QueueDeclare(queue: "ManyProducerConsumerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                //channel.ExchangeDeclare(exchange: "ManyProducerConsumerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                //channel.QueueBind(queue: "ManyProducerConsumerQueue", exchange: "ManyProducerConsumerExChange", routingKey: string.Empty, arguments: null);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    Console.WriteLine($"消費者002 接受消息: {message}");
                };
                channel.BasicConsume(queue: "ManyProducerConsumerQueue",
                             autoAck: true,
                             consumer: consumer);
                Console.WriteLine(" Press [enter] to exit. 002");
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

        /// <summary>
        /// 消費者03
        /// </summary>
        public static void Show03()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using var connection = factory.CreateConnection();
            using var channel = connection.CreateModel();
            Console.ForegroundColor = ConsoleColor.Green;
            try
            {
                //channel.QueueDeclare(queue: "ManyProducerConsumerQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                //channel.ExchangeDeclare(exchange: "ManyProducerConsumerExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                //channel.QueueBind(queue: "ManyProducerConsumerQueue", exchange: "ManyProducerConsumerExChange", routingKey: string.Empty, arguments: null);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    Console.WriteLine($"消費者003 接受消息: {message}");
                };
                channel.BasicConsume(queue: "ManyProducerConsumerQueue",
                             autoAck: true,
                             consumer: consumer);
                Console.WriteLine(" Press [enter] to exit 003.");
                Console.ReadLine();

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
View Code
 {
                Thread.Sleep(2000);  //休眠2秒,等待生產者
                Task.Run(() =>
                {
                    ManyConsumer.Show01();
                });
                Task.Run(() =>
                {
                    ManyConsumer.Show02();
                });
                Task.Run(() =>
                {
                    ManyConsumer.Show03();
                });
}

運行效果

2. 優先級隊列

模擬:用戶購買東西,依次下單,進行排隊,但是svip級別最高,可以先拿到東西,vip級別次之,普通用戶最后。

剖析:這里采用的是ExchangeType.Direct,指定路由key,通過對了的arguments參數配置支持優先級隊列,然后發送消息的時候,通過Priority設置級別,數值越大級別越高。

生產者代碼:

 /// <summary>
    /// 優先級隊列
    /// </summary>
    public class PriorityQueue
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "PriorityQueue", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>() {
                             {"x-max-priority",10 }  //指定隊列要支持優先級設置;
                       });
                    channel.ExchangeDeclare(exchange: "PriorityQueueExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    channel.QueueBind(queue: "PriorityQueue", exchange: "PriorityQueueExchange", routingKey: "PriorityKey");
                    Console.ForegroundColor = ConsoleColor.Red;
                    //下面發送消息,並設置消息的優先級
                    {
                        string[] questionList = {  "普通用戶A購買東西", "vip用戶B購買東西", "普通用戶C購買東西", "普通用戶D購買東西", "svip用戶F購買東西", "vip用戶G購買東西" };
                        //設置消息優先級
                        IBasicProperties props = channel.CreateBasicProperties();
                        foreach (string questionMsg in questionList)
                        {
                            //包含vip的優先級設置的高一些
                            if (questionMsg.StartsWith("svip"))
                            {
                                props.Priority = 9;                  //svip設置級別最高
                                channel.BasicPublish(exchange: "PriorityQueueExchange",
                                               routingKey: "PriorityKey",
                                               basicProperties: props,
                                               body: Encoding.UTF8.GetBytes(questionMsg));
                            }
                            else if (questionMsg.StartsWith("vip"))
                            {
                                props.Priority = 5;                 //vip級別次之
                                channel.BasicPublish(exchange: "PriorityQueueExchange",
                                               routingKey: "PriorityKey",
                                               basicProperties: props,
                                               body: Encoding.UTF8.GetBytes(questionMsg));
                            }
                            else
                            {
                                props.Priority = 1;               //普通用戶最后   
                                channel.BasicPublish(exchange: "PriorityQueueExchange",
                                               routingKey: "PriorityKey",
                                               basicProperties: props,
                                               body: Encoding.UTF8.GetBytes(questionMsg));
                            }
                            Console.WriteLine($"{questionMsg} 已發送~~");
                        }
                    }
                    Console.Read();
                }
            }
        }
    }
View Code
{
      PriorityQueue.Show();
}

消費者代碼:

 /// <summary>
    /// 優先級隊列,消費者
    /// </summary>
    public class PriorityQueue
    {
        public static void Show()
        {
            Console.ForegroundColor = ConsoleColor.Green;

            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //channel.QueueDeclare(queue: "PriorityQueue", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>() {
                    //         {"x-max-priority",10 }  //指定隊列要支持優先級設置;
                    //   });
                    //channel.ExchangeDeclare(exchange: "PriorityQueueExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    //channel.QueueBind(queue: "PriorityQueue", exchange: "PriorityQueueExchange", routingKey: "PriorityKey");

                   
                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        string msg = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine(msg);
                        Thread.Sleep(300);
                    };
                    Console.WriteLine("消費者准備就緒....");
                    //處理消息
                    channel.BasicConsume(queue: "PriorityQueue", autoAck: true, consumer: consumer);
                    Console.ReadKey();

                }
            }
        }
    }
View Code
{
                Thread.Sleep(3000);
                PriorityQueue.Show();
}

運行結果:svip雖然是第5個下單的,但是消費的時候是第1個消費的,然后vip次之,普通用戶最后。

3. 發布訂閱模式 

 也可以叫做觀察者模式,實質上就是一個交換機綁定多個隊列,每個隊列就是一個訂閱者,發布者每發布一條消息,同時向多個訂閱者的隊列中發送消息,然后每個訂閱者分別去自己的隊列中消費即可。

剖析:這里采用Fanout交換機的模式處理這個場景最為恰當(下一節會詳細介紹)

發布者代碼

/// <summary>
    /// 發布訂閱模式-發布者
    /// </summary>
    public class PublishSubscribeConsumer
    {
        public static void Show()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "PublishSubscrib01", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: "PublishSubscrib02", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.ExchangeDeclare(exchange: "PublishSubscribExChange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
                    channel.QueueBind(queue: "PublishSubscrib01", exchange: "PublishSubscribExChange", routingKey: string.Empty, arguments: null);
                    channel.QueueBind(queue: "PublishSubscrib02", exchange: "PublishSubscribExChange", routingKey: string.Empty, arguments: null);
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("開始發布消息~~~~"); 
                    for (int i = 1; i <= 20; i++)
                    { 
                        string message = $"發布第{i}條消息...";
                        byte[] body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "PublishSubscribExChange",
                                        routingKey: string.Empty,
                                        basicProperties: null,
                                        body: body); 
                        Console.WriteLine(message);
                        Thread.Sleep(200);
                    }
                }
            }
        }
    }
View Code
{
     PublishSubscribeConsumer.Show();
}

訂閱者代碼

 /// <summary>
    /// 發布訂閱-訂閱者
    /// </summary>
    public class PublishSubscribeConsumer
    {
        /// <summary>
        /// 訂閱者1
        /// </summary>
        public static void Show1()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                { 
                    Console.ForegroundColor = ConsoleColor.Green;
                    //channel.QueueDeclare(queue: "PublishSubscrib01", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //channel.ExchangeDeclare(exchange: "PublishSubscribExChange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
                    //channel.QueueBind(queue: "PublishSubscrib01", exchange: "PublishSubscribExChange", routingKey: string.Empty, arguments: null);                     
                    Console.WriteLine("訂閱者01 已經准備就緒~~");
                    try
                    {
                        var consumer = new EventingBasicConsumer(channel); 
                        consumer.Received += (model, ea) =>
                        { 
                                var body = ea.Body;
                                var message = Encoding.UTF8.GetString(body.ToArray());
                                Console.WriteLine($"訂閱者01收到消息:{message} ~");  
                        };
                        channel.BasicConsume(queue: "PublishSubscrib01", autoAck: true, consumer: consumer); 
                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    } 
                }
            }
        }

        /// <summary>
        /// 訂閱者2
        /// </summary>
        public static void Show2()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服務在本地運行
            factory.UserName = "guest";//用戶名
            factory.Password = "guest";//密碼 
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    //channel.QueueDeclare(queue: "PublishSubscrib02", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //channel.ExchangeDeclare(exchange: "PublishSubscribExChange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null);
                    //channel.QueueBind(queue: "PublishSubscrib02", exchange: "PublishSubscribExChange", routingKey: string.Empty, arguments: null);

                    Console.WriteLine("訂閱者02 已經准備就緒~~");
                    try
                    {
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"訂閱者02收到消息:{message} ~");
                        };
                        channel.BasicConsume(queue: "PublishSubscrib02", autoAck: true, consumer: consumer);
                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }
                }
            }
        }

    }
View Code
 {
                Thread.Sleep(2000);

                Task.Run(() =>
                {
                    PublishSubscribeConsumer.Show1();
                });
                Task.Run(() =>
                {
                    PublishSubscribeConsumer.Show2();
                });
}

運行結果 (訂閱者1和訂閱者2分別拿到自己的消息)

 

 

 

 

 

 

 

 

 

!

  • 作       者 : Yaopengfei(姚鵬飛)
  • 博客地址 : http://www.cnblogs.com/yaopengfei/
  • 聲     明1 : 如有錯誤,歡迎討論,請勿謾罵^_^。
  • 聲     明2 : 原創博客請在轉載時保留原文鏈接或在文章開頭加上本人博客地址,否則保留追究法律責任的權利。
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM