前一篇中我們介紹了使用RabbitMQ Java Client訪問RabbitMQ的方法。但是使用這種方式訪問RabbitMQ,開發者在程序中需要自己管理Connection,Channel對象,Consumer對象的創建,銷毀,這樣會非常不方便。我們下面介紹使用spring AMQP連接RabbitMQ,進行消息的接收和發送。
Spring AMQP是一個Spring子項目,它提供了訪問基於AMQP協議的消息服務器的解決方案。它包含兩部分,spring-ampq是基於AMQP協議的消息發送和接收的高層實現,spring-rabbit是基於RabbitMQ的具體實現。這兩部分我們下面都會使用到。
Spring-AMQP中的基礎類/接口
spring-amqp中定義了幾個基礎類/接口,Message,Exchange,Queue,Binding
Message
- public class Message implements Serializable
- {
- private final MessageProperties messageProperties;
- private final byte[] body;
spring-amqp中的Message類類似於javax的Message類,封裝了消息的Properties和消息體。
Exchange
spring-amqp定義了Exchange接口
- public interface Exchange extends Declarable {
- //Exchange名稱
- String getName();
- //Exchange的類型
- String getType();
- //Exchange是否持久化
- boolean isDurable();
- //Exchange不再被使用時(沒有任何綁定的情況下),是否由RabbitMQ自動刪除
- boolean isAutoDelete();
- //Exchange相關的參數
- Map<String, Object> getArguments();
這個接口和RabbitMQ Client中的Exchange類相似。 spring-amqp中的Exchange繼承關系如下圖所示

AbstractExchange類是所有Exchange類的父類,實現Exchange接口的具體方法。 CustomExchange針對用戶自定義的Exchange對象。其他四個Exchange類,分別對應四種Exchange。 我們在Spring配置文件中配置Exchange對象時,使用的就是這幾種Exchange類。
Queue
spring-amqp定義了Queue類,和RabbitMQ Client中的Queue相似,對應RabbitMQ中的消息隊列。
- public class Queue extends AbstractDeclarable {
- private final String name;
- private final boolean durable;
- private final boolean exclusive;
- private final boolean autoDelete;
- private final java.util.Map<java.lang.String, java.lang.Object> arguments;
- public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
- this(name, durable, exclusive, autoDelete, null);
- }
Binding
Binding類是對RabbitMQ中Exchange-Exchange以及Exchange-Queue綁定關系的抽象。
- public class Binding extends AbstractDeclarable
- {
- public enum DestinationType {
- QUEUE, EXCHANGE;
- }
- private final String destination;
- private final String exchange;
- private final String routingKey;
- private final Map<String, Object> arguments;
- private final DestinationType destinationType;
- public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
- Map<String, Object> arguments) {
- this.destination = destination;
- this.destinationType = destinationType;
- this.exchange = exchange;
- this.routingKey = routingKey;
- this.arguments = arguments;
- }
對照RabbitMQ Java Client中Channel接口的queueBind和ExchangeBind方法
- Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments)
- Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
我們可以看出Binding類實際是對底層建立的Exchange-Queue和Exchange-Exchange綁定關系的高層抽象記錄類,它使用枚舉類型DestinationType區分Exchange-Queue和Exchange-Exchange兩種綁定。
Spring AMQP搭建消費者應用
消費者應用程序框架搭建
我們接下來使用spring-amqp搭建一個RabbitMQ的消費者Web應用,我們先創建一個maven webapp應用程序,再添加一個dependency。
- <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-rabbit</artifactId>
- <version>1.6.5.RELEASE</version>
- </dependency>
spring-rabbit庫的引入是為了使用它里面的RabbitAdmin類,創建Exchange,Queue和Binding對象,在導入這個庫的時候同時引入了 spring-ampq和rabbitmq-client的庫,不需要另行導入。
在src/main/resources目錄下創建application.properties文件,用於記錄RabbitMQ的配置信息。
- mq.ip=localhost
- mq.port=5672
- mq.userName=rabbitmq_consumer
- mq.password=123456
- mq.virutalHost=test_vhosts
在src/main/resource目錄下創建applicationContext.xml文件:
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:util="http://www.springframework.org/schema/util"
- xsi:schemaLocation="
- http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
- http://www.springframework.org/schema/util
- http://www.springframework.org/schema/util/spring-util-4.0.xsd
- http://www.springframework.org/schema/context
- http://www.springframework.org/schema/context/spring-context-4.0.xsd" >
- <context:annotation-config/>
- <context:property-placeholder
- ignore-unresolvable="true" location="classpath*:/application.properties" />
- <!--從RabbitMQ Java Client創建RabbitMQ連接工廠對象-->
- <bean id="rabbitMQConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
- <property name="username" value="${mq.userName}" />
- <property name="password" value="${mq.password}" />
- <property name="host" value="${mq.ip}" />
- <property name="port" value="${mq.port}" />
- <property name="virtualHost" value="${mq.virutalHost}" />
- </bean>
- <!--基於RabbitMQ連接工廠對象構建spring-rabbit的連接工廠對象Wrapper-->
- <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
- <constructor-arg name="rabbitConnectionFactory" ref="rabbitMQConnectionFactory" />
- </bean>
- <!--構建RabbitAmdin對象,它負責創建Queue/Exchange/Bind對象-->
- <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
- <constructor-arg name="connectionFactory" ref="connectionFactory" />
- <property name="autoStartup" value="true"></property>
- </bean>
- <!--構建Rabbit Template對象,用於發送RabbitMQ消息,本程序使用它發送返回消息-->
- <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
- <constructor-arg name="connectionFactory" ref="connectionFactory" />
- </bean>
- <!--RabbitMQ消息轉化器,用於將RabbitMQ消息轉換為AMQP消息,我們這里使用基本的Message Converter -->
- <bean id="serializerMessageConverter"
- class="org.springframework.amqp.support.converter.SimpleMessageConverter" />
- <!--Message Properties轉換器,用於在spring-amqp Message對象中的Message Properties和RabbitMQ的
- Message Properties對象之間互相轉換 -->
- <bean id="messagePropertiesConverter"
- class="org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter" />
- <!--定義AMQP Queue-->
- <bean id="springMessageQueue" class="org.springframework.amqp.core.Queue">
- <constructor-arg name="name" value="springMessageQueue" />
- <constructor-arg name="autoDelete" value="false" />
- <constructor-arg name="durable" value="true" />
- <constructor-arg name="exclusive" value="false" />
- <!--定義AMQP Queue創建所需的RabbitAdmin對象-->
- <property name="adminsThatShouldDeclare" ref="rabbitAdmin" />
- <!--判斷是否需要在連接RabbitMQ后創建Queue-->
- <property name="shouldDeclare" value="true" />
- </bean>
- <!--定義AMQP Exchange-->
- <bean id="springMessageExchange" class="org.springframework.amqp.core.DirectExchange">
- <constructor-arg name="name" value="springMessageExchange" />
- <constructor-arg name="durable" value="true" />
- <constructor-arg name="autoDelete" value="false" />
- <!--定義AMQP Queue創建所需的RabbitAdmin對象-->
- <property name="adminsThatShouldDeclare" ref="rabbitAdmin" />
- <!--判斷是否需要在連接RabbitMQ后創建Exchange-->
- <property name="shouldDeclare" value="true" />
- </bean>
- <util:map id="emptyMap" map-class="java.util.HashMap" />
- <!--創建Exchange和Queue之間的Bind-->
- <bean id="springMessageBind" class="org.springframework.amqp.core.Binding">
- <constructor-arg name="destination" value="springMessageQueue" />
- <constructor-arg name="destinationType" value="QUEUE" />
- <constructor-arg name="exchange" value="springMessageExchange" />
- <constructor-arg name="routingKey" value="springMessage" />
- <constructor-arg name="arguments" ref="emptyMap" />
- </bean>
- <!--偵聽springMessageQueue隊列消息的Message Listener-->
- <bean id="consumerListener"
- class="com.qf.rabbitmq.listener.RabbitMQConsumer" />
- <!--創建偵聽springMessageQueue隊列的Message Listener Container-->
- <bean id="messageListenerContainer"
- class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
- <property name="messageConverter" ref="serializerMessageConverter" />
- <property name="connectionFactory" ref="connectionFactory" />
- <property name="messageListener" ref="consumerListener" />
- <property name="queues" ref="springMessageQueue" />
- <!--設置消息確認方式為自動確認-->
- <property name="acknowledgeMode" value="AUTO" />
- </bean>
- </beans>
我們定義了偵聽消息隊列的Message Listener類RabbitMQConsumer
- public class RabbitMQConsumer implements MessageListener
- {
- @Autowired
- private MessagePropertiesConverter messagePropertiesConverter;
- @Override
- public void onMessage(Message message)
- {
- try
- {
- //spring-amqp Message對象中的Message Properties屬性
- MessageProperties messageProperties = message.getMessageProperties();
- //使用Message Converter將spring-amqp Message對象中的Message Properties屬性
- //轉換為RabbitMQ 的Message Properties對象
- AMQP.BasicProperties rabbitMQProperties =
- messagePropertiesConverter.fromMessageProperties(messageProperties, "UTF-8");
- System.out.println("The message's correlationId is:" + rabbitMQProperties.getCorrelationId());
- String messageContent = null;
- messageContent = new String(message.getBody(),"UTF-8");
- System.out.println("The message content is:" + messageContent);
- }
- catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- }
- }
上面的Listener類是實現了MessageListener接口的類,當容器接收到消息后,會自動觸發onMessage方法。 如果我們想使用普通的POJO類作為Message Listener,需要引入org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter類
- public class MessageListenerAdapter extends AbstractAdaptableMessageListener {
- public MessageListenerAdapter(Object delegate) {
- doSetDelegate(delegate);
- }
- }
這里的delegate對象就是我們的POJO對象。 假設我們定義一個Delegate類ConsumerDelegate
- public class ConsumerDelegate
- {
- public void processMessage(Object message)
- {
- //這里接收的消息對象僅是消息體,不包含MessageProperties
- //如果想獲取帶MessageProperties的消息對象,需要在Adpater中
- //定義MessageConverter屬性。
- String messageContent = message.toString();
- System.out.println(messageContent);
- }
- }
在applicationContext.xml中定義Adapter對象,引用我們的Delegate對象。
- <bean id="consumerDelegate"
- class="com.qf.rabbitmq.listener.ConsumerDelegate" />
- <bean id="consumerListenerAdapter"
- class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
- <property name="delegate" ref="consumerDelegate" />
- <!--指定delegate處理消息的默認方法 -->
- <property name="defaultListenerMethod" value="processMessage" />
- </bean>
最后將Message Listener Container中的Message Listener指向Adapter對象。
- <bean id="messageListenerContainer"
- class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
- <property name="messageConverter" ref="serializerMessageConverter" />
- <property name="connectionFactory" ref="connectionFactory" />
- <!--設置Message Listener為Adapter對象 -->
- <property name="messageListener" ref="consumerListenerAdapter"/>
- <property name="queues" ref="springMessageQueue" />
- <property name="acknowledgeMode" value="AUTO" />
- </bean>
啟動Web應用后,我們從啟動日志信息可以看出應用連接上了RabbitMQ服務器

從RabbitMQ的管理界面(用rabbitmq_consumer用戶登錄)可以看到springMessageExchange和springMessageQueue已經創建,綁定關系也已經創建。



Consumer Tag自定義
連接springMessageQueue的消費者Tag是RabbitMQ隨機生成的Tag名

如果我們想設置消費者Tag為指定Tag,我們可以在Message Listener Container中 設置自定義consumer tag strategy。首先我們需要定義一個Consumer Tag Strategy類,它實現了ConsumerTagStrategy接口。
- public class CustomConsumerTagStrategy implements ConsumerTagStrategy
- {
- @Override
- public String createConsumerTag(String queue) {
- String consumerName = "Consumer1";
- return consumerName + "_" + queue;
- }
- }
在applicationContext.xml中設定自定義ConsumerTagStrategy
- <bean id="consumerTagStrategy" class="com.qf.rabbitmq.strategy.CustomConsumerTagStrategy" />
- <!--創建偵聽springMessageQueue隊列的Message Listener Container-->
- <bean id="messageListenerContainer"
- class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
- <property name="messageConverter" ref="serializerMessageConverter" />
- <property name="connectionFactory" ref="connectionFactory" />
- <property name="messageListener" ref="consumerListener" />
- <property name="queues" ref="springMessageQueue" />
- <property name="acknowledgeMode" value="AUTO" />
- <property name="consumerTagStrategy" ref="consumerTagStrategy" />
- </bean>
再次啟動Web應用,查看RabbitMQ管理界面,我們可以看到Consumer Tag已經變成“Consumer1_springMessageQueue”,正如我們在CustomConsumerTagStrategy中設定的那樣。

消費者應用接收消息驗證
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- factory.setPort(5672);
- factory.setUsername("rabbitmq_producer");
- factory.setPassword("123456");
- factory.setVirtualHost("test_vhosts");
- //創建與RabbitMQ服務器的TCP連接
- connection = factory.newConnection();
- channel = connection.createChannel();
- String message = "First Web RabbitMQ Message";
- String correlationId = UUID.randomUUID().toString();
- AMQP.BasicProperties props = new AMQP.BasicProperties
- .Builder()
- .correlationId(correlationId)
- .build();
- channel.basicPublish("springMessageExchange","springMessage", props, message.getBytes());
啟動消費者Web應用,從控制台輸出信息可以看到消費者接收到了生產者發送的消息。
設置消息手動確認模式
到目前為止,消費者端的Web應用對消息的確認是自動確認模式,如果我們想改為手動確認方式,需要做以下兩點改動:
1)修改applicationContext.xml文件中Message Listener Container的acknowledgeMode屬性的值為MANUAL。
- <bean id="messageListenerContainer"
- class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
- ......
- <property name="acknowledgeMode" value="MANUAL" />
- </bean>
2)將自定義的Message Listener類從實現org.springframework.amqp.core.MessageListener接口,改為實現 org.springframework.amqp.rabbit.core.ChannelAwareMessageListener接口,實現它的 onMessage(Message,Channel)方法。
- public class RabbitMQConsumer implements ChannelAwareMessageListener
- {
- ...........
- @Override
- public void onMessage(Message message, Channel channel)
- {
- try
- {
- //spring-amqp Message對象中的Message Properties屬性
- MessageProperties messageProperties = message.getMessageProperties();
- //使用Message Converter將spring-amqp Message對象中的Message Properties屬性
- //轉換為RabbitMQ 的Message Properties對象
- AMQP.BasicProperties rabbitMQProperties =
- messagePropertiesConverter.fromMessageProperties(messageProperties, "UTF-8");
- System.out.println("The message's correlationId is:" + rabbitMQProperties.getCorrelationId());
- String messageContent = null;
- messageContent = new String(message.getBody(),"UTF-8");
- System.out.println("The message content is:" + messageContent);
- channel.basicAck(messageProperties.getDeliveryTag(), false);
- }
- catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
onMessage方法的最后一句代碼調用Channel.basicAck方法對消息進行手動確認。再次運行生產者和消費者程序后,我們登錄管理界面,從管理界面中可以看到springMessageQueue隊列中未確認消息條數 (圖中Unacked列)為0條,說明消費者接收消息后已經手動確認。

RPC模式設置
如果生產者和消費者Web應用之間使用RPC模式,即消費者接收消息后要向指定Exchange/Queue發送返回消息,我們需要修改生產者和消費者的程序。 消費者程序修改點如下:
1)在applicationContext.xml中定義返回消息對應的Exchange,Queue和Bind。
- <!--定義AMQP Reply Queue-->
- <bean id="springReplyMessageQueue" class="org.springframework.amqp.core.Queue">
- <constructor-arg name="name" value="springReplyMessageQueue" />
- <constructor-arg name="autoDelete" value="false" />
- <constructor-arg name="durable" value="true" />
- <constructor-arg name="exclusive" value="false" />
- <property name="adminsThatShouldDeclare" ref="rabbitAdmin" />
- <property name="shouldDeclare" value="true" />
- </bean>
- <!--定義AMQP Reply Exchange-->
- <bean id="springReplyMessageExchange" class="org.springframework.amqp.core.DirectExchange">
- <constructor-arg name="name" value="springReplyMessageExchange" />
- <constructor-arg name="durable" value="true" />
- <constructor-arg name="autoDelete" value="false" />
- <!--定義AMQP Queue創建所需的RabbitAdmin對象-->
- <property name="adminsThatShouldDeclare" ref="rabbitAdmin" />
- <property name="shouldDeclare" value="true" />
- </bean>
- <!--創建Reply Exchange和Reply Queue之間的Bind-->
- <bean id="springReplyMessageBind" class="org.springframework.amqp.core.Binding">
- <constructor-arg name="destination" value="springReplyMessageQueue" />
- <constructor-arg name="destinationType" value="QUEUE" />
- <constructor-arg name="exchange" value="springReplyMessageExchange" />
- <constructor-arg name="routingKey" value="springReplyMessage" />
- <constructor-arg name="arguments" ref="emptyMap" />
- </bean>
- public void onMessage(Message message, Channel channel) {
- try
- {
- ......................
- String replyMessageContent = "Consumer1 have received the message '" + messageContent + "'";
- channel.basicPublish(rabbitMQProperties.getReplyTo(), "springReplyMessage",
- rabbitMQProperties, replyMessageContent.getBytes());
- ......................
這里發送返回消息直接使用接收消息時創建的Channel通道,不過如果我們的Message Listener類是繼承自MessageListener接口,無法獲得Channel對象時,我們需要使用RabbitTemplate對象進行返回消息的發送(我們前面已經在applicationContext.xml中定義了這個對象)
- public class RabbitMQConsumer implements MessageListener
- {
- @Autowired
- private MessagePropertiesConverter messagePropertiesConverter;
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Override
- public void onMessage(Message message)
- {
- ..........
- //創建返回消息的RabbitMQ Message Properties
- AMQP.BasicProperties replyRabbitMQProps =
- new AMQP.BasicProperties("text/plain",
- "UTF-8",
- null,
- 2,
- 0, rabbitMQProperties.getCorrelationId(), null, null,
- null, null, null, null,
- null, null);
- //創建返回消息的信封頭
- Envelope replyEnvelope =
- new Envelope(messageProperties.getDeliveryTag(), true,
- "springReplyMessageExchange", "springReplyMessage");
- //創建返回消息的spring-amqp Message Properties屬性
- MessageProperties replyMessageProperties =
- messagePropertiesConverter.toMessageProperties(replyRabbitMQProps,
- replyEnvelope,"UTF-8");
- //構建返回消息(spring-amqp消息)
- Message replyMessage = MessageBuilder.withBody(replyMessageContent.getBytes())
- .andProperties(replyMessageProperties)
- .build();
- rabbitTemplate.send("springReplyMessageExchange","springReplyMessage", replyMessage);
- String correlationId = UUID.randomUUID().toString();
- AMQP.BasicProperties props = new AMQP.BasicProperties
- .Builder()
- .correlationId(correlationId)
- .replyTo("springReplyMessageExchange")
- .build();
- channel.basicPublish("springMessageExchange","springMessage", props, message.getBytes());
- QueueingConsumer replyCustomer = new QueueingConsumer(channel);
- channel.basicConsume("springReplyMessageQueue",true,"Producer Reply Consumer", replyCustomer);
- String responseMessage = null;
- while(true)
- {
- QueueingConsumer.Delivery delivery = replyCustomer.nextDelivery();
- String messageCorrelationId = delivery.getProperties().getCorrelationId();
- if (messageCorrelationId != null && messageCorrelationId.equals(correlationId))
- {
- responseMessage = new String(delivery.getBody());
- System.out.println("The reply message's correlation id is:" + messageCorrelationId);
- break;
- }
- }
- if(responseMessage != null)
- {
- System.out.println("The repsonse message is:'" + responseMessage + "'");
- }
消費者控制台

生產者控制台

消費者並發數設置
到目前為止,消費者Web應用消費消息時,只有一個消費者接收並消費springMessageQueue隊列的消息(如下圖所示)

如果發送的消息量比較大時,我們需要增加消費者的數目。
增加消費者數目要修改Message Listener Container的concurrentConsumers和maxConcurrentConsumers屬性,concurrentConsumers屬性是Message Listener Container創建時創建的消費者數目,maxConcurrentConsumers屬性是容器最大的消費者數目,我們下面把這兩個屬性都設置為5,使Message Listener Container中有5個消費者,同時修改CustomerConsumerTagStrategy類,在Tag中加入線程名,以區分不同的消費者。
- <bean id="messageListenerContainer"
- class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
- ............
- <property name="consumerTagStrategy" ref="consumerTagStrategy" />
- <property name="concurrentConsumers" value="5" />
- <property name="maxConcurrentConsumers" value="5" />
- </bean>
- public class CustomConsumerTagStrategy implements ConsumerTagStrategy
- {
- @Override
- public String createConsumerTag(String queue) {
- String consumerName = "Consumer_" + Thread.currentThread().getName();
- return consumerName + "_" + queue;
- }
- }
修改生產者程序,循環發送50條消息
- ReplyConsumer replyCustomer = new ReplyConsumer(channel);
- channel.basicConsume("springReplyMessageQueue",true,"Producer Reply Consumer", replyCustomer);
- for(int i=0; i<50; i++)
- {
- String correlationId = UUID.randomUUID().toString();
- String message = "Web RabbitMQ Message " + i;
- AMQP.BasicProperties props =
- new AMQP.BasicProperties
- .Builder()
- .contentType("text/plain")
- .deliveryMode(2)
- .correlationId(correlationId)
- .replyTo("springReplyMessageExchange")
- .build();
- channel.basicPublish("springMessageExchange","springMessage", props, message.getBytes());
- }
- public class ReplyConsumer extends DefaultConsumer
- {
- public ReplyConsumer(Channel channel)
- {
- super(channel);
- }
- @Override
- public void handleDelivery(String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body)
- throws IOException
- {
- String consumerName = properties.getAppId();
- String replyMessageContent = new String(body, "UTF-8");
- System.out.println("The reply message's sender is:" + consumerName);
- System.out.println("The reply message is '" + replyMessageContent + "'");
- }
- }
- public void onMessage(Message message, Channel channel)
- {
- try
- {
- String consumerTag = messageProperties.getConsumerTag();
- String replyMessageContent = consumerTag + " have received the message '" + messageContent + "'";
- AMQP.BasicProperties replyRabbitMQProps =
- new AMQP.BasicProperties("text/plain",
- "UTF-8",
- null,
- 2,
- 0, rabbitMQProperties.getCorrelationId(), null, null,
- null, null, null, null,
- consumerTag, null);
- .............
- public class CustomConsumerTagStrategy implements ConsumerTagStrategy
- {
- @Override
- public String createConsumerTag(String queue) {
- String consumerName = "Consumer_" + Thread.currentThread().getName();
- return consumerName;
- }
- }

消費者消息預取數設置
上述的消費者Web應用中,每個消費者每次從隊列中獲取1條消息,如果我們想讓每個消費者一次性從消息隊列獲取多條消息,需要修改Message Listener Container的prefetchCount屬性,這樣可以提高RabbitMQ的消息處理吞吐量。
- <span style="font-size:10px;"><bean id="messageListenerContainer"
- class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
- <property name="prefetchCount" value="5" />
- </bean></span>
- /**
- * Request a specific prefetchCount "quality of service" settings
- * for this channel.
- *
- * @see #basicQos(int, int, boolean)
- * @param prefetchCount maximum number of messages that the server
- * will deliver, 0 if unlimited
- * @throws java.io.IOException if an error is encountered
- */
- void basicQos(int prefetchCount) throws IOException
這個方法設置從Channel上一次性可以讀取多少條消息,我們在Container設置的PrefetchCount值為5,表示從一個消費者Channel上,一次性可以與預讀取5條消息,按我們上面設置的5個消費者,5個消費者Channel計算,一次性可以預讀取25條消息。為了證實這一點,我們修改消費者的代碼,延長它處理一條消息的時間。
需要說明的是,對於每個消費者而言,只有一條預取的消息被接收且確認后,消費者才會再從消息隊列中讀取消息,並不是消費者在消息沒有確認完成前,每次都從隊列里預讀取prefetchCount條消息。
- public void onMessage(Message message, Channel channel) {
- try
- {
- ...........
- String messageContent = null;
- messageContent = new String(message.getBody(),"UTF-8");
- String consumerTag = messageProperties.getConsumerTag();
- String replyMessageContent = consumerTag + " have received the message '" + messageContent + "'";
- Thread.sleep(60000);
- ...........
- rabbitTemplate.send("springReplyMessageExchange","springReplyMessage", replyMessage);
- channel.basicAck(messageProperties.getDeliveryTag(), false);
我們在onMessage方法中添加Thread.sleep(60000),使得處理一條消息時間時間大於1分鍾,便於查看消息預取的效果,而且使用手動確認方式。
生產者程序改為一次性發送200條消息。
啟動生產者程序,發送200條消息,我們可以看到springMessageQueue隊列里有200條處於Ready狀態的消息

啟動消費者程序,我們可以看到springMessageQueue隊列里有25條消息被預取了,Ready狀態的消息從200條變成了175條,而未確認狀態的消息數(Unacked列)變成了25條,即25條被預取,但是沒有被確認的消息。

過了一段時間,等5個消費者確認了5條消息后,又從消息隊列預讀取了5條消息,Ready狀態的消息數變成170條,這時的消息隊列的消息數如下圖所示:

未確認的消息數仍然是25條,但是總的消息數變成了195條,表示已經有5條消息被處理且確認了。
隨着消息逐漸被處理,確認,消費者會逐漸從消息隊列預取新的消息,直到所有的消息都被處理和確認完成。

rabbit標簽使用
上面的消費者Web應用使用了Spring傳統的beans元素定義,spring-rabbit提供了rabbit namespace,我們可以在applicationContext.xml中使用rabbit:xxx形式的元素標簽,簡化我們的xml配置。 我們首先在applicationContext.xml的namespace定義中添加rabbit namespace定義:- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context"
- xmlns:util="http://www.springframework.org/schema/util"
- xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xsi:schemaLocation="
- http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
- http://www.springframework.org/schema/util
- http://www.springframework.org/schema/util/spring-util-4.0.xsd
- http://www.springframework.org/schema/rabbit
- http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd
- http://www.springframework.org/schema/context
- http://www.springframework.org/schema/context/spring-context-4.0.xsd" >
- <pre name="code" class="html"><span style="font-size:10px;"><rabbit:connection-factory id ="connectionFactory" connection-factory="rabbitMQConnectionFactory" /></span></pre>
- <pre></pre>
- <span style="color:rgb(51,51,51); font-family:Arial,sans-serif"><span style="font-size:10px">修改RabbitAdmin bean對象定義,使用rabbit:admin標簽</span></span>
- <pre></pre>
- <pre></pre>
- <pre></pre>
- <rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" auto-startup="true"/>
- <rabbit:template connection-factory="connectionFactory" />
MessageConverter和MessageProperties對象沒有對應的rabbit標簽,仍然使用bean標簽。
修改Queue,Exchange和Bind定義,分別使用rabbit:queue,rabbit:exchange標簽,Bind的內容放到了Exchange bean定義內部。
- <rabbit:queue id="springMessageQueue" name="springMessageQueue" auto-delete="false"
- durable="true" exclusive="false" auto-declare="false" declared-by="rabbitAdmin" />
- <rabbit:direct-exchange id="springMessageExchange" name="springMessageExchange" durable="true"
- auto-declare="false" auto-delete="false" declared-by="rabbitAdmin">
- <rabbit:bindings>
- <rabbit:binding queue="springMessageQueue" key="springMessage"></rabbit:binding>
- </rabbit:bindings>
- </rabbit:direct-exchange>
- <rabbit:listener-container message-converter="serializerMessageConverter"
- connection-factory="connectionFactory"
- acknowledge="manual"
- consumer-tag-strategy="consumerTagStrategy"
- concurrency="5"
- max-concurrency="5"
- prefetch="5">
- <rabbit:listener ref="consumerListener" queues="springMessageQueue"/>
- </rabbit:listener-container>
- <rabbit:listener ref="consumerListener" queue-names="springMessageQueue"/>
這里如果Listener關聯多個隊列,設置queues屬性或者queue-names屬性時可以用逗號進行分割,例如:
- <pre name="code" class="html" style="color: rgb(51, 51, 51);"><rabbit:listener ref="consumerListener" queue-names="messageQueue1,messageQueue2"/></pre>
- <pre></pre>
- <pre></pre>
- <pre></pre>
- <pre></pre>
使用rabbit標簽雖然可以簡化RabbitMQ相關對象的bean定義,但是它也有局限性:
1)標簽對應的bean對象類型是固定的,例如rabbit:listener-container標簽對應的Listener Container是SimpleMessageListenerContainer類,如果我們想使用其他MessageListenerContainer類或者自定義Message Listener Container類,就不能使用rabbit標簽。
2)有的標簽無法設置id和name屬性,這樣一旦有多個同類型的bean對象定義時,就不能使用rabbit標簽。
RabbitMQ的Channel和Connection緩存
- <rabbit:connection-factory id ="connectionFactory"
- connection-factory="rabbitMQConnectionFactory"
- cache-mode="CHANNEL"
- channel-cache-size="30" />
- <rabbit:connection-factory id ="connectionFactory"
- connection-factory="rabbitMQConnectionFactory"
- cache-mode="CONNECTION"
- connection-cache-size="10" />
- public class CachingConnectionFactory extends AbstractConnectionFactory
- {
- ................
- private volatile int connectionLimit = Integer.MAX_VALUE;
這個屬性默認值是Integer.MAX_VALUE,可以理解為無上限,我們可以在applicationContext.xml中設置這個值為10。
- <rabbit:connection-factory id ="connectionFactory"
- connection-factory="rabbitMQConnectionFactory"
- connection-limit="10"
- cache-mode="CONNECTION"
- connection-cache-size="10" />
- <span style="font-size:10px;"><rabbit:listener-container
- .............
- concurrency="4"
- max-concurrency="4">
- <rabbit:listener ref="Listener1" queues="messageQueue1"/>
- <rabbit:listener ref="Listener2" queues="messageQueue2"/>
- <rabbit:listener ref="Listener3" queues="messageQueue3"/>
- </rabbit:listener-container></span>
例如上面的Container中,一共定義了三個Listener,每個Listener的並發數是4,總的並發數為12,超過了上線10,因此拋出以下異常:
- 一月 03, 2017 10:15:28 上午 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer redeclareElementsIfNecessary
- 嚴重: Failed to check/redeclare auto-delete queue(s).
- org.springframework.amqp.AmqpTimeoutException: Timed out attempting to get a connection
- at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:575)
- ..............

Spring AMQP的重連機制
我們在使用1中介紹了RabbitMQ Java Client提供的重連機制,Spring AMQP也提供了重連機制。我們可以使用Rabbit Java Client的重連設置,我們修改applicationContext.xml中“rabbitMQConnectionFactory”的重連屬性設置。
- <bean id="rabbitMQConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
- ...................
- <property name="automaticRecoveryEnabled" value="true" />
- <property name="topologyRecoveryEnabled" value="true" />
- <property name="networkRecoveryInterval" value="60000" />
- </bean>
我們啟動消費者應用程序,打開管理頁面,可以看到消費者應用創建了5個Connection,每個Connection下分別創建了一個Channel,對應5個Consumer。



我們停止RabbitMQ服務器,可以看到消費者控制台輸出連接異常信息,不停試圖恢復Consumer。


重新啟動RabbitMQ服務器,從日志信息可以看出連接被重置,消費者被恢復。
點開一條Channel進去,可以看到連接Channel的Consumer Tag與最初的Consumer Tag也不一致,這可能是因為我們使用了自定義ConsumerTagStrategy,使用線程名為Tag名的原因。

我們也可以禁用RabbitMQ Java Client的重連設置,設置automaticRecoveryEnabled和topologyRecoveryEnabled屬性為false。
- <span style="font-size:10px;"><bean id="rabbitMQConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
- <property name="automaticRecoveryEnabled" value="false" />
- <property name="topologyRecoveryEnabled" value="false" />
- </bean></span>

當我們重啟RabbitMQ服務器后,發現只有4個Connection恢復,5個Channel被恢復,但是有兩個Channel復用同一個Connection,這一點與 使用RabbitMQ Java Client的重連機制時有所不同。


當執行RabbitMQ重連時,Message Listener Container也會對Consumer進行重新恢復,它的恢復間隔是由recoveryBackOff屬性決定的。
- public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer
- implements ApplicationEventPublisherAware {
- ..........
- private BackOff recoveryBackOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);
SimpleMessageListenerContainer類的recoveryBackOff屬性對象有兩個屬性,一個是恢復間隔,默認值是DEFAULT_RECOVERY_INTERVAL常量(5000ms,即每5秒試圖進行一次恢復),還有一個嘗試恢復次數,默認值是FixedBackOff.UNLIMITED_ATTEMPTS(Long.MaxValue,可以認為是無限次嘗試)。我們可以根據需要 設置自己的recoveryBackOff屬性,例如下面我們把恢復間隔設置為60000ms,嘗試次數設置為100次。
- <bean id="backOff" class="org.springframework.util.backoff.FixedBackOff">
- <constructor-arg name="interval" value="60000" />
- <constructor-arg name="maxAttempts" value="100" />
- </bean>
- <rabbit:listener-container message-converter="serializerMessageConverter"
- ..........
- recovery-back-off="backOff">
- <rabbit:listener ref="consumerListener" queues="springMessageQueue"/>
- </rabbit:listener-container>
修改后啟動消費者應用,停掉RabbitMQ服務器,我們從異常日志可以看出Message Listener Container的重試間隔變成了1分鍾,而不是默認的5000ms。(為了便於查看重試間隔起見,我們將Container的並發數調整為1)

