消息監聽適配器:具體消息的消費邏輯,適配成 MessageListenerAdapter
定義的格式
默認的方法簽名為:void handleMessage(byte[] messageBody)
/** * 自定義的消費監聽 */ public class MessageDelegate { /** *這個handleMessage方法名要根據org.springframework.amqp.rabbit.listener.adapter包下的 * MessageListenerAdapter.ORIGINAL_DEFAULT_LISTENER_METHOD的默認值來確定 */ public void handleMessage(byte[] messageBody) { System.err.println("默認方法, 消息內容:" + new String(messageBody)); } }
handleMessage 方法名要根據 org.springframework.amqp.rabbit.listener.adapter
包下 MessageListenerAdapter.ORIGINAL_DEFAULT_LISTENER_METHOD
的默認值來確定
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); messageListenerContainer.setQueues(topicQueue(), topicQueueAgain()); messageListenerContainer.setConcurrentConsumers(1); messageListenerContainer.setMaxConcurrentConsumers(5); messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString()); MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageDelegate()); messageListenerContainer.setMessageListener(listenerAdapter); return messageListenerContainer; }
測試
@Test public void sendMessage(){ MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc","自定義描述"); messageProperties.getHeaders().put("type","自定義類型"); Message message = new Message("hello message".getBytes(), messageProperties); rabbitTemplate.convertAndSend("topic.exchange","user.abc",message); }
自定義消費方法
@Slf4j public class MessageDelegate { public void handleMessage(byte[] messageBody) { log.info("默認方法 消息內容:" + new String(messageBody)); } public void consumerMessage(byte[] messageBody){ log.info("字節數據方法 消息的內容 "+new String(messageBody)); } }
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); messageListenerContainer.setQueues(topicQueue(), topicQueueAgain()); messageListenerContainer.setConcurrentConsumers(1); messageListenerContainer.setMaxConcurrentConsumers(5); messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString()); MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageDelegate()); listenerAdapter.setDefaultListenerMethod("consumerMessage"); messageListenerContainer.setMessageListener(listenerAdapter); return messageListenerContainer;
byte[] 和 String 間的轉換器
public class TextMessageConverter implements MessageConverter { @Override public Message toMessage(Object obj, MessageProperties messageProperties) throws MessageConversionException { return new Message(obj.toString().getBytes(), messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { String contentType = message.getMessageProperties().getContentType(); if(contentType !=null && contentType.contains("text")){ return new String(message.getBody()); } return message.getBody(); } }
添加String接受方法
public void consumerMessage(String messageBody) { log.info("字符串方法 消息內容:" + messageBody); }
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); messageListenerContainer.setQueues(topicQueue(), topicQueueAgain()); messageListenerContainer.setConcurrentConsumers(1); messageListenerContainer.setMaxConcurrentConsumers(5); messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString()); MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageDelegate()); listenerAdapter.setDefaultListenerMethod("consumerMessage"); listenerAdapter.setMessageConverter(new TextMessageConverter()); messageListenerContainer.setMessageListener(listenerAdapter); return messageListenerContainer;
}
測試
@Test public void sendMessage(){ MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc","自定義描述"); messageProperties.getHeaders().put("type","自定義類型"); messageProperties.setContentType("text/plain"); Message message = new Message("hello message".getBytes(), messageProperties); rabbitTemplate.convertAndSend("topic.exchange","user.abc",message); }
不同隊列由不同方法消費
@Slf4j public class MessageDelegate { public void method1(String messageBody) { log.info("method1收到的消息:" + messageBody); } public void method2(String messageBody) { log.info("method2收到的消息:" + messageBody); } }
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); messageListenerContainer.setQueues(topicQueue(), topicQueueAgain()); messageListenerContainer.setConcurrentConsumers(1); messageListenerContainer.setMaxConcurrentConsumers(5); messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString()); MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageDelegate()); listenerAdapter.setMessageConverter(new TextMessageConverter()); Map<String,String> queueOrTagToMethod=new HashMap<>(); queueOrTagToMethod.put("topic.queue","method1"); queueOrTagToMethod.put("topic.again","method2"); listenerAdapter.setQueueOrTagToMethodName(queueOrTagToMethod); messageListenerContainer.setMessageListener(listenerAdapter); return messageListenerContainer;
}
發送JSON使用Map接受消息
@Slf4j public class MessageDelegate { public void consumerMessage(Map messageBody) { log.info("消息的內容:" + messageBody); } }
添加消息轉換器
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); messageListenerContainer.setQueues(topicQueue(), topicQueueAgain()); messageListenerContainer.setConcurrentConsumers(1); messageListenerContainer.setMaxConcurrentConsumers(5); messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString()); MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageDelegate()); listenerAdapter.setDefaultListenerMethod("consumerMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); listenerAdapter.setMessageConverter(jackson2JsonMessageConverter); messageListenerContainer.setMessageListener(listenerAdapter); return messageListenerContainer; }
測試
@Test public void sendJSONMessage(){ Order order = new Order(); order.setId("0001"); order.setName("放飛夢想"); order.setContent("不服就干"); String str = JSON.toJSONString(order); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); Message message = new Message(str.getBytes(), messageProperties); rabbitTemplate.convertAndSend("topic.exchange","user.abc",message); }
JSON轉換成JAVA對象
處理方法
public void consumerMessage(Order order) { log.info("order對象,消息的內容,id:" + order.getId() + "name:" + order.getName() + " content:" + order.getContent()); }
配置DefaultJackson2JavaTypeMapper將JSON轉為JAVA對象
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); messageListenerContainer.setQueues(topicQueue(), topicQueueAgain()); messageListenerContainer.setConcurrentConsumers(1); messageListenerContainer.setMaxConcurrentConsumers(5); messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString()); MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageDelegate()); listenerAdapter.setDefaultListenerMethod("consumerMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); javaTypeMapper.setTrustedPackages("com.smart.domain"); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); listenerAdapter.setMessageConverter(jackson2JsonMessageConverter); messageListenerContainer.setMessageListener(listenerAdapter); return messageListenerContainer; }
pdf轉換器
@Slf4j public class PDFMessageConverter implements MessageConverter { @Override public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { log.info("-----------pdfMessageConverter---------------"); byte[] body = message.getBody(); String fileName = UUID.randomUUID().toString(); String path = "d:/test/" + fileName + ".pdf"; File file = new File(path); try { Files.copy(new ByteArrayInputStream(body), file.toPath()); } catch (IOException e) { e.printStackTrace(); } return file; } }
圖片轉換器
@Slf4j public class ImageMessageConverter implements MessageConverter { @Override public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { log.info("------------imageMessageConverter-------------------"); Object _extName = message.getMessageProperties().getHeaders().get("extName"); String extName = _extName == null ? "png" : _extName.toString(); byte[] messageBody = message.getBody(); String fileName = UUID.randomUUID().toString(); String filePrefix = "d:/test/"; File fileDir = new File(filePrefix); if (!fileDir.exists()) { fileDir.mkdirs(); } String path = filePrefix + fileName + "." + extName; File file = new File(path); try { Files.copy(new ByteArrayInputStream(messageBody), file.toPath()); } catch (IOException e) { e.printStackTrace(); } return file; } }
整合多個轉化器
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); messageListenerContainer.setQueues(topicQueue(), topicQueueAgain()); messageListenerContainer.setConcurrentConsumers(1); messageListenerContainer.setMaxConcurrentConsumers(5); messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString()); MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageDelegate()); listenerAdapter.setDefaultListenerMethod("consumerMessage"); ContentTypeDelegatingMessageConverter messageConverter = new ContentTypeDelegatingMessageConverter(); ImageMessageConverter imageMessageConverter = new ImageMessageConverter(); messageConverter.addDelegate("image/png", imageMessageConverter); messageConverter.addDelegate("image", imageMessageConverter); PDFMessageConverter pdfMessageConverter = new PDFMessageConverter(); messageConverter.addDelegate("application/pdf", pdfMessageConverter); listenerAdapter.setMessageConverter(messageConverter); messageListenerContainer.setMessageListener(listenerAdapter); return messageListenerContainer; } }
定義文件的消費方法
public class MessageDelegate { public void consumeMessage(File file) { System.err.println("文件對象 方法, 消息內容:" + file.getName()); } }
測試
@Test public void sendExtMessage() throws IOException { byte[] body = Files.readAllBytes(Paths.get("d:/", "cc.jpg")); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("image/png"); messageProperties.getHeaders().put("extName","png"); Message message = new Message(body, messageProperties); rabbitTemplate.send("topic.exchange","user.ddd",message); }