前言
在使用Spring整合RabbitMQ時我們主要關注三個核心接口:
- RabbitAdmin: 用於聲明交換機 隊列 綁定等
- RabbitTemplate: 用於RabbitMQ消息的發送和接收
- MessageListenerContainer: 監聽容器 為消息入隊提供異步處理
依賴
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.9.RELEASE</version> </dependency>
配置
可通過以下兩種方式進行配置。
- rabbitmq.xml配置文件
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd"> <!-- 消費者和生產者通用配置(begin) -->
<!-- 創建連接工廠 --> <rabbit:connection-factory id="rabbitmqConnectionFactory" host="${rabbitmq_host}" port="${rabbitmq_port}" username="${rabbitmq_user}" password="${rabbitmq_passwd}" virtual-host="${rabbitmq_vhost}" /> <!-- 創建rabbitAdmin --> <rabbit:admin id="connectAdmin" connection-factory="rabbitmqConnectionFactory"/> <!-- 定義消息對象json轉換類 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- 消費者和生產者通用配置(end) --> <!-- 財務記賬接口配置(begin) --> <!-- 創建rabbitTemplate消息發送模版 --> <rabbit:template id="rabbitTemplate" exchange="${mmc_topic_exchange_name}" routing-key="${mmc_routingkey_name}" connection-factory="rabbitmqConnectionFactory" message-converter="jsonMessageConverter"/> <!-- 財務記賬接口配置(end) -->
<!-- 關閉訂單配置(begin) --> <!-- 創建rabbitTemplate消息發送模版--> <rabbit:template id="orderDelayTemplate" exchange="mmc.order.delay.exchange-delay" connection-factory="rabbitmqConnectionFactory" message-converter="jsonMessageConverter" routing-key="mmc.order.delay.routingkey" /> <!-- 聲明監聽的Queue的名稱 --> <rabbit:queue id="orderQueue" name="mmc.order.delay.queue"/> <!-- 聲明exchange的類型為topic --> <rabbit:direct-exchange name="mmc.order.delay.exchange-delay" declared-by="connectAdmin" delayed="true"> <rabbit:bindings> <rabbit:binding queue="mmc.order.delay.queue" key="mmc.order.delay.routingkey"/> </rabbit:bindings> </rabbit:direct-exchange>
<!-- 創建消息監控容器 --> <bean id="messageHandler" class="com.zat.mmc.service.rabbitmq.MessageHandler"/> <rabbit:listener-container connection-factory="rabbitmqConnectionFactory" message-converter="jsonMessageConverter"> <rabbit:listener queues="mmc.order.delay.queue" ref="messageHandler" /> </rabbit:listener-container> <!-- 關閉訂單配置(end) --> </beans>
配置文件其實還可以更簡單點:
<!-- 消費者和生產者通用配置(begin) --> <!-- 創建連接 --> <rabbit:connection-factory id="rabbitmqConnectionFactory" host="${rabbitmq_host}" port="${rabbitmq_port}" username="${rabbitmq_user}" password="${rabbitmq_passwd}" virtual-host="${rabbitmq_vhost}" /> <!-- 創建rabbitAdmin --> <rabbit:admin id="connectAdmin" connection-factory="rabbitmqConnectionFactory"/> <!-- 消息對象json轉換類 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- 消費者和生產者通用配置(end) --> <!-- 財務記賬接口配置(begin) --> <!-- 創建rabbitTemplate消息發送模版 --> <rabbit:template id="rabbitTemplate" exchange="${mmc_topic_exchange_name}" routing-key="${mmc_routingkey_name}" connection-factory="rabbitmqConnectionFactory" message-converter="jsonMessageConverter"/> <!-- 財務記賬接口配置(end) -->
中間定義了交換機,定義了隊列,其實都可以省略的,可以到mq的管理后台創建。在代碼中定義交換機和隊列的好處是在使用時會自動創建它。
- Java配置類
spring在啟動時會掃描到Configuration這個注解是一個配置文件的注解。
@Configuration public class RabbitMQConfig { public final static String QUEUE_NAME = "spring-queue"; public final static String EXCHANGE_NAME = "spring-exchange"; public final static String ROUTING_KEY = "spring-key"; // 創建隊列 @Bean public Queue queue() { return new Queue(QUEUE_NAME); } // 創建一個 topic 類型的交換器 @Bean public TopicExchange exchange() { return new TopicExchange(EXCHANGE_NAME); } // 使用路由鍵(routingKey)把隊列(Queue)綁定到交換器(Exchange) @Bean public Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); } @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("xx", 5670); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; }
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 注意,autoStartup 必須設置為 true,否則 Spring 容器不會加載 RabbitAdmin 類
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } }
生產者
@Component public class DemoProducer { @Resource(name = "rabbitTemplate") private RabbitTemplate rabbitTemplate; public void sendProducer() { // 生產者發送消息(因為在配置文件里面已經為rabbitTemplate指定了交換機和routing,所以可以省去它們) rabbitTemplate.convertAndSend(jsonObject.toJSONString());
// 生產者往solutionInfo_exchange這個交換機,這個info_queue_key路由中發送消息 rabbitTemplate.convertAndSend("solutionInfo_exchange","info_queue_key", jsonObject.toJSONString()); } }
消費者
package com.rabbitmq.demo @Component public class DemoReceiver extends AbstractAdaptableMessageListener { @Override public void onMessage(Message message, Channel channel) { try { String result=new String(message.getBody(),"UTF-8") log.info("bodyJson:" + result); } catch (Throwable e) { log.error("WechatMsgReceiver exception", e); } } }
其它
- 批量自動聲明交換機、隊列和綁定
可以批量創建Queue和Exchange,批量創建綁定關系並將其放進List集合中返回使用:
- 手動聲明交換機、隊列和綁定
可通過RabbitAdmin來實現:
/** * 聲明一個direct類型的、持久化、非排他的交換器 */ rabbitAdmin.declareExchange(new DirectExchange(EXCHANGE_NAME, true, false, new HashMap<String, Object>()));
/** * 聲明一個持久化、非排他、非自動刪除的隊列 */ rabbitAdmin.declareQueue(new Queue(QUEUE_NAME, true, false, false, new HashMap<String, Object>()));
/** * 將交換器和隊列綁定 */ rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_NAME)). to(new DirectExchange(EXCHANGE_NAME)).with(ROUTING_KEY));