參考:《Redis入門指南》第4章進階
http://book.51cto.com/art/201305/395461.htm
4.4.2 使用Redis實現任務隊列
說到隊列很自然就能想到Redis的列表類型,3.4.2節介紹了使用LPUSH和RPOP命令實現隊列的概念。如果要實現任務隊列,只需要讓生產者將任務使用LPUSH命令加入到某個鍵中,另一邊讓消費者不斷地使用RPOP命令從該鍵中取出任務即可。
在小白的例子中,完成發郵件的任務需要知道收件地址、郵件主題和郵件正文。所以生產者需要將這三個信息組成對象並序列化成字符串,然后將其加入到任務隊列中。而消費者則循環從隊列中拉取任務,就像如下偽代碼:
- # 無限循環讀取任務隊列中的內容
- loop
- $task = RPOR queue
- if $task
- # 如果任務隊列中有任務則執行它
- execute($task)
- else
- # 如果沒有則等待1秒以免過於頻繁地請求數據
- wait 1 second
到此一個使用Redis實現的簡單的任務隊列就寫好了。不過還有一點不完美的地方:當任務隊列中沒有任務時消費者每秒都會調用一次RPOP命令查看是否有新任務。如果可以實現一旦有新任務加入任務隊列就通知消費者就好了。其實借助BRPOP命令就可以實現這樣的需求。
BRPOP命令和RPOP命令相似,唯一的區別是當列表中沒有元素時BRPOP命令會一直阻塞住連接,直到有新元素加入。如上段代碼可改寫為:
- loop
- # 如果任務隊列中沒有新任務,BRPOP命令會一直阻塞,不會執行execute()。
- $task = BRPOP queue, 0
- # 返回值是一個數組(見下介紹),數組第二個元素是我們需要的任務。
- execute($task[1])
BRPOP命令接收兩個參數,第一個是鍵名,第二個是超時時間,單位是秒。當超過了此時間仍然沒有獲得新元素的話就會返回nil。上例中超時時間為"0",表示不限制等待的時間,即如果沒有新元素加入列表就會永遠阻塞下去。
當獲得一個元素后BRPOP命令返回兩個值,分別是鍵名和元素值。為了測試BRPOP命令,我們可以打開兩個redis-cli實例,在實例A中:
- redis A> BRPOP queue 0
brpop:
返回值:
鍵入回車后實例1會處於阻塞狀態,這時在實例B中向queue中加入一個元素:
- redis B> LPUSH queue task
- (integer) 1
在LPUSH命令執行后實例A馬上就返回了結果:
- 1) "queue"
- 2) "task"
同時會發現queue中的元素已經被取走:
- redis> LLEN queue
- (integer) 0
除了BRPOP命令外,Redis還提供了BLPOP,和BRPOP的區別在與從隊列取元素時BLPOP會從隊列左邊取。具體可以參照LPOP理解,這里不再贅述。
4.4.3 優先級隊列
前面說到了小白博客需要在發布文章的時候向每個訂閱者發送郵件,這一步驟同樣可以使用任務隊列實現。由於要執行的任務和發送確認郵件一樣,所以二者可以共用一個消費者。然而設想這樣的情況:假設訂閱小白博客的用戶有1000人,那么當發布一篇新文章后博客就會向任務隊列中添加1000個發送通知郵件的任務。如果每發一封郵件需要10秒,全部完成這1000個任務就需要近3個小時。問題來了,假如這期間有新的用戶想要訂閱小白博客,當他提交完自己的郵箱並看到網頁提示他查收確認郵件時,他並不知道向自己發送確認郵件的任務被加入到了已經有1000個任務的隊列中。要收到確認郵件,他不得不等待近3個小時。多么糟糕的用戶體驗!而另一方面發布新文章后通知訂閱用戶的任務並不是很緊急,大多數用戶並不要求有新文章后馬上就能收到通知郵件,甚至延遲一天的時間在很多情況下也是可以接受的。
所以可以得出結論當發送確認郵件和發送通知郵件兩種任務同時存在時,應該優先執行前者。為了實現這一目的,我們需要實現一個優先級隊列。
BRPOP命令可以同時接收多個鍵,其完整的命令格式為BRPOP key [key …] timeout,
按參數 key 的先后順序依次檢查各個列表,彈出第一個非空列表的尾部元素
如BRPOP queue:1 queue:2 0。意義是同時檢測多個鍵,如果所有鍵都沒有元素則阻塞,如果其中有一個鍵有元素則會從該鍵中彈出元素。例如,打開兩個redis-cli實例,在實例A中:
- redis A> BRPOP queue:1 queue:2 queue:3 0
在實例B中:
- redis B> LPUSH queue:2 task
- (integer) 1
則實例A中會返回:
- 1) "queue:2"
- 2) "task"
如果多個鍵都有元素則按照從左到右的順序取第一個鍵中的一個元素。我們先在queue:2和queue:3中各加入一個元素:
- redis> LPUSH queue:2 task1
- 1) (integer) 1
- redis> LPUSH queue:3 task2
- 2) (integer) 1
然后執行BRPOP命令:
- redis> BRPOP queue:1 queue:2 queue:3 0
- 1) "queue:2"
- 2) "task1"
借此特性可以實現區分優先級的任務隊列。我們分別使用queue:confirmation. email和queue:notification.email兩個鍵存儲發送確認郵件和發送通知郵件兩種任務,然后將消費者的代碼改為:
- loop
- $task =
- BRPOP queue:confirmation.email,
- queue:notification.email,
- 0
- execute($task[1])
這時一旦發送確認郵件的任務被加入到queue:confirmation.email隊列中,無論queue: notification.email還有多少任務,消費者都會優先完成發送確認郵件的任務。
參考:http://book.51cto.com/art/201305/395463.htm
官網說法:
RPOPLPUSH source destination
命令 RPOPLPUSH 在一個原子時間內,執行以下兩個動作:
- 將列表 source 中的最后一個元素(尾元素)彈出,並返回給客戶端。
- 將 source 彈出的元素插入到列表 destination ,作為 destination 列表的的頭元素。
模式: 安全的隊列
Redis的列表經常被用作隊列(queue),用於在不同程序之間有序地交換消息(message)。一個客戶端通過 LPUSH 命令將消息放入隊列中,而另一個客戶端通過 RPOP 或者 BRPOP 命令取出隊列中等待時間最長的消息。
不幸的是,上面的隊列方法是『不安全』的,因為在這個過程中,一個客戶端可能在取出一個消息之后崩潰,而未處理完的消息也就因此丟失。
使用 RPOPLPUSH 命令(或者它的阻塞版本 BRPOPLPUSH )可以解決這個問題:因為它不僅返回一個消息,同時還將這個消息添加到另一個備份列表當中,如果一切正常的話,當一個客戶端完成某個消息的處理之后,可以用 LREM 命令將這個消息從備份表刪除。
最后,還可以添加一個客戶端專門用於監視備份表,它自動地將超過一定處理時限的消息重新放入隊列中去(負責處理該消息的客戶端可能已經崩潰),這樣就不會丟失任何消息了。
模式:循環列表 Circular list
通過使用相同的 key 作為 RPOPLPUSH 命令的兩個參數,客戶端可以用一個接一個地獲取列表元素的方式,取得列表的所有元素,而不必像 LRANGE 命令那樣一下子將所有列表元素都從服務器傳送到客戶端中(兩種方式的總復雜度都是 O(N))。
以上的模式甚至在以下的兩個情況下也能正常工作:
- 有多個客戶端同時對同一個列表進行旋轉(rotating),它們獲取不同的元素,直到所有元素都被讀取完,之后又從頭開始。
- 有客戶端在向列表尾部(右邊)添加新元素。
這個模式使得我們可以很容易實現這樣一類系統:有 N 個客戶端,需要連續不斷地對一些元素進行處理,而且處理的過程必須盡可能地快。一個典型的例子就是服務器的監控程序:它們需要在盡可能短的時間內,並行地檢查一組網站,確保它們的可訪問性。
注意,使用這個模式的客戶端是易於擴展(scala)且安全(reliable)的,因為就算接收到元素的客戶端失敗,元素還是保存在列表里面,不會丟失,等到下個迭代來臨的時候,別的客戶端又可以繼續處理這些元素了。
更多:
http://redis.io/commands/blpop
\http://redis.io/commands/rpoplpush