RabbitMQ整合Spring AMQP實戰
常用組件介紹
-
RabbitAdmin
-
Spring AMQP聲明 通過@Bean注解進行聲明
-
RabbitTemplate
-
SimpleMessageListenerContainer 對消息消費進行詳細配置和優化
-
MessageListenerAdapter 消息監聽適配器,建立在監聽器基礎之上
-
MessageConverter
RabbitAdmin
-
RabbitAdmin類可以很好的操作RabbitMQ,在Spring中直接進行注入即可
-
注意:autoSatrtup必須設置為true,否則spring容器不會加載RabbitAdmin類
-
RabbitAdmin底層實現就是從Spring容器中獲取Exchange、Bingding、RoutingKey以及Queue的@Bean聲明;
-
底層使用RabbitTemplate的execute方法執行對應的聲明、修改、刪除等一系列RabbitMQ基礎功能操作;
RabbitMQ簡單使用
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
@Configuration
public class RabbitMqConfig1 {
/**
* 設置連接
* @return ConnectionFactory
*/
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
/**
* 創建RabbitAdmin
* @return RabbitAdmin
*/
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
//默認就是true
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
測試
@Autowired
private RabbitAdmin rabbitAdmin;
/**
* RabbitAdmin api應用
*/
@Test
public void testAdmin() {
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
//綁定
rabbitAdmin.declareBinding(new Binding("test.direct.queue",
Binding.DestinationType.QUEUE,
"test.direct", "direct", new HashMap<>()));
//使用 BindingBuilder 創建綁定
// https://docs.spring.io/spring-amqp/docs/2.1.16.BUILD-SNAPSHOT/reference/html/#builder-api
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.topic.queue", false)) //直接創建隊列
.to(new TopicExchange("test.topic", false, false)) //直接創建交換機 建立關聯關系
.with("user.#")); //指定路由Key
//FanoutExchange 類型exchange不走路由鍵
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.fanout.queue", false))
.to(new FanoutExchange("test.fanout", false, false)));
//清空隊列數據
// rabbitAdmin.purgeQueue("test.topic.queue", false);
}
SpringAMQP聲明(Exchange、Queue、Binding)
在RabbitMQ基礎AP里面聲明一個Exchange、聲明一個綁定、一個隊列
//基礎API聲明一個exchange
channel.exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments)
//基礎API 聲明一個隊列
channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments)
//基礎API 聲明binding
channel.queueBind(String queue, String exchange, String routingKey)
使用SpringAMQP去聲明,就需要使用SpringAMQP的如下模式,即聲明@Bean方式
//聲明Topic 類型的exchange
@Bean
public TopicExchange topicExchange() {
//exchange 持久化
// Exchange springEchange = ExchangeBuilder.topicExchange("spring_amqp_test_echange").durable(true).build();
return new TopicExchange("spring_amqp_test_echange", true, false);
}
//聲明隊列
@Bean
public Queue queue() {
// Queue spring_amqp_test_echange = QueueBuilder.durable("spring_amqp_test_echange").build();
return new Queue("spring_amqp_test_queue");
}
//建立綁定
@Bean
public Binding binding(TopicExchange topicExchange, Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange).with("spring.*");
}
消息模板 RabbitTemplate
-
RabbitTemplate,即消息模板。
-
在與SpringAMQP整合的時候進行發送消息的關鍵類
-
該類提供了豐富的發送消息的方法,包括可靠性投遞消息方法、回調監聽消息接口ConfirmCallback、返回值確認接口ReturnCallback等等。同樣我們需要進入注入到Spring容器中,然后直接使用;
-
在與Spring整合時需要實例化,但是在與SpringBoot整合時,在配置文件里添加配置即可;
RabbitTemplate簡單使用
配置
@Configuration
public class RabbitMqConfig3 {
/**
* 設置連接
*
* @return ConnectionFactory
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
/**
* 創建RabbitAdmin
*
* @return RabbitAdmin
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//默認就是true
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
/**
* 消息模板
*
* @param connectionFactory connectionFactory
* @return RabbitTemplate
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
/**
* 針對消費者配置
* 1. 設置交換機類型
* 2. 將隊列綁定到交換機
* FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念
* HeadersExchange :通過添加屬性key-value匹配
* DirectExchange:按照routingkey分發到指定隊列
* TopicExchange:多關鍵字匹配
*/
@Bean
public TopicExchange exchange001() {
return new TopicExchange("topic001", true, false);
}
@Bean
public Queue queue001() {
return new Queue("queue001", true); //隊列持久
}
@Bean
public Binding binding001(TopicExchange exchange001, Queue queue001) {
return BindingBuilder.bind(queue001).to(exchange001).with("spring.*");
}
}
測試
@Test
public void testSendMessage() {
//1 創建消息
//AMQP消息的消息屬性
//MessageBuilder(也可以構建Message) 使用流利的API從byte[]主體或其他消息構建Spring AMQP消息。
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "信息描述..");
messageProperties.getHeaders().put("type", "自定義消息類型..");
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.err.println("------添加額外的設置---------");
message.getMessageProperties().getHeaders().put("desc", "額外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "額外新加的屬性");
return message;
}
});
}
隊列queue001
@Test
public void testSendMessage2() throws Exception {
//1 創建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
}
隊列queue001
隊列queue002
簡單消息監聽容器:SimpleMessageListenerContainer
- 這個類非常的強大,我們可以對他進行很多設置,對於消費者的配置項,這個類都可以滿足
- 監聽隊列(多個隊列)、自動啟動、自動聲明功能
- 設置事務特性、事務管理器、事務屬性、事務容量(並發)、是否開啟事務、回滾消息等
- 設置消費者數量、最小最大數量、批量消費
- 設置消息確認和自動確認模式、是否重回隊列、異常捕獲handler函數
- 設置消費者標簽生成策略、是否獨占模式、消費者屬性等
- 設置具體的監聽器、消息轉換器等等
注意:SimpleMessageListenerContainer可以進行動態設置,比如在運行中的應用可以動態的修改其消費者數量的大小、接收消息的模式等。很多基於RabbitMQ的自制定化后端管控台在進行動態設置的時候,也是根據這一特性去實現的。所以可以看出SpringAMQP非常的強大;
思考一下:SimpleMessageListenerContainer為什么可以動態感知配置變更?
配置
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// container.setQueueNames(); 接收字符串的隊列名
//
container.setQueues(queue001(), queue002(), queue003());
//當前消費者數量
container.setConcurrentConsumers(1);
//最大消費者數量
container.setMaxConcurrentConsumers(5);
//是否使用重隊列
container.setDefaultRequeueRejected(false);
//自動簽收
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setExposeListenerChannel(true);
//消費端的標簽策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//設置消息監聽
//必須設置消息監聽 否則 報 No message listener specified - see property 'messageListener'
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.err.println("----------消費者: " + msg);
//做消息處理....
}
});
return container;
}
消息監聽適配器:MessageListenerAdapter
通過MessageListenerAdapter的代碼我們可以看出如下核心屬性:
- defaultListenerMethod默認監聽方法名稱:用於設置監聽方法名稱
- Delegate委托對象:實際真實的委托對象,用於處理消息、
- queueOrTagToMethodName: 隊列標識與方法名稱組成的集合
- 可以一一進行隊列與方法名稱的匹配;
- 隊列和方法名稱綁定,即指定隊列里的消息會被綁定的方法所接收處理;
配置
public class MessageDelegate1 {
public void handleMessage(byte[] messageBody) {
System.err.println("默認方法, 消息內容:" + new String(messageBody));
}
public void consumeMessage(byte[] messageBody) {
System.err.println("字節數組方法, 消息內容:" + new String(messageBody));
}
public void consumeMessage(String messageBody) {
System.err.println("字符串方法, 消息內容:" + messageBody);
}
public void method1(String messageBody) {
System.err.println("method1 收到消息內容:" + new String(messageBody));
}
public void method2(String messageBody) {
System.err.println("method2 收到消息內容:" + new String(messageBody));
}
public void consumeMessage(Map messageBody) {
System.err.println("map方法, 消息內容:" + messageBody);
}
}
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// container.setQueueNames(); 接收字符串的隊列名
//
container.setQueues(queue001(), queue002(), queue003());
//當前消費者數量
container.setConcurrentConsumers(1);
//最大消費者數量
container.setMaxConcurrentConsumers(5);
//是否使用重隊列
container.setDefaultRequeueRejected(false);
//自動簽收
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setExposeListenerChannel(true);
//消費端的標簽策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//1 適配器方式. 默認是有自己的方法名字的:handleMessage
// 可以自己指定一個方法的名字: consumeMessage
// 也可以添加一個轉換器: 從字節數組轉換為String
//MessageDelegate1如何寫 MessageListenerAdapter 源碼里面也給出了一些建議
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate1());
//默認的方法是 public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";
adapter.setDefaultListenerMethod("consumeMessage");
//TextMessageConverter 自定義的消息轉換器
//new TextMessageConverter()-->consumeMessage(byte[] messageBody))->MessageProperties.setContentType("text/plian")
//new Jackson2JsonMessageConverter()--->consumeMessage(Map messageBody))->MessageProperties.setContentType("application/json")
// adapter.setMessageConverter(new Jackson2JsonMessageConverter());
container.setMessageListener(adapter);
return container;
}
MessageConverter消息轉換器
-
我們在進行發送消息的時候,正常情況下消息體為二進制的數據方式進行傳輸,如果希望內部幫我們進行轉換,或者指定自定義的轉換器,就需要用到MessageConverter;
-
自定義常用轉換器:MessageConverter,一般來講都需要實現這個接口
-
重寫下面兩個方法:
- toMessage:java對象轉換為Message
- fromMessage:Message對象轉換為java對象
-
MessageConverter消息轉換器:
-
Json轉換器:Jackson2JsonMessageConverter:可以進行java對象的轉換功能;
-
DefaultJackson2JavaTypeMapper映射器:可以進行java對象的映射關系;
-
自定義二進制轉換器:比如圖片類型、PDF、PPT、流媒體等
使用轉換器的目的是當傳入不同的類型的數據(如json,類,PDF,圖片等)時,在消息的接收方接收到時也總是以傳入的類型接收結果對象;我們通過寫入不同的轉換器以達到此種效果。具體可百度。
JSON格式轉換
默認監聽方法的參數為Map
public class Order {
private String id;
private String name;
private String content;
public Order() {
}
public Order(String id, String name, String content) {
this.id = id;
this.name = name;
this.content = content;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
配置
// 1.1 支持json格式的轉換器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
// public void consumeMessage(Map messageBody) {
// System.err.println("map方法, 消息內容:" + messageBody);
// }
//對應map參數方法
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
測試
@Test
public void testSendJsonMessage() throws Exception {
Order order = new Order();
order.setId("001");
order.setName("消息訂單");
order.setContent("描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json);
MessageProperties messageProperties = new MessageProperties();
//這里注意一定要修改contentType為 application/json
messageProperties.setContentType("application/json");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
JSON格式轉換支持Java對象
默認監聽方法的參數為Java對象
委托對象方法
public void consumeMessage(Order order) {
System.err.println("order對象, 消息內容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}
配置
// 1.2 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java對象轉換
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
//信任所有的包,否則會報 報不信任
javaTypeMapper.setTrustedPackages("*");
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
測試
@Test
public void testSendJavaMessage() throws Exception {
Order order = new Order();
order.setId("001");
order.setName("訂單消息");
order.setContent("訂單描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json);
MessageProperties messageProperties = new MessageProperties();
//這里注意一定要修改contentType為 application/json
messageProperties.setContentType("application/json");
//__TypeId__ 這個是固定寫法
messageProperties.getHeaders().put("__TypeId__", "com.niugang.spring.entity.Order");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
輸出
order對象, 消息內容, id: 001, name: 訂單消息, content: 訂單描述信息
JSON格式轉換支持Java對象(二)
委托對象方法
public void consumeMessage(Order order) {
System.err.println("order對象, 消息內容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}
public void consumeMessage(Packaged pack) {
System.err.println("package對象, 消息內容, id: " + pack.getId() +
", name: " + pack.getName() +
", content: "+ pack.getDescription());
}
配置
//1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java對象多映射轉換
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
idClassMapping.put("order", com.niugang.spring.entity.Order.class);
idClassMapping.put("packaged", com.niugang.spring.entity.Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
測試
@Test
public void testSendMappingMessage() throws Exception {
ObjectMapper mapper = new ObjectMapper();
Order order = new Order();
order.setId("001");
order.setName("訂單消息");
order.setContent("訂單描述信息");
String json1 = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json1);
MessageProperties messageProperties1 = new MessageProperties();
//這里注意一定要修改contentType為 application/json
messageProperties1.setContentType("application/json");
messageProperties1.getHeaders().put("__TypeId__", "order");
Message message1 = new Message(json1.getBytes(), messageProperties1);
rabbitTemplate.send("topic001", "spring.order", message1);
Packaged pack = new Packaged();
pack.setId("002");
pack.setName("包裹消息");
pack.setDescription("包裹描述信息");
String json2 = mapper.writeValueAsString(pack);
System.err.println("pack 4 json: " + json2);
MessageProperties messageProperties2 = new MessageProperties();
//這里注意一定要修改contentType為 application/json
messageProperties2.setContentType("application/json");
messageProperties2.getHeaders().put("__TypeId__", "packaged");
Message message2 = new Message(json2.getBytes(), messageProperties2);
rabbitTemplate.send("topic001", "spring.pack", message2);
}
全局消息轉化器與自定義轉化器
自定義文本轉化器
public class TextMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(object.toString().getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if(null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
}
自定義圖片轉化器
/**
* 圖片轉化器
*/
public class ImageMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("-----------Image MessageConverter----------");
Object _extName = message.getMessageProperties().getHeaders().get("extName");
String extName = _extName == null ? "png" : _extName.toString();
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
//目錄必須存在
String path = "d:/springbootlog/" + fileName + "." + extName;
File f = new File(path);
try {
//拷貝到指定路徑
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
自定義pdf轉化器
public class PDFMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("-----------PDF MessageConverter----------");
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "d:/springbootlog/" + fileName + ".pdf";
File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
委托對象
public void consumeMessage(File file) {
System.err.println("文件對象 方法, 消息內容:" + file.getName());
}
配置
//1.4 ext convert
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//全局的轉換器:
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
TextMessageConverter textConvert = new TextMessageConverter();
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter);
PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);
adapter.setMessageConverter(convert);
container.setMessageListener(adapter);
測試
@Test
public void testSendExtConverterMessage() throws Exception {
byte[] body = Files.readAllBytes(Paths.get("C:\\Users\\Administrator\\Desktop\\公眾號", "spring.png"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("image/png");
messageProperties.getHeaders().put("extName", "png");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "image_queue", message);
byte[] body1 = Files.readAllBytes(Paths.get("D:\\Documents\\技術書籍", "Java huashan-2019-06-20.pdf"));
MessageProperties messageProperties1 = new MessageProperties();
messageProperties.setContentType("application/pdf");
Message message1 = new Message(body1, messageProperties);
rabbitTemplate.send("", "pdf_queue", message1);
}
SpringBoot整合配置詳解(生產端)
- publisher-confirms,實現一個監聽器用於監聽Broker端給我們返回的確認請求:RabbitTemplate.ConfirmCallback
- publisher-returns,保證消息對Broker端是可達的,如果出現路由鍵不可達的情況,則使用監聽器對不可達的消息進行后續的處理,保證消息的路由成功: RabbitTemplate.ReturnCallback
注意一點,在發送消息的時候對template進行配置mandatory=true保證監聽有效;生產端還可以配置其他屬性,比如發送重試,超時時間、次數、間隔等。
生產端代碼示例
application.properties
spring.rabbitmq.addresses=localhost:5672
#spring.rabbitmq.host=localhost
#spring.rabbitmq.port=5762
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
# 消息確認模式
spring.rabbitmq.publisher-confirms=true
# 消息返回模式
spring.rabbitmq.publisher-returns=true
# 為true 消息返回模式才生效
spring.rabbitmq.template.mandatory=true
配置
/**
* springboot 消息生產者
*
* @author niugang
*/
@Configuration
public class RabbitMqConfig {
/**
* 自動注入RabbitTemplate模板類
*/
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 回調函數: confirm確認
*/
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
System.err.println("ack: " + ack);
if (!ack) {
System.err.println("異常處理....");
}
}
};
/**
* 回調函數: return返回
*/
final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.err.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};
/*
隊列監聽在消費者端配置,沒有將會自動創建
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("exchange-1");
}
@Bean
public Queue queue() {
return new Queue("queue-1");
}
@Bean
public Binding binding(Queue queue, TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with("springboot.#");
}*/
/**
* 發送消息方法調用: 構建Message消息
*
* @param message 消息體
* @param properties 消息屬性
*/
public void send(Object message, Map<String, Object> properties) {
MessageProperties messageProperties = new MessageProperties();
if (properties != null && properties.size() > 0) {
Set<Map.Entry<String, Object>> entries = properties.entrySet();
for (Map.Entry<String, Object> entry : entries) {
String key = entry.getKey();
Object value = entry.getValue();
messageProperties.setHeader(key, value);
}
}
//org.springframework.amqp.core
Message msg = MessageBuilder.withBody(message.toString().getBytes()).andProperties(messageProperties).build();
//id + 時間戳 全局唯一
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//routingKey修改 為 spring.abc 消息將走 returnCallback
rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
}
}
測試
在rabbitmq控制台新建,Exchange名為exchange-1,新建隊列queue-1,並建立兩者之間的綁定,routingKey為springboot.#
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Test
public void contextLoads() {
}
@Autowired
private RabbitMqConfig rabbitMqConfig ;
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Test
public void testSender1() throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("number", "12345");
properties.put("send_time", simpleDateFormat.format(new Date()));
rabbitMqConfig.send("Hello RabbitMQ For Spring Boot!"+System.currentTimeMillis(), properties);
}
}
注意:進行單元測試,ack一直是false;改為url請求,ack就正常了
SpringBoot整合配置詳解(消費端)
消費端核心配置
# NONE, MANUAL, AUTO; 手工消息消息確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#監聽器調用程序線程的最小數量。
spring.rabbitmq.listener.simple.concurrency=5
#監聽器調用程序線程的最大數量。
spring.rabbitmq.listener.simple.max-concurrency=10
# spring.rabbitmq.listener.type=simple 默認為 SimpleContainer 模式對應 spring.rabbitmq.listener.simple 前綴相關的
注意點
- 首先配置手工確認模式,用於ACK的手工處理,這樣我們可以保證消息的可靠性送達,或者在消費端消費失敗的時候可以做到重回隊列、根據業務記錄日志等處理
- 可以設置消費端的監聽個數和最大個數,用於控制消費端的並發情況。
@RabbitListener注解的使用
- 消費端監聽@RabbitMQListener注解,這個對於在實際工作中非常的好用。
- @RabbitListener是一個組合注解,里面可以注解配置
- @QueueBinding、@Queue、@Exchange直接通過這個組合注解一次性搞定消費端交換機、隊列、綁定、路由、並且配置監聽功能等。
消費者端代碼示例
類配置寫在代碼里非常不友好,所以強烈建議大家使用配置文件配置。
properties
#spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
# NONE, MANUAL, AUTO; 手工消息消息確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#監聽器調用程序線程的最小數量。
spring.rabbitmq.listener.simple.concurrency=5
#監聽器調用程序線程的最大數量。
spring.rabbitmq.listener.simple.max-concurrency=10
# spring.rabbitmq.listener.type=simple 默認為 SimpleContainer 模式對應 spring.rabbitmq.listener.simple 前綴相關的
spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
配置
public class Order implements Serializable {
private String id;
private String name;
public Order() {
}
public Order(String id, String name) {
super();
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
/**
* 消費者類
*
* @author niugang
*/
@Configuration
public class RabbitMQReceiver {
/**
* 從1.5.0版開始,您可以在類級別指定@RabbitListener注釋。
* 與新的@RabbitHandler批注一起,這使單個偵聽器可以根據傳入消息的有效負載類型調用不同的方法。
*
* @RabbitListener(id="multi", queues = "someQueue")
* @SendTo("my.reply.queue") public class MultiListenerBean {
* @RabbitHandler public String thing2(Thing2 thing2) {
* ...
* }
* @RabbitHandler public String cat(Cat cat) {
* ...
* }
* @RabbitHandler public String hat(@Header("amqp_receivedRoutingKey") String rk, @Payload Hat hat) {
* ...
* }
* @RabbitHandler(isDefault = true)
* public String defaultMethod(Object object) {
* ...
* }
* }
* 在這種情況下,如果轉換后的有效負載是Thing2,Cat或Hat,則會調用各個@RabbitHandler方法。
* 您應該了解,系統必須能夠根據有效負載類型識別唯一方法。
* 檢查該類型是否可分配給沒有注釋或帶有@Payload注釋的單個參數。
* 請注意,如方法級別@RabbitListener(前面所述)中所述,應用了相同的方法簽名。
*/
//隊列 exchange 綁定 沒有 自動創建
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-1",
durable = "true"),
exchange = @Exchange(value = "exchange-1",
durable = "true",
type = ExchangeTypes.TOPIC,
ignoreDeclarationExceptions = "true"),
key = "springboot.*" //routing key
)
)
@RabbitHandler
//@RabbitListener 提供了很多靈活的簽名 如Message Channel @Payload @Header 等 具體可查看源碼
// org.springframework.amqp.core.Message
// org.springframework.messaging.Message
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消費端Payload: " + new String(message.getBody()));
System.err.println("消費端MessageProperties.: " + message.getMessageProperties());
//AmqpHeaders header屬性封裝
//手工ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
/**
* spring.rabbitmq.listener.order.queue.name=queue-2
* spring.rabbitmq.listener.order.queue.durable=true
* spring.rabbitmq.listener.order.exchange.name=exchange-1
* spring.rabbitmq.listener.order.exchange.durable=true
* spring.rabbitmq.listener.order.exchange.type=topic
* spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
* spring.rabbitmq.listener.order.key=springboot.*
*
* @param order order
* @param channel channel
* @param headers headers
* @throws Exception Exception
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
durable = "${spring.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
durable = "${spring.rabbitmq.listener.order.exchange.durable}",
type = "${spring.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
key = "${spring.rabbitmq.listener.order.key}"
)
)
@RabbitHandler
//@Headers 必須通過Map接收
//@Header("amqp_receivedRoutingKey") String rk 直接獲取header中某一個key
//默認前綴為amqp_
/**
* {amqp_receivedDeliveryMode=PERSISTENT,
* amqp_receivedExchange=exchange-2,
* amqp_deliveryTag=1,
* amqp_consumerQueue=queue-2,
* amqp_redelivered=false,
amqp_receivedRoutingKey=springboot.def,
spring_listener_return_correlation=175a21c4-ffd5-4a3e-ac3a-2f63d60c18a5,
spring_returned_message_correlation=0987654321,
id=53443ced-0b23-3079-71c2-09997897a553,
amqp_consumerTag=amq.ctag-V0hqyVObrHXJeC60MwPSVQ,
contentType=application/x-java-serialized-object,
timestamp=1591240122842}
*/
public void onOrderMessage(@Payload Order order,
Channel channel,
@Headers Map<String, Object> headers) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消費端order: " + order.getId());
System.err.println("消費端headers: " + headers);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}