RabbitMQ消息中間件的用法


1.什么是RabbitMQ

RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現。AMQP 的出現其實也是應了廣大人民群眾的需求,雖然在同步消息通訊的世界里有很多公開標准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在異步消息處理中卻不是這樣,只有大企業有一些商業實現(如微軟的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等聯合制定了 AMQP 的公開標准。 RabbitMQ是由RabbitMQ Technologies Ltd開發並且提供商業支持的。該公司在2010年4月被SpringSource(VMWare的一個部門)收購。在2013年5月被並入Pivotal。其實VMWare,Pivotal和EMC本質上是一家的。不同的是VMWare是獨立上市子公司,而Pivotal是整合了EMC的某些資源,現在並沒有上市。 RabbitMQ的官網是http://www.rabbitmq.com 百度百科amqp協議介紹https://baike.baidu.com/item/AMQP/8354716?fr=aladdin 注意:RabbitMQ是采用erlang語言開發的,所以必須有erlang環境才可以運行

2.為什么要使用MQ

 

3.常用消息中間件的對比

4.消息隊列RabbitMq的五種形式隊列

4.1.點對點(簡單)的隊列

點對點模式:一對一消費,一個生產者投遞消息給隊列,只能允許有一個消費者進行消費。

注意:如果消費集群的話,會進行均攤消費。前提是服務器的配置相同。

均攤消費的弊端:假如有2台服務器分別為A、B。如果每個消費處理消息的業務時間不相同的情況下,可能對消費者處理慢的服務器不公平(服務器壓力大),A處理比B處理時間快,應該A處理的消息多一些,B處理的消息少一些才合理。

隊列以先進先出原則進行存放消息集合。生產者投遞消息到隊列中。

當消費者啟動的時候,會與隊列服務器建立長連接,當生產者有消息投遞到隊列的時候,隊列會立刻將消息通知給消費者進行消費。

長連接的好處:如果是短鏈接的話,每次訪問都需要建立連接,比較占內存。建立長連接會減少三次握手,提高傳輸速度。

取消息隊列與推送消息隊列的區別:

取消息:生產者投遞消息到隊列中,隊列服務器緩存消息。這時候當消費者啟動的時候,消費者會去向隊列服務器中獲取消息。

推消息:當生產者和消費者都啟動的時候,生產者向隊列投遞消息,這時候隊列會將消息推送給消費者。

4.2.工作(公平性)隊列模式

公平隊列的原理:隊列服務器向消費者發送消息的時候,消費者采用手動應答模式,隊列服務器必須要收到消費者發送ack結果通知之后,才會繼續發送下一個消息。

4.3.發布訂閱模式

Direct exchange(直連交換機)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的。

發布訂閱實現流程:生產者投遞消息給交換機,交換機根據路由策略(routignKey)轉發到不同的隊列服務器中緩存,然后隊列服務器在推送消息給消費者進行消費或者消費者從隊列服務器中拉取消息進行消費。

發布訂閱實現原理:一對多。

這個隊列模式是消息隊列中最重要的隊列了,其他的都是在它的基礎上進行了擴展。 功能實現:一個生產者發送消息,多個消費者獲取消息(同樣的消息),包括一個生產者,一個交換機,多個隊列,多個消費者。

思路解讀(重點理解): 

1. 一個生產者,多個消費者

2. 每一個消費者都有自己的一個隊列

3. 生產者沒有直接發消息到隊列中,而是發送到交換機 

4. 每個消費者的隊列都綁定到交換機上

5. 消息通過交換機到達每個消費者的隊列 該模式就是Fanout Exchange(扇型交換機)將消息路由給綁定到它身上的所有隊列 以用戶發郵件案例講解

注意:交換機沒有存儲消息功能,如果消息發送沒有綁定消費隊列的交換機,消息則丟失。在消費者沒有啟動的情況下,生產者投遞消息到交換機,這時候交換機不知道把消息轉發給哪個消費者,所以消息會消失。因為交換機沒有緩存功能,只做轉發的功能。

使用場景:用戶注冊→發送郵件→發送短信。

 

4.4.路由模式RoutingKey

Direct exchange(直連交換機)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的。

生產者發送消息到交換機並指定一個路由key,消費者隊列綁定到交換機時要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)。

例如:我們可以把路由key設置為insert ,那么消費者隊列key指定包含insert才可以接收消息,消費者隊列key定義為update或者delete就不能接收消息。很好的控制了更新,插入和刪除的操作。 采用交換機direct模式

流程說明:如果生產者投遞消息到交換機(exchange),郵件隊列和短信隊列也都綁定了交換機(exchange)。但是當交換機的類型(type=direct)的時候,交換機的轉發(路由)由routingKey決定轉發給誰。如下如圖所示,當交換機的rontingKey=email的時候,消息將轉發到郵件隊列服務然后由郵件消費者進行消費。而短信隊列是都收不到消息的,因為短信的路由routingKey=msg。如果短信隊列也想收到消息就需要修改routingKey=email才可以收到消息。

這就是交換機類型type=direct的用法及特性。

 

4.5.通配符模式Topics

說明:此模式實在路由key模式的基礎上,使用了通配符來管理消費者接收消息。生產者P發送消息到交換機X,type=topic,交換機根據綁定隊列的routing key的值進行通配符匹配;

符號#匹配一個或者多個詞lazy.# 可以匹配lazy.irs或者lazy.irs.cor

符號*只能匹配一個詞lazy.* 可以匹配lazy.irs或者lazy.cor

 

 

消息隊列RabbitMQ應答模式

為了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收並且處理完畢了。RabbitMQ就可以刪除它了。 如果一個消費者掛掉卻沒有發送應答,RabbitMQ會理解為這個消息沒有處理完全,然后交給另一個消費者去重新處理。這樣,你就可以確認即使消費者偶爾掛掉也不會丟失任何消息了。 沒有任何消息超時限制;只有當消費者掛掉時,RabbitMQ才會重新投遞。即使處理一條消息會花費很長的時間。 消息應答是默認打開的。我們通過顯示的設置autoAsk=true關閉這種機制。現即自動應答開,一旦我們完成任務,消費者會自動發送應答。通知RabbitMQ消息已被處理,可以從內存刪除。如果消費者因宕機或鏈接失敗等原因沒有發送ACK(不同於ActiveMQ,在RabbitMQ里,消息沒有過期的概念),則RabbitMQ會將消息重新發送給其他監聽在隊列的下一個消費者。

RabbitMQ的公平轉發

目前消息轉發機制是平均分配,這樣就會出現倆個消費者,奇數的任務很耗時,偶數的任何工作量很小,造成的原因就是近當消息到達隊列進行轉發消息。並不在乎有多少任務消費者並未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。 為了解決這樣的問題,我們可以使用basicQos方法,傳遞參數為prefetchCount= 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條消息。 換句話說,只有在消費者空閑的時候會發送下一條信息。調度分發消息的方式,也就是告訴RabbitMQ每次只給消費者處理一條消息,也就是等待消費者處理完畢並自己對剛剛處理的消息進行確認之后,才發送下一條消息,防止消費者太過於忙碌,也防止它太過去清閑。 通過 設置channel.basicQos(1);

消息隊列RabbitMQ應答模式

案例: 生產者端代碼不變,消費者端代碼這部分就是用於開啟手動應答模式的。 channel.basicConsume(QUEUE_NAME, false, defaultConsumer); 注:第二個參數值為false代表關閉RabbitMQ的自動應答機制,改為手動應答。 在處理完消息時,返回應答狀態,true表示為自動應答模式。 channel.basicAck(envelope.getDeliveryTag(), false);

傳統簡單隊列是如何實現的?

生產者生產消息直接投遞給隊列服務器,隊列服務器在以推送消息到消費者或者消費者從隊列服務器拉取消息進行消費。消費者啟動的時候會與隊列服務器建立長連接。

RabbitMQ關鍵名詞

AMQP(高級消息隊列協議)是一個異步消息傳遞所使用應用層協議規范,為面向消息中間件設計,基於此協議的客戶端與消息中間件可以無視消息來源傳遞消息,不受客戶端、消息中間件、不同的開發語言環境等條件的限制;

涉及概念解釋: 

 Server(Broker):接收客戶端連接,實現AMQP協議的消息隊列和路由功能的進程;

 Virtual Host:虛擬主機的概念,類似權限控制組,一個Virtual Host里可以有多個Exchange和Queue。     

Exchange:交換機,接收生產者發送的消息,並根據Routing Key將消息路由到服務器中的隊列Queue。

 ExchangeType:交換機類型決定了路由消息行為,RabbitMQ中有三種類型Exchange,分別是fanout、direct、topic;  Message Queue:消息隊列,用於存儲還未被消費者消費的消息;

 Message:由Header和body組成,Header是由生產者添加的各種屬性的集合,包括Message是否被持久化、優先級是多少、由哪個Message Queue接收等;

body是真正需要發送的數據內容;

BindingKey:綁定關鍵字,將一個特定的Exchange和一個特定的Queue綁定起來。

 

RabbitMQ交換機的作用

生產者發送消息不會像傳統方式直接將消息投遞到隊列中,而是先將消息投遞到交換機中,在由交換機轉發到具體的隊列,隊列在將消息以推送或者拉取方式給消費者進行消費,這和我們之前學習Nginx有點類似。 交換機的作用根據具體的路由策略分發到不同的隊列中。

交換機有四種類型:

Direct exchange(直連交換機):是根據消息攜帶的路由鍵;

routing key:將消息投遞給對應隊列的 Fanout exchange(扇型交換機)將消息路由給綁定到它身上的所有隊列 ;

Topic exchange(主題交換機):隊列通過路由鍵綁定到交換機上,然后,交換機根據消息里的路由值,將消息路由給一個或多個綁定隊列;

Headers exchange(頭交換機):類似主題交換機,但是頭交換機使用多個消息屬性來代替路由鍵建立路由規則。通過判斷消息頭的值能否與指定的綁定相匹配來確立路由規則。

 

RabbitMQ消息確認機制

問題產生背景: 生產者發送消息出去之后,不知道到底有沒有發送到RabbitMQ服務器, 默認是不知道的。而且有的時候我們在發送消息之后,后面的邏輯出問題了,我們不想要發送之前的消息了,需要撤回該怎么做。

如果RabbitMQ服務器宕機了,消息會丟失嗎?

  答案:RabbitMQ服務器支持消息持久化機制,會把消息持久化在硬盤上保存。代碼設置  channel.queueDeclare(EMAIL_QUEUE, true, false, false, null); 方法第二個參數,默認情況下我們應該設置為true。

解決方案:

1.AMQP 事務機制

2.Confirm 模式

事務模式::

  txSelect:將當前channel設置為transaction模式

  txCommit :提交當前事務

  txRollback:事務回滾

 

 生產者   消費者   隊列服務器  

消費者如何確保消息一定能夠消費成功?

通過應答模式,默認為應答模式,可以修改為手動應答。設置方法:channel.basicConsume(QUEUE_NAME, false, defaultConsumer); 第二個參數。

設置應答模式 :第一個參數 隊列名稱、第二個參數 應答模式 如果為true 自動應答,false 為手動應答、第三個參數 監聽器
自動應答(true):不在乎消費者對這個消息處理是否成功,都會告訴隊列刪除該消息。如果處理消息失敗的情況下,應該實現自動補償。
手動應答(false):當隊列把消息推送給消費者,消費者處理完業務邏輯之后,手動返回ack(通知)告訴給隊列服務器是否要刪除該消息、如果失敗,隊列服務器做補償,而不會直接刪除該消息、

 

springboot整合rabbitmq項目

springboot整合rabbitmq分為2個項目,一個是生產者服務,一個是消息服務平台項目。消息服務平台項目中包括郵件消費者和短信消費者。沒有必要每一個消費者都創建一個項目,那樣會浪費資源。

在一個項目中,可以有多個生產者和消費者。

 

RabbitMQ消息重試機制

消費者在消費消息的時候,如果消費者業務邏輯出現程序異常,這時候應該如何處理?

答案:使用消息重試機制。

如何合適選擇重試機制:

情況1: 消費者獲取到消息后,調用第三方接口,但接口暫時無法訪問,是否需要重試?

答案:需要重試機制。

情況2: 消費者獲取到消息后,拋出數據轉換異常,是否需要重試? 

答案:不需要重試機制,需要發布版本進行解決。

如何實現重試機制

總結:對於情況2,如果消費者代碼拋出異常是需要發布新版本才能解決的問題,那么不需要重試,重試也無濟於事。應該采用日志記錄+定時任務job健康檢查+人工進行補償。

消費者如果保證消息冪等性,不被重復消費

產生原因:網絡延遲傳輸中,消費出現異常或者是消費延遲消費,會造成MQ進行重試補償,在重試過程中,可能會造成重復消費。

消費者如何保證消息冪等性,不被重復消費

解決辦法:

①使用全局MessageID判斷消費方是否是同一個,解決冪等性。

②或者使用業務邏輯id保證唯一(比如訂單號碼)

RabbitMQ信隊列

死信隊列 聽上去像 消息“死”了     其實也有點這個意思,死信隊列  是 當消息在一個隊列 因為下列原因:

消息被拒絕(basic.reject/ basic.nack)並且不再重新投遞 requeue=false

消息超期 (rabbitmq  Time-To-Live -> messageProperties.setExpiration())

隊列超載

變成了 “死信” 后    被重新投遞(publish)到另一個Exchange   該Exchange 就是DLX     然后該Exchange 根據綁定規則 轉發到對應的 隊列上  監聽該隊列  就可以重新消費     說白了 就是  沒有被消費的消息  換個地方重新被消費

生產者   -->  消息 --> 交換機  --> 隊列  --> 變成死信  --> DLX交換機 -->隊列 --> 消費者

什么是死信呢?什么樣的消息會變成死信呢?

消息被拒絕(basic.reject或basic.nack)並且requeue=false.

消息TTL過期

隊列達到最大長度(隊列滿了,無法再添加數據到mq中)

應用場景分析

在定義業務隊列的時候,可以考慮指定一個死信交換機,並綁定一個死信隊列,當消息變成死信時,該消息就會被發送到該死信隊列上,這樣就方便我們查看消息失敗的原因了

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丟棄消息

如何使用死信交換機呢?

定義業務(普通)隊列的時候指定參數

x-dead-letter-exchange: 用來設置死信后發送的交換機

x-dead-letter-routing-key:用來設置死信的routingKey

/**
* 定義死信隊列相關信息
*/
public final static String deadQueueName = "dead_queue";
public final static String deadRoutingKey = "dead_routing_key";
public final static String deadExchangeName = "dead_exchange";
/**
* 死信隊列 交換機標識符
*/
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信隊列交換機綁定鍵標識符
*/
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";

/**
* 定義短信隊列 包括死信隊列
*
* @return
*/
@Bean
public Queue fanoutMsgQueue() {
//return new Queue(MSG_QUEUE_FANOUT);
// 將普通隊列綁定到死信隊列交換機上
Map<String, Object> args = new HashMap<>(2);
args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
Queue queue = new Queue(MSG_QUEUE_FANOUT, true, false, false, args);
return queue;
}
/**
* 配置死信隊列
*
* @return
*/
@Bean
public Queue deadQueue() {
Queue queue = new Queue(deadQueueName, true);
return queue;
}

/**
* 創建死信交換機
*/
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(deadExchangeName);
}

/**
* 死信交換機綁定私信隊列
* @param deadQueue
* @param deadExchange
* @return
*/
@Bean
public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
}

RabbitMq 的配置文件

spring:
rabbitmq:
#### 連接地址
host: 127.0.0.1
####端口號
port: 5672
#### 用戶名 自己在rabbitmq服務器上新建的 默認的用戶名和密碼為guest
username: ming
#### 密碼
password: ming
### 虛擬主機
virtual-host: /member
listener:
simple:
retry:
####開啟消費者(程序出現異常的情況下)進行重試機制
enabled: true
### 最大重試次數, 默認情況下 一直重試
max-attempts: 5
#### 重試間隔時間 單位:毫秒
initial-interval: 3000
##### 開啟手動應答 ack
acknowledge-mode: manual

### 服務端口號
server:
port: 8081

rabbitmq地址:http://www.rabbitmq.com/getstarted.html

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM