RabbitMq入門與基本使用


這兩天工作項目中用到了rabbitmq,順便學習了一下。

RabbitMq主要的使用模式有三種:工作隊列,發布訂閱和RPC遠程調用。

1.工作隊列

 

生產者:

using System;
using RabbitMQ.Client;
using System.Text;

class NewTask
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
        //一定要聲明隊列,向隊列發送消息
            channel.QueueDeclare(queue: "task_queue",
                                 durable: true,    //隊列是否持久化
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);

            var properties = channel.CreateBasicProperties();    
            properties.SetPersistent(true);    //消息是否持久化    

            channel.BasicPublish(exchange: "",    //沒有定義exchange,會使用系統默認的exchange
                                 routingKey: "task_queue",
                                 basicProperties: properties,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
    }
}

在方法

channel.BasicPublish("", "task_queue", null, bytes);
中的第一個參數是需要輸入一個exchange。在RabbitMQ中,所有的消息都必須要通過exchange發送到各個queue里面去。發送者發送消息,其實也就是把消息放到exchange中去。而exchange知道應該把消息放到哪里去。在這個方法中,我們沒有輸入exchange的名稱,只是定義了一個空的echange,而在第二個參數routeKey中輸入了我們目標隊列的名稱。RabbitMQ會幫我定義一個默認的exchange,這個exchange會把消息直接投遞到
我們輸入的隊列中,這樣服務端只需要直接去這個定義了的隊列中獲取消息就可以了。

 消費者:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;

class Worker
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "task_queue",
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            //修改分發機制(原先是輪詢分發), prefetchCount = 1 變為 不向正在處理的worker發發任務,誰先有空就給誰
            //In order to defeat that we can use the basicQos method with the prefetchCount = 1 setting. 
            //This tells RabbitMQ not to give more than one message to a worker at a time. 
            //Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. 
            //Instead, it will dispatch it to the next worker that is not still busy.
            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);

                int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 1000);

                Console.WriteLine(" [x] Done");
          
          //當noAck為false起作用,手動告知應答處理完成
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };
            channel.BasicConsume(queue: "task_queue",
                                 noAck: false,   //是否不要手動應答(no manual Ack),ture自動應答,自動刪除處理消息;false手動應答,服務器的消息會等待應答結果才消除
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

這里要注意的,如果沒有宿主進程,比如一個Console的后台程序,這個 Console.ReadLine(); 不能少,而且一定要加在這里。否則:

1.程序自動退出。2.相關的變量出了生命周期范圍,已經釋放!筆者在這里吃過虧,找了半天才發現。

2.發布訂閱

Exchange類型為四種:direct,fanout,topic,headers。此模式中,由於是通過exchange和routingkey發送給多個隊列,所以Publish中不用聲明隊列,只需聲明exchange。

1、Routing - Exchange類型direct

 

他是根據交換器名稱與routingkey來找隊列的。

Publish:

using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;

class EmitLogDirect
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "direct_logs",
                                    type: "direct");

            var severity = (args.Length > 0) ? args[0] : "info";
            var message = (args.Length > 1)
                          ? string.Join(" ", args.Skip( 1 ).ToArray())
                          : "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "direct_logs",
                                 routingKey: severity,        //傳來參數,指定的routekey
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

subscribe

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogsDirect
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "direct_logs",
                                    type: "direct");
            var queueName = channel.QueueDeclare().QueueName;

            if(args.Length < 1)
            {
                Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                                        Environment.GetCommandLineArgs()[0]);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
                Environment.ExitCode = 1;
                return;
            }
            
            //同時綁定多個指定的routekey
            foreach(var severity in args)
            {
                channel.QueueBind(queue: queueName,
                                  exchange: "direct_logs",
                                  routingKey: severity);
            }

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received '{0}':'{1}'",
                                  routingKey, message);
            };
            channel.BasicConsume(queue: queueName,
                                 noAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

 

2、Publish/Subscribe - Exchange類型fanout

 

這個類型忽略Routingkey,他為廣播模式。

廣播式時,Publish可以不指定queue和routekey。

using System;
using RabbitMQ.Client;
using System.Text;

class EmitLog
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "logs",
                                 routingKey: "",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0)
               ? string.Join(" ", args)
               : "info: Hello World!");
    }
}

 subscribe可以只用臨時隊列接收

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");
            
            //這里生成了一個隨機隊列(string queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true)
            //In the .NET client, when we supply no parameters to queueDeclare() we create a non-durable, exclusive, 
            //autodelete queue with a generated name:
            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queue: queueName,
                              exchange: "logs",
                              routingKey: "");

            Console.WriteLine(" [*] Waiting for logs.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] {0}", message);
            };
            channel.BasicConsume(queue: queueName,
                                 noAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

有一種簡寫的方式,用Subscription類

/// <summary>
/// 獲取消息並處理
/// </summary>
/// <param name="queueName">隊列名稱</param>
/// <param name="action">接收到消息后的Action</param>
public void Receive(string queueName, Action<byte[]> action, bool multThread = true)
        {
            ConnectionFactory cf = new ConnectionFactory();
            cf.UserName = this.UserName;
            cf.Password = this.PassWord;
            cf.HostName = this.HostName;
            cf.Port = this.Port;
            cf.VirtualHost = this.VitualHost;

            using (IConnection conn = cf.CreateConnection())
            {
                using (IModel ch = conn.CreateModel())
                {
                    //聲明交換器
                    ch.ExchangeDeclare(exchange: "e_linke1", type: "direct",durable: false);
                    ch.QueueDeclare(queue: queueName,
                                         durable: false,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);

                    //將隊列綁定到交換器上
                    ch.QueueBind(queue: queueName,
                                      exchange: "e_linke1",
                                      routingKey: "elk");
                    using (Subscription sub = new Subscription(ch, queueName, true))
                    {
                        foreach (BasicDeliverEventArgs e in sub)
                        {
                            // handle the message contained in e ...
                            // ... and finally acknowledge it
                            if (multThread)
                            {
                                System.Threading.Tasks.Task.Factory.StartNew(() => { action(e.Body); });
                            }
                            else
                            {
                                action(e.Body);
                            }

                            sub.Ack(e);
                        }
                    }
                }
            }
        }

 注:

如果有兩個接收程序都是用了同一個的queue和相同的routingKey去綁定direct exchange的話,分發的行為是負載均衡的,也就是說第一個是程序1收到,第二個是程序2收到,以此類推。

如果有兩個接收程序用了各自的queue,但使用相同的routingKey去綁定direct exchange的話,分發的行為是復制的,也就是說每個程序都會收到這個消息的副本。行為相當於fanout類型的exchange。

3、Exchange類型topic

 

這個類型的路由規則如果你掌握啦,那是相當的好用,與靈活。他是根據RoutingKey的設置,來做匹配的,其中這里還有兩個通配符為:

*,代表任意的一個詞。例如topic.zlh.*,他能夠匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....

#,代表任意多個詞。例如topic.#,他能夠匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....

 

4、Headers Exchange

Headers類型的exchange使用的比較少,它也是忽略routingKey的一種路由方式。是使用Headers來匹配的。Headers是一個鍵值對,可以定義成Hashtable。發送者在發送的時候定義一些鍵值對,接收者也可以再綁定時候傳入一些鍵值對,兩者匹配的話,則對應的隊列就可以收到消息。匹配有兩種方式all和any。這兩種方式是在接收端必須要用鍵值"x-mactch"來定義。all代表定義的多個鍵值對都要滿足,而any則代碼只要滿足一個就可以了。之前的幾種exchange的routingKey都需要要字符串形式的,而headers exchange則沒有這個要求,因為鍵值對的值可以是任何類型。代碼示例如下:

發送端:

channel.ExchangeDeclare("X1", "headers");
 
IBasicProperties properties = channel.CreateBasicProperties();
properties.Headers = new Hashtable();
properties.Headers.Add("Key1", 123);
properties.Headers.Add("Key2", 345);
 
XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
MemoryStream ms = new MemoryStream();
xs.Serialize(ms, message);
byte[] bytes = ms.ToArray();
 
channel.BasicPublish("X1", "", properties, bytes);

接收端:

channel.ExchangeDeclare("X1", "headers");
//隨機創建一個隊列
string queue_name = channel.QueueDeclare("headerssubscriber2", true, false, false, null);
//綁定
IDictionary ht = new Hashtable();
ht.Add("x-match", "any");
ht.Add("Key1", 12345);
ht.Add("Key2", 34567);
channel.QueueBind(queue_name, "X1", "", ht);
//定義這個隊列的消費者
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queue_name, true, consumer);
 
while (true)
{
    BasicDeliverEventArgs ea =
        (BasicDeliverEventArgs)consumer.Queue.Dequeue();
 
    byte[] bytes = ea.Body;
 
    XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
    using (MemoryStream ms = new MemoryStream(bytes))
    {
        RequestMessage message = (RequestMessage)xs.Deserialize(ms);
        Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message);
    }
}

 

3.RPC遠程調用

 

 

參考鏈接:

.Net使用RabbitMQ詳解

RabbitMQ消息隊列(三):任務分發機制[轉]

一個winform帶你玩轉rabbitMQ

.Net下RabbitMQ的使用(4) -- 訂閱和發布  *

.Net下RabbitMQ的使用(5) -- 路由機制 *

.Net下RabbitMQ的使用(6) -- 持久化 *

.Net下RabbitMQ的使用(7) -- 消息的傳輸控制 *

.NET/C# Client API Guide [官網]

RabbitMQ Tutorials [官網]

.NET/C# Client API Guide [官網]


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM