本篇主要讲述Spring Boot与RabbitMQ的整合,内容非常简单,纯API的调用操作。 操作之间需要加入依赖Jar
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
消息生产者
不论是创建消息消费者或生产者都需要ConnectionFactory
ConnectionFactory配置
创建AmqpConfig文件AmqpConfig.java(后期的配置都在该文件中)
@Configuration public class AmqpConfig { public static final String EXCHANGE = "spring-boot-exchange"; public static final String ROUTINGKEY = "spring-boot-routingKey"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); //必须要设置 return connectionFactory; } }
这里需要显示调用
connectionFactory.setPublisherConfirms(true);才能进行消息的回调。
RabbitTemplate
通过使用RabbitTemplate来对开发者提供API操作@Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; }这里设置为原型,具体的原因在后面会讲到
在发送消息时通过调用RabbitTemplate中的如下方法
public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)
- exchange:交换机名称
-
routingKey:路由关键字
-
object:发送的消息内容
-
correlationData:消息ID
因此生产者代码详单简洁
Send.java
@Component public class Send { private RabbitTemplate rabbitTemplate; /** * 构造方法注入 */ @Autowired public Send(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId); } }
如果需要在生产者中添加消息消费后的回调,需要对rabbitTemplate设置ConfirmCallback对象,由于不同的生产者需要对应不同的ConfirmCallback,如果rabbitTemplate设置为单例bean,则所有的rabbitTemplate
实际的ConfirmCallback为最后一次申明的ConfirmCallback。
下面给出完整的生产者代码:
package com.u51.lkl.springboot.amqp; import java.util.UUID; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息生产者 * * @author liaokailin * @version $Id: Send.java, v 0.1 2015年11月01日 下午4:22:25 liaokailin Exp $ */ @Component public class Send implements RabbitTemplate.ConfirmCallback { private RabbitTemplate rabbitTemplate; /** * 构造方法注入 */ @Autowired public Send(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容 } public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId); } /** * 回调 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println(" 回调id:" + correlationData); if (ack) { System.out.println("消息成功消费"); } else { System.out.println("消息消费失败:" + cause); } } }
消息消费者
消费者负责申明交换机(生产者也可以申明)、队列、两者的绑定操作。
交换机
/** * 针对消费者配置 FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 HeadersExchange :通过添加属性key-value匹配 DirectExchange:按照routingkey分发到指定队列 TopicExchange:多关键字匹配 */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE); }
在Spring Boot中交换机继承AbstractExchange类
队列
@Bean public Queue queue() { return new Queue("spring-boot-queue", true); //队列持久 }
绑定
@Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY); }
完成以上工作后,在spring boot中通过消息监听容器实现消息的监听,在消息到来时执行回调操作。
消息消费
@Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("receive msg : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费 } }); return container; }
下面给出完整的配置文件:
package com.u51.lkl.springboot.amqp; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; import com.rabbitmq.client.Channel; /** * Qmqp Rabbitmq * * http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/ * * @author lkl * @version $Id: AmqpConfig.java, v 0.1 2015年11月01日 下午2:05:37 lkl Exp $ */ @Configuration public class AmqpConfig { public static final String EXCHANGE = "spring-boot-exchange"; public static final String ROUTINGKEY = "spring-boot-routingKey"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); //必须要设置 return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } /** * 针对消费者配置 * 1. 设置交换机类型 * 2. 将队列绑定到交换机 * * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 HeadersExchange :通过添加属性key-value匹配 DirectExchange:按照routingkey分发到指定队列 TopicExchange:多关键字匹配 */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE); } @Bean public Queue queue() { return new Queue("spring-boot-queue", true); //队列持久 } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY); } @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("receive msg : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费 } }); return container; } }
以上完成 Spring Boot与RabbitMQ的整合
自动配置
在Spring Boot中实现了RabbitMQ的自动配置,在配置文件中添加如下配置信息
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=test spring.rabbitmq.password=test spring.rabbitmq.virtualHost=test
后会自动创建ConnectionFactory以及RabbitTemplate对应Bean,为什么上面我们还需要手动什么呢?
自动创建的ConnectionFactory无法完成事件的回调,即没有设置下面的代码
connectionFactory.setPublisherConfirms(true);