yls
2020/5/10
Spring Boot整合rabbitmq
rabbitmq的基本概念和其它相關知識請自主去官網學習
rabbitmq官網,
本文只介紹rabbitmq在springboot中如何使用
添加依賴包
<!--rabbitmq客戶端 start-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
<!--rabbitmq客戶端 end-->
添加配置文件 application.yml
spring:
rabbitmq: #rabbit連接配置信息
host: 39.97.234.52
port: 5672
username: admin
password: admin
virtual-host: /vhost_1
rabbitmq五種模式的使用
1. 簡單隊列
- 創建消費者
@Component
public class MQ {
/**
* 簡單隊列
* autoDelete = "true" 表示沒有生產者和消費者連接時自動刪除
* durable = "true" 表示隊列持久化,默認就是 true
* @param msg
*/
@RabbitListener(queuesToDeclare = @Queue(value = "simpleQueue",autoDelete = "true", durable = "true"))
public void simpleQueue(String msg) {
System.out.println("接收 " + msg);
}
}
- 創建生產者
@SpringBootTest
public class RabbitTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void simpleQueue() {
rabbitTemplate.convertAndSend("simpleQueue", "this is simpleQueue")
System.out.println("simple success");
}
}
2. 工作隊列,實現了能者多勞
- 創建消費者
@Component
public class MQ {
/**
* 工作隊列,多個消費者消費一個隊列
* <p>
* AMQP默認實現消費者確認模式,原文如下
* It's a common mistake to miss the basicAck and Spring AMQP helps to avoid this through its default configuration.
* The consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery),
* but RabbitMQ will eat more and more memory as it won't be able to release any unacked messages.
* <p>
* Fair dispatch vs Round-robin dispatching
* 官網說: AMQP默認實現消費者fair轉發,也就是能者多勞,原文如下(應該是說反了,默認的是250,但是是Round-robin dispatching)
* However, "Fair dispatch" is the default configuration for Spring AMQP.
* The AbstractMessageListenerContainer defines the value for DEFAULT_PREFETCH_COUNT to be 250.
* If the DEFAULT_PREFETCH_COUNT were set to 1 the behavior would be the round robin delivery as described above.
*/
//設置消費者的確認機制,並達到能者多勞的效果
@Bean("workListenerFactory")
public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory containerFactory =
new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
//自動ack,沒有異常的情況下自動發送ack
//auto 自動確認,默認是auto
//MANUAL 手動確認
//none 不確認,發完自動丟棄
containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
//拒絕策略,true回到隊列 false丟棄,默認是true
containerFactory.setDefaultRequeueRejected(true);
//默認的PrefetchCount是250,采用Round-robin dispatching,效率低
//setPrefetchCount 為 1,即可啟用fair 轉發
containerFactory.setPrefetchCount(1);
return containerFactory;
}
/**
* 若不使用自定義containerFactory = "workListenerFactory",默認的輪詢消費效率低
*
* @param s
*/
@RabbitListener(queuesToDeclare = @Queue("workQueue"), containerFactory = "workListenerFactory")
public void workQueue1(String s) {
System.out.println("workQueue 1 " + s);
}
@RabbitListener(queuesToDeclare = @Queue("workQueue"), containerFactory = "workListenerFactory")
public void workQueue2(String s) throws InterruptedException {
Thread.sleep(1000);
System.out.println("workQueue 2 " + s);
}
}
- 創建生產者
@SpringBootTest
public class RabbitTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void workQueue() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("workQueue", i);
}
System.out.println("workQueue success");
}
}
3. 訂閱模式
- 創建消費者
@Component
public class MQ {
/**
* 訂閱模式 fanout
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue, //臨時路由
exchange = @Exchange(value = "exchange1", type = ExchangeTypes.FANOUT))
})
public void fanout(String s) {
System.out.println("訂閱模式1 " + s);
}
@RabbitListener(bindings = {
@QueueBinding(value = @Queue, exchange = @Exchange(value = "exchange1", type = ExchangeTypes.FANOUT))
})
public void fanout2(String s) {
System.out.println("訂閱模式2 " + s);
}
}
- 創建生產者
@SpringBootTest
public class RabbitTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void fanOut() {
rabbitTemplate.convertAndSend("exchange1", "", "fan out......");
}
}
4. 路由模式
- 創建消費者
@Component
public class MQ {
/**
* 路由模式 DIRECT
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue, //臨時路由
exchange = @Exchange(value = "exchange2", type = ExchangeTypes.DIRECT),
key = {"error", "info"} //路由鍵
)
})
public void router(String s) {
System.out.println("路由模式1 " + s);
}
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = "exchange2", type = ExchangeTypes.DIRECT),
key = {"error"} //路由鍵
)
})
public void router2(String s) {
System.out.println("路由模式2 " + s);
}
}
- 創建生產者
@SpringBootTest
public class RabbitTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void router() {
rabbitTemplate.convertAndSend("exchange2", "info", "router");
System.out.println("router");
}
}
5. 主題模式
- 創建消費者
@Component
public class MQ {
/**
* topic topics
*/
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = "exchange3", type = ExchangeTypes.TOPIC),
key = {"user.#"} //路由鍵
)
})
public void topic2(String s) {
System.out.println("topic2 " + s);
}
}
- 創建生產者
@SpringBootTest
public class RabbitTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void topic() {
rabbitTemplate.convertAndSend("exchange3", "user.name", "hhh");
System.out.println("topic");
}
}
默認消息是持久化的,也可以設置不持久化,以簡單隊列示例
@SpringBootTest
public class RabbitTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* AMQP 默認消息是持久化的,但只有在隊列也是持久化時才有作用,原文如下:
* Messages are persistent by default with Spring AMQP.
* Note the queue the message will end up in needs to be durable as well,
* otherwise the message will not survive a broker restart as a non-durable queue does not itself survive a restart.
* <p>
* MessageProperties類中源碼如下:
* static {
* DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
* DEFAULT_PRIORITY = 0;
* }
* <p>
* 如何設置消息不持久化?
* 設置消息不持久化,默認是持久化的,這里只為記錄如何設置消息不持久化,一般不設置
* 發送消息時,添加 MessagePostProcessor即可,這里使用 lambda 表達式
* (message) -> {
* message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
* return message;
* }
* <p>
* 完整示例如下:
* rabbitTemplate.convertAndSend("simpleQueue", "this is simpleQueue",
* (message) -> {
* message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
* return message;
* });
*/
@Test
public void simpleQueue() {
rabbitTemplate.convertAndSend("simpleQueue", "this is simpleQueue",
(message) -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
return message;
});
System.out.println("simple success");
}
}
如何設置生產者消息確認,避免消息發送失敗而丟失(確認分為兩步,一是確認是否到達交換器,二是確認是否到達隊列。)
- 在配置文件中添加
spring:
rabbitmq: #rabbit連接配置信息
publisher-returns: true #開啟消息從 交換機----》隊列發送失敗的回調
publisher-confirm-type: correlated #開啟消息從 生產者----》交換機的回調
- 添加配置類
@Component
public class ProducerConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void pre() {
/**
*
* 消息發送到交換機的回調
*
* public void confirm(CorrelationData correlationData, boolean b, String s) {
*
* System.out.println("消息唯一標識:"+correlationData);
* System.out.println("確認結果:"+ b);
* System.out.println("失敗原因:"+ s);
* }
*
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("setConfirmCallback-------------------");
System.out.println("correlationData: " + correlationData);
System.out.println(ack);
System.out.println(cause);
if (ack) {
System.out.println("發送成功");
} else {
System.out.println("發送失敗");
// 可以記錄下來,也可以重新發送消息。。。
}
});
/**
*
* 消息從交換機發送到隊列的回調,只有發送失敗時才會回調
* public void returnedMessage(Message message, int i, String s, String s1, String s2) {
* System.out.println("消息主體 message : "+message);
* System.out.println("消息主體 message : "+ i);
* System.out.println("描述:"+ s);
* System.out.println("消息使用的交換器 exchange : "+ s1);
* System.out.println("消息使用的路由鍵 routing : "+ s2);
* }
*/
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
System.out.println("setReturnCallback---------------------");
System.out.println("消息主體 message : " + message);
System.out.println("響應碼 replyCode: " + replyCode);
System.out.println("響應內容 replyText:" + replyText);
System.out.println("消息使用的交換器 exchange : " + exchange);
System.out.println("消息使用的路由鍵 routeKey : " + routingKey);
//也可以重新發送消息
rabbitTemplate.convertAndSend(exchange, routingKey, new String(message.getBody()));
System.out.println("重新發送消息: -----" + new String(message.getBody()));
});
/**
* 網上都說必須設置rabbitTemplate.setMandatory(true),才能觸發ReturnCallback回調,
* 我嘗試了一下,並不需要設置為true,交換機發送消息給隊列失敗時,也能觸發回調
*/
//rabbitTemplate.setMandatory(true);
}
}
代碼github地址:https://github.com/1612480331/Spring-Boot-rabbitmq