C# rabbitmq 安裝步驟以及使用方法


1. 因為 rabbit 是用erlang 語言寫的 需要分別下載 erlang的運行庫和rabbitmq 服務器
       a.erlang的運行庫: https://www.erlang.org/downloads

              

 

 

        b.rabbitmq 服務器: https://www.rabbitmq.com/

           第一步。 點擊get  started 

                                                    

          第二步 點擊下載並且選擇環境 

                                                 

 

 

 

 

                                                

 

                                                           

 

 

       注意需要 rabbitmq 的版本與erlang的版本要對應
2. 安裝並配置環境
       a.安裝erlang OTP 環境
       b.配置環境變量 Path 添加第一步安裝的文件位置

              

 

 


       c.打開cmd 輸入erl 看是否配置成功

                         

 

 


      d.安裝rabbitmq server


      e.在安裝目錄下 運行cmd 輸入 rabbitmq-plugins enable rabbitmq_management 啟動界面管理服務

           

 

 


      f.啟動以后 在瀏覽器中輸入 http://127.0.0.1:15672/,進入管理頁面,賬戶密碼都是guest。

                            

 

 


      g.rabbitmq server 默認端口是 udp的5672端口 如果是遠程連接需要開啟防火牆
      h.guest 用戶是本地賬號如果 不在同一台服務器上無法連接 需要新建一個賬號並且給這個賬號對應的權限

        

 

 

 

   直接點擊設置默認權限

 

 

       現在權限就有了

 

 


3. 在vs中引用客戶端 RabbitMQ.Client 可以在官網下載 也可以在 vs的nuget中下載

 

 

 ConnectionFactory Factory1 = new ConnectionFactory();
            Factory1.HostName = "127.0.0.1";
            Factory1.UserName = "xy";
            Factory1.Password = "xy";
            Factory1.Port = 5672;
            var conn1 = Factory1.CreateConnection();
            //創建一個channel通道
            var channel1 = conn1.CreateModel();
            channel1.QueueDeclare("csdata", false, false, false, null);//創建一個 csdata的隊列
            var pro = channel1.CreateBasicProperties();
            pro.DeliveryMode = 1; //設置消息不持久保存
            Task.Run(() =>
            {
                while (true)
                {
                    channel1.BasicPublish("", "csdata", pro, Encoding.ASCII.GetBytes(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffffff"))); //生產消息
                    Thread.Sleep(100);
                }
            });

            ConnectionFactory Factory = new ConnectionFactory();
            Factory.HostName = "127.0.0.1";
            Factory.UserName = "xy";
            Factory.Password = "xy";
            Factory.Port = 5672;
            var conn = Factory.CreateConnection();

            //創建一個channel通道
            var channel = conn.CreateModel();
            var consumer = new EventingBasicConsumer(channel);//消費者
            channel.BasicConsume("csdata", true, consumer);//消費消息
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                Console.WriteLine($"send {message} rcv {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffffff")}");
            };


            Console.ReadLine();

注意如果需要消息持久化   需要 吧隊列設置為持久化 並且每次發送消息都需要設置為持久化,重啟以后會自動去加載隊列以及隊列的消息

 

一 不用交換機的隊列

這里寫圖片描述

生產者示例:

        //聲明隊列 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //發送10條消息,依次在消息后面附加1-10個點 
        for (int i = 6; i > 0; i--)  
        {  
            String message = "helloworld";
            channel.basicPublish("", QUEUE_NAME,null, message.getBytes());  
        }   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

消費者示例:

        //聲明隊列 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        // 指定消費隊列 
        channel.basicConsume(QUEUE_NAME, true, consumer);  
        while (true)
        {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            doWork(message);
        }  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

API介紹
(1) channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments) ;
可以看到生產者和消費者用同樣的參數聲明了隊列,官方推薦該做法,事實上對於一個已經存在的隊列即使該方法試圖用不同的參數去創建隊列也不會有任何效果,這意味着不會改變隊列更不會影響隊列現在的工作。
queue:隊列名字
durable:隊列持久化標志,true為持久化隊列
exclusive:exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:其一,排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一個連接創建的排他隊列的。其二,“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同。其三,即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的。這種隊列適用於只限於一個客戶端發送讀取消息的應用場景。
autoDelete:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。
arguments:Map類型,關於隊列及隊列中消息的詳細設置

arguments鍵 意義
x-message-ttl 數字類型,標志時間,以豪秒為單位 標志隊列中的消息存活時間,也就是說隊列中的消息超過了制定時間會被刪除
x-expires 數字類型,標志時間,以豪秒為單位 隊列自身的空閑存活時間,當前的queue在指定的時間內,沒有consumer、basic.get也就是未被訪問,就會被刪除。
x-max-length和x-max-length-bytes 數字 最大長度和最大占用空間,設置了最大長度的隊列,在超過了最大長度后進行插入會刪除之前插入的消息為本次的留出空間,相應的最大占用大小也是這個道理,當超過了這個大小的時候,會刪除之前插入的消息為本次的留出空間。
x-dead-letter-exchange和x-dead-letter-routing-key 字符串 消息因為超時或超過限制在隊列里消失,這樣我們就丟失了一些消息,也許里面就有一些是我們做需要獲知的。而rabbitmq的死信功能則為我們帶來了解決方案。設置了dead letter exchange與dead letter routingkey(要么都設定,要么都不設定)那些因為超時或超出限制而被刪除的消息會被推動到我們設置的exchange中,再根據routingkey推到queue中
x-max-priority 數字 隊列所支持的優先級別,列如設置為5,表示隊列支持0到5六個優先級別,5最高,0最低,當然這需要生產者在發送消息時指定消息的優先級別,消息按照優先級別從高到低的順序分發給消費者

(2)channel.basicPublish(exchange, routingKey, mandatory, immediate, basicProperties, body);
exchange: 交換機名
routingKey: 路由鍵
mandatory:當mandatory標志位設置為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那么會調用basic.return方法將消息返還給生產者, channel.addReturnListener添加一個監聽器,當broker執行basic.return方法時,會回調handleReturn方法,這樣我們就可以處理變為死信的消息了;當mandatory設為false時,出現上述情形broker會直接將消息扔掉;通俗的講,mandatory標志告訴broker代理服務器至少將消息route到一個隊列中,否則就將消息return給發送者。
immediate: rabbitMq3.0已經不在支持了,3.0以前這個標志告訴服務器如果該消息關聯的queue上有消費者,則馬上將消息投遞給它,如果所有queue都沒有消費者,直接把消息返還給生產者,不用將消息入隊列等待消費者了。
basicProperties:消息的詳細屬性,例如優先級別、持久化、到期時間,headers類型的exchange要用到的是其中的headers字段。

            public BasicProperties(
            String contentType,//消息類型如:text/plain
            String contentEncoding,//編碼
            Map<String,Object> headers,
            Integer deliveryMode,//1:nonpersistent 2:persistent
            Integer priority,//優先級
            String correlationId,
            String replyTo,//反饋隊列
            String expiration,//expiration到期時間
            String messageId,
            Date timestamp,
            String type,
            String userId,
            String appId,
            String clusterId)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

body:消息實體,字節數組。
(3)QueueingConsumer:這是一個已經實現好了的Consumer,相比於自己實現Consumer接口,這是個比較安全快捷的方式。該類基於jdk的BlockingQueue實現,handleDelivery方法中將收到的消息封裝成Delivery對象,並存放到BlockingQueue中,這相當於消費者本地存放了一個消息緩存隊列。nextDelivery()方法底層調用的BlockingQueue的阻塞方法take()。
(4)channel.basicConsume(queue, autoAck, consumer);
queue:隊列名。
autoAck:自動應答標志,true為自動應答。
consumer:消費者對象,可以自己實現Consumer接口,建議使用QueueingConsumer。

二 fanout類型的交換機

這里寫圖片描述
1、消息與隊列匹配規則:fanout類型交換機會將接收到的消息廣播給所有與之綁定的隊列。
2、現在我們來演示一下如圖所示的消息廣播機制,不難注意到這種情況生產者P只關心消息發送給哪個交換機,由交換機X決定消息發送到哪些隊列,,而消費者C只關注訂閱哪個隊列。

生產者示例:

        // 聲明轉發器和類型 
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );     
        String message = "消息1";  
        // 往轉發器上發送消息 
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); 
  • 1
  • 2
  • 3
  • 4
  • 5

消費者示例:

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
        // 創建一個非持久的、唯一的且自動刪除的隊列 
        String queueName = channel.queueDeclare().getQueue();  
        // 為轉發器指定隊列,設置binding 
        channel.queueBind(queueName, EXCHANGE_NAME, "");  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        // 指定接收者,第二個參數為自動應答,無需手動應答 
        channel.basicConsume(queueName, true, consumer);  
        while (true)  
        {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
            doSomething(message);  
        }  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

示例代碼所用API介紹:
(1)channel.exchangeDeclare(exchange, type );
exchange: 交換機名。
type:交換機類型,值為fanout、direct、topic、headers其中之一。
(2)channel.queueDeclare():創建一個非持久的、唯一的、自動刪除的隊列且隊列名稱由服務器隨機產生。
(3)channel.queueBind(queue, exchange, bindingKey);
queue:隊列名。
exchange:交換機名。
bindingKey:綁定鍵。
(4)channel.basicPublish(exchange, routingKey, basicProperties, body)
exchange:交換機名。
routingKey:消息綁定的路由鍵。
basicProperties:消息屬性,詳細字段見上一節不用交換機的隊列
body:消息實體,字節數組。

三 direct類型的交換機

這里寫圖片描述
1、消息分發規則:消息會被推送至綁定鍵(binding key)和消息發布附帶的選擇鍵(routing key)完全匹配的隊列。
2、圖示說明:消息1附帶路由鍵“error”、與綁定鍵“error”匹配,而隊列Q4、Q5與交換機X間都存在綁定鍵“error”所以消息1被分發到Q4、Q5;消息2附帶路由鍵“info”,而隊列Q4與交換機間存在綁定建“info”,所以消息2被分發到隊列Q4。

3、分發到隊列的消息不再帶有綁定鍵,事實上分發到隊列的消息不再帶有發送者的任何信息,當然如果消息實體里面包含了發送者消息,那么消費者可以獲取發送者信息。

生產者示例:

        chanel.exchangeDeclare(EXCHANGE_NAME, "direct"); 
        // 發布消息至轉發器,指定routingkey 
        chanel.basicPublish(EXCHANGE_NAME, "error", null, message1.getBytes()); 
        chanel.basicPublish(EXCHANGE_NAME, "info", null, message2.getBytes()); 
  • 1
  • 2
  • 3
  • 4

消費者示例:

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");  
        String queueName = channel.queueDeclare().getQueue();  
        // 指定binding_key 
        channel.queueBind(queueName, EXCHANGE_NAME, "error");   
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        channel.basicConsume(queueName, true, consumer);  
        while (true)  
        {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody()); 
            doSomething(message);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

四 topic類型的交換機

這里寫圖片描述

1、消息分發規則:一個附帶特殊的選擇鍵將會被轉發到綁定鍵與之匹配的隊列中。

2、routingKey於bindingKey匹配規則:
routingKey必須是由點隔開的一系列的標識符組成。標識符可以是任何東西,但是一般都與消息的某些特性相關。一些合法的選擇鍵的例子:”stock.usd.nyse”, “nyse.vmw”,”quick.orange.rabbit”.你可以定義任何數量的標識符,上限為255個字節。
綁定鍵和選擇鍵的形式一樣。。需要注意的是:關於綁定鍵有兩種特殊的情況。

  • *可以匹配一個標識符。
  • #可以匹配0個或多個標識符。

3、圖示說明:
消息1附帶路由鍵“fast.orange.*”與綁定鍵“#”、“*.orange.*”匹配,所以消息1被分發給隊列Q6、Q7;消息2附帶路由鍵“lazy.orange.a.b”與綁定鍵“#”、“lazy.#”匹配,所以消息2被分發給隊列Q6、Q8。

代碼示例與direct類型轉發器基本雷同,只是路由鍵和綁定鍵格式不一樣,這里不再贅述。

四 headers類型的交換機

這里寫圖片描述

1、消息分發規則:headers類型的交換機分發消息不依賴routingKey,是使用發送消息時basicProperties對象中的headers來匹配的。headers是一個鍵值對類型,發送者發送消息時將這些鍵值對放到basicProperties對象中的headers字段中,隊列綁定交換機時綁定一些鍵值對,當兩者匹配時,隊列就可以收到消息。匹配模式有兩種,在隊列綁定到交換機時用x-match來指定,all代表定義的多個鍵值對都要滿足,而any則代碼只要滿足一個就可以了。fanout,direct,topic exchange的routingKey都需要要字符串形式的,而headers exchange則沒有這個要求,因為鍵值對的值可以是任何類型。

2、圖示說明:
消息1附帶的鍵值對與Q9綁定鍵值對的color匹配、speed不匹配,但是Q9的x-match設置為any,因此只要有一項匹配消息1就可以被分發到Q9。消息1與Q10完全匹配,消息2與Q10部分匹配,由於Q10的x-match設置為all,所以只能接受到消息1。

3、代碼示例

生產者示例:

        //聲明轉發器和類型headers 
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.HEADERS,false,true,null);  
        String message = "消息1";  
        Map<String,Object> headers =  new Hashtable<String, Object>();  
        headers.put("aaa", "01234");  
        Builder properties = new BasicProperties.Builder();  
        properties.headers(headers);  
        // 指定消息發送到的轉發器,綁定鍵值對headers鍵值對 
        channel.basicPublish(EXCHANGE_NAME, "",properties.build(),message.getBytes()); 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

消費者示例:

        //聲明轉發器和類型headers 
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.HEADERS,false,true,null);  
        channel.queueDeclare(QUEUE_NAME,false, false, true,null);  

        Map<String, Object> headers = new Hashtable<String, Object>();  
        headers.put("x-match", "any");//all any 
        headers.put("aaa", "01234");  
        headers.put("bbb", "56789");  
        // 為轉發器指定隊列,設置binding 綁定header鍵值對 
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"", headers);  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        // 指定接收者,第二個參數為自動應答,無需手動應答 
        channel.basicConsume(QUEUE_NAME, true, consumer);  
        while (true) {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
            System.out.println(message);  
        }   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

示例代碼API介紹:
(1) channel.exchangeDeclare(exchange, type, durable, autoDelete, arguments);
exchange:交換機名
type:交換機類型
durable:持久化標志
autoDelete:自動刪除
arguments:擴展參數,具體如下表

擴展參數鍵 意義
alternate-exchange 下面簡稱AE,當一個消息不能被route的時候,如果exchange設定了AE,則消息會被投遞到AE。如果存在AE鏈,則會按此繼續投遞,直到消息被route或AE鏈結束或遇到已經嘗試route過消息的AE。

(2)channel.queueBind(queue, exchange, routingKey, arguments);
queue:隊列名
exchange:交換機名
routingKey:選擇鍵(路由鍵)

arguments:headers類型交換機綁定時指定的鍵值對 

 

 

 

 

queueDeclare(String queue, boolean durable, boolean exclusive, Map<String, Object> arguments);
    • queue: 隊列名稱

    • durable: 是否持久化, 隊列的聲明默認是存放到內存中的,如果rabbitmq重啟會丟失,如果想重啟之后還存在就要使隊列持久化,保存到Erlang自帶的Mnesia數據庫中,當rabbitmq重啟之后會讀取該數據庫

    • exclusive:是否排外的,有兩個作用,一:當連接關閉時connection.close()該隊列是否會自動刪除;二:該隊列是否是私有的private,如果不是排外的,可以使用兩個消費者都訪問同一個隊列,沒有任何問題,如果是排外的,會對當前隊列加鎖,其他通道channel是不能訪問的,如果強制訪問會報異常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等於true的話用於一個隊列只能有一個消費者來消費的場景

    • autoDelete:是否自動刪除,當最后一個消費者斷開連接之后隊列是否自動被刪除,可以通過RabbitMQ Management,查看某個隊列的消費者數量,當consumers = 0時隊列就會自動刪除

    • arguments: 
      隊列中的消息什么時候會自動被刪除?

      • Message TTL(x-message-ttl):設置隊列中的所有消息的生存周期(統一為整個隊列的所有消息設置生命周期), 也可以在發布消息的時候單獨為某個消息指定剩余生存時間,單位毫秒, 類似於redis中的ttl,生存時間到了,消息會被從隊里中刪除,注意是消息被刪除,而不是隊列被刪除, 特性Features=TTL, 單獨為某條消息設置過期時間AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”); 
        channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));

      • Auto Expire(x-expires): 當隊列在指定的時間沒有被訪問(consume, basicGet, queueDeclare…)就會被刪除,Features=Exp

      • Max Length(x-max-length): 限定隊列的消息的最大值長度,超過指定長度將會把最早的幾條刪除掉, 類似於mongodb中的固定集合,例如保存最新的100條消息, Feature=Lim

      • Max Length Bytes(x-max-length-bytes): 限定隊列最大占用的空間大小, 一般受限於內存、磁盤的大小, Features=Lim B

      • Dead letter exchange(x-dead-letter-exchange): 當隊列消息長度大於最大長度、或者過期的等,將從隊列中刪除的消息推送到指定的交換機中去而不是丟棄掉,Features=DLX

      • Dead letter routing key(x-dead-letter-routing-key):將刪除的消息推送到指定交換機的指定路由鍵的隊列中去, Feature=DLK

      • Maximum priority(x-max-priority):優先級隊列,聲明隊列時先定義最大優先級值(定義最大值一般不要太大),在發布消息的時候指定該消息的優先級, 優先級更高(數值更大的)的消息先被消費,

      • Lazy mode(x-queue-mode=lazy): Lazy Queues: 先將消息保存到磁盤上,不放在內存中,當消費者開始消費的時候才加載到內存中
      • Master locator(x-queue-master-locator)

一 不用交換機的隊列

這里寫圖片描述

生產者示例:

        //聲明隊列 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //發送10條消息,依次在消息后面附加1-10個點 
        for (int i = 6; i > 0; i--)  
        {  
            String message = "helloworld";
            channel.basicPublish("", QUEUE_NAME,null, message.getBytes());  
        }   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

消費者示例:

        //聲明隊列 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        // 指定消費隊列 
        channel.basicConsume(QUEUE_NAME, true, consumer);  
        while (true)
        {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            doWork(message);
        }  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

API介紹
(1) channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments) ;
可以看到生產者和消費者用同樣的參數聲明了隊列,官方推薦該做法,事實上對於一個已經存在的隊列即使該方法試圖用不同的參數去創建隊列也不會有任何效果,這意味着不會改變隊列更不會影響隊列現在的工作。
queue:隊列名字
durable:隊列持久化標志,true為持久化隊列
exclusive:exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:其一,排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一個連接創建的排他隊列的。其二,“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同。其三,即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的。這種隊列適用於只限於一個客戶端發送讀取消息的應用場景。
autoDelete:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。
arguments:Map類型,關於隊列及隊列中消息的詳細設置

arguments鍵 意義
x-message-ttl 數字類型,標志時間,以豪秒為單位 標志隊列中的消息存活時間,也就是說隊列中的消息超過了制定時間會被刪除
x-expires 數字類型,標志時間,以豪秒為單位 隊列自身的空閑存活時間,當前的queue在指定的時間內,沒有consumer、basic.get也就是未被訪問,就會被刪除。
x-max-length和x-max-length-bytes 數字 最大長度和最大占用空間,設置了最大長度的隊列,在超過了最大長度后進行插入會刪除之前插入的消息為本次的留出空間,相應的最大占用大小也是這個道理,當超過了這個大小的時候,會刪除之前插入的消息為本次的留出空間。
x-dead-letter-exchange和x-dead-letter-routing-key 字符串 消息因為超時或超過限制在隊列里消失,這樣我們就丟失了一些消息,也許里面就有一些是我們做需要獲知的。而rabbitmq的死信功能則為我們帶來了解決方案。設置了dead letter exchange與dead letter routingkey(要么都設定,要么都不設定)那些因為超時或超出限制而被刪除的消息會被推動到我們設置的exchange中,再根據routingkey推到queue中
x-max-priority 數字 隊列所支持的優先級別,列如設置為5,表示隊列支持0到5六個優先級別,5最高,0最低,當然這需要生產者在發送消息時指定消息的優先級別,消息按照優先級別從高到低的順序分發給消費者

(2)channel.basicPublish(exchange, routingKey, mandatory, immediate, basicProperties, body);
exchange: 交換機名
routingKey: 路由鍵
mandatory:當mandatory標志位設置為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那么會調用basic.return方法將消息返還給生產者, channel.addReturnListener添加一個監聽器,當broker執行basic.return方法時,會回調handleReturn方法,這樣我們就可以處理變為死信的消息了;當mandatory設為false時,出現上述情形broker會直接將消息扔掉;通俗的講,mandatory標志告訴broker代理服務器至少將消息route到一個隊列中,否則就將消息return給發送者。
immediate: rabbitMq3.0已經不在支持了,3.0以前這個標志告訴服務器如果該消息關聯的queue上有消費者,則馬上將消息投遞給它,如果所有queue都沒有消費者,直接把消息返還給生產者,不用將消息入隊列等待消費者了。
basicProperties:消息的詳細屬性,例如優先級別、持久化、到期時間,headers類型的exchange要用到的是其中的headers字段。

            public BasicProperties(
            String contentType,//消息類型如:text/plain
            String contentEncoding,//編碼
            Map<String,Object> headers,
            Integer deliveryMode,//1:nonpersistent 2:persistent
            Integer priority,//優先級
            String correlationId,
            String replyTo,//反饋隊列
            String expiration,//expiration到期時間
            String messageId,
            Date timestamp,
            String type,
            String userId,
            String appId,
            String clusterId)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

body:消息實體,字節數組。
(3)QueueingConsumer:這是一個已經實現好了的Consumer,相比於自己實現Consumer接口,這是個比較安全快捷的方式。該類基於jdk的BlockingQueue實現,handleDelivery方法中將收到的消息封裝成Delivery對象,並存放到BlockingQueue中,這相當於消費者本地存放了一個消息緩存隊列。nextDelivery()方法底層調用的BlockingQueue的阻塞方法take()。
(4)channel.basicConsume(queue, autoAck, consumer);
queue:隊列名。
autoAck:自動應答標志,true為自動應答。
consumer:消費者對象,可以自己實現Consumer接口,建議使用QueueingConsumer。

二 fanout類型的交換機

這里寫圖片描述
1、消息與隊列匹配規則:fanout類型交換機會將接收到的消息廣播給所有與之綁定的隊列。
2、現在我們來演示一下如圖所示的消息廣播機制,不難注意到這種情況生產者P只關心消息發送給哪個交換機,由交換機X決定消息發送到哪些隊列,,而消費者C只關注訂閱哪個隊列。

生產者示例:

        // 聲明轉發器和類型 
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );     
        String message = "消息1";  
        // 往轉發器上發送消息 
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); 
  • 1
  • 2
  • 3
  • 4
  • 5

消費者示例:

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
        // 創建一個非持久的、唯一的且自動刪除的隊列 
        String queueName = channel.queueDeclare().getQueue();  
        // 為轉發器指定隊列,設置binding 
        channel.queueBind(queueName, EXCHANGE_NAME, "");  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        // 指定接收者,第二個參數為自動應答,無需手動應答 
        channel.basicConsume(queueName, true, consumer);  
        while (true)  
        {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
            doSomething(message);  
        }  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

示例代碼所用API介紹:
(1)channel.exchangeDeclare(exchange, type );
exchange: 交換機名。
type:交換機類型,值為fanout、direct、topic、headers其中之一。
(2)channel.queueDeclare():創建一個非持久的、唯一的、自動刪除的隊列且隊列名稱由服務器隨機產生。
(3)channel.queueBind(queue, exchange, bindingKey);
queue:隊列名。
exchange:交換機名。
bindingKey:綁定鍵。
(4)channel.basicPublish(exchange, routingKey, basicProperties, body)
exchange:交換機名。
routingKey:消息綁定的路由鍵。
basicProperties:消息屬性,詳細字段見上一節不用交換機的隊列
body:消息實體,字節數組。

三 direct類型的交換機

這里寫圖片描述
1、消息分發規則:消息會被推送至綁定鍵(binding key)和消息發布附帶的選擇鍵(routing key)完全匹配的隊列。
2、圖示說明:消息1附帶路由鍵“error”、與綁定鍵“error”匹配,而隊列Q4、Q5與交換機X間都存在綁定鍵“error”所以消息1被分發到Q4、Q5;消息2附帶路由鍵“info”,而隊列Q4與交換機間存在綁定建“info”,所以消息2被分發到隊列Q4。

3、分發到隊列的消息不再帶有綁定鍵,事實上分發到隊列的消息不再帶有發送者的任何信息,當然如果消息實體里面包含了發送者消息,那么消費者可以獲取發送者信息。

生產者示例:

        chanel.exchangeDeclare(EXCHANGE_NAME, "direct"); 
        // 發布消息至轉發器,指定routingkey 
        chanel.basicPublish(EXCHANGE_NAME, "error", null, message1.getBytes()); 
        chanel.basicPublish(EXCHANGE_NAME, "info", null, message2.getBytes()); 
  • 1
  • 2
  • 3
  • 4

消費者示例:

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");  
        String queueName = channel.queueDeclare().getQueue();  
        // 指定binding_key 
        channel.queueBind(queueName, EXCHANGE_NAME, "error");   
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        channel.basicConsume(queueName, true, consumer);  
        while (true)  
        {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody()); 
            doSomething(message);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

四 topic類型的交換機

這里寫圖片描述

1、消息分發規則:一個附帶特殊的選擇鍵將會被轉發到綁定鍵與之匹配的隊列中。

2、routingKey於bindingKey匹配規則:
routingKey必須是由點隔開的一系列的標識符組成。標識符可以是任何東西,但是一般都與消息的某些特性相關。一些合法的選擇鍵的例子:”stock.usd.nyse”, “nyse.vmw”,”quick.orange.rabbit”.你可以定義任何數量的標識符,上限為255個字節。
綁定鍵和選擇鍵的形式一樣。。需要注意的是:關於綁定鍵有兩種特殊的情況。

  • *可以匹配一個標識符。
  • #可以匹配0個或多個標識符。

3、圖示說明:
消息1附帶路由鍵“fast.orange.*”與綁定鍵“#”、“*.orange.*”匹配,所以消息1被分發給隊列Q6、Q7;消息2附帶路由鍵“lazy.orange.a.b”與綁定鍵“#”、“lazy.#”匹配,所以消息2被分發給隊列Q6、Q8。

代碼示例與direct類型轉發器基本雷同,只是路由鍵和綁定鍵格式不一樣,這里不再贅述。

四 headers類型的交換機

這里寫圖片描述

1、消息分發規則:headers類型的交換機分發消息不依賴routingKey,是使用發送消息時basicProperties對象中的headers來匹配的。headers是一個鍵值對類型,發送者發送消息時將這些鍵值對放到basicProperties對象中的headers字段中,隊列綁定交換機時綁定一些鍵值對,當兩者匹配時,隊列就可以收到消息。匹配模式有兩種,在隊列綁定到交換機時用x-match來指定,all代表定義的多個鍵值對都要滿足,而any則代碼只要滿足一個就可以了。fanout,direct,topic exchange的routingKey都需要要字符串形式的,而headers exchange則沒有這個要求,因為鍵值對的值可以是任何類型。

2、圖示說明:
消息1附帶的鍵值對與Q9綁定鍵值對的color匹配、speed不匹配,但是Q9的x-match設置為any,因此只要有一項匹配消息1就可以被分發到Q9。消息1與Q10完全匹配,消息2與Q10部分匹配,由於Q10的x-match設置為all,所以只能接受到消息1。

3、代碼示例

生產者示例:

        //聲明轉發器和類型headers 
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.HEADERS,false,true,null);  
        String message = "消息1";  
        Map<String,Object> headers =  new Hashtable<String, Object>();  
        headers.put("aaa", "01234");  
        Builder properties = new BasicProperties.Builder();  
        properties.headers(headers);  
        // 指定消息發送到的轉發器,綁定鍵值對headers鍵值對 
        channel.basicPublish(EXCHANGE_NAME, "",properties.build(),message.getBytes()); 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

消費者示例:

        //聲明轉發器和類型headers 
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.HEADERS,false,true,null);  
        channel.queueDeclare(QUEUE_NAME,false, false, true,null);  

        Map<String, Object> headers = new Hashtable<String, Object>();  
        headers.put("x-match", "any");//all any 
        headers.put("aaa", "01234");  
        headers.put("bbb", "56789");  
        // 為轉發器指定隊列,設置binding 綁定header鍵值對 
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"", headers);  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        // 指定接收者,第二個參數為自動應答,無需手動應答 
        channel.basicConsume(QUEUE_NAME, true, consumer);  
        while (true) {  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
            System.out.println(message);  
        }   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

示例代碼API介紹:
(1) channel.exchangeDeclare(exchange, type, durable, autoDelete, arguments);
exchange:交換機名
type:交換機類型
durable:持久化標志
autoDelete:自動刪除
arguments:擴展參數,具體如下表

擴展參數鍵 意義
alternate-exchange 下面簡稱AE,當一個消息不能被route的時候,如果exchange設定了AE,則消息會被投遞到AE。如果存在AE鏈,則會按此繼續投遞,直到消息被route或AE鏈結束或遇到已經嘗試route過消息的AE。

(2)channel.queueBind(queue, exchange, routingKey, arguments);
queue:隊列名
exchange:交換機名
routingKey:選擇鍵(路由鍵)
arguments:headers類型交換機綁定時指定的鍵值對


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM