SpringBoot連接多RabbitMQ源
在實際開發中,很多場景需要異步處理,這時就需要用到RabbitMQ,而且隨着場景的增多程序可能需要連接多個RabbitMQ。SpringBoot本身提供了默認的配置可以快速配置連接RabbitMQ,但是只能連接一個RabbitMQ,當需要連接多個RabbitMQ時,默認的配置就不太適用了,需要單獨編寫每個連接。
在SpringBoot框架中,我們常用的兩個類一般是:
RabbitTemplate
:作為生產、消費消息使用;RabbitAdmin
:作為申明、刪除交換機和隊列,綁定和解綁隊列和交換機的綁定關系使用。
所以我們連接多個RabbitMQ就需要重新建立連接、重新實現這兩個類。
代碼如下:
配置
application.properties
配置文件需要配置兩個連接:
server.port=8080
# rabbitmq
v2.spring.rabbitmq.host=host
v2.spring.rabbitmq.port=5672
v2.spring.rabbitmq.username=username
v2.spring.rabbitmq.password=password
v2.spring.rabbitmq.virtual-host=virtual-host
#consume 手動 ack
v2.spring.rabbitmq.listener.simple.acknowledge-mode=manual
#1.當mandatory標志位設置為true時,
# 如果exchange根據自身類型和消息routingKey無法找到一個合適的queue存儲消息,
# 那么broker會調用basic.return方法將消息返還給生產者;
#2.當mandatory設置為false時,出現上述情況broker會直接將消息丟棄;通俗的講,
# mandatory標志告訴broker代理服務器至少將消息route到一個隊列中,
# 否則就將消息return給發送者;
v2.spring.rabbitmq.template.mandatory=true
#publisher confirms 發送確認
v2.spring.rabbitmq.publisher-confirms=true
#returns callback :
# 1.未送達exchange
# 2.送達exchange卻未送道queue的消息 回調returnCallback.(注意)出現2情況時,publisher-confirms 回調的是true
v2.spring.rabbitmq.publisher-returns=true
v2.spring.rabbitmq.listener.simple.prefetch=5
# rabbitmq
v1.spring.rabbitmq.host=host
v1.spring.rabbitmq.port=5672
v1.spring.rabbitmq.username=username
v1.spring.rabbitmq.password=password
v1.spring.rabbitmq.virtual-host=virtual-host
#consume 手動 ack
v1.spring.rabbitmq.listener.simple.acknowledge-mode=manual
#1.當mandatory標志位設置為true時,
# 如果exchange根據自身類型和消息routingKey無法找到一個合適的queue存儲消息,
# 那么broker會調用basic.return方法將消息返還給生產者;
#2.當mandatory設置為false時,出現上述情況broker會直接將消息丟棄;通俗的講,
# mandatory標志告訴broker代理服務器至少將消息route到一個隊列中,
# 否則就將消息return給發送者;
v1.spring.rabbitmq.template.mandatory=true
#publisher confirms 發送確認
v1.spring.rabbitmq.publisher-confirms=true
#returns callback :
# 1.未送達exchange
# 2.送達exchange卻未送道queue的消息 回調returnCallback.(注意)出現2情況時,publisher-confirms 回調的是true
v1.spring.rabbitmq.publisher-returns=true
v1.spring.rabbitmq.listener.simple.prefetch=5
重寫連接工廠
需要注意的是,在多源的情況下,需要在某個連接加上@Primary
注解,表示主連接,默認使用這個連接
package com.example.config.rabbitmq;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
/**
* Created by shuai on 2019/4/23.
*/
@Configuration
public class MultipleRabbitMQConfig {
@Bean(name = "v2ConnectionFactory")
public CachingConnectionFactory hospSyncConnectionFactory(
@Value("${v2.spring.rabbitmq.host}") String host,
@Value("${v2.spring.rabbitmq.port}") int port,
@Value("${v2.spring.rabbitmq.username}") String username,
@Value("${v2.spring.rabbitmq.password}") String password,
@Value("${v2.spring.rabbitmq.virtual-host}") String virtualHost,
@Value("${v2.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
@Value("${v2.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(publisherConfirms);
connectionFactory.setPublisherReturns(publisherReturns);
return connectionFactory;
}
@Bean(name = "v2RabbitTemplate")
public RabbitTemplate firstRabbitTemplate(
@Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
@Value("${v2.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
RabbitTemplate v2RabbitTemplate = new RabbitTemplate(connectionFactory);
v2RabbitTemplate.setMandatory(mandatory);
v2RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
if (!ack) {
// LOGGER.info("{} 發送RabbitMQ消息 ack確認 失敗: [{}]", this.name, JSON.toJSONString(object));
} else {
// LOGGER.info("{} 發送RabbitMQ消息 ack確認 成功: [{}]", this.name, JSON.toJSONString(object));
}
});
v2RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
// LOGGER.error("{} 發送RabbitMQ消息returnedMessage,出現異常,Exchange不存在或發送至Exchange卻沒有發送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});
});
return v2RabbitTemplate;
}
@Bean(name = "v2ContainerFactory")
public SimpleRabbitListenerContainerFactory hospSyncFactory(
@Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
@Value("${v2.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
@Value("${v2.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
factory.setPrefetchCount(prefetch);
return factory;
}
@Bean(name = "v2RabbitAdmin")
public RabbitAdmin iqianzhanRabbitAdmin(
@Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
// mq主連接
@Bean(name = "v1ConnectionFactory")
@Primary
public CachingConnectionFactory publicConnectionFactory(
@Value("${v1.spring.rabbitmq.host}") String host,
@Value("${v1.spring.rabbitmq.port}") int port,
@Value("${v1.spring.rabbitmq.username}") String username,
@Value("${v1.spring.rabbitmq.password}") String password,
@Value("${v1.spring.rabbitmq.virtual-host}") String virtualHost,
@Value("${v1.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
@Value("${v1.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(publisherConfirms);
connectionFactory.setPublisherReturns(publisherReturns);
return connectionFactory;
}
@Bean(name = "v1RabbitTemplate")
@Primary
public RabbitTemplate publicRabbitTemplate(
@Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory,
@Value("${v1.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
RabbitTemplate v1RabbitTemplate = new RabbitTemplate(connectionFactory);
v1RabbitTemplate.setMandatory(mandatory);
v1RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
if (!ack) {
// LOGGER.info("{} 發送RabbitMQ消息 ack確認 失敗: [{}]", this.name, JSON.toJSONString(object));
} else {
// LOGGER.info("{} 發送RabbitMQ消息 ack確認 成功: [{}]", this.name, JSON.toJSONString(object));
}
});
v1RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
// LOGGER.error("{} 發送RabbitMQ消息returnedMessage,出現異常,Exchange不存在或發送至Exchange卻沒有發送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});
});
return v1RabbitTemplate;
}
@Bean(name = "v1ContainerFactory")
@Primary
public SimpleRabbitListenerContainerFactory insMessageListenerContainer(
@Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory,
@Value("${v1.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
@Value("${v1.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
factory.setPrefetchCount(prefetch);
return factory;
}
@Bean(name = "v1RabbitAdmin")
@Primary
public RabbitAdmin publicRabbitAdmin(
@Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
創建Exchange、Queue並綁定
再實現RabbitAdmin
后,我們就需要根據RabbitAdmin
創建對應的交換機和隊列,並建立綁定關系
package com.example.config.rabbitmq;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* 創建Queue、Exchange並建立綁定關系
* Created by shuai on 2019/5/16.
*/
@Configuration
public class MyRabbitMQCreateConfig {
@Resource(name = "v2RabbitAdmin")
private RabbitAdmin v2RabbitAdmin;
@Resource(name = "v1RabbitAdmin")
private RabbitAdmin v1RabbitAdmin;
@PostConstruct
public void RabbitInit() {
v2RabbitAdmin.declareExchange(new TopicExchange("exchange.topic.example.new", true, false));
v2RabbitAdmin.declareQueue(new Queue("queue.example.topic.new", true));
v2RabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("queue.example.topic.new", true)) //直接創建隊列
.to(new TopicExchange("exchange.topic.example.new", true, false)) //直接創建交換機 建立關聯關系
.with("routing.key.example.new")); //指定路由Key
}
}
生產者
為了后續驗證每個連接都建立成功,並且都能生產消息,生產者這里分別使用新生成的RabbitTemplate
發送一條消息。
package com.example.topic;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class TopicProducer {
@Resource(name = "v1RabbitTemplate")
private RabbitTemplate v1RabbitTemplate;
@Resource(name = "v2RabbitTemplate")
private RabbitTemplate v2RabbitTemplate;
public void sendMessageByTopic() {
String content1 = "This is a topic type of the RabbitMQ message example from v1RabbitTemplate";
v1RabbitTemplate.convertAndSend(
"exchange.topic.example.new",
"routing.key.example.new",
content1);
String content2 = "This is a topic type of the RabbitMQ message example from v2RabbitTemplate";
v2RabbitTemplate.convertAndSend(
"exchange.topic.example.new",
"routing.key.example.new",
content2);
}
}
消費者
這里需要注意在配置消費隊列時,需要標識ContainerFactory
package com.example.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "queue.example.topic.new", containerFactory = "v2ContainerFactory")
public class TopicConsumer {
@RabbitHandler
public void consumer(String message) {
System.out.println(message);
}
}
這樣就完成了SpringBoot連接多個RabbitMQ源的示例了,再寫一段測試代碼驗證下。
測試驗證
package com.example.test;
import com.example.topic.TopicProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQMultipleTest {
@Autowired
private TopicProducer topicProducer;
@Test
public void topicProducerTest() {
topicProducer.sendMessageByTopic();
}
}
執行測試代碼,驗證結果為:
驗證SpringBoot連接多RabbitMQ源成功!