使用Spring AMQP開發消費者應用


前一篇中我們介紹了使用RabbitMQ Java Client訪問RabbitMQ的方法。但是使用這種方式訪問RabbitMQ,開發者在程序中需要自己管理Connection,Channel對象,Consumer對象的創建,銷毀,這樣會非常不方便。我們下面介紹使用spring AMQP連接RabbitMQ,進行消息的接收和發送。

Spring AMQP是一個Spring子項目,它提供了訪問基於AMQP協議的消息服務器的解決方案。它包含兩部分,spring-ampq是基於AMQP協議的消息發送和接收的高層實現,spring-rabbit是基於RabbitMQ的具體實現。這兩部分我們下面都會使用到。

 

Spring-AMQP中的基礎類/接口

spring-amqp中定義了幾個基礎類/接口,Message,Exchange,Queue,Binding

 

 

Message

[java]  view plain  copy
 
  1. public class Message implements Serializable   
  2. {  
  3.   private final MessageProperties messageProperties;  
  4.    
  5.   private final byte[] body;  

 

spring-amqp中的Message類類似於javax的Message類,封裝了消息的Properties和消息體。

 

Exchange

spring-amqp定義了Exchange接口

 

 

[java] view plain copy
 
  1. public interface Exchange extends Declarable {  
  2.         //Exchange名稱  
  3.     String getName();  
  4.         //Exchange的類型  
  5.     String getType();  
  6.         //Exchange是否持久化  
  7.     boolean isDurable();  
  8.         //Exchange不再被使用時(沒有任何綁定的情況下),是否由RabbitMQ自動刪除  
  9.     boolean isAutoDelete();  
  10.         //Exchange相關的參數  
  11.     Map<String, Object> getArguments();  


這個接口和RabbitMQ Client中的Exchange類相似。 spring-amqp中的Exchange繼承關系如下圖所示

 

AbstractExchange類是所有Exchange類的父類,實現Exchange接口的具體方法。 CustomExchange針對用戶自定義的Exchange對象。其他四個Exchange類,分別對應四種Exchange。 我們在Spring配置文件中配置Exchange對象時,使用的就是這幾種Exchange類。

 

Queue

spring-amqp定義了Queue類,和RabbitMQ Client中的Queue相似,對應RabbitMQ中的消息隊列。

 

 

[java]  view plain  copy
 
  1. public class Queue extends AbstractDeclarable {  
  2.    
  3.     private final String name;  
  4.    
  5.     private final boolean durable;  
  6.    
  7.     private final boolean exclusive;  
  8.    
  9.     private final boolean autoDelete;  
  10.    
  11.     private final java.util.Map<java.lang.String, java.lang.Object> arguments;  
  12.    
  13.         public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {  
  14.         this(name, durable, exclusive, autoDelete, null);  
  15.     }   

 

 

Binding

Binding類是對RabbitMQ中Exchange-Exchange以及Exchange-Queue綁定關系的抽象。

 

 

[java]  view plain  copy
 
  1. public class Binding extends AbstractDeclarable   
  2. {  
  3.    
  4.     public enum DestinationType {  
  5.         QUEUE, EXCHANGE;  
  6.     }  
  7.    
  8.     private final String destination;  
  9.    
  10.     private final String exchange;  
  11.    
  12.     private final String routingKey;  
  13.    
  14.     private final Map<String, Object> arguments;  
  15.    
  16.     private final DestinationType destinationType;  
  17.    
  18.     public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,  
  19.             Map<String, Object> arguments) {  
  20.         this.destination = destination;  
  21.         this.destinationType = destinationType;  
  22.         this.exchange = exchange;  
  23.         this.routingKey = routingKey;  
  24.         this.arguments = arguments;  
  25.     }  


對照RabbitMQ Java Client中Channel接口的queueBind和ExchangeBind方法

 

 

[java]  view plain  copy
 
  1. Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments)   
  2.    
  3. Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)  


我們可以看出Binding類實際是對底層建立的Exchange-Queue和Exchange-Exchange綁定關系的高層抽象記錄類,它使用枚舉類型DestinationType區分Exchange-Queue和Exchange-Exchange兩種綁定。

 

 

Spring AMQP搭建消費者應用

消費者應用程序框架搭建

我們接下來使用spring-amqp搭建一個RabbitMQ的消費者Web應用,我們先創建一個maven webapp應用程序,再添加一個dependency。

 

 

[html]  view plain  copy
 
  1. <dependency>  
  2.     <groupId>org.springframework.amqp</groupId>  
  3.     <artifactId>spring-rabbit</artifactId>  
  4.     <version>1.6.5.RELEASE</version>  
  5.  </dependency>   

 

 

spring-rabbit庫的引入是為了使用它里面的RabbitAdmin類,創建Exchange,Queue和Binding對象,在導入這個庫的時候同時引入了 spring-ampq和rabbitmq-client的庫,不需要另行導入。

在src/main/resources目錄下創建application.properties文件,用於記錄RabbitMQ的配置信息。

 

[plain]  view plain  copy
 
  1. mq.ip=localhost  
  2. mq.port=5672  
  3. mq.userName=rabbitmq_consumer  
  4. mq.password=123456  
  5. mq.virutalHost=test_vhosts  

在src/main/resource目錄下創建applicationContext.xml文件:

 

 

[html]  view plain  copy
 
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2.    
  3. <beans xmlns="http://www.springframework.org/schema/beans"  
  4.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  5.        xmlns:context="http://www.springframework.org/schema/context"  
  6.        xmlns:util="http://www.springframework.org/schema/util"  
  7.        xsi:schemaLocation="  
  8.          http://www.springframework.org/schema/beans  
  9.          http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  
  10.          http://www.springframework.org/schema/util  
  11.          http://www.springframework.org/schema/util/spring-util-4.0.xsd  
  12.          http://www.springframework.org/schema/context  
  13.          http://www.springframework.org/schema/context/spring-context-4.0.xsd" >  
  14.    
  15.     <context:annotation-config/>  
  16.    
  17.     <context:property-placeholder  
  18.             ignore-unresolvable="true" location="classpath*:/application.properties" />  
  19.    
  20.     <!--從RabbitMQ Java Client創建RabbitMQ連接工廠對象-->  
  21.     <bean id="rabbitMQConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">  
  22.         <property name="username" value="${mq.userName}" />  
  23.         <property name="password" value="${mq.password}" />  
  24.         <property name="host" value="${mq.ip}" />  
  25.         <property name="port" value="${mq.port}" />  
  26.         <property name="virtualHost" value="${mq.virutalHost}" />  
  27.     </bean>  
  28.    
  29.     <!--基於RabbitMQ連接工廠對象構建spring-rabbit的連接工廠對象Wrapper-->  
  30.     <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">  
  31.         <constructor-arg name="rabbitConnectionFactory" ref="rabbitMQConnectionFactory" />  
  32.     </bean>  
  33.    
  34.     <!--構建RabbitAmdin對象,它負責創建Queue/Exchange/Bind對象-->  
  35.     <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">  
  36.         <constructor-arg name="connectionFactory" ref="connectionFactory" />  
  37.         <property name="autoStartup" value="true"></property>  
  38.     </bean>  
  39.    
  40.     <!--構建Rabbit Template對象,用於發送RabbitMQ消息,本程序使用它發送返回消息-->  
  41.     <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">  
  42.         <constructor-arg name="connectionFactory" ref="connectionFactory" />  
  43.     </bean>  
  44.    
  45.     <!--RabbitMQ消息轉化器,用於將RabbitMQ消息轉換為AMQP消息,我們這里使用基本的Message Converter -->  
  46.     <bean id="serializerMessageConverter"  
  47.           class="org.springframework.amqp.support.converter.SimpleMessageConverter" />  
  48.    
  49.     <!--Message Properties轉換器,用於在spring-amqp Message對象中的Message Properties和RabbitMQ的  
  50.      Message Properties對象之間互相轉換 -->        
  51.     <bean id="messagePropertiesConverter"  
  52.           class="org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter" />        
  53.    
  54.     <!--定義AMQP Queue-->  
  55.     <bean id="springMessageQueue" class="org.springframework.amqp.core.Queue">  
  56.         <constructor-arg name="name" value="springMessageQueue" />  
  57.         <constructor-arg name="autoDelete" value="false" />  
  58.         <constructor-arg name="durable" value="true" />  
  59.         <constructor-arg name="exclusive" value="false" />  
  60.         <!--定義AMQP Queue創建所需的RabbitAdmin對象-->  
  61.         <property name="adminsThatShouldDeclare" ref="rabbitAdmin" />  
  62.         <!--判斷是否需要在連接RabbitMQ后創建Queue-->  
  63.         <property name="shouldDeclare" value="true" />  
  64.     </bean>  
  65.    
  66.     <!--定義AMQP Exchange-->  
  67.     <bean id="springMessageExchange" class="org.springframework.amqp.core.DirectExchange">  
  68.         <constructor-arg name="name" value="springMessageExchange" />  
  69.         <constructor-arg name="durable" value="true" />  
  70.         <constructor-arg name="autoDelete" value="false" />  
  71.         <!--定義AMQP Queue創建所需的RabbitAdmin對象-->  
  72.         <property name="adminsThatShouldDeclare" ref="rabbitAdmin" />  
  73.         <!--判斷是否需要在連接RabbitMQ后創建Exchange-->  
  74.         <property name="shouldDeclare" value="true" />  
  75.     </bean>  
  76.    
  77.     <util:map id="emptyMap" map-class="java.util.HashMap" />  
  78.    
  79.     <!--創建Exchange和Queue之間的Bind-->  
  80.     <bean id="springMessageBind" class="org.springframework.amqp.core.Binding">  
  81.         <constructor-arg name="destination" value="springMessageQueue" />  
  82.         <constructor-arg name="destinationType" value="QUEUE" />  
  83.         <constructor-arg name="exchange" value="springMessageExchange" />  
  84.         <constructor-arg name="routingKey" value="springMessage" />  
  85.         <constructor-arg name="arguments" ref="emptyMap" />  
  86.     </bean>  
  87.    
  88.     <!--偵聽springMessageQueue隊列消息的Message Listener-->  
  89.     <bean id="consumerListener"   
  90.         class="com.qf.rabbitmq.listener.RabbitMQConsumer" />  
  91.    
  92.     <!--創建偵聽springMessageQueue隊列的Message Listener Container-->  
  93.     <bean id="messageListenerContainer"  
  94.           class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">  
  95.         <property name="messageConverter" ref="serializerMessageConverter" />  
  96.         <property name="connectionFactory" ref="connectionFactory" />  
  97.         <property name="messageListener" ref="consumerListener" />  
  98.         <property name="queues" ref="springMessageQueue" />  
  99.         <!--設置消息確認方式為自動確認-->  
  100.         <property name="acknowledgeMode" value="AUTO" />  
  101.     </bean>  
  102. </beans>  

我們定義了偵聽消息隊列的Message Listener類RabbitMQConsumer

 

 

[java]  view plain  copy
 
  1. public class RabbitMQConsumer implements MessageListener  
  2. {  
  3.     @Autowired  
  4.     private MessagePropertiesConverter messagePropertiesConverter;  
  5.    
  6.     @Override  
  7.     public void onMessage(Message message)  
  8.     {  
  9.         try   
  10.         {  
  11.              //spring-amqp Message對象中的Message Properties屬性  
  12.              MessageProperties messageProperties = message.getMessageProperties();               
  13.              //使用Message Converter將spring-amqp Message對象中的Message Properties屬性  
  14.              //轉換為RabbitMQ 的Message Properties對象  
  15.              AMQP.BasicProperties rabbitMQProperties =  
  16.                 messagePropertiesConverter.fromMessageProperties(messageProperties, "UTF-8");               
  17.              System.out.println("The message's correlationId is:" + rabbitMQProperties.getCorrelationId());  
  18.              String messageContent = null;  
  19.              messageContent = new String(message.getBody(),"UTF-8");  
  20.              System.out.println("The message content is:" + messageContent);  
  21.         }   
  22.         catch (UnsupportedEncodingException e) {  
  23.             e.printStackTrace();  
  24.         }  
  25.     }  
  26. }  

上面的Listener類是實現了MessageListener接口的類,當容器接收到消息后,會自動觸發onMessage方法。 如果我們想使用普通的POJO類作為Message Listener,需要引入org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter類

 

 

[java]  view plain  copy
 
  1. public class MessageListenerAdapter extends AbstractAdaptableMessageListener {  
  2.    
  3.   public MessageListenerAdapter(Object delegate) {  
  4.         doSetDelegate(delegate);  
  5.     }  
  6. }  

這里的delegate對象就是我們的POJO對象。 假設我們定義一個Delegate類ConsumerDelegate

 

 

[java]  view plain  copy
 
  1. public class ConsumerDelegate  
  2. {  
  3.     public void processMessage(Object message)  
  4.     {  
  5.        //這里接收的消息對象僅是消息體,不包含MessageProperties  
  6.        //如果想獲取帶MessageProperties的消息對象,需要在Adpater中  
  7.        //定義MessageConverter屬性。  
  8.        String messageContent = message.toString();  
  9.        System.out.println(messageContent);  
  10.     }  
  11. }  

在applicationContext.xml中定義Adapter對象,引用我們的Delegate對象。

 

 

[html]  view plain  copy
 
  1. <bean id="consumerDelegate"  
  2.          class="com.qf.rabbitmq.listener.ConsumerDelegate" />  
  3.   
  4. <bean id="consumerListenerAdapter"  
  5.          class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">  
  6.        <property name="delegate" ref="consumerDelegate" />  
  7.        <!--指定delegate處理消息的默認方法 -->  
  8.        <property name="defaultListenerMethod" value="processMessage" />  
  9. </bean>  

最后將Message Listener Container中的Message Listener指向Adapter對象。

 

 

[html] view plain copy
 
  1. <bean id="messageListenerContainer"  
  2.           class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">  
  3.         <property name="messageConverter" ref="serializerMessageConverter" />  
  4.         <property name="connectionFactory" ref="connectionFactory" />  
  5.         <!--設置Message Listener為Adapter對象 -->  
  6.         <property name="messageListener" ref="consumerListenerAdapter"/>  
  7.         <property name="queues" ref="springMessageQueue" />  
  8.         <property name="acknowledgeMode" value="AUTO" />  
  9.  </bean>  

啟動Web應用后,我們從啟動日志信息可以看出應用連接上了RabbitMQ服務器

 



從RabbitMQ的管理界面(用rabbitmq_consumer用戶登錄)可以看到springMessageExchange和springMessageQueue已經創建,綁定關系也已經創建。







 

 

Consumer Tag自定義

連接springMessageQueue的消費者Tag是RabbitMQ隨機生成的Tag名

 

 

如果我們想設置消費者Tag為指定Tag,我們可以在Message Listener Container中 設置自定義consumer tag strategy。首先我們需要定義一個Consumer Tag Strategy類,它實現了ConsumerTagStrategy接口。

 

[java]  view plain  copy
 
  1. public class CustomConsumerTagStrategy implements ConsumerTagStrategy  
  2. {  
  3.     @Override  
  4.     public String createConsumerTag(String queue) {  
  5.         String consumerName = "Consumer1";  
  6.         return consumerName + "_" + queue;  
  7.     }  
  8. }  

在applicationContext.xml中設定自定義ConsumerTagStrategy

 

 

[html]  view plain  copy
 
  1. <bean id="consumerTagStrategy" class="com.qf.rabbitmq.strategy.CustomConsumerTagStrategy" />  
  2.  <!--創建偵聽springMessageQueue隊列的Message Listener Container-->  
  3.  <bean id="messageListenerContainer"  
  4.           class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">  
  5.      <property name="messageConverter" ref="serializerMessageConverter" />  
  6.      <property name="connectionFactory" ref="connectionFactory" />  
  7.      <property name="messageListener" ref="consumerListener" />  
  8.      <property name="queues" ref="springMessageQueue" />  
  9.      <property name="acknowledgeMode" value="AUTO" />  
  10.      <property name="consumerTagStrategy" ref="consumerTagStrategy" />  
  11.   </bean>  


再次啟動Web應用,查看RabbitMQ管理界面,我們可以看到Consumer Tag已經變成“Consumer1_springMessageQueue”,正如我們在CustomConsumerTagStrategy中設定的那樣。

 



消費者應用接收消息驗證

我們編寫了一個生產者程序,向springMessageExchange發送消息。 生產者的主要代碼如下,由於Exchange,Queue,Bind已經由消費者Web應用創建,因此生產者程序不再創建。
 
[java]  view plain  copy
 
  1. ConnectionFactory factory = new ConnectionFactory();  
  2. factory.setHost("localhost");  
  3. factory.setPort(5672);  
  4. factory.setUsername("rabbitmq_producer");  
  5. factory.setPassword("123456");  
  6. factory.setVirtualHost("test_vhosts");  
  7.    
  8. //創建與RabbitMQ服務器的TCP連接  
  9. connection  = factory.newConnection();  
  10. channel = connection.createChannel();  
  11.    
  12. String message = "First Web RabbitMQ Message";  
  13.    
  14. String correlationId = UUID.randomUUID().toString();  
  15. AMQP.BasicProperties props = new AMQP.BasicProperties  
  16.                     .Builder()  
  17.                     .correlationId(correlationId)  
  18.                     .build();  
  19.    
  20. channel.basicPublish("springMessageExchange","springMessage", props, message.getBytes());  

啟動消費者Web應用,從控制台輸出信息可以看到消費者接收到了生產者發送的消息。

設置消息手動確認模式

到目前為止,消費者端的Web應用對消息的確認是自動確認模式,如果我們想改為手動確認方式,需要做以下兩點改動:

1)修改applicationContext.xml文件中Message Listener Container的acknowledgeMode屬性的值為MANUAL。

 

[html]  view plain  copy
 
  1. <bean id="messageListenerContainer"  
  2.           class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">  
  3.     ......  
  4.     <property name="acknowledgeMode" value="MANUAL" />   
  5. </bean>  

 

2)將自定義的Message Listener類從實現org.springframework.amqp.core.MessageListener接口,改為實現 org.springframework.amqp.rabbit.core.ChannelAwareMessageListener接口,實現它的 onMessage(Message,Channel)方法。

 

[java]  view plain  copy
 
  1. public class RabbitMQConsumer implements ChannelAwareMessageListener  
  2. {  
  3.     ...........  
  4.    
  5.     @Override  
  6.     public void onMessage(Message message, Channel channel)   
  7.     {  
  8.         try   
  9.         {  
  10.              //spring-amqp Message對象中的Message Properties屬性  
  11.              MessageProperties messageProperties = message.getMessageProperties();               
  12.              //使用Message Converter將spring-amqp Message對象中的Message Properties屬性  
  13.              //轉換為RabbitMQ 的Message Properties對象  
  14.              AMQP.BasicProperties rabbitMQProperties =  
  15.                     messagePropertiesConverter.fromMessageProperties(messageProperties, "UTF-8");               
  16.              System.out.println("The message's correlationId is:" + rabbitMQProperties.getCorrelationId());  
  17.              String messageContent = null;  
  18.              messageContent = new String(message.getBody(),"UTF-8");  
  19.              System.out.println("The message content is:" + messageContent);  
  20.              channel.basicAck(messageProperties.getDeliveryTag(), false);  
  21.         }  
  22.         catch (IOException e) {  
  23.             e.printStackTrace();  
  24.         }  
  25.     }  
  26. }  

 

 

onMessage方法的最后一句代碼調用Channel.basicAck方法對消息進行手動確認。再次運行生產者和消費者程序后,我們登錄管理界面,從管理界面中可以看到springMessageQueue隊列中未確認消息條數 (圖中Unacked列)為0條,說明消費者接收消息后已經手動確認。

 

RPC模式設置

如果生產者和消費者Web應用之間使用RPC模式,即消費者接收消息后要向指定Exchange/Queue發送返回消息,我們需要修改生產者和消費者的程序。 消費者程序修改點如下:

1)在applicationContext.xml中定義返回消息對應的Exchange,Queue和Bind。

 

[java]  view plain  copy
 
  1. <!--定義AMQP Reply Queue-->  
  2. <bean id="springReplyMessageQueue" class="org.springframework.amqp.core.Queue">  
  3.         <constructor-arg name="name" value="springReplyMessageQueue" />  
  4.         <constructor-arg name="autoDelete" value="false" />  
  5.         <constructor-arg name="durable" value="true" />  
  6.         <constructor-arg name="exclusive" value="false" />  
  7.         <property name="adminsThatShouldDeclare" ref="rabbitAdmin" />  
  8.         <property name="shouldDeclare" value="true" />  
  9.     </bean>  
  10.    
  11.     <!--定義AMQP Reply Exchange-->  
  12.     <bean id="springReplyMessageExchange" class="org.springframework.amqp.core.DirectExchange">  
  13.         <constructor-arg name="name" value="springReplyMessageExchange" />  
  14.         <constructor-arg name="durable" value="true" />  
  15.         <constructor-arg name="autoDelete" value="false" />  
  16.         <!--定義AMQP Queue創建所需的RabbitAdmin對象-->  
  17.         <property name="adminsThatShouldDeclare" ref="rabbitAdmin" />  
  18.         <property name="shouldDeclare" value="true" />  
  19.     </bean>  
  20.    
  21.     <!--創建Reply Exchange和Reply Queue之間的Bind-->  
  22.     <bean id="springReplyMessageBind" class="org.springframework.amqp.core.Binding">  
  23.         <constructor-arg name="destination" value="springReplyMessageQueue" />  
  24.         <constructor-arg name="destinationType" value="QUEUE" />  
  25.         <constructor-arg name="exchange" value="springReplyMessageExchange" />  
  26.         <constructor-arg name="routingKey" value="springReplyMessage" />  
  27.         <constructor-arg name="arguments" ref="emptyMap" />  
  28. </bean>  
2)修改自定義Message Listener類的onMessage方法,添加發送返回消息的代碼

 

 

[java]  view plain  copy
 
  1. public void onMessage(Message message, Channel channel) {  
  2. try   
  3.   {  
  4.     ......................  
  5.     String replyMessageContent = "Consumer1 have received the message '" + messageContent + "'";  
  6.     channel.basicPublish(rabbitMQProperties.getReplyTo(), "springReplyMessage",  
  7.     rabbitMQProperties, replyMessageContent.getBytes());  
  8.     ......................  

這里發送返回消息直接使用接收消息時創建的Channel通道,不過如果我們的Message Listener類是繼承自MessageListener接口,無法獲得Channel對象時,我們需要使用RabbitTemplate對象進行返回消息的發送(我們前面已經在applicationContext.xml中定義了這個對象)

 

 

[java]  view plain  copy
 
  1. public class RabbitMQConsumer implements MessageListener  
  2. {   
  3.    @Autowired  
  4.    private MessagePropertiesConverter messagePropertiesConverter;  
  5.    
  6.    @Autowired  
  7.    private RabbitTemplate rabbitTemplate;  
  8.    
  9.    @Override  
  10.    public void onMessage(Message message)   
  11.    {  
  12.     ..........  
  13.     //創建返回消息的RabbitMQ Message Properties  
  14.     AMQP.BasicProperties replyRabbitMQProps =  
  15.              new AMQP.BasicProperties("text/plain",  
  16.                            "UTF-8",  
  17.                             null,  
  18.                             2,  
  19.                             0, rabbitMQProperties.getCorrelationId(), null, null,  
  20.                             null, null, null, null,  
  21.                             null, null);  
  22.     //創建返回消息的信封頭  
  23.     Envelope replyEnvelope =  
  24.              new Envelope(messageProperties.getDeliveryTag(), true,   
  25.                         "springReplyMessageExchange", "springReplyMessage");  
  26.    
  27.     //創建返回消息的spring-amqp Message Properties屬性  
  28.     MessageProperties replyMessageProperties =  
  29.              messagePropertiesConverter.toMessageProperties(replyRabbitMQProps,   
  30.                         replyEnvelope,"UTF-8");  
  31.    
  32.     //構建返回消息(spring-amqp消息)  
  33.     Message replyMessage = MessageBuilder.withBody(replyMessageContent.getBytes())  
  34.                                          .andProperties(replyMessageProperties)  
  35.                                          .build();  
  36.    
  37.     rabbitTemplate.send("springReplyMessageExchange","springReplyMessage", replyMessage);   
生產者程序添加對返回消息隊列偵聽的Consumer

 

 

[java]  view plain  copy
 
  1. String correlationId = UUID.randomUUID().toString();  
  2. AMQP.BasicProperties props = new AMQP.BasicProperties  
  3.                     .Builder()  
  4.                     .correlationId(correlationId)  
  5.                     .replyTo("springReplyMessageExchange")  
  6.                     .build();  
  7.    
  8. channel.basicPublish("springMessageExchange","springMessage", props, message.getBytes());  
  9.    
  10. QueueingConsumer replyCustomer = new QueueingConsumer(channel);  
  11. channel.basicConsume("springReplyMessageQueue",true,"Producer Reply Consumer", replyCustomer);  
  12.    
  13. String responseMessage = null;  
  14.    
  15. while(true)  
  16. {  
  17.    QueueingConsumer.Delivery delivery = replyCustomer.nextDelivery();  
  18.    String messageCorrelationId = delivery.getProperties().getCorrelationId();  
  19.    if (messageCorrelationId != null && messageCorrelationId.equals(correlationId))   
  20.    {  
  21.        responseMessage = new String(delivery.getBody());  
  22.        System.out.println("The reply message's correlation id is:" + messageCorrelationId);  
  23.        break;  
  24.    }  
  25. }  
  26. if(responseMessage != null)  
  27. {  
  28.   System.out.println("The repsonse message is:'" + responseMessage + "'");  
  29. }  
啟動修改后的生產者和消費者程序,我們從生產者的控制台界面可以看到它接收到了消費者發送的返回消息。

 

消費者控制台

生產者控制台

 

消費者並發數設置

到目前為止,消費者Web應用消費消息時,只有一個消費者接收並消費springMessageQueue隊列的消息(如下圖所示)

 

 

如果發送的消息量比較大時,我們需要增加消費者的數目。

增加消費者數目要修改Message Listener Container的concurrentConsumers和maxConcurrentConsumers屬性,concurrentConsumers屬性是Message Listener Container創建時創建的消費者數目,maxConcurrentConsumers屬性是容器最大的消費者數目,我們下面把這兩個屬性都設置為5,使Message Listener Container中有5個消費者,同時修改CustomerConsumerTagStrategy類,在Tag中加入線程名,以區分不同的消費者。

 

[html]  view plain  copy
 
  1. <bean id="messageListenerContainer"  
  2.           class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">  
  3.         ............  
  4.         <property name="consumerTagStrategy" ref="consumerTagStrategy" />  
  5.         <property name="concurrentConsumers" value="5" />  
  6.         <property name="maxConcurrentConsumers" value="5" />  
  7. </bean>  

[java]  view plain  copy
 
  1. public class CustomConsumerTagStrategy implements ConsumerTagStrategy  
  2. {  
  3.     @Override  
  4.     public String createConsumerTag(String queue) {  
  5.         String consumerName = "Consumer_" + Thread.currentThread().getName();  
  6.         return consumerName + "_" + queue;  
  7.     }  
  8. }  
啟動消費者Web應用,從管理頁面可以看到連接springMessageQueue隊列的有5個消費者。

 

修改生產者程序,循環發送50條消息

 

[java]  view plain  copy
 
  1. ReplyConsumer replyCustomer = new ReplyConsumer(channel);  
  2. channel.basicConsume("springReplyMessageQueue",true,"Producer Reply Consumer", replyCustomer);  
  3.    
  4. for(int i=0; i<50; i++)  
  5. {  
  6.    String correlationId = UUID.randomUUID().toString();  
  7.    String message = "Web RabbitMQ Message " + i;  
  8.    
  9.    AMQP.BasicProperties props =   
  10.                    new AMQP.BasicProperties  
  11.                         .Builder()  
  12.                         .contentType("text/plain")  
  13.                         .deliveryMode(2)  
  14.                         .correlationId(correlationId)  
  15.                         .replyTo("springReplyMessageExchange")  
  16.                         .build();  
  17.    
  18.    channel.basicPublish("springMessageExchange","springMessage", props, message.getBytes());  
  19. }  
在修改的生產者代碼中,我們將Consumer代碼抽出,定義了ReplyCustomer類

 

 

[java]  view plain  copy
 
  1. public class ReplyConsumer extends DefaultConsumer  
  2. {  
  3.     public ReplyConsumer(Channel channel)  
  4.     {  
  5.         super(channel);  
  6.     }  
  7.    
  8.     @Override  
  9.     public void handleDelivery(String consumerTag,  
  10.                                          Envelope envelope,  
  11.                                          AMQP.BasicProperties properties,  
  12.                                          byte[] body)  
  13.             throws IOException  
  14.     {  
  15.         String consumerName = properties.getAppId();  
  16.         String replyMessageContent = new String(body, "UTF-8");  
  17.         System.out.println("The reply message's sender is:" + consumerName);  
  18.         System.out.println("The reply message is '" + replyMessageContent + "'");  
  19.     }  
  20. }  
修改消費者的Message Listener消息,將Consumer Tag作為參數,放在返回消息的Properties中,返回給生產者。
[java]  view plain  copy
 
  1. public void onMessage(Message message, Channel channel)  
  2. {  
  3.  try   
  4.  {  
  5.    String consumerTag = messageProperties.getConsumerTag();  
  6.    String replyMessageContent = consumerTag + " have received the message '" + messageContent + "'";  
  7.    
  8.    AMQP.BasicProperties replyRabbitMQProps =  
  9.                     new AMQP.BasicProperties("text/plain",  
  10.                             "UTF-8",  
  11.                             null,  
  12.                             2,  
  13.                             0, rabbitMQProperties.getCorrelationId(), null, null,  
  14.                             null, null, null, null,  
  15.                             consumerTag, null);   
  16.    .............      
修改消費者的CustomConsumerTagStrategy類,用“Consumer” + “_” + 線程名作為Consumer Tag。

 

 

[java]  view plain  copy
 
  1. public class CustomConsumerTagStrategy implements ConsumerTagStrategy  
  2. {  
  3.     @Override  
  4.     public String createConsumerTag(String queue) {  
  5.         String consumerName = "Consumer_" + Thread.currentThread().getName();  
  6.         return consumerName;  
  7.     }  
  8. }  
修改完成后,啟動生產者和消費者程序,通過查看生產者的控制台輸出,我們可以看到多個消費者接收了生產者發送的消息,發送了返回消息給生產者。

 



消費者消息預取數設置

上述的消費者Web應用中,每個消費者每次從隊列中獲取1條消息,如果我們想讓每個消費者一次性從消息隊列獲取多條消息,需要修改Message Listener Container的prefetchCount屬性,這樣可以提高RabbitMQ的消息處理吞吐量。

 

 

[html]  view plain  copy
 
  1. <span style="font-size:10px;"><bean id="messageListenerContainer"  
  2.           class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">  
  3.           <property name="prefetchCount" value="5" />  
  4. </bean></span>  
這個屬性值最終被設置為底層Rabbit Client的Channel接口的basicQos方法參數

 

 

[java]  view plain  copy
 
  1. /** 
  2.      * Request a specific prefetchCount "quality of service" settings 
  3.      * for this channel. 
  4.      * 
  5.      * @see #basicQos(int, int, boolean) 
  6.      * @param prefetchCount maximum number of messages that the server 
  7.      * will deliver, 0 if unlimited 
  8.      * @throws java.io.IOException if an error is encountered 
  9. */  
  10. void basicQos(int prefetchCount) throws IOException  

 

這個方法設置從Channel上一次性可以讀取多少條消息,我們在Container設置的PrefetchCount值為5,表示從一個消費者Channel上,一次性可以與預讀取5條消息,按我們上面設置的5個消費者,5個消費者Channel計算,一次性可以預讀取25條消息。為了證實這一點,我們修改消費者的代碼,延長它處理一條消息的時間。

需要說明的是,對於每個消費者而言,只有一條預取的消息被接收且確認后,消費者才會再從消息隊列中讀取消息,並不是消費者在消息沒有確認完成前,每次都從隊列里預讀取prefetchCount條消息。

 

[java]  view plain  copy
 
  1. public void onMessage(Message message, Channel channel) {  
  2. try   
  3.     {  
  4.      ...........  
  5.      String messageContent = null;  
  6.      messageContent = new String(message.getBody(),"UTF-8");  
  7.      String consumerTag = messageProperties.getConsumerTag();  
  8.      String replyMessageContent = consumerTag + " have received the message '" + messageContent + "'";  
  9.    
  10.      Thread.sleep(60000);  
  11.    
  12.      ...........  
  13.      rabbitTemplate.send("springReplyMessageExchange","springReplyMessage", replyMessage);  
  14.      channel.basicAck(messageProperties.getDeliveryTag(), false);   

 

我們在onMessage方法中添加Thread.sleep(60000),使得處理一條消息時間時間大於1分鍾,便於查看消息預取的效果,而且使用手動確認方式。

生產者程序改為一次性發送200條消息。

啟動生產者程序,發送200條消息,我們可以看到springMessageQueue隊列里有200條處於Ready狀態的消息

啟動消費者程序,我們可以看到springMessageQueue隊列里有25條消息被預取了,Ready狀態的消息從200條變成了175條,而未確認狀態的消息數(Unacked列)變成了25條,即25條被預取,但是沒有被確認的消息。

過了一段時間,等5個消費者確認了5條消息后,又從消息隊列預讀取了5條消息,Ready狀態的消息數變成170條,這時的消息隊列的消息數如下圖所示:

 

未確認的消息數仍然是25條,但是總的消息數變成了195條,表示已經有5條消息被處理且確認了。

隨着消息逐漸被處理,確認,消費者會逐漸從消息隊列預取新的消息,直到所有的消息都被處理和確認完成。

rabbit標簽使用

上面的消費者Web應用使用了Spring傳統的beans元素定義,spring-rabbit提供了rabbit namespace,我們可以在applicationContext.xml中使用rabbit:xxx形式的元素標簽,簡化我們的xml配置。 我們首先在applicationContext.xml的namespace定義中添加rabbit namespace定義:
[html]  view plain  copy
 
  1. <beans xmlns="http://www.springframework.org/schema/beans"  
  2.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  3.        xmlns:context="http://www.springframework.org/schema/context"  
  4.        xmlns:util="http://www.springframework.org/schema/util"  
  5.        xmlns:rabbit="http://www.springframework.org/schema/rabbit"  
  6.        xsi:schemaLocation="  
  7.          http://www.springframework.org/schema/beans  
  8.          http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  
  9.          http://www.springframework.org/schema/util  
  10.          http://www.springframework.org/schema/util/spring-util-4.0.xsd  
  11.          http://www.springframework.org/schema/rabbit  
  12.          http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd  
  13.          http://www.springframework.org/schema/context  
  14.          http://www.springframework.org/schema/context/spring-context-4.0.xsd" >  
RabbitMQ Client ConnectionFactory的bean定義不需要修改,我們修改CachingConnectionFactory bean對象的定義
[html]  view plain  copy
 
  1. <pre name="code" class="html"><span style="font-size:10px;"><rabbit:connection-factory id ="connectionFactory" connection-factory="rabbitMQConnectionFactory" /></span></pre>  
  2. <pre></pre>  
  3. <span style="color:rgb(51,51,51); font-family:Arial,sans-serif"><span style="font-size:10px">修改RabbitAdmin bean對象定義,使用rabbit:admin標簽</span></span>  
  4. <pre></pre>  
  5. <pre></pre>  
  6. <pre></pre>  
[html]  view plain  copy
 
  1. <rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" auto-startup="true"/>  
修改rabbitTemplate定義,使用rabbit:template標簽
[html]  view plain  copy
 
  1. <rabbit:template connection-factory="connectionFactory" />  

MessageConverter和MessageProperties對象沒有對應的rabbit標簽,仍然使用bean標簽。
修改Queue,Exchange和Bind定義,分別使用rabbit:queue,rabbit:exchange標簽,Bind的內容放到了Exchange bean定義內部。

[html]  view plain  copy
 
  1. <rabbit:queue id="springMessageQueue" name="springMessageQueue" auto-delete="false"  
  2.            durable="true" exclusive="false" auto-declare="false" declared-by="rabbitAdmin" />  
  3.    
  4. <rabbit:direct-exchange id="springMessageExchange" name="springMessageExchange" durable="true"  
  5.                             auto-declare="false" auto-delete="false" declared-by="rabbitAdmin">  
  6.     <rabbit:bindings>  
  7.         <rabbit:binding queue="springMessageQueue" key="springMessage"></rabbit:binding>  
  8.     </rabbit:bindings>  
  9. </rabbit:direct-exchange>  
最后使用rabbit:listener-container修改Message Listener Container bean對象。
[html]  view plain  copy
 
  1. <rabbit:listener-container message-converter="serializerMessageConverter"  
  2.                                connection-factory="connectionFactory"  
  3.                                acknowledge="manual"  
  4.                                consumer-tag-strategy="consumerTagStrategy"  
  5.                                concurrency="5"  
  6.                                max-concurrency="5"  
  7.                                prefetch="5">  
  8.         <rabbit:listener ref="consumerListener" queues="springMessageQueue"/>  
  9. </rabbit:listener-container>  
如果上面沒有創建queue的bean對象,這里的rabbit:listener中的queues屬性也可以改成queueNames屬性
[html]  view plain  copy
 
  1. <rabbit:listener ref="consumerListener" queue-names="springMessageQueue"/>  

這里如果Listener關聯多個隊列,設置queues屬性或者queue-names屬性時可以用逗號進行分割,例如:

[html]  view plain  copy
 
  1. <pre name="code" class="html" style="color: rgb(51, 51, 51);"><rabbit:listener ref="consumerListener" queue-names="messageQueue1,messageQueue2"/></pre>  
  2. <pre></pre>  
  3. <pre></pre>  
  4. <pre></pre>  
  5. <pre></pre>  

使用rabbit標簽雖然可以簡化RabbitMQ相關對象的bean定義,但是它也有局限性:

1)標簽對應的bean對象類型是固定的,例如rabbit:listener-container標簽對應的Listener Container是SimpleMessageListenerContainer類,如果我們想使用其他MessageListenerContainer類或者自定義Message Listener Container類,就不能使用rabbit標簽。
 

2)有的標簽無法設置id和name屬性,這樣一旦有多個同類型的bean對象定義時,就不能使用rabbit標簽。

 

 

RabbitMQ的Channel和Connection緩存

spring-rabbit中的CachingConnectionFactory類提供了Connection和Channel級別的緩存,如果我們沒有做任何設置,默認的緩存模式是Channel模式,Channel緩存默認最大數是25,所有的Channel復用一個Connection。我們在Message Listener Container中設置並發數為5,啟動消費者應用后,我們從管理界面可以看到一個消費者Connection,5個Channel。
 
 
 
 
重新啟動消費者應用后,我們可以看到有30個Channel被創建,但是只能有25個Channel被緩存,其他5個Channel只是臨時存在於內存中,一旦不被使用,會被自動銷毀,不會被回收到Channel緩存池中被復用。
 
 
如果我們想修改Channel模式下的最大緩存數的話,我們可以進行如下修改:
 
[html]  view plain  copy
 
  1. <rabbit:connection-factory id ="connectionFactory"  
  2.                                 connection-factory="rabbitMQConnectionFactory"  
  3.                                 cache-mode="CHANNEL"  
  4.                                 channel-cache-size="30" />  
 
我們也可以設置緩存模式為Connection模式,設置最大連接緩存數為10
[html]  view plain  copy
 
  1. <rabbit:connection-factory id ="connectionFactory"  
  2.                                 connection-factory="rabbitMQConnectionFactory"  
  3.                                 cache-mode="CONNECTION"  
  4.                                 connection-cache-size="10" />  
如果我們的Message Listener Container的消費者並發數小於最大緩存數,例如為5,管理界面中只顯示有5個Connection,每個Connection上一條Channel。
 

如果消費者並發數大於最大緩存數,例如並發數為20,會出現與並發數對應的連接數,但是只有5個Connection能夠被緩存,其他Connection,如果不再被使用,會被RabbitMQ自動銷毀。
 
 
我們還可以設置Connection的上限,使用CachingConnectionFactory的connectionLimit屬性
 
[java]  view plain  copy
 
  1. public class CachingConnectionFactory extends AbstractConnectionFactory  
  2. {  
  3.   ................  
  4.   private volatile int connectionLimit = Integer.MAX_VALUE;  

這個屬性默認值是Integer.MAX_VALUE,可以理解為無上限,我們可以在applicationContext.xml中設置這個值為10。
[html]  view plain  copy
 
  1. <rabbit:connection-factory id ="connectionFactory"  
  2.                                 connection-factory="rabbitMQConnectionFactory"  
  3.                                 connection-limit="10"  
  4.                                 cache-mode="CONNECTION"  
  5.                                 connection-cache-size="10" />  
此時如果Message Listener Container的Message Listener總並發數大於這個上限,會拋出無法獲取連接的異常。
[html]  view plain  copy
 
  1. <span style="font-size:10px;"><rabbit:listener-container   
  2.                                .............  
  3.                                concurrency="4"  
  4.                                max-concurrency="4">  
  5.         <rabbit:listener ref="Listener1" queues="messageQueue1"/>  
  6.     <rabbit:listener ref="Listener2" queues="messageQueue2"/>  
  7.     <rabbit:listener ref="Listener3" queues="messageQueue3"/>  
  8. </rabbit:listener-container></span>  

例如上面的Container中,一共定義了三個Listener,每個Listener的並發數是4,總的並發數為12,超過了上線10,因此拋出以下異常:

 

[java]  view plain  copy
 
  1. 一月 03, 2017 10:15:28 上午 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer redeclareElementsIfNecessary  
  2. 嚴重: Failed to check/redeclare auto-delete queue(s).  
  3. org.springframework.amqp.AmqpTimeoutException: Timed out attempting to get a connection  
  4.     at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:575)  
  5.     ..............  
此時,消費者應用與RabbitMQ服務器之間的Connection數只有上限數10條。

 

Spring AMQP的重連機制

我們在使用1中介紹了RabbitMQ Java Client提供的重連機制,Spring AMQP也提供了重連機制。我們可以使用Rabbit Java Client的重連設置,我們修改applicationContext.xml中“rabbitMQConnectionFactory”的重連屬性設置。

 

 

[html]  view plain  copy
 
  1. <bean id="rabbitMQConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">  
  2.         ...................  
  3.         <property name="automaticRecoveryEnabled" value="true" />  
  4.         <property name="topologyRecoveryEnabled" value="true" />  
  5.         <property name="networkRecoveryInterval" value="60000" />  
  6. </bean>  

我們啟動消費者應用程序,打開管理頁面,可以看到消費者應用創建了5個Connection,每個Connection下分別創建了一個Channel,對應5個Consumer。

 

 

我們停止RabbitMQ服務器,可以看到消費者控制台輸出連接異常信息,不停試圖恢復Consumer。


重新啟動RabbitMQ服務器,從日志信息可以看出連接被重置,消費者被恢復。

 

登錄管理界面,可以看到原先的5條Channel已經被恢復,但是本地連接端口號與之前的Channel不再一致。

點開一條Channel進去,可以看到連接Channel的Consumer Tag與最初的Consumer Tag也不一致,這可能是因為我們使用了自定義ConsumerTagStrategy,使用線程名為Tag名的原因。

我們也可以禁用RabbitMQ Java Client的重連設置,設置automaticRecoveryEnabled和topologyRecoveryEnabled屬性為false。

 

[html]  view plain  copy
 
  1. <span style="font-size:10px;"><bean id="rabbitMQConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">  
  2.    <property name="automaticRecoveryEnabled" value="false" />  
  3.    <property name="topologyRecoveryEnabled" value="false" />  
  4. </bean></span>  
我們再啟動消費者應用,可以看到初始有5個Connection,5個Channel,每個Channel對應一個Connection。


 

 

當我們重啟RabbitMQ服務器后,發現只有4個Connection恢復,5個Channel被恢復,但是有兩個Channel復用同一個Connection,這一點與 使用RabbitMQ Java Client的重連機制時有所不同。

當執行RabbitMQ重連時,Message Listener Container也會對Consumer進行重新恢復,它的恢復間隔是由recoveryBackOff屬性決定的。

 

[java]  view plain  copy
 
  1. public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer  
  2.         implements ApplicationEventPublisherAware {  
  3.       ..........  
  4.       private BackOff recoveryBackOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);  

 

 

SimpleMessageListenerContainer類的recoveryBackOff屬性對象有兩個屬性,一個是恢復間隔,默認值是DEFAULT_RECOVERY_INTERVAL常量(5000ms,即每5秒試圖進行一次恢復),還有一個嘗試恢復次數,默認值是FixedBackOff.UNLIMITED_ATTEMPTS(Long.MaxValue,可以認為是無限次嘗試)。我們可以根據需要 設置自己的recoveryBackOff屬性,例如下面我們把恢復間隔設置為60000ms,嘗試次數設置為100次。

 

[html]  view plain  copy
 
  1. <bean id="backOff" class="org.springframework.util.backoff.FixedBackOff">  
  2.         <constructor-arg name="interval" value="60000" />  
  3.         <constructor-arg name="maxAttempts" value="100" />  
  4. </bean>  
  5. <rabbit:listener-container message-converter="serializerMessageConverter"  
  6.                                ..........  
  7.                                recovery-back-off="backOff">   
  8.     <rabbit:listener ref="consumerListener" queues="springMessageQueue"/>  
  9. </rabbit:listener-container>  

 

修改后啟動消費者應用,停掉RabbitMQ服務器,我們從異常日志可以看出Message Listener Container的重試間隔變成了1分鍾,而不是默認的5000ms。(為了便於查看重試間隔起見,我們將Container的並發數調整為1)


 




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM