前言
本文主要介紹AMQP核心組件:
- RabbitAdmin
- SpringAMQP 聲明
- RabbitTemplate
- SimpleMessageListenerContainer
- MessageListenerAdapte
- MessageConverter
RabbitAdmin
RabbitAdmin類是針對RabbitMQ管理端進行封裝操作,比如:Exchange操作、Queue操作,Binding綁定操作等,操作起來簡單便捷!
- 用於聲明RabbitMQ相關配置、操作RabbitMQ
- autoStartup設為true:表示Spring容器啟動時自動初始化RabbitAdmin
- 底層實現:從Spring容器中獲取Exchange、Binding、RoutingKey以及Queue的@Bean聲明
- rabbitTemplate的execute方法執行對應的聲明等操作
@Configuration @ComponentScan({"com.orcas.spring"}) public class RabbitMQConfig { /** * 創建 RabbitMQ 連接工廠 */ @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("192.168.58.129:5672"); connectionFactory.setUsername("orcas"); connectionFactory.setPassword("1224"); connectionFactory.setVirtualHost("/"); return connectionFactory; } /** * 創建 RabbitAdmin 類,這個類封裝了對 RabbitMQ 管理端的操作! 比如:Exchange 操作,Queue 操作,Binding 綁定 等 */ @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); // 只有設置為 true,spring 才會加載 RabbitAdmin 這個類 rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
@RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests { @Autowired private RabbitAdmin rabbitAdmin; /** * 交換機操作 */ @Test public void testAdminExchange() { // 創建交換機, 類型為 direct,durable 參數表示是否持久化 rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false)); // 創建交換機,類型為 topic,durable 參數表示是否持久化 rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false)); // 創建交換機,類型為 fanout,durable 參數表示是否持久化 rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false)); } /** * 隊列操作 */ @Test public void testAdminQueue() { // 創建隊列 // durable 參數表示是否持久化 rabbitAdmin.declareQueue(new Queue("test.direct.queue", false)); // 創建隊列 // durable 參數表示是否持久化 rabbitAdmin.declareQueue(new Queue("test.topic.queue", false)); // 創建隊列 // durable 參數表示是否持久化 rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false)); } /** * 綁定操作 */ @Test public void testAdminBinding() { /** * 兩種寫法都可以,都選擇綁定 隊列 或者 交換機 */ /** * destination 需要綁定隊列的名字 * DestinationType 綁定類型, * Binding.DestinationType.QUEUE 表示是隊列綁定 * Binding.DestinationType.EXCHANGE 表示交換機綁定 * * exchange 交換機名稱 * routingKey 路由key * arguments 額外參數(比如綁定隊列,可以設置 死信隊列的參數) */ rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "routing_direct", new HashMap<>())); /** * 鏈式寫法 */ rabbitAdmin.declareBinding( BindingBuilder.bind(new Queue("test.topic.queue", false)) // 直接創建隊列 .to(new TopicExchange("test.topic", false, false)) // 直接創建交換機,並建立關聯關系 .with("routing_topic") // 指定路由 key ); /** * 鏈式寫法 * * FanoutExchange 交換機,和路由key沒有綁定關系,因為他是給交換機內所有的 queue 都發送消息! */ rabbitAdmin.declareBinding( BindingBuilder.bind(new Queue("test.fanout.queue", false)) // 直接創建隊列 .to(new FanoutExchange("test.fanout", false, false)) // 直接創建交換機,並建立關聯關系 ); } /** * 其他操作-清空隊列 */ @Test public void testAdminOther() {// noWait 參數是否需要等待: true 表示需要,false 表示不需要 // 也就是需要清空的時候,我需要等待一下,在清空(會自動等待幾秒鍾) rabbitAdmin.purgeQueue("test.fanout.queue", false); } }
Spring AMQP 聲明
- exchange
- TopicExchange
- FanoutExchange
- DirectExchange
- queue
- binding
- BindingBuilder
spring使用@bean聲明exchange queue binding 例子如下:
@Bean
public TopicExchange exchange001() { return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true); //隊列持久 } @Bean public Binding binding001() { return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); }
RabbitTemplate
消息模板
- 與SpringAMQP整合的時候進行發送消息的關鍵類
-
該類提供了豐富的發送消息方法,包括可靠性投遞方法,回調監聽消息接口
ConfirmCallback
,返回值確認接口ReturnCallback
等等。同樣我們需要進行注入到spring容器中。然后進行使用。 - 與Spring整合時需要實例化,但是與Springboot整合時不需要,在配置文件添加配置即可
- rabbitTemplate.convertAndSend方法是主要的發送消息的方法
- MessageProperties 用於構造消息體
- MessagePostProcessor:消息發送之后對消息進行的設置
以下是RabbitTemplate實例化的例子:
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(//); rabbitTemplate.setReturnCallback(//); return rabbitTemplate; }
以下是發送消息例子:
@Test
public void testSendMessage() throws Exception { //1 創建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc", "信息描述.."); messageProperties.getHeaders().put("type", "自定義消息類型.."); Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties); rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { System.err.println("------添加額外的設置---------"); message.getMessageProperties().getHeaders().put("desc", "額外修改的信息描述"); message.getMessageProperties().getHeaders().put("attr", "額外新加的屬性"); return message; } }); }
SimpleMessageListenerContainer
簡單消息監聽容器 功能:
- 監聽多個隊列、自動啟動、自動聲明
- 設置事務特性、事務管理器、事務屬性、事務容器、是否開啟事務、回滾消息
- 設置消費者數量、最小最大數量、批量消費
- 設置消息確認和自動確認模式、是否重回隊列、異常捕獲函數
- 設置消費者標簽生成策略、是否獨占模式、消費者屬性等
- 設置具體的監聽器、消息轉換器 message convert
- SimpleMessageListenerContainer可以進行動態設置,比如在運行中的應用可以動態的修改其消費者的大小、接收消息的模式等,可以基於此開發rabbitmq自定義后台管控平台
- queues: 消費隊列
- concurrentConsumers:當前消費者數
- maxConcurrentConsumers:最大消費者並發
- defaultRequeueRejected: 是否重回隊列,默認: false
- acknowledgeMode:消息確認模型,默認:AUTO
- exposeListenerChannel:
- messageListener: 消息監聽
- consumerTagStrategy:consumerTag生成策略
/** * 簡單消息監聽容器 * 配置完成后。可以在管控台看到消息者信息。 以及消費者標簽信息 * * @param connectionFactory 鏈接工廠 * @return SimpleMessageListenerContainer */ @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); //設置要監聽的隊列 simpleMessageListenerContainer.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf()); //初始化消費者數量 simpleMessageListenerContainer.setConcurrentConsumers(1); //最大消費者數量 simpleMessageListenerContainer.setMaxConcurrentConsumers(5); //設置是否重回隊列[一般為false] simpleMessageListenerContainer.setDefaultRequeueRejected(false); //設置自動ack simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); //設置channel 是否外露 simpleMessageListenerContainer.setExposeListenerChannel(true); //設置消費端標簽的策略 simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queueName) { return queueName + "_" + UUID.randomUUID().toString(); } }); //設置消息監聽 ChannelAwareMessageListener simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { String msg = new String(message.getBody()); System.out.println("----------消費者: " + msg); } }); return simpleMessageListenerContainer; }
MessageListenerAdaper
消息監聽適配器
- extends AbstractAdaptableMessageListener 消息listener
- queueOrTagToMethodName 隊列標識與方法名稱組成的集合
- defaultListenerMethod 默認監聽方法名稱
- Delegate 委托對象:實際真實的委托對象
- 可以一一進行隊列和方法名稱的匹配
- 隊列和方法名稱綁定,即指定隊列里的消息會被綁定的方法所接受處理
/** * 通過`simpleMessageListenerContainer` 配置消息監聽適配器。 指向這個類 */ public class MessageDelegate { /** * MessageListenerAdapter 默認指定接收消息的方法的名字就是 handleMessage .當然也可以手動設置 * * @param messageBody message信息 */ public void handleMessage(byte[] messageBody) { System.err.println("默認方法,消息內容: " + new String(messageBody)); } public void consumeMessage(byte[] messageBody) { System.err.println("字節數組方法, 消息內容:" + new String(messageBody)); } public void consumeMessage(String messageBody) { System.err.println("字符串方法, 消息內容:" + messageBody); } } /** * spring amqp 消息轉換器 * * @author yangHX * createTime 2019/4/6 12:28 */ public class TextMessageConverter implements MessageConverter { /** * 將數據轉化為 message 類 * * @param o 要發送的數據 * @param messageProperties 消息頭 * @return Message * @throws MessageConversionException ex */ @Override public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException { return new Message(o.toString().getBytes(), messageProperties); } /** * 將message轉換為想要的數據類型 * * @param message message * @return Object * @throws MessageConversionException ex */ @Override public Object fromMessage(Message message) throws MessageConversionException { String contentType = message.getMessageProperties().getContentType(); if (null != contentType && contentType.contains("text")) { return new String(message.getBody()); } return message.getBody(); } }
//消息監聽適配器 只截取了一小段 /* * 適配器方式。 默認是有自己的方法名字。 handleMessage * 可以自己指定一個方法的名稱。 consumerMessage * 也可以添加一個轉換器: 從字節數組轉換為String */ MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate); //設定queue使用哪個adapter方法處理 Map<String,String> queueOrTagToMethodName = new HashMap<>(); queueOrTagToMethodName.put("queue001","method1"); queueOrTagToMethodName.put("queue002","method2"); adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); //設置默認處理方法,默認處理方法是handleMessage adapter.setDefaultListenerMethod("handleMessage"); //設置消息轉換方式 adapter.setMessageConverter(new TextMessageConvert()); //消息監聽 container.setMessageListener(adapter); /** * 發送消息。測試轉換器和適配器 * <p> * 轉換器判斷contentType 將字節數組轉化為字符串 * 適配器將數據交給 MessageDelegate 的 consumeMessage 方法進行處理 */ @Test public void testMessage4Text() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plan"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.send("topic002", "rabbit.abc", message); }
MessageConverter
-
我們在消息傳輸的時候,正常情況下消息體為二進制的數據方式進行傳輸。如果希望內部幫我們進行轉換,或者指定自定義的轉換器,就需要用到MessageConverter
-
自定義常用轉換器
MessageConverter
一般來講都需要實現這個接口 -
重寫下面兩個方法
toMessage
: java 對象轉換為MessagefromMessage
: Message對象轉換為java對象
- 轉換器類型
- json轉換器: jackson2JsonMessageConverter: 可以進行java對象的轉換功能
- DefaultJackson2JavaTypeMapper映射器: 可以進行java對象的映射關系
- 自定義二進制轉換器: 比如圖片類型、PDF,PPT, 流媒體
1、Json轉換器:Jackson2JsonMessageConverter
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); container.setMessageListener(jackson2JsonMessageConverter);
2、支持Java對象的轉換:DefaultJackson2JavaTypeMapper
&Jackson2JsonMessageConverter
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper(); jackson2JsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter);
3、支持java對象多映射轉換:DefaultJackson2JavaTypeMapper
& Jackson2JsonMessageConverter
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>(); // (標簽, 類的全路徑)將標簽和類進行綁定 idClassMapping.put("order", com.orcas.spring.entity.Order.class); idClassMapping.put("packaged", com.orcas.spring.entity.Packaged.class); javaTypeMapper.setIdClassMapping(idClassMapping); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter);
4、全局的轉換器:ContentTypeDelegatingMessageConverter
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); //全局的轉換器: ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter(); TextMessageConverter textConvert = new TextMessageConverter(); convert.addDelegate("text", textConvert); convert.addDelegate("html/text", textConvert); convert.addDelegate("xml/text", textConvert); convert.addDelegate("text/plain", textConvert); Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter(); convert.addDelegate("json", jsonConvert); convert.addDelegate("application/json", jsonConvert); ImageMessageConverter imageConverter = new ImageMessageConverter(); convert.addDelegate("image/png", imageConverter); convert.addDelegate("image", imageConverter); PDFMessageConverter pdfConverter = new PDFMessageConverter(); convert.addDelegate("application/pdf", pdfConverter); adapter.setMessageConverter(convert); container.setMessageListener(adapter);
5、圖片轉換器:
public class ImageMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { System.err.println("-----------Image MessageConverter----------"); // 獲取消息擴展屬性中的"extName" Object _extName = message.getMessageProperties().getHeaders().get("extName"); // 若為空默認為png, 否則就獲取該擴展名 String extName = _extName == null ? "png" : _extName.toString(); byte[] body = message.getBody(); String fileName = UUID.randomUUID().toString(); // 該圖的路徑+圖片名 String path = "d:/010_test/" + fileName + "." + extName; File f = new File(path); try { Files.copy(new ByteArrayInputStream(body), f.toPath()); } catch (IOException e) { e.printStackTrace(); } return f; } }
六、PDF轉換器:
public class PDFMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { System.err.println("-----------PDF MessageConverter----------"); byte[] body = message.getBody(); String fileName = UUID.randomUUID().toString(); String path = "d:/010_test/" + fileName + ".pdf"; File f = new File(path); try { Files.copy(new ByteArrayInputStream(body), f.toPath()); } catch (IOException e) { e.printStackTrace(); } return f; } }
七、測試代碼:
@Test public void testSendExtConverterMessage() throws Exception { // byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png")); // MessageProperties messageProperties = new MessageProperties(); // messageProperties.setContentType("image/png"); // messageProperties.getHeaders().put("extName", "png"); // Message message = new Message(body, messageProperties); // rabbitTemplate.send("", "image_queue", message); byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf")); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/pdf"); Message message = new Message(body, messageProperties); rabbitTemplate.send("", "pdf_queue", message); }
引用: