在上一篇我們提到了如何在Springboot項目中搭建單個Rabbitmq,但是當我們遇到多數據源Rabbitmq的時候,需要怎么做呢?
我們首先看application.yml文件
spring: rabbitmq: rabbitmq: cart-order: virtual-host: / host: localhost port: 5672 username: guest password: guest order-adaptor: virtual-host: / host: localhost port: 5672 username: test password: 123456 listener: simple: concurrency: 10 max-concurrency: 20 prefetch: 5 mq: env: test cart: place: order: queue: ${mq.env}.cart.place.order.queue exchange: ${mq.env}.cart.place.order.exchange routing: key: ${mq.env}.cart.place.order.routing.key ticketing: place: order: queue: ${mq.env}.ticketing.place.order.queue exchange: ${mq.env}.ticketing.place.order.exchange routing: key: ${mq.env}.ticketing.place.order.routing.key
然后主要修改的部分還是在RabbitmqConfig的修改
@Configuration public class RabbitmqConfig { private static final Logger logger = LogManager.getLogger(RabbitmqConfig.class); @Autowired private Environment environment; @Autowired private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; @Autowired private ObjectMapper objectMapper; @Bean(name = "cartOrderCachingConnectionFactory") @Primary public CachingConnectionFactory cartOrderCachingConnectionFactory() { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setHost(environment.getProperty("spring.rabbitmq.cart-order.host")); cachingConnectionFactory.setPort(environment.getProperty("spring.rabbitmq.cart-order.port", int.class)); cachingConnectionFactory.setUsername(environment.getProperty("spring.rabbitmq.cart-order.username")); cachingConnectionFactory.setPassword(environment.getProperty("spring.rabbitmq.cart-order.password")); cachingConnectionFactory.setVirtualHost(environment.getProperty("spring.rabbitmq.cart-order.virtual-host")); cachingConnectionFactory.setPublisherReturns(true); cachingConnectionFactory.setPublisherConfirms(true); return cachingConnectionFactory; } @Bean(name = "orderAdaptorCachingConnectionFactory") public CachingConnectionFactory orderAdaptorCachingConnectionFactory() { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setHost(environment.getProperty("spring.rabbitmq.order-adaptor.host")); cachingConnectionFactory.setPort(environment.getProperty("spring.rabbitmq.order-adaptor.port", int.class)); cachingConnectionFactory.setUsername(environment.getProperty("spring.rabbitmq.order-adaptor.username")); cachingConnectionFactory.setPassword(environment.getProperty("spring.rabbitmq.order-adaptor.password")); cachingConnectionFactory.setVirtualHost(environment.getProperty("spring.rabbitmq.order-adaptor.virtual-host")); cachingConnectionFactory.setPublisherReturns(true); cachingConnectionFactory.setPublisherConfirms(true); return cachingConnectionFactory; } /** * singleton can't set multi times callback * @return */ @Bean(name = "cartOrderRabbitTemplate") @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Primary public RabbitTemplate cartOrderRabbitTemplate(@Qualifier("cartOrderCachingConnectionFactory")CachingConnectionFactory cachingConnectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { logger.info("cartOrder message send succeed:correlationData({}),ack({}),cause({})",correlationData, ack, cause); } else { logger.info("cartOrder message send failed:correlationData({}),ack({}),cause({})",correlationData, ack, cause); } }); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { logger.info("cartOrder message lose:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message); }); return rabbitTemplate; } /** * singleton can't set multi times callback * @return */ @Bean(name = "orderAdaptorRabbitTemplate") @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate orderAdaptorRabbitTemplate(@Qualifier("orderAdaptorCachingConnectionFactory")CachingConnectionFactory cachingConnectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setRoutingKey(""); rabbitTemplate.setDefaultReceiveQueue(""); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { logger.info("orderAdaptor message send succeed:correlationData({}),ack({}),cause({})",correlationData, ack, cause); } else { logger.info("orderAdaptor message send failed:correlationData({}),ack({}),cause({})",correlationData, ack, cause); } }); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { logger.info("orderAdaptor message lose:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message); }); return rabbitTemplate; } @Bean(name = "cartOrderSingleListenerContainer") @Primary public SimpleRabbitListenerContainerFactory cartOrderSingleListenerContainer(SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer, @Qualifier("cartOrderCachingConnectionFactory")CachingConnectionFactory cachingConnectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(cachingConnectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(1); factory.setMaxConcurrentConsumers(1); factory.setPrefetchCount(1); factory.setTxSize(1); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } @Bean(name = "cartOrderMultiListenerContainer") @Primary public SimpleRabbitListenerContainerFactory cartOrderMultiListenerContainer(SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer, @Qualifier("cartOrderCachingConnectionFactory")CachingConnectionFactory cachingConnectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factoryConfigurer.configure(factory, cachingConnectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(environment.getProperty("spring.rabbitmq.listener.simple.concurrency", int.class)); factory.setMaxConcurrentConsumers(environment.getProperty("spring.rabbitmq.listener.simple.max-concurrency", int.class)); factory.setPrefetchCount(environment.getProperty("spring.rabbitmq.listener.simple.prefetch", int.class)); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } @Bean(name = "orderAdaptorSingleListenerContainer") public SimpleRabbitListenerContainerFactory orderAdaptorSingleListenerContainer(@Qualifier("orderAdaptorCachingConnectionFactory")CachingConnectionFactory cachingConnectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(cachingConnectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(1); factory.setMaxConcurrentConsumers(1); factory.setPrefetchCount(1); factory.setTxSize(1); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } @Bean(name = "orderAdaptorMultiListenerContainer") public SimpleRabbitListenerContainerFactory orderAdaptorMultiListenerContainer(@Qualifier("orderAdaptorCachingConnectionFactory")CachingConnectionFactory cachingConnectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factoryConfigurer.configure(factory, cachingConnectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(environment.getProperty("spring.rabbitmq.listener.simple.concurrency", int.class)); factory.setMaxConcurrentConsumers(environment.getProperty("spring.rabbitmq.listener.simple.max-concurrency", int.class)); factory.setPrefetchCount(environment.getProperty("spring.rabbitmq.listener.simple.prefetch", int.class)); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } /** * Message persistent: Set the deliveryMode of the message to 2 and the consumer can continue to consume * the messages after persistence after restarting; * Use convertAndSend to send a message. The message is persistent by default. The following is the source code: * new MessageProperties() --> DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT --> deliveryMode = 2; * * @param exchangeName * @param routingKeyName * @param content * @param flag * @param messageId * @param <T> */ public <T> void sendMessage(RabbitTemplate rabbitTemplate, String exchangeName, String routingKeyName, T content, boolean flag, String messageId) { logger.info("message send :messageId({}), exchangeName({}), routingKeyName({}), content({}), flag({})", messageId, exchangeName, routingKeyName, content); CorrelationData correlationData = new CorrelationData(); MessageProperties properties = new MessageProperties(); properties.setContentType("application/json"); try { if (flag) { properties.setCorrelationId(messageId); correlationData.setId(messageId); rabbitTemplate.convertAndSend(exchangeName, routingKeyName, MessageBuilder.withBody(objectMapper.writeValueAsBytes(content)).andProperties(properties).build(), correlationData); } else { rabbitTemplate.convertAndSend(exchangeName, routingKeyName, MessageBuilder.withBody(objectMapper.writeValueAsBytes(content)).andProperties(properties).build(), correlationData); } } catch (Exception e) { logger.error("error message :e.getMessage({})", e.getMessage()); } } }
從代碼中就可以看到我們定義了多個連接,@Primary表示我們默認的連接。因為在單個數據源的情況下,我們可以使用默認的加載方式讀取到mq的配置,但是多數據源的情況下我們就必須明確的指出每個bean對應的配置詳情了。但是到這一步我們仍然會出現問題,那就是我們的queue和exchange的綁定仍然是默認的。單數據源的時候我們是默認綁定到單獨的mq上,現在我們需要動態的分配它們的綁定信息,所以QueueConfig方式就不能滿足我們的需求了,於是我采用了RabbitAdmin這個類動態的幫助我綁定queue和exchange到指定的mq上。
@Configuration public class CartOrderRabbitAdminConfig { private static final Logger logger = LogManager.getLogger(CartOrderRabbitAdminConfig.class); @Autowired private Environment environment; @Bean public RabbitAdmin cartOrderRabbitAdmin(@Qualifier("cartOrderCachingConnectionFactory")CachingConnectionFactory cachingConnectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(cachingConnectionFactory); rabbitAdmin.setAutoStartup(true);//place cart order Queue placeCartOrderQueue = RabbitmqUtil.createQueue(environment.getProperty("cart.place.order.queue")); DirectExchange placeCartOrderExchange = RabbitmqUtil.createExchange(environment.getProperty("cart.place.order.exchange")); Binding placeCartOrderBinding = RabbitmqUtil.createBinding(placeCartOrderQueue, placeCartOrderExchange, environment.getProperty("cart.place.order.routing.key")); RabbitmqUtil.createRabbitAdmin(placeCartOrderQueue, placeCartOrderExchange, placeCartOrderBinding, rabbitAdmin); // return rabbitAdmin; }
@Configuration public class TicketOrderRabbitAdminConfig { private static final Logger logger = LogManager.getLogger(TicketOrderRabbitAdminConfig.class); @Autowired private Environment environment; @Bean public RabbitAdmin ticketOrderRabbitAdmin(@Qualifier("orderAdaptorCachingConnectionFactory")CachingConnectionFactory cachingConnectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(cachingConnectionFactory); rabbitAdmin.setAutoStartup(true); //place ticket order Queue createCartOrderQueue = RabbitmqUtil.createQueue(environment.getProperty("ticketing.place.order.queue")); DirectExchange createCartOrderExchange = RabbitmqUtil.createExchange(environment.getProperty("ticketing.place.order.exchange")); Binding createCartOrderBinding = RabbitmqUtil.createBinding(createCartOrderQueue, createCartOrderExchange, environment.getProperty("ticketing.place.order.routing.key")); RabbitmqUtil.createRabbitAdmin(createCartOrderQueue, createCartOrderExchange, createCartOrderBinding, rabbitAdmin); return rabbitAdmin; } }
public class RabbitmqUtil { public static DirectExchange createExchange (String exchangeName) { if(StringUtils.isNotBlank(exchangeName)) { return new DirectExchange(exchangeName, true, false); } return null; } public static Queue createQueue(String queueName) { if(StringUtils.isNotBlank(queueName)) { return new Queue(queueName, true); } return null; } public static Binding createBinding (Queue queueName, DirectExchange exchangeName, String routingKeyName) { if(Objects.nonNull(queueName) && Objects.nonNull(exchangeName) && StringUtils.isNotBlank(routingKeyName)) { return BindingBuilder.bind(queueName).to(exchangeName).with(routingKeyName); } return null; } public static void createRabbitAdmin(Queue queue, DirectExchange exchange, Binding binding, RabbitAdmin rabbitAdmin) { rabbitAdmin.declareQueue(queue); rabbitAdmin.declareExchange(exchange); rabbitAdmin.declareBinding(binding); } }
如上,我們就可以就可以動態綁定我們queue和exchange到目標mq上了,生產者和消費者和單數據源的情況下沒有很大的區別。