RabbitMQ是一個開源的消息中間件,自帶管理界面友好、開發語言支持廣泛、沒有對其它中間件的依賴,而且社區非常活躍,特別適合中小型企業拿來就用。這篇文章主要探討提升RabbitMQ消費速度的一些方法和實踐,比如增加消費者、提高Prefetch count、多線程處理、批量Ack等。
增加消費者

這個道理比較容易理解,多個人搬磚的速度肯定比一個人要快很多。
不過實際情況中還需要面對一些技術挑戰,比如后端處理能力、並發沖突,以及處理順序。
后端處理能力:比如多個消費者都要操作數據庫,那么數據庫連接的並發數和讀寫吞吐量就是后端處理能力,如果達到了數據庫的最大處理能力,增加再多的消費者也沒有用,甚至會因為數據庫擁塞導致整體消費速度的下降。這個問題還存在另一種情況,就是消費者是否真正的發揮了后端服務的處理能力,比如使用Redis時是否采用了多線程、IO復用等方式來進一步提升吞吐量。
並發沖突:比如兩個消費者都要去修改用戶的積分,單個消費者的做法可能就是取出來、改下字段的值、最后再update到數據庫,多個消費者時如果同時取出了相同的數據,還這樣處理的話就會出問題了。這時候可能需要修改下SQL語句,直接在SQL語句中修改積分,由數據庫寫入事務來處理並發沖突;或者搞一個分布式鎖,對於具體的某個用戶同時只能有一個消費者來處理其積分。
處理順序:如果消息需要被順序處理,那么各個消費者之間還需要增加一個同步機制。比如基於GPS定位的電子圍欄,在出圍欄的某個時段,先產生了圍欄內定位消息、然后產生了圍欄外定位消息;如果圍欄外定位消息先被一個消費者處理,則判定為出圍欄,這沒有問題;然后圍欄內定位消息被另一個消費者處理,則會被判定為入圍欄,這個就屬於誤判了。這時候可能要同步一個已處理定位時間,早於這個時間的定位就拋棄掉;或者同一個設備的定位消息通過某種算法控制只能由某個消費者進行處理。
解決后邊兩個問題的方法不可避免的要引入多個消費者之間的協商機制,如果這些協商機制設計不好會對處理速度帶來很大影響。因此多人搬磚速度快的前提是多個人搬磚時不需要大家頻繁的坐下來協商誰搬哪塊磚,否則就會浪費很多時間在相互協調上,反而不能提升搬磚的速度。
所以通過增加消費者提升消費速度得以成立的前提是消費者業務並發處理能力要足夠,消費者依賴的后端服務處理能力也要足夠。這是此種方式的關鍵點。
提高Prefetch count

消息消費速度主要受到發送消息時間、消費者處理時間、消息Ack時間這幾個時間的影響,如果一個消息走完這個流程再發送另一個的話,效率將會非常低。可以讓消息在這幾個時間內恰當的分配,讓消息總是連續不斷的被消費者接收處理,就可以提升消費者的消費速度。
根據如上描述,有些消息可能正在被消費者處理,有些可能在等待消費者處理,有的消息可能還在網絡傳輸中,而如果不限制傳輸的數量,消費者端可能因處理能力補足會堆積大量的消息,首先內存使用將不可控制,其次此時也無法將這些消息再分配給別的消費者。因此才有了Prefetch count,用於控制消息發送給消費者的速度;這個方案需要配合Ack使用,消費者回復消息Ack后,RabbitMQ才會繼續發送同等數量的消息到消費者。提高Prefetch count到一個合適的值可以提升消息的消費速度。這個值的設定可能還要實時參考上邊提到的三個時間,這有點類似TCP的流控措施。這個值的計算方法請看下文:
RabbitMQ關於吞吐量,延遲和帶寬的一些理論
參考文檔:https://blog.csdn.net/gbbqrglvir3dyi82/article/details/78663828
多線程處理

多線程處理和增加消費者有異曲同工之妙。多線程處理不需要建立多個到RabbitMQ的連接,它在收到隊列消息后將其放入不同的線程中進行處理,這樣進程中就會有多個消息同時處理,增加了消費吞吐量,從而提升了消費速度。
來看一個例子:
consumer.Received += (o, e) => { ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessSingleContextMessage), e); };
在這個例子中波斯碼將收到的消息放入線程池隊列進行處理,注意這里需要配合上一節提到的Prefetch count,設置一個合適的值,消費者就可以同時處理多條消息了。
多線程處理也存在多消費者處理時的問題,只不過在一個進程中處理並發沖突和消息順序的成本可能更低一些。下邊的代碼片段展示了一個解決消息順序處理問題的方案:
// 接收消息存入列表,當接收數量達到prefetchCount/2時就加入處理隊列; // 1/2是考慮了消息從RabbitMQ到消費者的傳輸時間,不需要等所有的消息都到達了才開始處理。 consumer.Received += (o, e) => { lock(receiveLocker){ basicDeliverEventArgsList.Add(e); if (basicDeliverEventArgsList.Count >= prefetchCount/2) { var deliverEventArgs = basicDeliverEventArgsList.ToArray(); basicDeliverEventArgsList.Clear(); EnProcessQueue(deliverEventArgs); } } }; // 此處省略數據出隊列的代碼,請自行腦補 .... // 然后這個方法是用來處理消息的,將消息根據數據Key分成若干組,放到多個任務中並行處理; // 相同數據Key的消息將分配到一個組中,在這個組中數據被順序處理 private void Process(BasicDeliverEventArgs[] args) { if (args.Length <= 0) { return; } try { var tasks = CreateParallelProcessTasksByDataKey(args); Task.WaitAll(tasks); } catch (Exception ex) { ToLog("處理任務發生異常", ex); } } // 創建並行處理多條消息的任務 private Task[] CreateParallelProcessTasksByDataKey(BasicDeliverEventArgs[] args) { // 根據dataKey進行分組,dataKey可以放到消息的header中進行傳輸,這里就不給出具體的分組方法了 Dictionary<string, List<DeliverObject>> eDic = GetMessgeGroupByDataKey(args); // 任務數量 var paralleTaskNum = this.parallelNum; if (paralleTaskNum > eDic.Count) { paralleTaskNum = eDic.Count; } // 每個任務處理的消息數量 var perTaskNum = (int)Math.Ceiling(args.Length / (double)paralleTaskNum); // 任務數組 List<Task> tasks = new List<Task>(); var taskArgs = new List<DeliverObject>(); for (int j = eDic.Count - 1; j >= 0; j--) { var currentElement = eDic.ElementAt(j); taskArgs.AddRange(currentElement.Value); eDic.Remove(currentElement.Key); if (taskArgs.Count >= perTaskNum || j == 0) { // 創建任務,並處理分配的消息 var taskList = taskArgs.Select(d => d).ToList(); taskArgs.Clear(); var task = Task.Factory.StartNew(() => { // 這這里處理分組中的消息 ... }); tasks.Add(task); } } return tasks.ToArray(); }
上邊這段代碼中解決問題的關鍵就是將消息進行分組,同組內的消息順序處理,分組間並行處理,既通過多線程提升了消息整體的處理速度,又能支持消息的順序處理。
批量Ack

這種方式有效的原理是:每條消息分別Ack的情況下,RabbitMQ收到一個Ack才發送一條消息,這中間就會有很多的時間在等待Ack回來,通過批量Ack的方式,減少了很多Ack傳輸的時間。注意這里隱含的方式是RabbitMQ通過設置的Prefetch count連續向消費者發送多條消息,否則這個批量就沒意義了。
下邊的代碼片段給出其使用方式:
channel.BasicAck(e.DeliveryTag, true);
第2個參數為true就是指示采用批量Ack的方式,凡是delivery-tag比第1個參數小的消息都會被Ack。
這里需要注意:如果消費者在處理某條消息時失敗了,業務上又要求不能丟失任何消息,這時就不能對所有的消息進行批量Ack,否則RabbitMQ就不會再次投遞這條消息了,這需要根據自己的實際情況進行取舍。解決此問題的一個簡單方法是,跟蹤所有消息的處理結果,如果全部成功則使用批量Ack,如果部分成功則有兩個選擇:如果不關注順序則退化為每個消息發送Ack或Reject的方式;如果關注順序則本次接收到Prefetch count數量的消息全部nack,否則reject的消息再次投遞時順序就不對了,這時候業務還要做好處理重復數據的邏輯。
總結
通過分析上邊的這些方法,在使用RabbitMQ消費時可以遵循這樣一個路徑:
- 啟用Prefetch count設置;
- 先1個消費者,1次只接收1條,處理完畢后再傳輸下一條,這樣可以避免並發沖突和消息順序問題;
- 如果消費速度不滿足要求,則1次接收多條,按接收順序處理;
- 如果消費速度還是不滿足要求,則1次接收多條,並行處理;
- 如果消費速度還是不滿足要求,則啟動多個消費者,並行處理。
- 如果消費速度還是不滿足要求,改需求,或者換別的中間件。
在這個過程中需要始終關注優化消費者及后端程序處理能力,比如優化SQL語句、使用緩存、使用負載均衡等等,加快處理速度就能提升消費速度,而且很多時候就是程序處理太耗時了。
關於重復數據、並發沖突、順序處理問題的處理:
- 隨時做好處理重復數據的准備,因為不只消費者端可能會觸發消息的重復投遞,發送端也可能重復發送消息,這個很難避免。
- 對於並發沖突問題,消費者進程內可以使用鎖,跨消費者引入第三方機制來處理,比如使用Redis原子操作、數據庫原子操作或者分布式鎖。
- 對於順序處理問題,最好沒有這個需求;在同一個消費者內可以分組處理;在多個消費者時使用隊列分組,每個隊列綁定不同的Route key,不同Route key代表的消息之間沒有順序關聯。波斯碼再次提醒還要注意處理失敗時的邏輯,避免重新投遞消息的順序問題。