1. RabbitMQ實戰應用技巧
1.1. 前言
由於項目原因,之后會和RabbitMQ比較多的打交道,所以讓我們來好好整理下RabbitMQ的應用實戰技巧,盡量避免日后的采坑
1.2. 概述
RabbitMQ有幾個重要的概念:虛擬主機,交換機,隊列和綁定
- 虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定,我們可以從虛擬主機層面的顆粒度進行權限控制
- 交換機:Exchange用於轉發消息,它並不存儲消息,如果沒有Queue隊列綁定到Exchange,它會直接丟棄掉生產者發來的數據。
交換機還有個關聯的重要概念:路由鍵,消息轉發到哪個隊列根據路由鍵決定 - 綁定:就是綁定交換機和隊列,它是多對多的關系,也就是說多個交換機可以綁同一個隊列,也可以一個交換機綁多個隊列
1.3. 交換機
交換機有四種類型的模式Direct, topic, Headers and Fanout
1.3.1. Direct Exchage
Direct模式使用的是RabbitMQ的默認交換機,也是最簡單的模式,適合比較簡單的場景
如下圖所示,使用Direct模式,我們需要創建不同的隊列,而默認交換機則通過Routing key
路由鍵的值來決定轉發到哪個隊列,可以看到,路由鍵綁定隊列是可以指定多個的
1.3.2. Topic Exchange
Topic模式主要是根據通配符匹配,也就類似於模糊匹配,當這種匹配模式和路由鍵匹配后交換機就能轉發消息到指定隊列
- 路由鍵為一串字符串,由句號(
.
)隔開,比如a.b.c
- (
*
)代表指定位置一個單詞,(#
)代表零個或者多個單詞,比如a.*.b.#
,表示a和b中間隨意填個單詞,b后面可以跟n個單詞,比如a.x.b.c.d.e
Topic模式和Direct模式的區別在於交換機需要自己指定,路由鍵支持模糊匹配,例如:
rabbitTemplate.convertAndSend("topicExchange","a.x.b.d", " hello world!");
1.3.3. Headers Exchage
Headers也是根據規則匹配,但它不是根據路由鍵了,headers有個自定義匹配規則,它將匹配鍵值設在了消息的headers屬性上,當這些鍵值對有一對或者全部匹配時,消息才會被投遞到對應隊列,這種模式效率相對較低,一般不推薦使用
1.3.4. Fanout Exchange
Fanout即為大名鼎鼎的廣播模式了,它不需要管路由鍵,會把消息發給綁定它的全部隊列,就算配置了路由鍵也會被忽略
1.4. 復雜情況
- 首先我們Direct模式,一個生產者一個消費者的情況,也就對應了一個發送者和一個隊列A接收,這是沒有疑問的,發送什么接收什么
- 當Direct模式,一個生產者發消息,開啟多個消費者也就是多個相同queue,此時消息由多個消費者均勻分攤,不會重復消費(前提ack正常)
- 當Topic模式,一個交換機綁定兩個隊列,路由鍵有重疊關系,如下代碼,此時指定路由鍵
topic.message
發送消息,隊列queueMessage
和queueMessages
都能接收到相同消息,也就是說,topic模式可以實現類似於廣播模式的形式,甚至更加靈活,它能否轉發到消息由路由鍵決定。 - 相比於Fanout模式,我們如果要對消費者隊列分組發送,我們需要指定不同的路由鍵;而Fanout模式則需要指定不同的交換機和隊列綁定,實際使用結合實際情況
@Configuration
public class TopicRabbitConfig {
final static String message = "topic.message";
final static String messages = "topic.messages";
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("exchange");
}
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
1.5. springboot配置
我們的常用配置如下
spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=1000
##設置監聽限制:最大10,默認5
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.listener.simple.acknowledge-mode=manual
其中最后四條配置需要着重解釋:
spring.rabbitmq.publisher-confirms
為true,表示生產者消息發出后,MQ的broker接收到了消息,發送回執表示確認接收,不設置則可能導致消息丟失spring.rabbitmq.publisher-returns
為true,表示當消息不能到達MQ的Broker端,,則使用監聽器對不可達的消息做后續處理,這種一般是路由鍵沒配好,或MQ宕機才可能發生spring.rabbitmq.template.mandatory
當上面兩個為true時,這個一定要配true,否則上面兩個不起作用spring.rabbitmq.listener.simple.acknowledge-mode
這個為manual
表示手工確認,實際生產應該設為手工,才能保證你的業務是處理完成的,注意業務的冪等性,可重復調用,手工確認代碼如下例子
@Component
public class RabbitReceiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-1",
durable="true"),
exchange = @Exchange(value = "exchange-1",
durable="true",
type= "topic",
ignoreDeclarationExceptions = "true"),
key = "springboot.*"
)
)
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消費端Payload: " + message.getPayload());
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK,獲取deliveryTag
channel.basicAck(deliveryTag, false);
}
}
1.6. 隊列屬性
-
queue : 隊列的名字
-
durable : 為true表示隊列中數據持久化到磁盤,可以防止mq宕機重啟數據丟失
-
exclusive : 為true表示排他性,只允許一個當前連接訪問該隊列,當前已連接就不允許新的連接進入否則報錯,當連接斷開當前隊列會銷毀
-
autoDelete : 為true表示自動刪除,當沒有Connection連接到隊列的時候,會自動刪除
-
arguments : 這個參數用來添加一些額外參數的,如下圖片
- 比如添加
x-message-ttl
為5000,則表示消息超過5秒沒被處理就會超時過期; x-expires
設置120000表示隊列在2分鍾內沒被消費則被刪除;x-max-length
,x-max-length-bytes
表示傳送數據的最大長度和字節數x-dead-letter-exchange
,x-dead-letter-routing-key
表示死信交換機和死信路由,放在需要過期或處理失敗的隊列屬性中,這些數據會轉發到死信隊列存儲起來,創建普通的交換機和隊列綁定,把交換機名填到x-dead-letter-exchange
的值,填寫路由鍵要符合死信隊列的路由鍵x-max-priority
,表示設置優先級,范圍為0~255,只有當消息堆積的時候,這個優先級才有意義,數字越大優先級越高x-queue-mode
當為lazy
,表示惰性隊列,3.6.0之后才被引入的概念,相比默認的模式,惰性隊列模式會將生產者產生的消息直接存到磁盤中,這當然會增加IO開銷,但適合應對大量消息堆積的情況;因為當大量消息堆積時,內存也不夠存放,會將消息轉存到磁盤,這個過程也是比較耗時且過程中不能接收新的消息。如果需要將普通隊列轉換成惰性隊列需要將原來的隊列刪除,重新創建個惰性隊列綁定。
- 比如添加
1.7. 交換機屬性
-
exchange : 交換機名稱
-
type : 交換機類型
-
durable : 持久化,同隊列
-
autoDelete : 是否自動刪除,同隊列
-
internal : 若為true,表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定。
-
arguments : 額外參數,目前只有個
alternate-exchange
,表示當生產者發送消息到這個交換機,路由不到該交換機的隊列,則會嘗試這個參數指定的交換機進行路由,若路由鍵匹配,則路由到alternate-exchange
指定的隊列,相當於轉發了,剛好和上一個參數internal
配合,若不想本交換機起到路由隊列的作用,可以設置internal
為true,把消息都轉發到alternate-exchange
指定的交換機,由該交換機來路由指定隊列,- 如下圖:
exchange0
設置了alternate-exchange
交換機為exchange1
,生產者發送數據到exchange0
路由鍵為test1
,在exchange0
路由不到,則轉發到exchange1
判斷路由符合,發送到隊列queue1
- 如下圖:
1.8. 磁盤和內存
在RabbitMQ的管理界面,當我們集群部署時可以看到Nodes節點中Info字段可能為disc
也可能ram
,表示了磁盤存儲或內存儲存。事實上,在集群部署的時候,我們至少要一個磁盤儲存,它代表了將交換機,隊列,綁定,用戶等元數據持久化保存到磁盤,一遍重啟RabbitMQ也能恢復到原先的狀態,當只有一個節點時,必定是磁盤存儲;而內存儲存也有它的優勢,就是效率更高速度更快
1.9. 報錯案例
- 當報下列錯誤,表示你一定存在排他性隊列,也就是設置了
exclusive
屬性的隊列,由於同一個連接創建的不同通道可以訪問同一個隊列,此時由於這個排他屬性會得到資源被鎖定錯誤,也就是下列的錯誤。 - 由此我們可以知道,若你把隊列設置成了
exclusive
屬性的,那么就別創建新的連接去訪問同一個隊列
ESOURCE_LOCKED - cannot obtain exclusive access to locked queue xxxxxx
今日教學視頻:RabbitMQ消息隊列從入門到精通,長按圖片到百度雲
歡迎關注公眾號,一起學習進步