解決RabbitMQ消息丟失與重復消費問題


1. 背景

最近用戶反饋提交的SQL查詢一直處於長時間等待狀態,經過排查觀察,發現部分查詢請求丟失,導致用戶提交的查詢未被正常接收,繼而長時間無響應。

現象:集市SQL控制台提交10個簡單SQL查詢 -> 消息發送方:發送10條消息至消息隊列 -> 消息消費方:只消費了7條消息

2. 現狀

2.1. 當前SQL查詢的整體流程

1

  • 生產者:PHP
    • 將用戶的SQL查詢記錄在DB表,標識查詢任務狀態(f_status)為運行中;
    • 將DB表中的任務id、提交人等信息發送到RabbitMQ;
  • 消息隊列:RabbitMQ
    • PHP消息提交到了交換機;
    • 交換機再把消息分發給指定的消息隊列;
  • 消費者:Python
    • 主進程監聽消息隊列,一旦有消息就不停拉取;
    • 拉取一條消息,就從進程池調起一個空閑進程來處理消息;
    • 隨后反饋ACK給消息隊列,將消息從消息隊列中移除;

2.2. 消息發送方:Web端

結論:消息發送正常
排查步驟:查看log

2.3. 消息隊列

結論:消息數量正常
診斷步驟:
執行機安裝rabbitmq-dump-queue插件,用於dump隊列的消息;
1. 執行機:停止服務;
2. 用戶:提交10個SQL查詢:
3. 發送方:查看Web服務端的輸出日志,確定10個消息已經往消息隊列寫;
4. 執行機:通過rabbitmq-dump-queue查看隊列的消息,確認是正常10個消息寫入;

watch -n 1 '$GOPATH/src/rabbitmq-dump-queue/rabbitmq-dump-queue -uri="amqp://guest:guest@xxxxx:5672" -queue ph_open_task'
  • 1

3
5. 執行機:啟動服務,消息隊列中的消息全部被接收;

2.4. 消息接收方

代碼邏輯:

try: pool = Pool(processes=40) def callback(ch, method, properties, body): try: doSomething... pool.apply_async(process) except Exception as e: print traceback.format_exc() logger_msg.info(traceback.format_exc()) finally: // 這里會有問題,即使消息未被處理也會反饋ACK給RabbitMQ ch.basic_ack(delivery_tag=method.delivery_tag) while True: try: connection = pika.BlockingConnection( pika.ConnectionParameters(host='xxxxxxxx')) channel = connection.channel() channel.queue_declare(queue=queue_name, durable=True) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=queue_name, no_ack=False) channel.start_consuming() except pika.exceptions.ConnectionClosed as e: continue except Exception as e: logger_msg.info(traceback.format_exc()) finally: channel.basic_ack(delivery_tag=method.delivery_tag) pool.close() pool.join()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

結論:本例中消費者主進程將持續監聽MQ,一旦MQ有消息將會拉取,隨后從進程池中啟動子進程來處理消息,但是從進程池啟動子進程的過程並不一定成功(若當前進程池沒有空閑子進程),而主進程不管任何情況下都給MQ發送ACK狀態碼,從而MQ將未處理的消息移除掉,導致消息丟失

3. 方案

問題是在消費者環節產生,因此對消費者做改動,需要調整消費者的架構:
* 原來邏輯:使用進程池技術,主進程負責監聽、接收MQ的消息,子進程負責執行MQ的消息,缺點是單一的主進程無法簡單處理ACK狀態碼,不易維護;
* 現有邏輯:使用RabbitMQ自身特性(work_queue),消費者不再維護進程池,是單進程,負責監聽、接收、處理MQ的消息,處理完了以后再反饋ACK狀態碼,進程與進程之間互不干擾,易維護,並發量大時可隨時增加消費者進程;

目前方案的問題以及解決方案:

  • 問題1:消息重復消費
    描述:用戶在頁面停止查詢時,會導致消費者進程被殺死,因此ACK狀態碼未反饋至MQ,從而消息一直存留在MQ中,當新的消費者啟動時會重新消費;
    解決方案:消費者每次執行查詢前,首先在DB上查詢任務的執行狀態,若處於「取消/失敗/成功」則表示已經由其它消費者消費過,那么直接返回ACK狀態碼給MQ,將消息從MQ中移除;

  • 問題2:進程池如何維護?
    描述:用戶在頁面停止查詢時,會導致消費者進程被殺死,導致消費者數量減少;
    解決方案:維護一個監控腳本,每分鍾輪詢消費者進程數,若少於40個進程,則新啟動一個消費者,直到數量足夠;

4


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM