1. 背景
最近用戶反饋提交的SQL查詢一直處於長時間等待狀態,經過排查觀察,發現部分查詢請求丟失,導致用戶提交的查詢未被正常接收,繼而長時間無響應。
現象:集市SQL控制台提交10個簡單SQL查詢 -> 消息發送方:發送10條消息至消息隊列 -> 消息消費方:只消費了7條消息
2. 現狀
2.1. 當前SQL查詢的整體流程
- 生產者:PHP:
- 將用戶的SQL查詢記錄在DB表,標識查詢任務狀態(f_status)為運行中;
- 將DB表中的任務id、提交人等信息發送到RabbitMQ;
- 消息隊列:RabbitMQ:
- PHP消息提交到了交換機;
- 交換機再把消息分發給指定的消息隊列;
- 消費者:Python:
- 主進程監聽消息隊列,一旦有消息就不停拉取;
- 拉取一條消息,就從進程池調起一個空閑進程來處理消息;
- 隨后反饋ACK給消息隊列,將消息從消息隊列中移除;
2.2. 消息發送方:Web端
結論:消息發送正常
排查步驟:查看log
2.3. 消息隊列
結論:消息數量正常
診斷步驟:
執行機安裝rabbitmq-dump-queue插件,用於dump隊列的消息;
1. 執行機:停止服務;
2. 用戶:提交10個SQL查詢:
3. 發送方:查看Web服務端的輸出日志,確定10個消息已經往消息隊列寫;
4. 執行機:通過rabbitmq-dump-queue查看隊列的消息,確認是正常10個消息寫入;
watch -n 1 '$GOPATH/src/rabbitmq-dump-queue/rabbitmq-dump-queue -uri="amqp://guest:guest@xxxxx:5672" -queue ph_open_task'
- 1
5. 執行機:啟動服務,消息隊列中的消息全部被接收;
2.4. 消息接收方
代碼邏輯:
try: pool = Pool(processes=40) def callback(ch, method, properties, body): try: doSomething... pool.apply_async(process) except Exception as e: print traceback.format_exc() logger_msg.info(traceback.format_exc()) finally: // 這里會有問題,即使消息未被處理也會反饋ACK給RabbitMQ ch.basic_ack(delivery_tag=method.delivery_tag) while True: try: connection = pika.BlockingConnection( pika.ConnectionParameters(host='xxxxxxxx')) channel = connection.channel() channel.queue_declare(queue=queue_name, durable=True) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=queue_name, no_ack=False) channel.start_consuming() except pika.exceptions.ConnectionClosed as e: continue except Exception as e: logger_msg.info(traceback.format_exc()) finally: channel.basic_ack(delivery_tag=method.delivery_tag) pool.close() pool.join()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
結論:本例中消費者主進程將持續監聽MQ,一旦MQ有消息將會拉取,隨后從進程池中啟動子進程來處理消息,但是從進程池啟動子進程的過程並不一定成功(若當前進程池沒有空閑子進程),而主進程不管任何情況下都給MQ發送ACK狀態碼,從而MQ將未處理的消息移除掉,導致消息丟失
3. 方案
問題是在消費者環節產生,因此對消費者做改動,需要調整消費者的架構:
* 原來邏輯:使用進程池技術,主進程負責監聽、接收MQ的消息,子進程負責執行MQ的消息,缺點是單一的主進程無法簡單處理ACK狀態碼,不易維護;
* 現有邏輯:使用RabbitMQ自身特性(work_queue),消費者不再維護進程池,是單進程,負責監聽、接收、處理MQ的消息,處理完了以后再反饋ACK狀態碼,進程與進程之間互不干擾,易維護,並發量大時可隨時增加消費者進程;
目前方案的問題以及解決方案:
-
問題1:消息重復消費
描述:用戶在頁面停止查詢時,會導致消費者進程被殺死,因此ACK狀態碼未反饋至MQ,從而消息一直存留在MQ中,當新的消費者啟動時會重新消費;
解決方案:消費者每次執行查詢前,首先在DB上查詢任務的執行狀態,若處於「取消/失敗/成功」則表示已經由其它消費者消費過,那么直接返回ACK狀態碼給MQ,將消息從MQ中移除; -
問題2:進程池如何維護?
描述:用戶在頁面停止查詢時,會導致消費者進程被殺死,導致消費者數量減少;
解決方案:維護一個監控腳本,每分鍾輪詢消費者進程數,若少於40個進程,則新啟動一個消費者,直到數量足夠;