原文:http://blog.nosqlfan.com/html/3223.html
RabbitMQ是當成應用比較廣泛的隊列服務系統,其配套的客戶端和監控運維方案也比較成熟。BoxedIce的隊列服務從今年四月開始從RabbitMQ切換到了MongoDB上,並一直穩定運行至今,下面是BoxedIce對隊列系統的一個討論PPT及相關敘述。分享給大家。
為什么要使用隊列系統?
- 因為一些任務需要在后台執行,讓調用者不需要等待其完成就能返回。比如給用戶發送郵件,短信之類的操作。
- 還有就是一個大系統內部的通信,可能會采用隊列的方式傳遞消息。
對隊列系統有哪些要求?
- 應對任務處理者:通常會有一些進程從隊列里獲取消息進行處理,而且通常這些進程都會啟動很多個。所以隊列需要能夠處理並發的數據請求操作。
- 原子性:隊列中的元素只能被取出一次,必須保證每次讀取隊列中元素進行操作和刪除這個元素是原子性的。
- 快速:隊列系統要能夠快速地處理元素的寫入和讀取操作。
- 垃圾回收能力:如果一個任務處理到一半死掉了,那么必須能有方法監測到並且將這個任務重新放入隊列中。
關於任務處理
任務處理進程的工作,就是從隊列中讀出消息,並且處理這個消息。所以它需要一種能夠從隊列中取出元素進行處理的能力。RabbitMQ提供了AMQP協議,目前已經有許多基於此協議的客戶端了,相對的,MongoDB基於其Mongo Wire Protocol協議也擁有豐富的客戶端支持。
BoxedIce在使用RabbitMQ時是用的pika客戶端,轉為MongoDB后使用的是pymongo。這兩個協議相對比,pymongo的開銷要小很多。
對於原子性,RabbitMQ通過對consume/ack協議的支持來實現。而由於MongoDB只支持對單個文檔的原子性個性,所以你可以使用其findAndModify 命令,簡單語法如下:
db.runCommand( { findAndModify : collection, { options } } )
這里的options是一個數組,其包含下面一些元素:
- where:是一個查詢條件,比如在我們的例子里,這個查詢條件是{‘where’ { ‘inProg’: false, ‘done’: false } }操作會針對查詢到的第一個元素進行。
- sort:是對查詢結構的排序,可以設置查詢返回結果是按哪種條件排序的。比如你可以設置一個消息優先級,然后按優先級的倒序排序,讓優先級高的消息優先進行處理。
- update:這是標明你需要如何個性這個元素,在我們的例子里,我們設置其inProg標識為true,標 明正在處理中,設置處理時間t為當前時間,這個時間會用在垃圾回收中。如下:{‘update’ : {‘$set’ : { ‘inProg’ : true, ‘t’ : new Date() } } }
垃圾回收
上面我們在處理消息的時候會將其inProg設置為true表示正在處理,當成功處理完成后,再將done設置為true,但如果處理過程中出現問 題,就會導致一個inProg為true但是done永遠為false的任務。這時候就需要垃圾回收策略通過檢查處理時間t來進行垃圾回收處理了。
now = datetime.datetime.now()
difference = datetime.timedelta(seconds=10)
timeout = now - difference
queue.find({'inProg' : True, 'start' : {'$lte' : timeout} })
比如上面的代碼,我們通過判斷t是否是在當前時間10秒以前來判斷是否過期(10秒都未處理完,我們認為任務處理失敗),這些失敗的消息我們可以進行相關的處理,讓它重新加入到消息隊列中。
其它的一些考慮
除了上面說到的速度,原子性等特性,對於一個隊列系統,還是有一些其它方面需要考慮的。
- 容錯性:MongoDB的replica sets架構提供了整體的高可用性。當其被用作隊列時,也同樣繼承了這一我。而RabbitMQ並沒有內置的支持。目前在RabbitMQ 2.6.0中有相關的支持
- 一致性:MongoDB默認會一分鍾將數據flush到磁盤,但其同時提供一個默認100ms的操作日志可以增強其單機的可靠性。可以緩解宕機時數據丟失導致不一至的情況。如果你對一致性要求非常高,你也可以使用MongoDB的getLastError命令來保證你的每次操作都寫入操作日志或者磁盤上才返回成功。
- 擴展性:我們使用capped collection來做消息隊列,所以老數據的清除是自動的。在MongoDB中可以通過sharding方式來實現數據的橫向擴展,但是sharding並不支持用於capped collection。你可以自己選擇自己需要的應用方式。
