消息隊列的使用和注意事項


rabbitmq、kafka、activemq、rocketmq之間區別?

activemq比較成熟,有較低概率丟數據吞吐量低,用異步和解耦可以用下。

rabbitmq吞吐量萬級,只比kafka低些,消息時效性最低,微秒級別,基本不丟數據。

rocketmq吞吐量10萬級,比rabbitmq高些,消息時效性ms級別

kafka吞吐量10萬級,吞吐量最高,常用於大數據,日志分析

 

使用消息隊列我們要搞清楚一下幾個問題:

  1. 為什么使用消息隊列?
  2. 使用消息隊列有什么缺點?
  3. 消息隊列如何選型?
  4. 如何保證消息隊列是高可用的?
  5. 如何保證消息不被重復消費?
  6. 如何保證消費的可靠性傳輸?
  7. 如何保證消息的順序性?

1.使用消息隊列可以達到幾個目的:解耦、異步、削峰

解耦:系統間耦合性太強,系統A在代碼中直接調用系統B和系統C的代碼,如果將來D系統接入,系統A還需要修改代碼,過於麻煩!將消息寫入消息隊列,需要消息的系統自己從消息隊列中訂閱,從而系統A不需要做任何修改。

異步:一些非必要的業務邏輯以同步的方式運行,太耗費時間。將消息寫入消息隊列,非必要的業務邏輯以異步的方式運行,加快相應速度

削峰:

傳統模式的缺點:

  • 並發量大的時間,所有的請求直接懟到數據庫,造成數據庫連接異常

中間件模式的優點:

  • 系統A慢慢的按照數據庫能處理的並發量,從消息隊列中慢慢拉取消息。在生產中,這個短暫的高峰期積壓是允許的。

2.使用消息隊列會有的缺點

  • 系統可用性降低:你想呀,本來其他系統只要運行好好的,那你的系統就是正常的。現在你非要加入個消息隊列進去,那消息隊列掛了,你的系統不是呵呵了。因此,系統可用性會降低
  • 系統復雜性增加:加入了消息隊列,要多考慮很多方面的問題,比如:一致性問題、如何保證消息不被重復消費、如何保證消息可靠性傳輸等。因此,需要考慮的東西更多,刺痛復雜性增大。

3.消息隊列的選擇:

 

 

4.如何保證消息隊列的高可用

5.如何保證消息不被重復消費

如何保證消息不被重復消費的前提是我們必須要知道是什么原因導致了消息被重復消費:網絡傳輸等等故障,確認信息沒有傳送到消息隊列(消息隊列不清楚是否被消費再次將消費發給消費者)

如何解決這個問題呢?

(1)比如,你拿到這個消息做數據庫的insert操作,那就容易了,給這個消息做一個唯一的主鍵,那么就算出現重復消費的情況,就會導致主鍵沖突,避免數據庫出現臟數據。

(2)再比如,你拿到這個消息做redis的set的操作,那就容易了,不用解決,因為你無論set幾次結果都是一樣的,set操作本來就算冪等操作。

(3)如果上面兩種情況還不行,上大招。准備一個第三方介質,來做消費記錄。以redis為例,給消息分配一個全局id,只要消費過該消息,將<id,message>以K-V形式寫入redis.那消費者開始消費前,先去redis中查詢有沒有消費記錄即可。

6.如何保證消費的可靠性傳輸?

  • 生產者弄丟數據
  • 消息隊列弄丟數據
  • 消費者弄丟數據

 生產者丟數據

從生產者弄丟數據這個角度來看,RabbitMQ提供transaction和confirm模式來確保生產者不丟消息。

transaction機制就是說,發送消息前,開啟事務(channel.txSelect()),然后發送消息,如果發送過程中出現什么異常,事務就會回滾(channel.txRollback()),如果發送成功則提交事務(channel.txCommit())。

然而,這種方式有個缺點:吞吐量下降。因為,按照經驗,生產上用confirm模式的居多。一旦channel進入confirm模式,所有在該信道上發布的消息都將會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之后,rabbitMQ就會發送一個ACK給生產者(包含消息的唯一ID),這就使得生產者知道消息已經正確到達目的隊列了。如果rabbitMQ沒能處理該消息,則會發送一個Nack消息給你,你可以進行重試操作。處理Ack和Nack的代碼如下所示:

channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("nack: deliveryTag = "+deliveryTag+" multiple: "+multiple); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("ack: deliveryTag = "+deliveryTag+" multiple: "+multiple); } }); 

消息隊列丟數據

處理消息隊列丟數據的情況,一般是開啟持久化磁盤的配置。這個持久化配置可以和confirm機制配合使用,你可以在消息持久化磁盤后,再給生產者發送一個Ack信號。這樣,如果消息持久化磁盤之前,rabbitMQ陣亡了,那么生產者收不到Ack信號,生產者會自動重發。

那么如何持久化呢,這里順便說一下吧,其實也很容易,就下面兩步

  1. 將queue的持久化標識durable設置為true,則代表是一個持久的隊列
  2. 發送消息的時候將deliveryMode=2

這樣設置以后,即使rabbitMQ掛了,重啟后也能恢復數據

 

消費者丟數據

消費者丟數據一般是因為采用了自動確認消息模式。這種模式下,消費者會自動確認收到信息。這時rabbitMQ會立即將消息刪除,這種情況下,如果消費者出現異常而未能處理消息,就會丟失該消息。

至於解決方案,采用手動確認消息即可

 

7.如何保證消息的順序性

分析:其實並非所有的公司都有這種業務需求,但是還是對這個問題要有所復習。

回答:針對這個問題,通過某種算法,將需要保持先后順序的消息放到同一個消息隊列中(kafka中就是partition,rabbitMq中就是queue)。然后只用一個消費者去消費該隊列。
有的人會問:那如果為了吞吐量,有多個消費者去消費怎么辦?

這個問題,沒有固定回答的套路。比如我們有一個微博的操作,發微博、寫評論、刪除微博,這三個異步操作。如果是這樣一個業務場景,那只要重試就行。比如你一個消費者先執行了寫評論的操作,但是這時候,微博都還沒發,寫評論一定是失敗的,等一段時間。等另一個消費者,先執行寫評論的操作后,再執行,就可以成功。

總之,針對這個問題,我的觀點是保證入隊有序就行,出隊以后的順序交給消費者自己去保證,沒有固定套路。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM