背景
最近公司新啟動一個新平台的項目,需要配置多個RabbitMQ?下面就是在Spring Boot配置多個RabbitMQ的例子。是自己摸索搭建的,已經成功上線了,有其他好的實現方法的網友可以互相交流一下。
項目結構
引入maven依賴
<parent> <artifactId>spring-boot-starter-parent</artifactId> <groupId>org.springframework.boot</groupId> <version>2.3.10.RELEASE</version> </parent> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
常量類
public class RabbitaMqConstant { /**隊列屬性**/ public final static String HOST_ONE = "81.69.230.42"; public final static Integer PORT_ONE = 5673; public final static String USERNAME_ONE = "guest"; public final static String PASSWORD_ONE = "guest"; public final static String VIRTUALHOST_ONE = "/"; public final static String QUEUE_ONE_NAME = "queue_one_name"; public final static String QUEUE_ONE_KEY = "queueOneKey"; public final static String HOST_TWO = "120.53.104.163"; public final static Integer PORT_TWO = 5672; public final static String USERNAME_TWO = "admin"; public final static String PASSWORD_TWO = "admin"; public final static String VIRTUALHOST_TWO = "/"; public final static String QUEUE_TWO_NAME = "queue_two_name"; public final static String QUEUE_TWO_KEY = "queueTwoKey"; }
屬性類
@Data @NoArgsConstructor @AllArgsConstructor @Builder public class RabbitProperties { private String host; private Integer port; private String username; private String password; private String virtualHost; private String queueName; /** * 隊列屬性 */ public static Map<String, RabbitProperties> MULTI_MQPROPERTIES_MAP = new HashMap<String, RabbitProperties>() { { put(RabbitaMqConstant.QUEUE_ONE_KEY, RabbitProperties.builder() .host(RabbitaMqConstant.HOST_ONE) .port(RabbitaMqConstant.PORT_ONE) .username(RabbitaMqConstant.USERNAME_ONE) .password(RabbitaMqConstant.PASSWORD_ONE) .virtualHost(RabbitaMqConstant.VIRTUALHOST_ONE) .queueName(RabbitaMqConstant.QUEUE_ONE_NAME).build()); put(RabbitaMqConstant.QUEUE_TWO_KEY, RabbitProperties.builder() .host(RabbitaMqConstant.HOST_TWO) .port(RabbitaMqConstant.PORT_TWO) .username(RabbitaMqConstant.USERNAME_TWO) .password(RabbitaMqConstant.PASSWORD_TWO) .virtualHost(RabbitaMqConstant.VIRTUALHOST_TWO) .queueName(RabbitaMqConstant.QUEUE_TWO_NAME).build()); //如果需要配置更多, 參考上面 } }; }
rabbitMQ配置類
這個類的作用是:利用spring的工廠類在在容器的初始化不同服務器的Rabbit注入到容器
@Configuration @RequiredArgsConstructor @Slf4j public class MultiRabbitMqConfig { private final DefaultListableBeanFactory defaultListableBeanFactory; /** * 初始化消息 */ @PostConstruct public void initRabbitmq() { RabbitProperties.MULTI_MQPROPERTIES_MAP.forEach((key, rabbitProperties) -> { AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition(CachingConnectionFactory.class) .addPropertyValue("cacheMode", CachingConnectionFactory.CacheMode.CHANNEL) .addPropertyValue("host", rabbitProperties.getHost()) .addPropertyValue("port", rabbitProperties.getPort()) .addPropertyValue("username", rabbitProperties.getUsername()) .addPropertyValue("password", rabbitProperties.getPassword()) .addPropertyValue("virtualHost", rabbitProperties.getVirtualHost()) .getBeanDefinition(); String connectionFactoryName = String.format("%s%s", key, "ConnectionFactory"); //將連接工程注入容器 defaultListableBeanFactory.registerBeanDefinition(connectionFactoryName, beanDefinition); CachingConnectionFactory connectionFactory = defaultListableBeanFactory.getBean(connectionFactoryName, CachingConnectionFactory.class); String rabbitAdminName = String.format("%s%s", key, "RabbitAdmin"); AbstractBeanDefinition rabbitAdminBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(RabbitAdmin.class) .addConstructorArgValue(connectionFactory) .addPropertyValue("autoStartup", true) .getBeanDefinition(); //將 defaultListableBeanFactory.registerBeanDefinition(rabbitAdminName, rabbitAdminBeanDefinition); RabbitAdmin rabbitAdmin = defaultListableBeanFactory.getBean(rabbitAdminName, RabbitAdmin.class); log.info("rabbitAdmin:[{}]", rabbitAdmin); Queue queue = new Queue(rabbitProperties.getQueueName()); rabbitAdmin.declareQueue(queue); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //設置消息確認 connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) {// 如果發送交換機成功,但是沒有匹配路由到指定的隊列, 這個時候ack返回是true(這是一個坑) log.info("====>ack success connection:[{}]", cause); } else { // 失敗 log.info("====>message error success connection:[{}]", cause); } } }); defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "RabbitTemplate"), rabbitTemplate); SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); // 設置監聽的隊列 simpleMessageListenerContainer.setQueueNames(rabbitProperties.getQueueName()); // 指定要創建的並發使用者的數量,默認值是1,當並發高時可以增加這個的數值,同時下方max的數值也要增加 simpleMessageListenerContainer.setConcurrentConsumers(10); // 最大的並發消費者 simpleMessageListenerContainer.setMaxConcurrentConsumers(10); // 設置是否重回隊列 simpleMessageListenerContainer.setDefaultRequeueRejected(false); // 設置簽收模式 simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 設置非獨占模式 simpleMessageListenerContainer.setExclusive(false); // 設置consumer未被 ack 的消息個數 simpleMessageListenerContainer.setPrefetchCount(5); //消費mq消息的類 CrossborderReceiptListen crossborderReceiptListen = new CrossborderReceiptListen(); //onMessage消費mq消息的方法 crossborderReceiptListen.addQueueOrTagToMethodName(rabbitProperties.getQueueName(),"onMessage"); simpleMessageListenerContainer.setMessageListener(crossborderReceiptListen); defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "SimpleMessageListenerContainer"), simpleMessageListenerContainer); }); } }
controller類
@RestController public class IndexController { @Autowired private DefaultListableBeanFactory defaultListableBeanFactory; @RequestMapping("/send") public String send(){ RabbitTemplate rabbitTemplate = (RabbitTemplate) defaultListableBeanFactory.getBean(RabbitaMqConstant.QUEUE_ONE_KEY+"RabbitTemplate"); rabbitTemplate.convertAndSend(RabbitaMqConstant.QUEUE_ONE_NAME,"測試queue_one_name"); return "122432"; } @RequestMapping("/send2") public String send2(){ RabbitTemplate rabbitTemplate = (RabbitTemplate) defaultListableBeanFactory.getBean(RabbitaMqConstant.QUEUE_TWO_KEY+"RabbitTemplate"); rabbitTemplate.convertAndSend(RabbitaMqConstant.QUEUE_TWO_NAME,"測試QUEUE_TWO_NAME"); return "122432"; } }
監聽類
@Service @Slf4j public class CrossborderReceiptListen extends MessageListenerAdapter{ @Override public void onMessage(Message message, Channel channel) throws IOException { try { log.info("============> Thread:[{}] 接收到消息:[{}] ", Thread.currentThread().getName(), new String(message.getBody())); log.info("====>connection:[{}]", channel.getConnection()); String base64Str = new String(Base64.getEncoder().encode(message.getBody())); //處理業務邏輯 log.info("調用baohe 回調接口處理結果res:{}", base64Str); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { log.error("CrossborderReceiptListen onMessage error", e); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } }
啟動類
注意這里有個坑,因為spring boot自動配置已經加載了RabbitAutoConfiguration類,spring boot也會加載該類,就會與自己手動注冊Rabbitmq沖突,因此必須排除該類,如果不排除,就會發生如下錯誤:
Description: Parameter 1 of method rabbitTemplate in org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration$RabbitTemplateConfiguration required a single bean, but 3 were found: - rabbitConnectionFactory: defined by method 'rabbitConnectionFactory' in class path resource [org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration$RabbitConnectionFactoryCreator.class] - queueOneKeyConnectionFactory: defined in null - queueTwoKeyConnectionFactory: defined in null Action: Consider marking one of the beans as @Primary, updating the consumer to accept multiple beans, or using @Qualifier to identify the bean that should be consumed
@SpringBootApplication(exclude = RabbitAutoConfiguration.class) public class RabbtiMqBootstrap { public static void main(String[] args) { SpringApplication.run(RabbtiMqBootstrap.class); } }
啟動測試
訪問http://localhost:8080/send
效果如下
訪問http://localhost:8080/send2
效果如下:
測試成功,哈哈