RabbitMQ Exchange類型詳解


前言

在上一篇文章中,我們知道了RabbitMQ的消息流程如下:

但在具體的使用中,我們還需知道exchange的類型,因為不同的類型對應不同的隊列和路由規則。

在rabbitmq中,exchange有4個類型:direct,topic,fanout,header。

direct exchange

此類型的exchange路由規則很簡單:

exchange在和queue進行binding時會設置routingkey

channel.QueueBind(queue: "create_pdf_queue",
                    exchange: "pdf_events",
                    routingKey: "pdf_create",
                    arguments: null);

 然后我們在將消息發送到exchange時會設置對應的routingkey

channel.BasicPublish(exchange: "pdf_events",
                        routingKey: "pdf_create",
                        basicProperties: properties,
                        body: body);

 在direct類型的exchange中,只有這兩個routingkey完全相同,exchange才會選擇對應的binging進行消息路由。

具體的流程如下:

通過代碼可以會理解好一點:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    // Direct類型的exchange, 名稱 pdf_events
    channel.ExchangeDeclare(exchange: "pdf_events",
                            type: ExchangeType.Direct,
                            durable: true,
                            autoDelete: false,
                            arguments: null);

    // 創建create_pdf_queue隊列
    channel.QueueDeclare(queue: "create_pdf_queue",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //創建 pdf_log_queue隊列
    channel.QueueDeclare(queue: "pdf_log_queue",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //綁定 pdf_events --> create_pdf_queue 使用routingkey:pdf_create
    channel.QueueBind(queue: "create_pdf_queue",
                        exchange: "pdf_events",
                        routingKey: "pdf_create",
                        arguments: null);

    //綁定 pdf_events --> pdf_log_queue 使用routingkey:pdf_log
    channel.QueueBind(queue: "pdf_log_queue",
                        exchange: "pdf_events",
                        routingKey: "pdf_log",
                        arguments: null);


    var message = "Demo some pdf creating...";
    var body = Encoding.UTF8.GetBytes(message);
    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;

    //發送消息到exchange :pdf_events ,使用routingkey: pdf_create
    //通過binding routinekey的比較,次消息會路由到隊列 create_pdf_queue
    channel.BasicPublish(exchange: "pdf_events",
                routingKey: "pdf_create",
                basicProperties: properties,
                body: body);

    message = "pdf loging ...";
    body = Encoding.UTF8.GetBytes(message);
    properties = channel.CreateBasicProperties();
    properties.Persistent = true;

    //發送消息到exchange :pdf_events ,使用routingkey: pdf_log
    //通過binding routinekey的比較,次消息會路由到隊列 pdf_log_queue
    channel.BasicPublish(exchange: "pdf_events",
            routingKey: "pdf_log",
            basicProperties: properties,
            body: body);

    
}

 topic exchange

此類型exchange和上面的direct類型差不多,但direct類型要求routingkey完全相等,這里的routingkey可以有通配符:'*','#'.

其中'*'表示匹配一個單詞, '#'則表示匹配沒有或者多個單詞

如上圖第一個binding:

  • exchange: agreements
  • queue A:  berlin_agreements
  • binding routingkey: agreements.eu.berlin.#

第二個binding: 

  • exchange: agreements
  • queue B: all_agreements
  • binding routingkey: agreements.#

第三個binding:

  • exchange: agreements
  • queue c: headstore_agreements
  • binding routingkey: agreements.eu.*.headstore

所以如果我們消息的routingkey為agreements.eu.berlin那么符合第一和第二個binding,但最后一個不符合,具體的代碼如下:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    // Topic類型的exchange, 名稱 agreements
    channel.ExchangeDeclare(exchange: "agreements",
                            type: ExchangeType.Topic,
                            durable: true,
                            autoDelete: false,
                            arguments: null);

    // 創建berlin_agreements隊列
    channel.QueueDeclare(queue: "berlin_agreements",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //創建 all_agreements 隊列
    channel.QueueDeclare(queue: "all_agreements",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //創建 headstore_agreements 隊列
    channel.QueueDeclare(queue: "headstore_agreements",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

    //綁定 agreements --> berlin_agreements 使用routingkey:agreements.eu.berlin.#
    channel.QueueBind(queue: "berlin_agreements",
                        exchange: "agreements",
                        routingKey: "agreements.eu.berlin.#",
                        arguments: null);

    //綁定 agreements --> all_agreements 使用routingkey:agreements.#
    channel.QueueBind(queue: "all_agreements",
                        exchange: "agreements",
                        routingKey: "agreements.#",
                        arguments: null);

    //綁定 agreements --> headstore_agreements 使用routingkey:agreements.eu.*.headstore
    channel.QueueBind(queue: "headstore_agreements",
                        exchange: "agreements",
                        routingKey: "agreements.eu.*.headstore",
                        arguments: null);


    var message = "hello world";
    var body = Encoding.UTF8.GetBytes(message);
    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;

    //發送消息到exchange :agreements ,使用routingkey: agreements.eu.berlin
    //agreements.eu.berlin 匹配  agreements.eu.berlin.# 和agreements.#
    //agreements.eu.berlin 不匹配  agreements.eu.*.headstore
    //最終次消息會路由到隊里:berlin_agreements(agreements.eu.berlin.#) 和 all_agreements(agreements.#)
    channel.BasicPublish(exchange: "agreements",
                            routingKey: "agreements.eu.berlin",
                            basicProperties: properties,
                            body: body);

               
}

 fanout exchange

此exchange的路由規則很簡單直接將消息路由到所有綁定的隊列中,無須對消息的routingkey進行匹配操作。

header exchange

 此類型的exchange和以上三個都不一樣,其路由的規則是根據header來判斷,其中的header就是以下方法的arguments參數:

Dictionary<string, object> aHeader = new Dictionary<string, object>();
aHeader.Add("format", "pdf");
aHeader.Add("type", "report");
aHeader.Add("x-match", "all");
channel.QueueBind(queue: "queue.A",
                    exchange: "agreements",
                    routingKey: string.Empty,
                    arguments: aHeader);

其中的x-match為特殊的header,可以為all則表示要匹配所有的header,如果為any則表示只要匹配其中的一個header即可。

在發布消息的時候就需要傳入header值:

var properties = channel.CreateBasicProperties();
properties.Persistent = true;
Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
mHeader1.Add("format", "pdf");
mHeader1.Add("type", "report");
properties.Headers = mHeader1;

 具體的規則可以看以下代碼:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    // Headers類型的exchange, 名稱 agreements
    channel.ExchangeDeclare(exchange: "agreements",
                            type: ExchangeType.Headers,
                            durable: true,
                            autoDelete: false,
                            arguments: null);

    // 創建queue.A隊列
    channel.QueueDeclare(queue: "queue.A", durable: true, exclusive: false, autoDelete: false, arguments: null);

    //創建 queue.B 隊列
    channel.QueueDeclare(queue: "queue.B", durable: true, exclusive: false, autoDelete: false, arguments: null);

    //創建 queue.C 隊列
    channel.QueueDeclare(queue: "queue.C", durable: true, exclusive: false, autoDelete: false, arguments: null);

    //綁定 agreements --> queue.A 使用arguments (format=pdf, type=report, x-match=all)
    Dictionary<string, object> aHeader = new Dictionary<string, object>();
    aHeader.Add("format", "pdf");
    aHeader.Add("type", "report");
    aHeader.Add("x-match", "all");
    channel.QueueBind(queue: "queue.A",
                        exchange: "agreements",
                        routingKey: string.Empty,
                        arguments: aHeader);

    //綁定 agreements --> queue.B 使用arguments (format=pdf, type=log, x-match=any)
    Dictionary<string, object> bHeader = new Dictionary<string, object>();
    bHeader.Add("format", "pdf");
    bHeader.Add("type", "log");
    bHeader.Add("x-match", "any");
    channel.QueueBind(queue: "queue.B",
                        exchange: "agreements",
                        routingKey: string.Empty,
                        arguments: bHeader);

    //綁定 agreements --> queue.C 使用arguments (format=zip, type=report, x-match=all)
    Dictionary<string, object> cHeader = new Dictionary<string, object>();
    cHeader.Add("format", "zip");
    cHeader.Add("type", "report");
    cHeader.Add("x-match", "all");
    channel.QueueBind(queue: "queue.C",
                        exchange: "agreements",
                        routingKey: string.Empty,
                        arguments: cHeader);


    string message1 = "hello world";
    var body = Encoding.UTF8.GetBytes(message1);
    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;
    Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
    mHeader1.Add("format", "pdf");
    mHeader1.Add("type", "report");
    properties.Headers = mHeader1;
    //此消息路由到 queue.A 和 queue.B
    //queue.A 的binding (format=pdf, type=report, x-match=all)
    //queue.B 的binding (format = pdf, type = log, x - match = any)
    channel.BasicPublish(exchange: "agreements",
                            routingKey: string.Empty,
                            basicProperties: properties,
                            body: body);


    string message2 = "hello world";
    body = Encoding.UTF8.GetBytes(message2);
    properties = channel.CreateBasicProperties();
    properties.Persistent = true;
    Dictionary<string, object> mHeader2 = new Dictionary<string, object>();
    mHeader2.Add("type", "log");
    properties.Headers = mHeader2;
    //x-match 配置queue.B 
    //queue.B 的binding (format = pdf, type = log, x-match = any)
    channel.BasicPublish(exchange: "agreements",
                    routingKey: string.Empty,
                    basicProperties: properties,
                    body: body);

    string message3= "hello world";
    body = Encoding.UTF8.GetBytes(message3);
    properties = channel.CreateBasicProperties();
    properties.Persistent = true;
    Dictionary<string, object> mHeader3 = new Dictionary<string, object>();
    mHeader3.Add("format", "zip");
    properties.Headers = mHeader3;
    //配置失敗,不會被路由
    channel.BasicPublish(exchange: "agreements",
                    routingKey: string.Empty,
                    basicProperties: properties,
                    body: body);


}

 總計

以上就是exchange 類型的總結,一般來說direct和topic用來具體的路由消息,如果要用廣播的消息一般用fanout的exchange。

header類型用的比較少,但還是知道一點好。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM