RabbitMQ介紹


(一)RabbitMQ基本概念

  RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。我曾經對這門語言挺有興趣,學過一段時間,后來沒堅持。RabbitMQ是 AMQP(高級消息隊列協議)的標准實現。如果不熟悉AMQP,直接看RabbitMQ的文檔會比較困難。不過它也只有幾個關鍵概念,這里簡單介紹。

RabbitMQ的結構圖如下:

1、幾個概念說明:

Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。

2、消息隊列的使用過程大概如下:

(1)客戶端連接到消息隊列服務器,打開一個channel。
(2)客戶端聲明一個exchange,並設置相關屬性。
(3)客戶端聲明一個queue,並設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。
(5)客戶端投遞消息到exchange。

exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。

exchange也有幾個類型,完全根據key進行投遞的叫做Direct交換機,例如,綁定時設置了routing key為”abc”,那么客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。對key進行模式匹配后進行投遞的叫做Topic交換機,符 號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還 有一種不需要key的,叫做Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。

3、關聯關系

從示意圖可以看出消息生產者並沒有直接將消息發送給消息隊列,而是通過建立與Exchange的Channel,將消息發送給Exchange,Exchange根據規則,將消息轉發給指定的消息隊列。消費者通過建立與消息隊列相連的Channel,從消息隊列中獲取消息。

這里談到的Channel可以理解為建立在生產者/消費者和RabbitMQ服務器之間的TCP連接上的虛擬連接,一個TCP連接上可以建立多個Channel。 RabbitMQ服務器的Exchange對象可以理解為生產者發送消息的郵局,消息隊列可以理解為消費者的郵箱。Exchange對象根據它定義的規則和消息包含的routing key以及header信息將消息轉發到消息隊列。channel下圖中淺紅色框起來的兩塊所示:

根據轉發消息的規則不同,RabbitMQ服務器中使用的Exchange對象有四種,Direct Exchange, Fanout Exchange, Topic Exchange, Header Exchange,如果定義Exchange時沒有指定類型和名稱, RabbitMQ將會為每個消息隊列設定一個Default Exchange,它的Routing Key是消息隊列名稱。

RabbitMQ Java Client的官網示例有6個,本篇只使用三個例程,分別是使用默認Default Exchange的消息生產/消費,使用Direct Exchange的消息生產/消費,以及RPC方式的消息生產/消費。

為了測試方便,我們新定義了一個virutal host,名字是test_vhosts,定義了兩個用戶rabbitmq_producer和rabbitmq_consumer, 設置其user_tag為administrator(可以進行遠程連接), 為它們設置了訪問test_vhosts下所有資源的權限。

創建virutal host,在Admin-->Virtual Hosts(右側的導航欄上)打開:

創建用戶:

為用戶設置權限:(在用戶列表上點擊某個用戶進入設置頁面)

 

 

使用默認Default Exchange的消息生產/消費

我們定義一個生產者程序,一個消費者程序。

生產者程序代碼如下:

package com.gl365.payment.util.rabbitmq.demo1;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import io.netty.handler.timeout.TimeoutException;

public class ProducerApp {
    public static void main(String[] args) throws IOException, TimeoutException {  
        Connection connection = null;  
        Channel channel = null;  
        try  
        {  
            ConnectionFactory factory = new ConnectionFactory();  
            factory.setHost("localhost");  
            factory.setPort(5672);  
            factory.setUsername("rabbitmq_producer");  
            factory.setPassword("rabbitmq_producer");  
            factory.setVirtualHost("test_vhosts");  
   
            //創建與RabbitMQ服務器的TCP連接  
            connection  = factory.newConnection();  
            channel = connection.createChannel();  
            channel.queueDeclare("firstQueue", true, false, false, null);  
            String message = "First Message";             
            channel.basicPublish("", "firstQueue", null, message.getBytes());  
            System.out.println("Send Message is:'" + message + "'");              
        }  
        catch(Exception ex)  
        {  
            ex.printStackTrace();  
        }  
        finally  
        {  
            if(channel != null)  
            {  
                try {
                    channel.close();
                } catch (java.util.concurrent.TimeoutException e) {
                    e.printStackTrace();
                }  
            }  
            if(connection != null)  
            {  
                connection.close();  
            }  
        }  
    }
}

關於生產者的代碼有幾點說明:

1) RabbitMQ Java Client示例提供的ConnectionFactory屬性設置的代碼只有一句:

factory.setHost("localhost");  

這句代碼表示使用rabbitmq服務器默認的virutal host(“/”),默認的用戶guest/guest進行連接,但是如果這段代碼運行在遠程機器上時, 將因為guest用戶不能用於遠程連接RabbitMQ服務器而運行失敗,上面提供的代碼是可以進行建立遠程連接的代碼。

2)Channel建立后,調用Channel.queueDeclare方法創建消息隊列firstQueue。

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,  
                 Map<String, Object> arguments) throws IOException; 

這個方法的第二個參數durable表示建立的消息隊列是否是持久化(RabbitMQ重啟后仍然存在,並不是指消息的持久化),第三個參數exclusive 表示建立的消息隊列是否只適用於當前TCP連接,第四個參數autoDelete表示當隊列不再被使用時,RabbitMQ是否可以自動刪除這個隊列。 第五個參數arguments定義了隊列的一些參數信息,主要用於Headers Exchange進行消息匹配時。

 

3)生產者發送消息使用Channel.basicPublish方法。

void basicPublish(String exchange, String routingKey,   
 BasicProperties props, byte[] body) throws IOException; 

第一個參數exchange是消息發送的Exchange名稱,如果沒有指定,則使用Default Exchange。 第二個參數routingKey是消息的路由Key,是用於Exchange將消息路由到指定的消息隊列時使用(如果Exchange是Fanout Exchange,這個參數會被忽略), 第三個參數props是消息包含的屬性信息。RabbitMQ的消息屬性和消息體是分開的,不像JMS消息那樣同時包含在javax.jms.Message對象中,這一點需要特別注意。 第四個參數body是RabbitMQ消息體。 我們這里調用basicPublish方法發送消息時,props參數為null,因而我們發送的消息是非持久化消息,如果要發送持久化消息,我們需要進行如下設置:

AMQP.BasicProperties props =  
                    new AMQP.BasicProperties("text/plain",  
                            "UTF-8",  
                            null,  
                            2,  
                            0, null, null, null,  
                            null, null, null, null,  
                            null, null);  
 channel.basicPublish("", "firstQueue", props, message.getBytes());  

定義props時的參數2表示消息的類型為持久化消息。 運行生產者程序后,我們可以執行rabbitmqctl命令查看隊列消息,我們看到firstQueue隊列有一條消息。

消費者代碼如下:

package com.gl365.payment.util.rabbitmq.demo1;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class ConsumerApp {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("rabbitmq_consumer");
            factory.setPassword("rabbitmq_consumer");
            factory.setVirtualHost("test_vhosts");
            connection = factory.newConnection();
            channel = connection.createChannel();

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" Consumer have received '" + message + "'");
                }
            };
            channel.basicConsume("firstQueue", true, consumer);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

消費者代碼中,建立Connection,Channel的代碼和生產者程序類似。它主要定義了一個Consumer對象,這個對象重載了DefaultCustomer類 的handleDelivery方法:

void handleDelivery(String consumerTag,  
                        Envelope envelope,  
                        AMQP.BasicProperties properties,  
                        byte[] body) 
 

handleDelivery方法的第一個參數consumerTag是接收到消息時的消費者Tag,如果我們沒有在basicConsume方法中指定Consumer Tag,RabbitMQ將使用隨機生成的Consumer Tag(如下圖所示)

 

第二個參數envelope是消息的打包信息,包含了四個屬性:

1._deliveryTag,消息發送的編號,表示這條消息是RabbitMQ發送的第幾條消息,我們可以看到這條消息是發送的 第一條消息。

2._redeliver,重傳標志,確認在收到對消息的失敗確認后,是否需要重發這條消息,我們這里的值是false,不需要重發。

3._exchange,消息發送到的Exchange名稱,正如我們上面發送消息時一樣,exchange名稱為空,使用的是Default Exchange。

4._routingKey,消息發送的路由Key,我們這里是發送消息時設置的“firstQueue”。

第三個參數properties就是上面使用basicPublish方法發送消息時的props參數,由於我們上面設置它為null,這里接收到的properties 是默認的Properties,只有bodySize,其他全是null。

第四個參數body是消息體.

我們這里重載的handleDelivery方法僅僅打印出了生產者發送的消息內容,實際使用時可以轉發給后台程序進行處理。

在Consumer對象定義后,我們調用了Channel.basicConsume方法將Consumer與消息隊列綁定,否則Consumer無法從消息隊列獲取消息。

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException  

basicConsume方法的第一個參數是Consumer綁定的隊列名,第二個參數是自動確認標志,如果為true,表示Consumer接受到消息后,會自動發確認消息(Ack消息)給消息隊列,消息隊列會將這條消息從消息隊列里刪除,第三個參數就是Consumer對象,用於處理接收到的消息。

如果我們想讓消費者接收到消息后對消息進行手動確認(Manual Ack),我們需要對代碼進行兩處改動:

1)在調用basicConsume方法時,將autoAck屬性設置為false。

channel.basicConsume("firstQueue", false, consumer);  

2)在handleDelivery方法中調用Channel.basicAck方法,發送手動確認消息給消息隊列。

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)  
                        throws IOException  
{  
      this.getChannel().basicAck(envelope.getDeliveryTag(), false);  
}  

basicAck方法有兩個參數,第一個參數deliverTag是消息的發送編號,第二個參數multiple是消息確認方式,如果值為true,表示對消息隊列里所有編號小於或等於當前消息編號的未確認消息進行手動確認,如果為false,表示僅確認當前消息。

消費者代碼執行后,我們可以看到消費者程序的控制台輸出了這條消息的內容,而且使用rabbitmqctl命令查看隊列消息時,隊列里的消息數為0。

使用Direct Exchange的消息生產/消費

使用Direct Exchange的生產者/消費者代碼與Default Exchange比較類似,不過生產者程序的代碼需要添加創建Direct Exchange和 將Exchange和消息隊列綁定的代碼,具體添加和修改的代碼如下:

package com.gl365.payment.util.rabbitmq.demo2;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import io.netty.handler.timeout.TimeoutException;

public class ProducerApp {
    public static void main(String[] args) throws IOException, TimeoutException {  
        Connection connection = null;  
        Channel channel = null;  
        try  
        {  
            ConnectionFactory factory = new ConnectionFactory();  
            factory.setHost("localhost");  
            factory.setPort(5672);  
            factory.setUsername("rabbitmq_producer");  
            factory.setPassword("rabbitmq_producer");  
            factory.setVirtualHost("test_vhosts");  
   
            //創建與RabbitMQ服務器的TCP連接  
            connection  = factory.newConnection();  
            channel = connection.createChannel(); 
            channel.exchangeDeclare("directExchange", "direct"); channel.queueDeclare("directQueue", true, false, false, null); channel.queueBind("directQueue", "directExchange", "directMessage");  
            String message = "First Direct Message";  
               
            channel.basicPublish("directExchange", "directMessage", null, message.getBytes());  
            System.out.println("Send Direct Message is:'" + message + "'");
        }  
        catch(Exception ex)  
        {  
            ex.printStackTrace();  
        }  
        finally  
        {  
            if(channel != null)  
            {  
                try {
                    channel.close();
                } catch (java.util.concurrent.TimeoutException e) {
                    e.printStackTrace();
                }  
            }  
            if(connection != null)  
            {  
                connection.close();  
            }  
        }  
    }
}

首先我們調用Channel.exchangeDeclare方法創建名為“directExchange”的Direct Exchange。

Exchange.DeclareOk exchangeDeclare(String exchange, String type,boolean durable) throws IOException 

exchangeDeclare方法的第一個參數exchange是exchange名稱,第二個參數type是Exchange類型,有“direct”,“fanout”,“topic”,“headers”四種,分別對應RabbitMQ的四種Exchange。第三個參數durable是設置Exchange是否持久化( 即在RabbitMQ服務器重啟后Exchange是否仍存在,如果沒有設置,默認是非持久化的)

創建“directQueue”消息隊列后,我們再調用Channel.queueBind方法,將我們創建的Direct Exchange和消息隊列綁定。

Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;  

queueBind方法第一個參數queue是消息隊列的名稱,第二個參數exchange是Exchange的名稱,第三個參數routingKey是消息隊列和Exchange之間綁定的路由key,我們這里綁定的路由key是“directMessage”。從Exchange過來的消息,只有routing key為“directMessage”的消息會被轉到消息隊列“directQueue”,其他消息將不會被轉發,下面將證實這一點。

運行ProducerApp程序,使用rabbitmq_producer用戶登錄管理頁面,我們可以看到名為“directExchange”的Direct Exchange被創建出來。

消息隊列directQueue與它綁定,routing key為directMessage。

消息隊列directQueue里有一條消息

我們修改ProducerApp的程序,將消息的routing key改為“indirectMessage”

package com.gl365.payment.util.rabbitmq.demo2;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import io.netty.handler.timeout.TimeoutException;

public class ProducerApp {
    public static void main(String[] args) throws IOException, TimeoutException {  
        Connection connection = null;  
        Channel channel = null;  
        try  
        {  
            ConnectionFactory factory = new ConnectionFactory();  
            factory.setHost("localhost");  
            factory.setPort(5672);  
            factory.setUsername("rabbitmq_producer");  
            factory.setPassword("rabbitmq_producer");  
            factory.setVirtualHost("test_vhosts");  
   
            //創建與RabbitMQ服務器的TCP連接  
            connection  = factory.newConnection();  
            channel = connection.createChannel(); 
            channel.exchangeDeclare("directExchange", "direct");  
            channel.queueDeclare("directQueue", true, false, false, null);  
            channel.queueBind("directQueue", "directExchange", "directMessage");  
            //String message = "First Direct Message";  
            String message = "First Indirect Message";  
            channel.basicPublish("directExchange", "indirectMessage", null, message.getBytes());  
            System.out.println("Send Indirect Message is:'" + message + "'"); 
            
            //channel.basicPublish("directExchange", "indirectQueue", null, message.getBytes());  
            //System.out.println("Send Direct Message is:'" + message + "'");
        }  
        catch(Exception ex)  
        {  
            ex.printStackTrace();  
        }  
        finally  
        {  
            if(channel != null)  
            {  
                try {
                    channel.close();
                } catch (java.util.concurrent.TimeoutException e) {
                    e.printStackTrace();
                }  
            }  
            if(connection != null)  
            {  
                connection.close();  
            }  
        }  
    }
}

再次運行程序后,打開管理頁面,我們看到“directQueue”隊列里仍然只有一條消息。

 

我們向Exchange發送的第二條消息由於和綁定的routing key不一致,沒有被轉發到“directQueue”消息隊列,說明被RabbitMQ丟棄了

 

我們通過管理界面再創建一個消息隊列“indirectQueue”,在它和“directExchange”之間建立bind關系,routingkey為“indirectMessage” 。

再綁定一個

 

再次運行ProducerApp程序,我們可以看到“directQueue”消息隊列消息數仍是1,但“indirectQueue”消息隊列接收到了從Exchange轉發來的消息。

 

使用RPC方式的消息生產/消費

RPC方式的消息生產和消費示意圖如下:

 

 

在這種方式下,生產者和消費者之間的消息發送/接收流程如下:

1)生產者在發送消息的同時,將返回消息的消息隊列名(replyTo中指定)以及消息關聯Id(correlationId)附帶在消息Properties中發送給消費者。

2)消費者在接收到消息,處理完成后,將結果作為返回消息發送到replyTo指定的返回消息隊列中,同時附帶接收消息中的corrleationId, 以便讓生產者接收到到返回消息后,根據corrleationId確認是針對1)中發送消息的返回消息,如果correlationId確認一致,則將返回消息 取出,進行后續處理。

示意圖中的生產者和消費者在發送消息時使用的都是Default Exchange,我們接下來的程序做一點改動,使用Direct Exchange。

在我們的程序中,生產者發送一個數字給消費者,消費者接收到消息后,計算這個數字的階乘結果,返回給生產者。 生產者程序的主要代碼如下:

[java]  view plain  copy
 
  1.   //創建RPC發送消息的Direct Exchange,消息隊列和綁定關系。  
  2.   channel.exchangeDeclare("rpcSendExchange", "direct",true);  
  3.   channel.queueDeclare("rpcSendQueue", true, false, false, null);  
  4.   channel.queueBind("rpcSendQueue", "rpcSendExchange", "rpcSendMessage");  
  5.   
  6.   //建立RPC返回消息的Direct Exchange, 消息隊列和綁定關系           
  7.   channel.exchangeDeclare("rpcReplyExchange", "direct",true);  
  8.   channel.queueDeclare("rpcReplyQueue", true, false, false, null);  
  9.   channel.queueBind("rpcReplyQueue", "rpcReplyExchange", "rpcReplyMessage");  
  10.   
  11.   //創建接收RPC返回消息的消費者,並將它與RPC返回消息隊列相關聯。  
  12.   QueueingConsumer replyCustomer = new QueueingConsumer(channel);  
  13.   channel.basicConsume("rpcReplyQueue", true,replyCustomer);  
  14.   
  15.   String number = "10";  
  16.   
  17.   //生成RPC請求消息的CorrelationId  
  18.   String correlationId = UUID.randomUUID().toString();  
  19.   //在RabbitMQ消息的Properties中設置RPC請求消息的CorrelationId以及  
  20.   //ReplyTo名稱(我們這里使用的是Exchange名稱,  
  21.   //而不是消息隊列名稱)  
  22.   BasicProperties props = new BasicProperties  
  23.                       .Builder()  
  24.                       .correlationId(correlationId)  
  25.                       .replyTo("rpcReplyExchange")  
  26.                       .build();  
  27.   
  28.   System.out.println("The send message's correlation id is:" + correlationId);              
  29.   channel.basicPublish("rpcSendExchange", "rpcSendMessage", props, number.getBytes());  
  30.   
  31.   String response = null;  
  32.   
  33.   while(true)  
  34.   {  
  35.           //從返回消息中取一條消息  
  36.    Delivery delivery = replyCustomer.nextDelivery();  
  37.    //如果消息的CorrelationId與發送消息的CorrleationId一致,表示這條消息是  
  38.           //發送消息對應的返回消息,是階乘運算的計算結果。  
  39.           System.out.println("The received reply message's correlation id is:" + messageCorrelationId);  
  40.           String messageCorrelationId = delivery.getProperties().getCorrelationId();  
  41.    if (!Strings.isNullOrEmpty(messageCorrelationId) && messageCorrelationId.equals(correlationId))   
  42.           {  
  43.     response = new String(delivery.getBody());  
  44.     break;  
  45.    }  
  46.   }  
  47.   
  48.   //輸出階乘運算結果  
  49.   if(!Strings.isNullOrEmpty(response))  
  50.   {  
  51. System.out.println("Factorial(" + number + ") = " + response);  
  52.   }  
消費者程序的主要代碼如下:
[java] view plain copy
 
  1.  Consumer consumer = new DefaultConsumer(channel)  
  2.  {  
  3.     @Override  
  4.     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException  
  5.     {  
  6.        //獲取返回消息發送到的Exchange名稱  
  7.        String replyExchange = properties.getReplyTo();  
  8.    
  9.        //設置返回消息的Properties,附帶發送消息的CorrelationId.  
  10.        AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()  
  11.                             .correlationId(properties.getCorrelationId())  
  12.                             .build();  
  13.    
  14.        String message = new String(body,"UTF-8");  
  15.        System.out.println("The received message is:" + message);  
  16.        System.out.println("The received message's correlation id is:" + properties.getCorrelationId());  
  17.    
  18.        //計算階乘,factorial方法是計算階乘的方法。  
  19.        int number = Integer.parseInt(message);  
  20.        String response = factorial(number);  
  21.    
  22.        //將階乘消息發送到Reply Exchange  
  23.        this.getChannel().basicPublish(replyExchange, "rpcReplyMessage",replyProps, response.getBytes());  
  24.    }  
  25. };  
  26.    
  27. channel.basicConsume("rpcSendQueue", true, consumer);  

先運行生產者程序,發送請求消息到Send Exchange,然后等待消費者發送的返回消息。 
再啟動消費者程序,計算階乘並返回結果給Reply Exchange。 兩個程序的控制台信息如下圖所示
生產者程序控制台

消費者程序控制台

從控制台信息可以看出生產者端根據返回消息中包含的Correlation Id判斷出這是發送消息對應的返回消息,獲取了階乘的計算結果。

這個例子只是簡單的生產者和消費者之間的方法調用,實際使用時,我們可以基於這個實例,實現更為復雜的操作。

 

RabbitMQ Client的重連機制

 

RabbitMQ Java Client提供了重連機制,不過在RabbitMQ Java Client 4.0版本之前,自動重連默認是關閉的。從Rabbit Client 4.0版本開始,自動重連默認是打開的。控制自動重連的屬性是com.rabbitmq.client.ConnectionFactory類的automaticRecovery和topologyRecovery屬性。

設置automaticRecovery屬性為true時,會執行以下recovery:

1)Connection的重連。

2)偵聽Connection的Listener的恢復。

3)重新建立在Connection基礎上的Channel。

4)偵聽Channel的Listener的恢復。

5)Channel上的設置,如basicQos,publisher confirm以及事務屬性等的恢復。

當設置topologyRecovery屬性為true時,會執行以下recovery:

1)exchange的重新定義(不包含預定義的exchange)

2)queue的重新定義(不包含預定義的queue)

3)binding的重新定義(不包含預定義的binding)

4)所有Consumer的恢復

我們定義一個帶auto recovery的消費者程序,我們使用RabbitMQ Java Client 4.0.0版本,這個版本引入了AutorecoveringConnection和

AutorecoveringChannel類,可以添加RecoveryListener對Recovery過程進行監控。

 

[java] view plain copy
 
  1. public class RecoveryConsumerApp  
  2. {  
  3.     public static void main( String[] args ) throws IOException, TimeoutException {  
  4.             ConnectionFactory connectionFactory = new ConnectionFactory();  
  5.             ...................  
  6.    
  7.             AutorecoveringConnection connection = (AutorecoveringConnection)connectionFactory.newConnection();  
  8.             String originalLocalAddress =  
  9.                     connection.getLocalAddress() + ":" + connection.getLocalPort();  
  10.             System.out.println("The origin connection's local address is:" + originalLocalAddress);  
  11.    
  12.             AutorecoveringChannel  channel = (AutorecoveringChannel)connection.createChannel();  
  13.             System.out.println("The origin channel's channel number is:" + channel.getChannelNumber());  
  14.    
  15.             channel.exchangeDeclare("recoveryExchange", BuiltinExchangeType.DIRECT, false, true ,null);  
  16.             channel.queueDeclare("recoveryQueue", false, false, true,null);  
  17.             channel.queueBind("recoveryQueue", "recoveryExchange", "recoveryMessage");  
  18.    
  19.             connection.addRecoveryListener(new RecoveryListener() {  
  20.                 public void handleRecovery(Recoverable recoverable) {  
  21.                     System.out.println("Connection handleRecovery method is called");  
  22.                     AutorecoveringConnection recoveredConnection =  
  23.                             (AutorecoveringConnection)recoverable;  
  24.                     String recoveredLocalAddress =  
  25.                             recoveredConnection.getLocalAddress() + ":" + recoveredConnection.getLocalPort();  
  26.                     System.out.println("The recovered connection's local address is:" + recoveredLocalAddress);  
  27.                 }  
  28.    
  29.                 public void handleRecoveryStarted(Recoverable recoverable) {  
  30.                     System.out.println("Connection handleRecoveryStarted method is called");  
  31.                 }  
  32.             });  
  33.    
  34.             channel.addRecoveryListener(new RecoveryListener() {  
  35.                     public void handleRecovery(Recoverable recoverable) {  
  36.                         System.out.println("Channel handleRecovery method is called");  
  37.                         AutorecoveringChannel recoveryChannel =  
  38.                                 (AutorecoveringChannel)recoverable;  
  39.                         System.out.println("The recovered Channel's number is:" + recoveryChannel.getChannelNumber());  
  40.                     }  
  41.    
  42.                     public void handleRecoveryStarted(Recoverable recoverable) {  
  43.                         System.out.println("Channel handleRecoveryStarted method is called");  
  44.                     }  
  45.             });  
  46.    
  47.     }  
  48. }  

 

這個程序中Exchange, Queue都是非持久化並且自動刪除的。 我們為Connection和Channel分別添加了Recovery Listener匿名對象,

便於確認他們確實進行了Recovery操作。

啟動程序后,我們可以看到recoveryExchange和recoveryQueue都被創建出來,且Binding關系建立了。

 

連接的本地地址是0.0.0.0:8109,Channel編號是1

此時我們關閉RabbitMQ服務器,再重啟RabbitMQ服務器,我們可以從控制台界面看到有連接超時的警告信

息以及重連信息。

從重連日志信息中我們可以看出Channel的編號還是1,但是Connection的本地地址已經變成了0.0.0.0:8470,證明進行了重連。

連接到recoveryQueue隊列上的Consumer Tag也進行了恢復,而且Consumer Tag與之前的Consumer Tag一致,這是因為設置了

topologyRecovery屬性為true。

我們再在生產者程序中使用重連機制,依然使用Rabbit Java Client 4.0版本 生產者程序的片段如下:

 

[java] view plain copy
 
  1. <span style="font-size: 17.5px;">  </span>factory.setAutomaticRecoveryEnabled(true);  
  2.    factory.setNetworkRecoveryInterval(60000);  
  3.    factory.setTopologyRecoveryEnabled(true);  
  4.    
  5.    AutorecoveringConnection connection = (AutorecoveringConnection)factory.newConnection();  
  6.    AutorecoveringChannel channel = (AutorecoveringChannel)connection.createChannel();     
  7.    //設置Channel為Publish Confirm模式  
  8.    channel.confirmSelect();  <span style="font-size: 17.5px;">  </span>  


 

登錄管理界面,我們可以看到生產者建立的Channel是Confirm模式(圖中Mode列用C表示)

我們關掉RabbitMQ服務器,再重啟RabbitMQ服務器,可以看到生產者Channel被恢復,但是本地端口號已經從13684變成了13874,

說明這是重新創建的Channel,創建的Channel仍然是Confirm模式,和最初的Channel一致。

如果我們設置Channel為Transaction模式(調用Channel.txSelect()方法),重連后恢復的Channel的模式也仍然是Transaction模式。

RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,為了數據安全考慮,我想大多數用戶都會選擇持久化。消息隊列持久化包括3個部分:
(1)exchange持久化,在聲明時指定durable => 1
(2)queue持久化,在聲明時指定durable => 1
(3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。

(二、基本概念介紹)

      AMQP(高級消息隊列協議) 是一個異步消息傳遞所使用的應用層協議規范,作為線路層協議,而不是API(例如JMS),AMQP 客戶端能夠無視消息的來源任意發送和接受信息。AMQP的原始用途只是為金融界提供一個可以彼此協作的消息協議,而現在的目標則是為通用消息隊列架構提供通用構建工具。因此,面向消息的中間件 (MOM)系統,例如發布/訂閱隊列,沒有作為基本元素實現。反而通過發送簡化的AMQ實體,用戶被賦予了構建例如這些實體的能力。這些實體也是規范的一 部分,形成了在線路層協議頂端的一個層級:AMQP模型。這個模型統一了消息模式,諸如之前提到的發布/訂閱,隊列,事務以及流數據,並且添加了額外的特性,例如更易於擴展,基於內容的路由。

AMQP當中有四個概念非常重要

  1. virtual host,虛擬主機
  2. exchange,交換機
  3. queue,隊列
  4. binding,綁定

一個虛擬主機持有一組交換機、隊列和綁定。

為什么需要多個虛擬主機呢?因為RabbitMQ當中,用戶只能在虛擬主機的粒度進行權限控制。因此,如果需要禁止A組訪問B組的交換機/隊列/綁定,必須為A和B分別創建一個虛擬主機。每一個RabbitMQ服務器都有一個默認的虛擬主機/

何謂虛擬主機(virtual host),交換機(exchange),隊列(queue)和綁定(binding)

隊列(Queues)是你的消息(messages)的終點,可以理解成裝消息的容器。消息就一直在里面,直到有客戶端(也就是消費者,Consumer)連接到這個隊列並且將其取走為止。不過,也可以將一個隊列配置成這樣的:一旦消息進入這個隊列,此消息就被刪除。

隊列是由消費者(Consumer)通過程序建立的,不是通過配置文件或者命令行工具。這沒什么問題,如果一個消費者試圖創建一個已經存在的隊列,RabbitMQ會直接忽略這個請求。因此我們可以將消息隊列的配置寫在應用程序的代碼里面。

而要把一個消息放進隊列前,需要有一個交換機(Exchange)。

交換機(Exchange)可以理解成具有路由表的路由程序。每個消息都有一個稱為路由鍵(routing key)的屬性,就是一個簡單的字符串。交換機當中有一系列的綁定(binding),即路由規則(routes)。(例如,指明具有路由鍵 “X” 的消息要到名為timbuku的隊列當中去。)

消費者程序(Consumer)要負責創建你的交換機。交換機可以存在多個,每個交換機在自己獨立的進程當中執行,因此增加多個交換機就是增加多個進程,可以充分利用服務器上的CPU核以便達到更高的效率。例如,在一個8核的服務器上,可以創建5個交換機來用5個核,另外3個核留下來做消息處理。類似的,在RabbitMQ的集群當中,你可以用類似的思路來擴展交換機一邊獲取更高的吞吐量。

交換機如何判斷要把消息送到哪個隊列?你需要路由規則,即綁定(binding)。一個綁定就是一個類似這樣的規則:將交換機“desert(沙漠)”當中具有路由鍵“阿里巴巴”的消息送到隊列“hideout(山洞)”里面去。換句話說,一個綁定就是一個基於路由鍵將交換機和隊列連接起來的路由規則。例如,具有路由鍵“audit”的消息需要被送到兩個隊列,“log-forever”和“alert-the-big-dude”。要做到這個,就需要創建兩個綁定,每個都連接一個交換機和一個隊列,兩者都是由“audit”路由鍵觸發。在這種情況下,交換機會復制一份消息並且把它們分別發送到兩個隊列當中。交換機不過就是一個由綁定構成的路由表。

交換機有多種類型。他們都是做路由的,但是它們接受不同類型的綁定。為什么不創建一種交換機來處理所有類型的路由規則呢?因為每種規則用來做匹配分子的CPU開銷是不同的。例如,一個“topic”類型的交換機試圖將消息的路由鍵與類似“dogs.*”的模式進行匹配。匹配這種末端的通配符比直接將路由鍵與“dogs”比較(“direct”類型的交換機)要消耗更多的CPU。如果你不需要“topic”類型的交換機帶來的靈活性,你可以通過使用“direct”類型的交換機獲取更高的處理效率。那么有哪些類型,他們又是怎么處理的呢?

  Exchange

  1. Exchange Direct

     

    Exchange Fanout

    Exchange Topic

     


持久化

你花了大量的時間來創建隊列、交換機和綁定,然后,服務器程序掛了。你的隊列、交換機和綁定怎么樣了?還有,放在隊列里面但是尚未處理的消息們呢?

如果你是用默認參數構造的這一切的話,那么,他們都灰飛煙滅了。RabbitMQ重啟之后會干凈的像個新生兒。你必須重做所有的一切,亡羊補牢,如何避免將來再度發生此類杯具?

隊列和交換機有一個創建時候指定的標志durable。durable的唯一含義就是具有這個標志的隊列和交換機會在重啟之后重新建立,它不表示說在隊列當中的消息會在重啟后恢復。那么如何才能做到不只是隊列和交換機,還有消息都是持久的呢?

但是首先需要考慮的問題是:是否真的需要消息的持久化?如果需要重啟后消息可以回復,那么它需要被寫入磁盤。但即使是最簡單的磁盤操作也是要消耗時間的。所以需要衡量判斷。

當你將消息發布到交換機的時候,可以指定一個標志“Delivery Mode”(投遞模式)。根據你使用的AMQP的庫不同,指定這個標志的方法可能不太一樣。簡單的說,就是將Delivery Mode設置成2,也就是持久的(persistent)即可。一般的AMQP庫都是將Delivery Mode設置成1,也就是非持久的。所以要持久化消息的步驟如下:

  1. 將交換機設成 durable。
  2. 將隊列設成 durable。
  3. 將消息的 Delivery Mode 設置成2 。

綁定(Bindings)怎么辦?綁定無法在創建的時候設置成durable。沒問題,如果你綁定了一個durable的隊列和一個durable的交換機,RabbitMQ會自動保留這個綁定。類似的,如果刪除了某個隊列或交換機(無論是不是durable),依賴它的綁定都會自動刪除。

注意:

  • RabbitMQ 不允許你綁定一個非堅固(non-durable)的交換機和一個durable的隊列。反之亦然。要想成功必須隊列和交換機都是durable的。
  • 一旦創建了隊列和交換機,就不能修改其標志了。例如,如果創建了一個non-durable的隊列,然后想把它改變成durable的,唯一的辦法就是刪除這個隊列然后重現創建。因此,最好仔細檢查創建的標志。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM