1、SpringAMQP用戶管理組件RabbitAdmin。
RabbitAdmin類可以很好的操作RabbitMQ,在Spring中直接進行注入即可。注意,autoStartup必須設置為true,否則Spring容器不會加載RabbitAdmin類。RabbitAdmin底層實現就是從Spring容器中獲取Exchange交換機、Binding綁定、RoutingKey路由鍵以及Queue隊列的@Bean聲明。
然后使用RabbitTemplate的execute方法執行對應的聲明、修改、刪除等一系列RabbitMQ基礎功能操作。例如,添加一個交換機、刪除一個綁定、清空一個隊列里面的消息等待操作。
2、由於使用的maven工程配合了Springboot整合Spring與RabbitMQ的知識。所以先引入依賴包,如下所示:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 5 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 6 <modelVersion>4.0.0</modelVersion> 7 <parent> 8 <groupId>org.springframework.boot</groupId> 9 <artifactId>spring-boot-starter-parent</artifactId> 10 <version>2.2.1.RELEASE</version> 11 <relativePath /> <!-- lookup parent from repository --> 12 </parent> 13 <groupId>com.bie</groupId> 14 <artifactId>rabbitmq-spring</artifactId> 15 <version>0.0.1-SNAPSHOT</version> 16 <name>rabbitmq-spring</name> 17 <description>Demo project for Spring Boot</description> 18 19 <properties> 20 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 21 <project.reporting.outputEncoding>UTF-8 22 </project.reporting.outputEncoding> 23 <java.version>1.8</java.version> 24 <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version> 25 </properties> 26 27 <dependencies> 28 <!-- Spring與RabbitMQ整合的包 --> 29 <dependency> 30 <groupId>org.springframework.boot</groupId> 31 <artifactId>spring-boot-starter-amqp</artifactId> 32 </dependency> 33 <dependency> 34 <groupId>org.springframework.boot</groupId> 35 <artifactId>spring-boot-starter-web</artifactId> 36 </dependency> 37 38 <dependency> 39 <groupId>org.springframework.boot</groupId> 40 <artifactId>spring-boot-starter-test</artifactId> 41 <scope>test</scope> 42 <exclusions> 43 <exclusion> 44 <groupId>org.junit.vintage</groupId> 45 <artifactId>junit-vintage-engine</artifactId> 46 </exclusion> 47 </exclusions> 48 </dependency> 49 <dependency> 50 <groupId>org.springframework.amqp</groupId> 51 <artifactId>spring-rabbit-test</artifactId> 52 <scope>test</scope> 53 </dependency> 54 <!-- RabbitMQ基礎核心包 --> 55 <dependency> 56 <groupId>com.rabbitmq</groupId> 57 <artifactId>amqp-client</artifactId> 58 </dependency> 59 </dependencies> 60 61 <build> 62 <plugins> 63 <plugin> 64 <groupId>org.springframework.boot</groupId> 65 <artifactId>spring-boot-maven-plugin</artifactId> 66 </plugin> 67 </plugins> 68 </build> 69 70 </project>
由於使用的是Springboot項目配合RabbitMQ來做的,所以配置文件這里使用了注解來替換,所以啟動的時候,加載如下所示配置類,如下所示:
1 package com.bie; 2 3 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 4 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 5 import org.springframework.amqp.rabbit.core.RabbitAdmin; 6 import org.springframework.context.annotation.Bean; 7 import org.springframework.context.annotation.ComponentScan; 8 import org.springframework.context.annotation.Configuration; 9 10 /** 11 * 12 * @author biehl 13 * 14 */ 15 @Configuration 16 @ComponentScan(basePackages = "com.bie") 17 public class RabbitMQConfig { 18 19 /** 20 * 將ConnectionFactory注入到bean容器中 21 * 22 * @return 23 */ 24 @Bean 25 public ConnectionFactory connectionFactory() { 26 CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); 27 connectionFactory.setAddresses("192.168.110.133:5672"); 28 connectionFactory.setUsername("guest"); 29 connectionFactory.setPassword("guest"); 30 connectionFactory.setVirtualHost("/"); 31 return connectionFactory; 32 } 33 34 /** 35 * 參數依賴上面注入的ConnectionFactory類,所以保持參數名稱和注入的ConnectionFactory一致 36 * 37 * @param connectionFactory 38 * @return 39 */ 40 @Bean 41 public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { 42 RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); 43 // 注意:autoStartup必須設置為true,否則Spring容器不會加載RabbitAdmin類。 44 rabbitAdmin.setAutoStartup(true); 45 return rabbitAdmin; 46 } 47 48 }
項目主啟動類,如下所示:
1 package com.bie; 2 3 import org.springframework.boot.SpringApplication; 4 import org.springframework.boot.autoconfigure.SpringBootApplication; 5 6 @SpringBootApplication 7 public class RabbitmqSpringApplication { 8 9 public static void main(String[] args) { 10 SpringApplication.run(RabbitmqSpringApplication.class, args); 11 } 12 13 }
下面演示一下,Spring整合RabbitMQ,創建交換機,創建隊列,將交換機和隊列綁定的示例代碼,如下所示:
1 package com.bie; 2 3 import java.util.HashMap; 4 5 import org.junit.Test; 6 import org.junit.runner.RunWith; 7 import org.springframework.amqp.core.Binding; 8 import org.springframework.amqp.core.BindingBuilder; 9 import org.springframework.amqp.core.DirectExchange; 10 import org.springframework.amqp.core.FanoutExchange; 11 import org.springframework.amqp.core.Queue; 12 import org.springframework.amqp.core.TopicExchange; 13 import org.springframework.amqp.rabbit.core.RabbitAdmin; 14 import org.springframework.beans.factory.annotation.Autowired; 15 import org.springframework.boot.test.context.SpringBootTest; 16 import org.springframework.test.context.junit4.SpringRunner; 17 18 /** 19 * 20 * @author biehl 21 * 22 */ 23 @SpringBootTest 24 @RunWith(SpringRunner.class) 25 public class RabbitmqSpringApplicationTests { 26 27 @Autowired 28 private RabbitAdmin rabbitAdmin; 29 30 @Test 31 public void rabbitmqAdmin() { 32 // 參數1交換機名稱, 參數1是否持久化durable, 參數3是否自動刪除 autoDelete 33 // 創建direct類型的交換機 34 DirectExchange directExchange = new DirectExchange("test.directExchange", false, false); 35 rabbitAdmin.declareExchange(directExchange); 36 37 // 創建topic類型的交換機 38 TopicExchange topicExchange = new TopicExchange("test.topicExchange", false, false); 39 rabbitAdmin.declareExchange(topicExchange); 40 41 // 創建fanout類型的交換機 42 FanoutExchange fanoutExchange = new FanoutExchange("test.fanoutExchange", false, false); 43 rabbitAdmin.declareExchange(fanoutExchange); 44 45 // 創建direct類型的隊列 46 Queue directQueue = new Queue("test.direct.queue", false); 47 rabbitAdmin.declareQueue(directQueue); 48 49 // 創建topic類型的交隊列 50 Queue topicQueue = new Queue("test.topic.queue", false); 51 rabbitAdmin.declareQueue(topicQueue); 52 53 // 創建fanout類型的隊列 54 Queue fanoutQueue = new Queue("test.fanout.queue", false); 55 rabbitAdmin.declareQueue(fanoutQueue); 56 57 // 聲明綁定 58 // 參數1 String destination,可以認為是具體的隊列。 59 // 參數2 DestinationType destinationType,綁定的類型。 60 // 參數3 String exchange,交換機的名稱。 61 // 參數4 String routingKey,路由鍵的名稱。 62 // 參數5 Map<String, Object> arguments可以傳入的參數。 63 // 將test.directExchange交換機和test.direct.queue隊列進行綁定 64 rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, 65 "test.directExchange", "direct", new HashMap<>())); 66 67 // 將test.topicExchange交換機和test.topic.queue隊列進行綁定 68 // rabbitAdmin.declareBinding(new Binding("test.topic.queue", 69 // Binding.DestinationType.QUEUE, "test.topicExchange", 70 // "topic", new HashMap<>())); 71 rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.topic.queue", false)) // 直接創建隊列 72 .to(new TopicExchange("test.topicExchange", false, false)) // 直接創建交換機 建立關聯關系 73 .with("user.#")); // 指定路由Key 74 75 // 將test.fanoutExchange交換機和test.fanout.queue隊列進行綁定 76 // rabbitAdmin.declareBinding(new Binding("test.fanout.queue", 77 // Binding.DestinationType.QUEUE, 78 // "test.fanoutExchange", "", new HashMap<>())); 79 80 rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.fanout.queue", false)) // 直接創建隊列 81 .to(new FanoutExchange("test.fanoutExchange", false, false))); // 直接創建交換機 建立關聯關系 82 83 // 清空隊列數據 84 rabbitAdmin.purgeQueue("test.direct.queue", false); 85 rabbitAdmin.purgeQueue("test.topic.queue", false); 86 rabbitAdmin.purgeQueue("test.fanout.queue", false); 87 } 88 89 }
執行代碼,完畢,可以在RabbitMQ的管控台查詢效果,效果如下所示:
3、使用SpringAMQP去聲明,就需要使用SpringAMQP的如下模式,即聲明@Bean方式。SpringAMQP-RabbitMQ聲明式配置使用。可以在初始化加載配置文件中創建好交換機,隊列,以及交換機和隊列的綁定關系,啟動項目即可將交換機,隊列,以及交換機和隊列的綁定創建,如下所示:
1 package com.bie; 2 3 import org.springframework.amqp.core.Binding; 4 import org.springframework.amqp.core.BindingBuilder; 5 import org.springframework.amqp.core.Queue; 6 import org.springframework.amqp.core.TopicExchange; 7 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 8 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 9 import org.springframework.amqp.rabbit.core.RabbitAdmin; 10 import org.springframework.context.annotation.Bean; 11 import org.springframework.context.annotation.ComponentScan; 12 import org.springframework.context.annotation.Configuration; 13 14 /** 15 * 16 * @author biehl 17 * 18 */ 19 @Configuration 20 @ComponentScan(basePackages = "com.bie") 21 public class RabbitMQConfig { 22 23 /** 24 * 將ConnectionFactory注入到bean容器中 25 * 26 * @return 27 */ 28 @Bean 29 public ConnectionFactory connectionFactory() { 30 CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); 31 connectionFactory.setAddresses("192.168.110.133:5672"); 32 connectionFactory.setUsername("guest"); 33 connectionFactory.setPassword("guest"); 34 connectionFactory.setVirtualHost("/"); 35 return connectionFactory; 36 } 37 38 /** 39 * 參數依賴上面注入的ConnectionFactory類,所以保持參數名稱和注入的ConnectionFactory一致 40 * 41 * @param connectionFactory 42 * @return 43 */ 44 @Bean 45 public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { 46 RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); 47 // 注意:autoStartup必須設置為true,否則Spring容器不會加載RabbitAdmin類。 48 rabbitAdmin.setAutoStartup(true); 49 return rabbitAdmin; 50 } 51 52 // 使用SpringAMQP去聲明,就需要使用SpringAMQP的如下模式,即聲明@Bean方式。 53 // SpringAMQP-RabbitMQ聲明式配置使用 54 55 /** 56 * 針對消費者配置: 57 * 58 * 1. 設置交換機類型。 59 * 60 * 2. 將隊列綁定到交換機。 61 * 62 * FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念。 63 * 64 * HeadersExchange :通過添加屬性key-value匹配。 65 * 66 * DirectExchange:按照routingkey分發到指定隊列。 67 * 68 * TopicExchange:多關鍵字匹配。 69 */ 70 @Bean 71 public TopicExchange topicExchange001() { 72 return new TopicExchange("topic001", true, false); 73 } 74 75 @Bean 76 public Queue queue001() { 77 return new Queue("queue001", true);// 隊列持久化 78 } 79 80 @Bean 81 public Binding bingding001() { 82 return BindingBuilder.bind(queue001()).to(topicExchange001()).with("spring.*"); 83 } 84 85 // 第二個交換機通過路由鍵綁定到隊列上面。 86 @Bean 87 public TopicExchange topicExchange002() { 88 return new TopicExchange("topic002", true, false); 89 } 90 91 @Bean 92 public Queue queue002() { 93 return new Queue("queue002", true);// 隊列持久化 94 } 95 96 @Bean 97 public Binding bingding002() { 98 return BindingBuilder.bind(queue002()).to(topicExchange002()).with("rabbit.*"); 99 } 100 101 // 第三個,隊列通過路由鍵綁定到第一個隊列上面,即第一個交換機綁定了兩個隊列。 102 @Bean 103 public Queue queue003() { 104 return new Queue("queue003", true);// 隊列持久化 105 } 106 107 @Bean 108 public Binding bingding003() { 109 return BindingBuilder.bind(queue003()).to(topicExchange001()).with("mq.*"); 110 } 111 112 @Bean 113 public Queue queue_image() { 114 return new Queue("image_queue", true); // 隊列持久 115 } 116 117 @Bean 118 public Queue queue_pdf() { 119 return new Queue("pdf_queue", true); // 隊列持久 120 } 121 122 }
4、RabbitTemplate,即消息模板,我們在與SpringAMQP整合的時候進行發送消息的關鍵類。該類提供了豐富的發送消息方法,包括可靠性投遞消息方法,回調監聽消息接口ConfirmCallback,返回值確認ReturnCallBack等等。同樣我們需要進行注入到Spring容器中,然后直接使用。在與Spring整合的時候需要實例化,但是在與SpringBoot整合的時候,在配置文件里面添加配置即可。
1 package com.bie; 2 3 import org.springframework.amqp.core.Binding; 4 import org.springframework.amqp.core.BindingBuilder; 5 import org.springframework.amqp.core.Queue; 6 import org.springframework.amqp.core.TopicExchange; 7 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 8 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 9 import org.springframework.amqp.rabbit.core.RabbitAdmin; 10 import org.springframework.amqp.rabbit.core.RabbitTemplate; 11 import org.springframework.context.annotation.Bean; 12 import org.springframework.context.annotation.ComponentScan; 13 import org.springframework.context.annotation.Configuration; 14 15 /** 16 * 17 * @author biehl 18 * 19 */ 20 @Configuration 21 @ComponentScan(basePackages = "com.bie") 22 public class RabbitMQConfig { 23 24 /** 25 * 將ConnectionFactory注入到bean容器中 26 * 27 * @return 28 */ 29 @Bean 30 public ConnectionFactory connectionFactory() { 31 CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); 32 connectionFactory.setAddresses("192.168.110.133:5672"); 33 connectionFactory.setUsername("guest"); 34 connectionFactory.setPassword("guest"); 35 connectionFactory.setVirtualHost("/"); 36 return connectionFactory; 37 } 38 39 /** 40 * 參數依賴上面注入的ConnectionFactory類,所以保持參數名稱和注入的ConnectionFactory一致 41 * 42 * @param connectionFactory 43 * @return 44 */ 45 @Bean 46 public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { 47 RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); 48 // 注意:autoStartup必須設置為true,否則Spring容器不會加載RabbitAdmin類。 49 rabbitAdmin.setAutoStartup(true); 50 return rabbitAdmin; 51 } 52 53 // 使用SpringAMQP去聲明,就需要使用SpringAMQP的如下模式,即聲明@Bean方式。 54 // SpringAMQP-RabbitMQ聲明式配置使用 55 56 /** 57 * 針對消費者配置: 58 * 59 * 1. 設置交換機類型。 60 * 61 * 2. 將隊列綁定到交換機。 62 * 63 * FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念。 64 * 65 * HeadersExchange :通過添加屬性key-value匹配。 66 * 67 * DirectExchange:按照routingkey分發到指定隊列。 68 * 69 * TopicExchange:多關鍵字匹配。 70 */ 71 @Bean 72 public TopicExchange topicExchange001() { 73 return new TopicExchange("topic001", true, false); 74 } 75 76 @Bean 77 public Queue queue001() { 78 return new Queue("queue001", true);// 隊列持久化 79 } 80 81 @Bean 82 public Binding bingding001() { 83 return BindingBuilder.bind(queue001()).to(topicExchange001()).with("spring.*"); 84 } 85 86 // 第二個交換機通過路由鍵綁定到隊列上面。 87 @Bean 88 public TopicExchange topicExchange002() { 89 return new TopicExchange("topic002", true, false); 90 } 91 92 @Bean 93 public Queue queue002() { 94 return new Queue("queue002", true);// 隊列持久化 95 } 96 97 @Bean 98 public Binding bingding002() { 99 return BindingBuilder.bind(queue002()).to(topicExchange002()).with("rabbit.*"); 100 } 101 102 // 第三個,隊列通過路由鍵綁定到第一個隊列上面,即第一個交換機綁定了兩個隊列。 103 @Bean 104 public Queue queue003() { 105 return new Queue("queue003", true);// 隊列持久化 106 } 107 108 @Bean 109 public Binding bingding003() { 110 return BindingBuilder.bind(queue003()).to(topicExchange001()).with("mq.*"); 111 } 112 113 @Bean 114 public Queue queue_image() { 115 return new Queue("image_queue", true); // 隊列持久 116 } 117 118 @Bean 119 public Queue queue_pdf() { 120 return new Queue("pdf_queue", true); // 隊列持久 121 } 122 123 // RabbitTemplate,即消息模板,我們在與SpringAMQP整合的時候進行發送消息的關鍵類。 124 // 該類提供了豐富的發送消息方法,包括可靠性投遞消息方法,回調監聽消息接口ConfirmCallback, 125 // 返回值確認ReturnCallBack等等。同樣我們需要進行注入到Spring容器中,然后直接使用。 126 // 將RabbitTemplate加入到Spring容器中 127 public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { 128 RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 129 return rabbitTemplate; 130 } 131 132 }
使用RabbittEmplate發送消息的案例,由於結合初始化配置文件創建的交換機,隊列以及交換機和隊列的綁定,將消息發送到自己創建的交換機,隊列上面,所以效果請自己仔細查看,如下所示:
1 package com.bie; 2 3 import java.util.HashMap; 4 5 import org.junit.Test; 6 import org.junit.runner.RunWith; 7 import org.springframework.amqp.AmqpException; 8 import org.springframework.amqp.core.Binding; 9 import org.springframework.amqp.core.BindingBuilder; 10 import org.springframework.amqp.core.DirectExchange; 11 import org.springframework.amqp.core.FanoutExchange; 12 import org.springframework.amqp.core.Message; 13 import org.springframework.amqp.core.MessagePostProcessor; 14 import org.springframework.amqp.core.MessageProperties; 15 import org.springframework.amqp.core.Queue; 16 import org.springframework.amqp.core.TopicExchange; 17 import org.springframework.amqp.rabbit.core.RabbitAdmin; 18 import org.springframework.amqp.rabbit.core.RabbitTemplate; 19 import org.springframework.beans.factory.annotation.Autowired; 20 import org.springframework.boot.test.context.SpringBootTest; 21 import org.springframework.test.context.junit4.SpringRunner; 22 23 /** 24 * 25 * @author biehl 26 * 27 */ 28 @SpringBootTest 29 @RunWith(SpringRunner.class) 30 public class RabbitmqSpringApplicationTests { 31 32 // 發送消息 33 @Autowired 34 private RabbitTemplate rabbitTemplate; 35 36 @Test 37 public void sendMessage() { 38 // 1、創建消息 39 MessageProperties messageProperties = new MessageProperties(); 40 messageProperties.getHeaders().put("desc", "消息描述"); 41 messageProperties.getHeaders().put("type", "消息類型"); 42 Message message = new Message("hello RabbitMQ".getBytes(), messageProperties); 43 44 // 2、發送消息 45 String exchange = "topic001"; 46 String routingKey = "spring.amqp"; 47 rabbitTemplate.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() { 48 49 @Override 50 public Message postProcessMessage(Message message) throws AmqpException { 51 System.out.println("======添加額外的設置======"); 52 message.getMessageProperties().getHeaders().put("desc", "額外的消息描述"); 53 message.getMessageProperties().getHeaders().put("attr", "額外的屬性"); 54 return message; 55 } 56 }); 57 } 58 59 @Test 60 public void sendMessage2() throws Exception { 61 // 1 創建消息 62 MessageProperties messageProperties = new MessageProperties(); 63 messageProperties.setContentType("text/plain"); 64 Message message = new Message("RabbitMQ的消息.......".getBytes(), messageProperties); 65 66 // 2、發送消息 67 rabbitTemplate.send("topic001", "spring.abc", message); 68 69 rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!"); 70 rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!"); 71 } 72 73 }
運行效果如下所示:
5、SpringAMQP消息容器SimpleMessageListenerContainer。
1)、簡單消息監聽器,這個類非常的強大,我們可以對他進行很多設置,對於消費者的設置項,這個類都可以滿足。可以監聽隊列(多個隊列),自動啟動,自動聲明功能。可以設置事務特性、事務管理器、事務屬性、事務容量(並發)、是否開啟事務、回滾消息等等。可以設置消費者數量、最小最大數量、批量消費等等。可以設置消息確認和自動確認模式,是否重回隊列、異常捕獲handler函數。可以設置消費者標簽生成策略,是否獨占模式,消費者屬性等等。可以設置具體的監聽器、消息轉換器等等。
2)、注意,SpringAMQP消息容器SimpleMessageListenerContainer可以進行動態設置,比如在運行中的應用可以動態的修改其消費者數量的大小,接收消息的模式等等。很多基於RabbitMQ的自制定化后端控制台在進行動態設置的時候,也是根據這一特性實現的。
1 package com.bie; 2 3 import java.util.UUID; 4 5 import org.springframework.amqp.core.AcknowledgeMode; 6 import org.springframework.amqp.core.Binding; 7 import org.springframework.amqp.core.BindingBuilder; 8 import org.springframework.amqp.core.Message; 9 import org.springframework.amqp.core.Queue; 10 import org.springframework.amqp.core.TopicExchange; 11 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 12 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 13 import org.springframework.amqp.rabbit.core.RabbitAdmin; 14 import org.springframework.amqp.rabbit.core.RabbitTemplate; 15 import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; 16 import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; 17 import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; 18 import org.springframework.amqp.support.ConsumerTagStrategy; 19 import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter; 20 import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; 21 import org.springframework.context.annotation.Bean; 22 import org.springframework.context.annotation.ComponentScan; 23 import org.springframework.context.annotation.Configuration; 24 25 import com.bie.adapter.MessageDelegate; 26 import com.bie.convert.ImageMessageConverter; 27 import com.bie.convert.PDFMessageConverter; 28 import com.bie.convert.TextMessageConverter; 29 import com.rabbitmq.client.Channel; 30 31 /** 32 * 33 * @author biehl 34 * 35 */ 36 @Configuration 37 @ComponentScan(basePackages = "com.bie") 38 public class RabbitMQConfig { 39 40 /** 41 * 將ConnectionFactory注入到bean容器中 42 * 43 * @return 44 */ 45 @Bean 46 public ConnectionFactory connectionFactory() { 47 CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); 48 connectionFactory.setAddresses("192.168.110.133:5672"); 49 connectionFactory.setUsername("guest"); 50 connectionFactory.setPassword("guest"); 51 connectionFactory.setVirtualHost("/"); 52 return connectionFactory; 53 } 54 55 /** 56 * 參數依賴上面注入的ConnectionFactory類,所以保持參數名稱和注入的ConnectionFactory一致 57 * 58 * @param connectionFactory 59 * @return 60 */ 61 @Bean 62 public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { 63 RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); 64 // 注意:autoStartup必須設置為true,否則Spring容器不會加載RabbitAdmin類。 65 rabbitAdmin.setAutoStartup(true); 66 return rabbitAdmin; 67 } 68 69 // 使用SpringAMQP去聲明,就需要使用SpringAMQP的如下模式,即聲明@Bean方式。 70 // SpringAMQP-RabbitMQ聲明式配置使用 71 72 /** 73 * 針對消費者配置: 74 * 75 * 1. 設置交換機類型。 76 * 77 * 2. 將隊列綁定到交換機。 78 * 79 * FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念。 80 * 81 * HeadersExchange :通過添加屬性key-value匹配。 82 * 83 * DirectExchange:按照routingkey分發到指定隊列。 84 * 85 * TopicExchange:多關鍵字匹配。 86 */ 87 @Bean 88 public TopicExchange topicExchange001() { 89 return new TopicExchange("topic001", true, false); 90 } 91 92 @Bean 93 public Queue queue001() { 94 return new Queue("queue001", true);// 隊列持久化 95 } 96 97 @Bean 98 public Binding bingding001() { 99 return BindingBuilder.bind(queue001()).to(topicExchange001()).with("spring.*"); 100 } 101 102 // 第二個交換機通過路由鍵綁定到隊列上面。 103 @Bean 104 public TopicExchange topicExchange002() { 105 return new TopicExchange("topic002", true, false); 106 } 107 108 @Bean 109 public Queue queue002() { 110 return new Queue("queue002", true);// 隊列持久化 111 } 112 113 @Bean 114 public Binding bingding002() { 115 return BindingBuilder.bind(queue002()).to(topicExchange002()).with("rabbit.*"); 116 } 117 118 // 第三個,隊列通過路由鍵綁定到第一個隊列上面,即第一個交換機綁定了兩個隊列。 119 @Bean 120 public Queue queue003() { 121 return new Queue("queue003", true);// 隊列持久化 122 } 123 124 @Bean 125 public Binding bingding003() { 126 return BindingBuilder.bind(queue003()).to(topicExchange001()).with("mq.*"); 127 } 128 129 @Bean 130 public Queue queue_image() { 131 return new Queue("image_queue", true); // 隊列持久 132 } 133 134 @Bean 135 public Queue queue_pdf() { 136 return new Queue("pdf_queue", true); // 隊列持久 137 } 138 139 // RabbitTemplate,即消息模板,我們在與SpringAMQP整合的時候進行發送消息的關鍵類。 140 // 該類提供了豐富的發送消息方法,包括可靠性投遞消息方法,回調監聽消息接口ConfirmCallback, 141 // 返回值確認ReturnCallBack等等。同樣我們需要進行注入到Spring容器中,然后直接使用。 142 // 將RabbitTemplate加入到Spring容器中 143 public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { 144 RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 145 return rabbitTemplate; 146 } 147 148 /** 149 * 150 * @param connectionFactory 151 * @return 152 */ 153 @Bean 154 public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { 155 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); 156 // 監聽的隊列,可變參數,可以添加多個隊列。 157 container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf()); 158 // 設置消費者個數。 159 container.setConcurrentConsumers(1); 160 // 設置最大消費者個數。 161 container.setMaxConcurrentConsumers(5); 162 // 設置默認是否重回隊列。 163 container.setDefaultRequeueRejected(false); 164 // 設置簽收模式,自動簽收。 165 container.setAcknowledgeMode(AcknowledgeMode.AUTO); 166 // 設置是否外露 167 container.setExposeListenerChannel(true); 168 // 設置標簽策略。 169 container.setConsumerTagStrategy(new ConsumerTagStrategy() { 170 @Override 171 public String createConsumerTag(String queue) { 172 return queue + "_" + UUID.randomUUID().toString(); 173 } 174 }); 175 176 // 監聽消息 177 container.setMessageListener(new ChannelAwareMessageListener() { 178 179 @Override 180 public void onMessage(Message message, Channel channel) throws Exception { 181 String msg = new String(message.getBody()); 182 System.out.println("===========================消費者消息msg : " + msg); 183 } 184 }); 185 186 return container; 187 } 188 189 }
可以直接在測試類里面進行啟動,直接啟動測試方法sendMessage2,如下所示:
1 package com.bie; 2 3 import java.util.HashMap; 4 5 import org.junit.Test; 6 import org.junit.runner.RunWith; 7 import org.springframework.amqp.AmqpException; 8 import org.springframework.amqp.core.Binding; 9 import org.springframework.amqp.core.BindingBuilder; 10 import org.springframework.amqp.core.DirectExchange; 11 import org.springframework.amqp.core.FanoutExchange; 12 import org.springframework.amqp.core.Message; 13 import org.springframework.amqp.core.MessagePostProcessor; 14 import org.springframework.amqp.core.MessageProperties; 15 import org.springframework.amqp.core.Queue; 16 import org.springframework.amqp.core.TopicExchange; 17 import org.springframework.amqp.rabbit.core.RabbitAdmin; 18 import org.springframework.amqp.rabbit.core.RabbitTemplate; 19 import org.springframework.beans.factory.annotation.Autowired; 20 import org.springframework.boot.test.context.SpringBootTest; 21 import org.springframework.test.context.junit4.SpringRunner; 22 23 /** 24 * 25 * @author biehl 26 * 27 */ 28 @SpringBootTest 29 @RunWith(SpringRunner.class) 30 public class RabbitmqSpringApplicationTests { 31 32 @Autowired 33 private RabbitAdmin rabbitAdmin; 34 35 @Test 36 public void rabbitmqAdmin() { 37 // 參數1交換機名稱, 參數1是否持久化durable, 參數3是否自動刪除 autoDelete 38 // 創建direct類型的交換機 39 DirectExchange directExchange = new DirectExchange("test.directExchange", false, false); 40 rabbitAdmin.declareExchange(directExchange); 41 42 // 創建topic類型的交換機 43 TopicExchange topicExchange = new TopicExchange("test.topicExchange", false, false); 44 rabbitAdmin.declareExchange(topicExchange); 45 46 // 創建fanout類型的交換機 47 FanoutExchange fanoutExchange = new FanoutExchange("test.fanoutExchange", false, false); 48 rabbitAdmin.declareExchange(fanoutExchange); 49 50 // 創建direct類型的隊列 51 Queue directQueue = new Queue("test.direct.queue", false); 52 rabbitAdmin.declareQueue(directQueue); 53 54 // 創建topic類型的交隊列 55 Queue topicQueue = new Queue("test.topic.queue", false); 56 rabbitAdmin.declareQueue(topicQueue); 57 58 // 創建fanout類型的隊列 59 Queue fanoutQueue = new Queue("test.fanout.queue", false); 60 rabbitAdmin.declareQueue(fanoutQueue); 61 62 // 聲明綁定 63 // 參數1 String destination,可以認為是具體的隊列。 64 // 參數2 DestinationType destinationType,綁定的類型。 65 // 參數3 String exchange,交換機的名稱。 66 // 參數4 String routingKey,路由鍵的名稱。 67 // 參數5 Map<String, Object> arguments可以傳入的參數。 68 // 將test.directExchange交換機和test.direct.queue隊列進行綁定 69 rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, 70 "test.directExchange", "direct", new HashMap<>())); 71 72 // 將test.topicExchange交換機和test.topic.queue隊列進行綁定 73 // rabbitAdmin.declareBinding(new Binding("test.topic.queue", 74 // Binding.DestinationType.QUEUE, "test.topicExchange", 75 // "topic", new HashMap<>())); 76 rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.topic.queue", false)) // 直接創建隊列 77 .to(new TopicExchange("test.topicExchange", false, false)) // 直接創建交換機 建立關聯關系 78 .with("user.#")); // 指定路由Key 79 80 // 將test.fanoutExchange交換機和test.fanout.queue隊列進行綁定 81 // rabbitAdmin.declareBinding(new Binding("test.fanout.queue", 82 // Binding.DestinationType.QUEUE, 83 // "test.fanoutExchange", "", new HashMap<>())); 84 85 rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.fanout.queue", false)) // 直接創建隊列 86 .to(new FanoutExchange("test.fanoutExchange", false, false))); // 直接創建交換機 建立關聯關系 87 88 // 清空隊列數據 89 rabbitAdmin.purgeQueue("test.direct.queue", false); 90 rabbitAdmin.purgeQueue("test.topic.queue", false); 91 rabbitAdmin.purgeQueue("test.fanout.queue", false); 92 } 93 94 // 發送消息 95 @Autowired 96 private RabbitTemplate rabbitTemplate; 97 98 @Test 99 public void sendMessage() { 100 // 1、創建消息 101 MessageProperties messageProperties = new MessageProperties(); 102 messageProperties.getHeaders().put("desc", "消息描述"); 103 messageProperties.getHeaders().put("type", "消息類型"); 104 Message message = new Message("hello RabbitMQ".getBytes(), messageProperties); 105 106 // 2、發送消息 107 String exchange = "topic001"; 108 String routingKey = "spring.amqp"; 109 rabbitTemplate.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() { 110 111 @Override 112 public Message postProcessMessage(Message message) throws AmqpException { 113 System.out.println("======添加額外的設置======"); 114 message.getMessageProperties().getHeaders().put("desc", "額外的消息描述"); 115 message.getMessageProperties().getHeaders().put("attr", "額外的屬性"); 116 return message; 117 } 118 }); 119 } 120 121 @Test 122 public void sendMessage2() throws Exception { 123 // 1 創建消息 124 MessageProperties messageProperties = new MessageProperties(); 125 messageProperties.setContentType("text/plain"); 126 Message message = new Message("RabbitMQ的消息.......".getBytes(), messageProperties); 127 128 // 2、發送消息 129 rabbitTemplate.send("topic001", "spring.abc", message); 130 131 rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!"); 132 rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!"); 133 } 134 135 }
SpringAMQP消息容器SimpleMessageListenerContainer,簡單消息監聽器,效果如下所示:
作者:別先生
博客園:https://www.cnblogs.com/biehongli/
如果您想及時得到個人撰寫文章以及著作的消息推送,可以掃描上方二維碼,關注個人公眾號哦。