RabbitMQ實戰應用技巧


1. RabbitMQ實戰應用技巧

1.1. 前言

由於項目原因,之后會和RabbitMQ比較多的打交道,所以讓我們來好好整理下RabbitMQ的應用實戰技巧,盡量避免日后的采坑

1.2. 概述

RabbitMQ有幾個重要的概念:虛擬主機,交換機,隊列和綁定

  • 虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定,我們可以從虛擬主機層面的顆粒度進行權限控制
  • 交換機:Exchange用於轉發消息,它並不存儲消息,如果沒有Queue隊列綁定到Exchange,它會直接丟棄掉生產者發來的數據。
    交換機還有個關聯的重要概念:路由鍵,消息轉發到哪個隊列根據路由鍵決定
  • 綁定:就是綁定交換機和隊列,它是多對多的關系,也就是說多個交換機可以綁同一個隊列,也可以一個交換機綁多個隊列

1.3. 交換機

交換機有四種類型的模式Direct, topic, Headers and Fanout

1.3.1. Direct Exchage

Direct模式使用的是RabbitMQ的默認交換機,也是最簡單的模式,適合比較簡單的場景

如下圖所示,使用Direct模式,我們需要創建不同的隊列,而默認交換機則通過Routing key路由鍵的值來決定轉發到哪個隊列,可以看到,路由鍵綁定隊列是可以指定多個的

1.3.2. Topic Exchange

Topic模式主要是根據通配符匹配,也就類似於模糊匹配,當這種匹配模式和路由鍵匹配后交換機就能轉發消息到指定隊列

  • 路由鍵為一串字符串,由句號(.)隔開,比如a.b.c
  • *)代表指定位置一個單詞,(#)代表零個或者多個單詞,比如a.*.b.#,表示a和b中間隨意填個單詞,b后面可以跟n個單詞,比如a.x.b.c.d.e

Topic模式和Direct模式的區別在於交換機需要自己指定,路由鍵支持模糊匹配,例如:

rabbitTemplate.convertAndSend("topicExchange","a.x.b.d", " hello world!");

1.3.3. Headers Exchage

Headers也是根據規則匹配,但它不是根據路由鍵了,headers有個自定義匹配規則,它將匹配鍵值設在了消息的headers屬性上,當這些鍵值對有一對或者全部匹配時,消息才會被投遞到對應隊列,這種模式效率相對較低,一般不推薦使用

1.3.4. Fanout Exchange

Fanout即為大名鼎鼎的廣播模式了,它不需要管路由鍵,會把消息發給綁定它的全部隊列,就算配置了路由鍵也會被忽略

1.4. 復雜情況

  1. 首先我們Direct模式,一個生產者一個消費者的情況,也就對應了一個發送者和一個隊列A接收,這是沒有疑問的,發送什么接收什么
  2. 當Direct模式,一個生產者發消息,開啟多個消費者也就是多個相同queue,此時消息由多個消費者均勻分攤,不會重復消費(前提ack正常)
  3. 當Topic模式,一個交換機綁定兩個隊列,路由鍵有重疊關系,如下代碼,此時指定路由鍵topic.message發送消息,隊列queueMessagequeueMessages都能接收到相同消息,也就是說,topic模式可以實現類似於廣播模式的形式,甚至更加靈活,它能否轉發到消息由路由鍵決定。
  4. 相比於Fanout模式,我們如果要對消費者隊列分組發送,我們需要指定不同的路由鍵;而Fanout模式則需要指定不同的交換機和隊列綁定,實際使用結合實際情況
@Configuration
public class TopicRabbitConfig {

    final static String message = "topic.message";
    final static String messages = "topic.messages";

    @Bean
    public Queue queueMessage() {
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(TopicRabbitConfig.messages);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}

1.5. springboot配置

我們的常用配置如下

spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=1000
##設置監聽限制:最大10,默認5
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.listener.simple.acknowledge-mode=manual

其中最后四條配置需要着重解釋:

  • spring.rabbitmq.publisher-confirms為true,表示生產者消息發出后,MQ的broker接收到了消息,發送回執表示確認接收,不設置則可能導致消息丟失
  • spring.rabbitmq.publisher-returns為true,表示當消息不能到達MQ的Broker端,,則使用監聽器對不可達的消息做后續處理,這種一般是路由鍵沒配好,或MQ宕機才可能發生
  • spring.rabbitmq.template.mandatory當上面兩個為true時,這個一定要配true,否則上面兩個不起作用
  • spring.rabbitmq.listener.simple.acknowledge-mode這個為manual表示手工確認,實際生產應該設為手工,才能保證你的業務是處理完成的,注意業務的冪等性,可重復調用,手工確認代碼如下例子
@Component
public class RabbitReceiver {

	
	@RabbitListener(bindings = @QueueBinding(
			value = @Queue(value = "queue-1", 
			durable="true"),
			exchange = @Exchange(value = "exchange-1", 
			durable="true", 
			type= "topic", 
			ignoreDeclarationExceptions = "true"),
			key = "springboot.*"
			)
	)
	@RabbitHandler
	public void onMessage(Message message, Channel channel) throws Exception {
		System.err.println("--------------------------------------");
		System.err.println("消費端Payload: " + message.getPayload());
		Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
		//手工ACK,獲取deliveryTag
		channel.basicAck(deliveryTag, false);
	}
}

1.6. 隊列屬性

  1. queue : 隊列的名字

  2. durable : 為true表示隊列中數據持久化到磁盤,可以防止mq宕機重啟數據丟失

  3. exclusive : 為true表示排他性,只允許一個當前連接訪問該隊列,當前已連接就不允許新的連接進入否則報錯,當連接斷開當前隊列會銷毀

  4. autoDelete : 為true表示自動刪除,當沒有Connection連接到隊列的時候,會自動刪除

  5. arguments : 這個參數用來添加一些額外參數的,如下圖片

    • 比如添加x-message-ttl為5000,則表示消息超過5秒沒被處理就會超時過期;
    • x-expires設置120000表示隊列在2分鍾內沒被消費則被刪除;
    • x-max-length,x-max-length-bytes表示傳送數據的最大長度和字節數
    • x-dead-letter-exchangex-dead-letter-routing-key表示死信交換機和死信路由,放在需要過期或處理失敗的隊列屬性中,這些數據會轉發到死信隊列存儲起來,創建普通的交換機和隊列綁定,把交換機名填到x-dead-letter-exchange的值,填寫路由鍵要符合死信隊列的路由鍵
    • x-max-priority,表示設置優先級,范圍為0~255,只有當消息堆積的時候,這個優先級才有意義,數字越大優先級越高
    • x-queue-mode當為lazy,表示惰性隊列,3.6.0之后才被引入的概念,相比默認的模式,惰性隊列模式會將生產者產生的消息直接存到磁盤中,這當然會增加IO開銷,但適合應對大量消息堆積的情況;因為當大量消息堆積時,內存也不夠存放,會將消息轉存到磁盤,這個過程也是比較耗時且過程中不能接收新的消息。如果需要將普通隊列轉換成惰性隊列需要將原來的隊列刪除,重新創建個惰性隊列綁定。

1.7. 交換機屬性

  1. exchange : 交換機名稱

  2. type : 交換機類型

  3. durable : 持久化,同隊列

  4. autoDelete : 是否自動刪除,同隊列

  5. internal : 若為true,表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定。

  6. arguments : 額外參數,目前只有個alternate-exchange,表示當生產者發送消息到這個交換機,路由不到該交換機的隊列,則會嘗試這個參數指定的交換機進行路由,若路由鍵匹配,則路由到alternate-exchange指定的隊列,相當於轉發了,剛好和上一個參數internal配合,若不想本交換機起到路由隊列的作用,可以設置internal為true,把消息都轉發到alternate-exchange指定的交換機,由該交換機來路由指定隊列,

    • 如下圖:exchange0設置了alternate-exchange交換機為exchange1,生產者發送數據到exchange0路由鍵為test1,在exchange0路由不到,則轉發到exchange1判斷路由符合,發送到隊列queue1

1.8. 磁盤和內存

在RabbitMQ的管理界面,當我們集群部署時可以看到Nodes節點中Info字段可能為disc也可能ram,表示了磁盤存儲或內存儲存。事實上,在集群部署的時候,我們至少要一個磁盤儲存,它代表了將交換機,隊列,綁定,用戶等元數據持久化保存到磁盤,一遍重啟RabbitMQ也能恢復到原先的狀態,當只有一個節點時,必定是磁盤存儲;而內存儲存也有它的優勢,就是效率更高速度更快

1.9. 報錯案例

  • 當報下列錯誤,表示你一定存在排他性隊列,也就是設置了exclusive屬性的隊列,由於同一個連接創建的不同通道可以訪問同一個隊列,此時由於這個排他屬性會得到資源被鎖定錯誤,也就是下列的錯誤。
  • 由此我們可以知道,若你把隊列設置成了exclusive屬性的,那么就別創建新的連接去訪問同一個隊列
ESOURCE_LOCKED - cannot obtain exclusive access to locked queue xxxxxx

今日教學視頻:RabbitMQ消息隊列從入門到精通,長按圖片到百度雲

RabbitMQ消息隊列從入門到精通

老梁講Java

歡迎關注公眾號,一起學習進步


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM