消息隊列的常見問題


消息隊列的常見問題

1.消息丟失怎么辦?(消息的可靠性傳輸)

消息的丟失可能會出現在三個地方:

RabbitMQ消息中間件:

(1)生產者弄丟數據

生產者將數據發送到RabbitMQ的時候,可能數據就在半路給搞丟了,因為網絡啥的問題,都有可能。怎么解決?

事務:生產者發送數據之前開啟RabbitMQ事務(channel.txSelect),然后發送消息,如果消息沒有成功被RabbitMQ接收到,那么生產者會收到異常報錯,此時就可以回滾事務(channel.txRollback),然后重試發送消息;如果收到了消息,可以提交事務(channel.txCommit)。但是問題是,RabbitMQ事務機制一搞,基本上吞吐量會下來,因為太耗性能。

confirm模式:在生產者那里設置開啟confirm模式之后,你每次寫的消息都會分配一個唯一的id,然后如果寫入了RabbitMQ中,RabbitMQ會給你回傳一個ack消息,告訴你說這個消息ok了。如果RabbitMQ沒能處理這個消息,會回調你一個nack接口,告訴你這個消息接收失敗,你可以重試。而且你可以結合這個機制自己在內存里維護每個消息id的狀態,如果超過一定時間還沒接收到這個消息的回調,那么你可以重發

所以一般在生產者這塊避免數據丟失,都是用confirm機制的

(2)Mq弄丟數據

就是RabbitMQ自己弄丟了數據,這個你必須開啟RabbitMQ的持久化,就是消息寫入之后會持久化到磁盤,哪怕是RabbitMQ自己掛了,恢復之后會自動讀取之前存儲的數據,一般數據不會丟。

設置持久化有兩個步驟:

第一個是創建queue和交換器的時候將其設置為持久化,這樣就可以保證RabbitMQ持久化相關的元數據,但是不會持久化queue里的數據;

第二個是發送消息的時候將消息的deliveryMode設置為2,就是將消息設置為持久化的,此時RabbitMQ就會將消息持久化到磁盤上去

必須要同時設置這兩個持久化才行

持久化可以和生產者的confirm結合,當持久化成功后,再ack生產者。如果持久化之前RabbitMQ掛了,生產者沒收到ack,會重發。

哪怕是你給RabbitMQ開啟了持久化機制,也有一種可能,就是這個消息寫到了RabbitMQ中,但是還沒來得及持久化到磁盤上,結果不巧,此時RabbitMQ掛了,就會導致內存里的一點點數據會丟失。

(3)消費者弄丟數據

RabbitMQ如果沒有丟失了數據消費者丟失了數據,主要是因為你消費的時候,剛消費到,還沒處理,結果進程掛了比如重啟了,那么就尷尬了,RabbitMQ認為你都消費了,這數據就丟了

這個時候得用RabbitMQ提供的ack機制,簡單來說,就是你關閉RabbitMQ自動ack,可以通過一個api來調用就行,然后每次你自己代碼里確保處理完的時候,再在程序里用aip調用ack一把。這樣的話,如果你還沒處理完,不就沒有ack?那RabbitMQ就認為你還沒處理完,這個時候RabbitMQ會把這個消費分配給別的consumer去處理,消息是不會丟的。

Kafka消息中間件:

(1)消費端弄丟了數據

唯一可能導致消費者弄丟數據的情況,就是說,你那個消費到了這個消息,然后消費者那邊自動提交了offset,讓kafka以為你已經消費好了這個消息,其實你剛准備處理這個消息,你還沒處理,你自己就掛了,此時這條消息就丟咯。

大家都知道kafka會自動提交offset,那么只要關閉自動提交offset,在處理完之后自己手動提交offset,就可以保證數據不會丟。但是此時確實還是會重復消費,比如你剛處理完,還沒提交offset,結果自己掛了,此時肯定會重復消費一次,自己保證冪等性就好了。

生產環境碰到的一個問題,就是說我們的kafka消費者消費到了數據之后是寫到一個內存的queue里先緩沖一下,結果有的時候,你剛把消息寫入內存queue,然后消費者會自動提交offset。

然后此時我們重啟了系統,就會導致內存queue里還沒來得及處理的數據就丟失了

(2)kafka弄丟了數據

這塊比較常見的一個場景,就是kafka某個broker宕機,然后重新選舉partiton的leader時。大家想想,要是此時其他的follower剛好還有些數據沒有同步,結果此時leader掛了,然后選舉某個follower成leader之后,他不就少了一些數據?這就丟了一些數據啊。

生產環境也遇到過,我們也是,之前kafka的leader機器宕機了,將follower切換為leader之后,就會發現說這個數據就丟了。

所以此時一般是要求起碼設置如下4個參數:

給這個topic設置replication.factor參數:這個值必須大於1,要求每個partition必須有至少2個副本。
在kafka服務端設置min.insync.replicas參數:這個值必須大於1,這個是要求一個leader至少感知到有至少一個follower還跟自己保持聯系,沒掉隊,這樣才能確保leader掛了還有一個follower吧。
在producer端設置acks=all:這個是要求每條數據,必須是寫入所有replica之后,才能認為是寫成功了。
在producer端設置retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這里了。

(3)生產者會不會弄丟數據

如果按照上述的思路設置了ack=all,一定不會丟,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才認為本次寫成功了。如果沒滿足這個條件,生產者會自動不斷的重試,重試無限次。

 

 

2.消費者順序消費

從根本上說,異步消息是不應該有順序依賴的。在MQ上估計是沒法解決。要實現嚴格的順序消息,簡單且可行的辦法就是:保證生產者- MQServer -消費者是一對一對一的關系。

如果有順序依賴的消息,要保證消息有一個hashKey,類似於數據庫表分區的的分區key列。保證對同一個key的消息發送到相同的隊列。A用戶產生的消息(包括創建消息和刪除消息)都按A的hashKey分發到同一個隊列。只需要把強相關的兩條消息基於相同的路由就行了,也就是說經過m1和m2的在路由表里的路由是一樣的,那自然m1會優先於m2去投遞。而且一個queue只對應一個consumer。

 

3.消息的重復

分為兩大類情況:1、生產者消息重復發送; 2.MQ向消費者投遞時重復投遞

終極解決辦法:冪等性

 

什么是冪等性?對於消息接收端的情況,冪等的含義是采用同樣的輸入多次調用處理函數,得到同樣的結果

例如,一個 SQL 操作:

update stat_table set count= 10 where id =1

這個操作多次執行,id 等於 1 的記錄中的 count 字段的值都為 10 (每一次的count都不是上一次返回的count,都是原始的count),這個操作就是冪等的,我們不用擔心這個操作被重復。

 

再來看另外一個 SQL 操作:

update stat_table set count= count +1 where id= 1;

這樣的 SQL 操作就不是冪等的(每一次的count都是上一次返回的count,都不是原始的count),一旦重復,結果就會產生變化。

因此應對消息重復的辦法是使消息接收端的處理是一個冪等操作。這樣的做法降低了消息中間件的整體復雜性,不過也給使用消息中間件的消息接收端應用帶來了一定的限制和門檻。

                                                                     

1. MVCC:

多版本並發控制,樂觀鎖的一種實現,在生產者發送消息時進行數據更新時需要帶上數據的版本號,消費者去更新時需要去比較持有數據的版本號,版本號不一致的操作無法成功。

例如博客點贊次數自動+1的接口:

public boolean addCount(Long id, Long version);                                                          
    update blogTable set count= count+1,version=version+1 where id=321 and version=123

每一個version只有一次執行成功的機會,一旦失敗了生產者必須重新獲取數據的最新版本號再次發起更新。

2. 去重表:

利用數據庫表單的特性來實現冪等,常用的一個思路是在表上構建唯一性索引,保證某一類數據一旦執行完畢,后續同樣的請求不再重復處理了(利用一張日志表來記錄已經處理成功的消息的ID,如果新到的消息ID已經在日志表中,那么就不再處理這條消息。)

以電商平台為例子,電商平台上的訂單 id 就是最適合的 token當用戶下單時,會經歷多個環節,比如生成訂單,減庫存,減優惠券等等。每一個環節執行時都先檢測一下該訂單id是否已經執行過這一步驟,對未執行的請求,執行操作並緩存結果,而對已經執行過此 id 的請求,則直接返回之前的執行結果,不做任何操作。這樣可以在最大程度上避免操作的重復執行問題,緩存起來的執行結果也能用於事務的控制等。


 
 

如何保證消息的順序性

因為在某些情況下我們扔進MQ中的消息是要嚴格保證順序的,尤其涉及到訂單什么的業務需求,消費的時候也是要嚴格保證順序,不然會出大問題的。

先看看順序會錯亂的倆場景

rabbitmq:一個queue,多個consumer,這不明顯亂了
kafka:一個topic,一個partition,一個consumer,內部多線程,這不也明顯亂了
如何來保證消息的順序性呢?
rabbitmq:拆分多個queue,每個queue一個consumer,就是多一些queue而已,確實是麻煩點;或者就一個queue但是對應一個consumer,然后這個consumer內部用內存隊列做排隊,然后分發給底層不同的worker來處理。
kafka:一個topic,一個partition,一個consumer,內部單線程消費,寫N個內存queue,然后N個線程分別消費一個內存queue即可。

 


如何解決消息隊列的延時以及過期失效問題?消息隊列滿了以后該怎么處理?有幾百萬消息持續積壓幾小時怎么解決?

(一)、大量消息在mq里積壓了幾個小時了還沒解決

幾千萬條數據在MQ里積壓了七八個小時,從下午4點多,積壓到了晚上很晚,10點多,11點多
這個是我們真實遇到過的一個場景,確實是線上故障了,這個時候要不然就是修復consumer的問題,讓他恢復消費速度,然后傻傻的等待幾個小時消費完畢。這個肯定不能在面試的時候說吧。

一個消費者一秒是1000條,一秒3個消費者是3000條,一分鍾是18萬條,1000多萬條,所以如果你積壓了幾百萬到上千萬的數據,即使消費者恢復了,也需要大概1小時的時間才能恢復過來。

一般這個時候,只能操作臨時緊急擴容了,具體操作步驟和思路如下:

1,先修復consumer的問題,確保其恢復消費速度,然后將現有cnosumer都停掉。
2,新建一個topic,partition是原來的10倍,臨時建立好原先10倍或者20倍的queue數量。
3,然后寫一個臨時的分發數據的consumer程序,這個程序部署上去消費積壓的數據,消費之后不做耗時的處理,直接均勻輪詢寫入臨時建立好的10倍數量的queue。
4,接着臨時征用10倍的機器來部署consumer,每一批consumer消費一個臨時queue的數據。
5,這種做法相當於是臨時將queue資源和consumer資源擴大10倍,以正常的10倍速度來消費數據。
6,等快速消費完積壓數據之后,得恢復原先部署架構,重新用原先的consumer機器來消費消息。

 

(二)、消息隊列過期失效問題


假設你用的是rabbitmq,rabbitmq是可以設置過期時間的,就是TTL,如果消息在queue中積壓超過一定的時間就會被rabbitmq給清理掉,這個數據就沒了。那這就是第二個坑了。這就不是說數據會大量積壓在mq里,而是大量的數據會直接搞丟。

這個情況下,就不是說要增加consumer消費積壓的消息,因為實際上沒啥積壓,而是丟了大量的消息。我們可以采取一個方案,就是批量重導,這個我們之前線上也有類似的場景干過。就是大量積壓的時候,我們當時就直接丟棄數據了,然后等過了高峰期以后,比如大家一起喝咖啡熬夜到晚上12點以后,用戶都睡覺了。這個時候我們就開始寫程序,將丟失的那批數據,寫個臨時程序,一點一點的查出來,然后重新灌入mq里面去,把白天丟的數據給他補回來。也只能是這樣了。

假設1萬個訂單積壓在mq里面,沒有處理,其中1000個訂單都丟了,你只能手動寫程序把那1000個訂單給查出來,手動發到mq里去再補一次。

(三)、消息隊列滿了怎么搞?

如果走的方式是消息積壓在mq里,那么如果你很長時間都沒處理掉,此時導致mq都快寫滿了,咋辦?這個還有別的辦法嗎?沒有,誰讓你第一個方案執行的太慢了,你臨時寫程序,接入數據來消費,消費一個丟棄一個,都不要了,快速消費掉所有的消息。然后走第二個方案,到了晚上再補數據吧。

 

如何保證消息隊列的高可用性


由於筆者只使用和實踐過RabbitMQ和Kafka,RocketMQ和ActiveMQ了解的不深,所以分析一下RabbitMQ和Kafka的高可用。

(一)RabbitMQ

RabbitMQ有三種模式:單機模式,普通集群模式,鏡像集群模式

(1)單機模式

單機模式平常使用在開發或者本地測試場景,一般就是測試是不是能夠正確的處理消息,生產上基本沒人去用單機模式,風險很大。

(2)普通集群模式

普通集群模式就是啟動多個RabbitMQ實例。在你創建的queue,只會放在一個rabbtimq實例上,但是每個實例都同步queue的元數據。在消費的時候完了,上如果連接到了另外一個實例,那么那個實例會從queue所在實例上拉取數據過來。

這種方式確實很麻煩,也不怎么好,沒做到所謂的分布式,就是個普通集群。因為這導致你要么消費者每次隨機連接一個實例然后拉取數據,要么固定連接那個queue所在實例消費數據,前者有數據拉取的開銷,后者導致單實例性能瓶頸。

而且如果那個放queue的實例宕機了,會導致接下來其他實例就無法從那個實例拉取,如果你開啟了消息持久化,讓RabbitMQ落地存儲消息的話,消息不一定會丟,得等這個實例恢復了,然后才可以繼續從這個queue拉取數據。

這方案主要是提高吞吐量的,就是說讓集群中多個節點來服務某個queue的讀寫操作。

(3)鏡像集群模式

鏡像集群模式是所謂的RabbitMQ的高可用模式,跟普通集群模式不一樣的是,你創建的queue,無論元數據還是queue里的消息都會存在於多個實例上,然后每次你寫消息到queue的時候,都會自動把消息到多個實例的queue里進行消息同步。

優點在於你任何一個實例宕機了,沒事兒,別的實例都可以用。缺點在於性能開銷太大和擴展性很低,同步所有實例,這會導致網絡帶寬和壓力很重,而且擴展性很低,每增加一個實例都會去包含已有的queue的所有數據,並沒有辦法線性擴展queue。

開啟鏡像集群模式可以去RabbitMQ的管理控制台去增加一個策略,指定要求數據同步到所有節點的,也可以要求就同步到指定數量的節點,然后你再次創建queue的時候,應用這個策略,就會自動將數據同步到其他的節點上去了。

(二)Kafka

Kafka天生就是一個分布式的消息隊列,它可以由多個broker組成,每個broker是一個節點;你創建一個topic,這個topic可以划分為多個partition,每個partition可以存在於不同的broker上,每個partition就放一部分數據。

kafka 0.8以前,是沒有HA機制的,就是任何一個broker宕機了,那個broker上的partition就廢了,沒法寫也沒法讀,沒有什么高可用性可言。

kafka 0.8以后,提供了HA機制,就是replica副本機制。kafka會均勻的將一個partition的所有replica分布在不同的機器上,來提高容錯性。每個partition的數據都會同步到吉他機器上,形成自己的多個replica副本。然后所有replica會選舉一個leader出來,那么生產和消費都去leader,其他replica就是follower,leader會同步數據給follower。當leader掛了會自動去找replica,然后會再選舉一個leader出來,這樣就具有高可用性了。

寫數據的時候,生產者就寫leader,然后leader將數據落地寫本地磁盤,接着其他follower自己主動從leader來pull數據。一旦所有follower同步好數據了,就會發送ack給leader,leader收到所有follower的ack之后,就會返回寫成功的消息給生產者。(當然,這只是其中一種模式,還可以適當調整這個行為)

消費的時候,只會從leader去讀,但是只有一個消息已經被所有follower都同步成功返回ack的時候,這個消息才會被消費者讀到。

 

 


原文鏈接:https://blog.csdn.net/qq_36236890/article/details/81174504

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM