Laravel中的隊列處理
隊列介紹
為什么要有消息隊?這里先對其進行一個簡單的介紹,方便還不了解的同學理解.在面向對象里,有一個很簡單的概念--消息傳遞,而消息隊列就可以在它上面擴展一下,把它說的更通俗些:從執行的角度去看,消息隊列把原
來可直接調用的一個函數(一段程序或一個對象)放到另一個進程中了,所以它們之間的消息傳遞就從直接傳遞參數變成了以隊列為載體來傳遞所需參數的一種方式.更加詳細的介紹可以參考這篇文章
眾所周知,laravel是個優雅的框架,它的隊列處理也不例外,可以先看看手冊
Laravel 5.2 文檔 服務 —— 隊列(這篇文章是英文版手冊翻譯,內容詳盡).在基本了解它的使用之后,我們就可以來分析分析相關源代碼,學習它的原理了(該篇文章只探討基於Redis的隊列服務).
隊列Service
laravel把隊列相關的服務全封裝在一個Service里面,通過Queue Service Provider 注冊到IOC中.在Queue Serivce里提供了多種服務,關於它做了什么請看以下代碼中的注釋(代碼有點多,就不全部貼出來了)
public function register()
{
// 注冊Manager, 而Manager為隊列服務的統一入口
$this->registerManager();
// 注冊隊列的各種命令
$this->registerWorker();
$this->registerListener();
// 注冊任務執行失敗后的記錄
$this->registerFailedJobServices();
// 未知
$this->registerQueueClosure();
}
講配置文件
在Queue Service注入到IOC后,我們就可以使用隊列了。
一個隊列服務最基本的就是把任務寫入隊列,再將其拿出來執行, 很簡單.所以隊列服務最基本的要素有四點:任務,進隊列,出隊列,執行.在這里,我們就跟着這四個要素的步伐,看看它們在laravel中是如何實現的.這里,先以官方的示例來分析,有了一個具體概念之后再舉一反三,學會它的本質。
任務
隊列服務就是圍繞着任務進行的.在手冊上,通過它的實例SendReminderEmail
,我們可以很清楚地知道,laravel可以對一個任務做很多事,比如:可設置重新執行的次數,說明該任務(若失敗)可以被執多次(針對的是單個Job);可設置是否可以延遲執行;對該Job設置處理的隊列名稱,等等.這些功能都是\Illuminate\Bus\Queueable
提供的,當然,實例中還有一個\Illuminate\Queue\InteractsWithQueue
,而它則是針對Job所用(稍后再說).一個任務建立完成后,就需要使其進入隊列了。當然了,除了以上幾個特點,還有任務的執行邏輯等等,要全面地了解任務,就需要清楚它的數據結構,其在隊列中的數據結構會在進入隊列中講到.
任務進隊列
示例中,在定義了任務之后,就將其用Controller中的方法使其進入了隊列,那么這一點是如何實現的?
代碼在:\Illuminate\Foundation\Bus\DispatchesJobs
protected function dispatch($job)
{
return app(Dispatcher::class)->dispatch($job);
}
/**
* Dispatch a command to its appropriate handler in the current process.
*
* @param mixed $job
* @return mixed
*/
public function dispatchNow($job)
{
return app(Dispatcher::class)->dispatchNow($job);
}
這段代碼的意圖得了解app(Dispather::class)
, 這個則在\Illuminate\Bus\BusServiceProvider
中表現的很明確了(為什么是這里就不分析了),app(Dispather::class)
就是\Illuminate\Bus\Dispatcher
.現在,上面代碼中的dispatch
與 dispatchNow
方法就會逐漸清晰起來.簡言之,該Dispatcher
類做了兩件事,執行該任務或把該任務放入隊列,也就是將隊列任務分為了兩種執行方式,立即執行或以消息隊列執行,與隊列相關的代碼如下:
/**
* 把任務分發到隊列中
* @param string $command 任務類
*/
public function dispatchToQueue($command)
{
$connection = isset($command->connection) ? $command->connection : null;
// laravel里內置了多種隊列服務,這里則解析出來
$queue = call_user_func($this->queueResolver, $connection);
// 隊列服務解析不成功則拋出異常
if (! $queue instanceof Queue) {
throw new RuntimeException('Queue resolver did not return a Queue implementation.');
}
// 在任務類中可自定義queue方法進入隊列
if (method_exists($command, 'queue')) {
return $command->queue($queue, $command);
} else {
// 系統提供的一種進入隊列方式
return $this->pushCommandToQueue($queue, $command);
}
}
/**
* 根據不同的任務屬性選擇不同的進入隊列方式
* 這里所提到的方式在手冊中有提到
* @param Queue $queue 隊列服務
* @param $command 任務類
*/
protected function pushCommandToQueue($queue, $command)
{
// 該推任務設置了延遲,且設置隊列名稱
if (isset($command->queue, $command->delay)) {
return $queue->laterOn($command->queue, $command->delay, $command);
}
//設置隊列名稱
if (isset($command->queue)) {
return $queue->pushOn($command->queue, $command);
}
//設置延遲
if (isset($command->delay)) {
return $queue->later($command->delay, $command);
}
// default
return $queue->push($command);
}
到現在為止,Controller已經展示了一種進入隊列的方法,很明顯它是經過封裝提供的接口,雖然很好用,但有些操作是我們所不必須的,比如:是否立即執行,進入隊列就需設置不同的任務參數等等,需要更好的為我們所用,就再深入一點,找出它進入隊列的關鍵點(與Redis交互的地方).上面已經提到call_user_func($this->queueResolver, $connection);
會得到一個隊列服務,那么$this->queueResolver
是什么?在\Illuminate\Bus\BusServiceProvider:23
就可以看到:
// 理解這個回調,則需要了解Illuminate\Contracts\Queue\Factory
// 在vendor/laravel/framework/src/Illuminate/Foundation/Application.php:1051 可以看到它與Illuminate\Queue\QueueManager的關系了
$this->app->singleton('Illuminate\Bus\Dispatcher', function ($app) {
return new Dispatcher($app, function ($connection = null) use ($app) {
return $app['Illuminate\Contracts\Queue\Factory']->connection($connection);
});
});
QueueManager
在laravel中,Service對外的統一接口都是其Manager,其中與所需服務交互的基本上是通過__call
方法提供,這種方式有兩個優點,一,提供統一的接口,二,分層明確(將實際的處理由__call
轉發,與配置相關的則由manager自己解決).
現在為了使任務進入隊列的過程更清晰,一步一步找到了QueueManager
,這個類設置了很多事件接口,和其他連接相關方法.其中connection
方法就展示了一個隊列服務是如何解析出來的了.
其實這段解析的代碼唯一的難點中於:
protected function getConnector($driver)
{
if (isset($this->connectors[$driver])) {
return call_user_func($this->connectors[$driver]);
}
throw new InvalidArgumentException("No connector for [$driver]");
}
為什么這么一段簡單的代碼就能解析隊列服務?查看QueueServiceProvider就一目了然了.其中就注冊了很多隊列服務.redis
的隊列服務處理則是\Illuminate\Queue\RedisQueue
.
QueueManager
的功能現在很清晰了.1,解析隊列服務 2,轉發(__call
)處理到相應的隊列服務中 3,提供隊列相關接口 .既然QueueManager
有這么多隊列相關的功能,那么我們完全可以把它作為一個隊列處理的入口(直接獲取隊列服務再進行操作是並不是明智的選擇),巧的是laravel
也是這么做的.所以現在有兩種方式進入隊列,1,使用\Illuminate\Foundation\Bus\DispatchesJobs
間接與隊列服務通信 2,使用QueueManager
間接與隊列服務通信.當然這些方法都是在\Illuminate\Queue\RedisQueue
(隊列服務的接口)上擴展的.所以掌握該類,就能明白隊列的各種行為了.
RedisQueue
隊列服務在lavavel
中提供了多種,這里只對以Redis
隊列服務進行分析學習.所以有關隊列的處理都集中在\Illuminate\Queue\RedisQueue
.上面也說到了,有兩種方式進入隊列, 分別使用,看它們產生的任務數據結構有什么區別?(數據結構便於分析,在后面會提到)
-
在Controller使用
$this->dispatch((new SendReminderEmail()))
;即以任務類進入{ "job": "Illuminate\\Queue\\CallQueuedHandler@call", "data": { "command": "O:26:\"App\\Jobs\\SendReminderEmail\":4:{s:5:\"queue\";s:5:\"email\";s:10:\"connection\";N;s:5:\"delay \";N;s:6:\"\u0000*\u0000job\";N;}" }, "id": "7u00jImd8CAns0fQO8jedqkQmnbQsfsr", "attempts": 1 }
-
直接使用
Queue::push(SendReminderEmail::class , ['email'=>'123456789@qq.com'],'email')
;{ "job": "App\\Jobs\\SendReminderEmail", "data": { "email": "123456789@qq.com" }, "id": "I0OeBIQjJjisQrZ7STX3zexrBLF7Uilx", "attempts": 1 }
上面講到,構成消息隊列需要兩個進程,所以上面的進入隊列是一個進程,現在的出隊列及執行任務則在另一個進程中執行。lavarel提供了兩個命令來啟動該進程,quque:work ,queue:litsen 當然,再理解了如何完成這些操作后完全可以自己寫一個命令,現在看看它是如何出隊列和如何執行任務?
任務出隊列
在手冊中,對於一個任務可以指定多種屬性,比如,延遲,失敗次數,隊列名稱等等,當然,所有可執行操作或功能都得依賴數據結構,數據結構的制定也是為了實現相應的行為.所以,RedisQueue的代碼對應上面的數據結構來理解就比較容易了。
RedisQueue是所有隊列服務(Redis)的基礎接口,所以任務出隊列的操作也能在這找到。假設現在已經對RedisQueue的代碼已經有點熟悉了,不難發現,有一個稍復雜的pop方法(出隊列)。那么,問題出現了,出隊列是如何實現的?解決了這個問題,任務出隊列就可算是完成了.
隊列應有的功能
查看php artisan queue:work --help
命令的使用方法,整理有關隊列所需的功能或服務:
- 指定隊列名稱
- 任務的執行邏輯
- 任務執行延遲
- 任務中失敗的最大次數
當然還有其他關於該命令的功能,比如:是否以守護進程執行,是否強制執行,限制進程執行的memory,無任務時的等待時間.這些與命令相關的因不同的命令而異,與隊列任務無關.這樣,在理清隊列任務需要的功能后,我們就可以分析它的數據結構,理解代碼了.
隊列數據結構
數據結構都是依據行為而建立.所以在查看pop方法時,可考慮以上幾個點.上面的數據結構中,已經可以看到隊列的執行邏輯,所需參數,失敗次數,這些一目了然,就不啰嗦了.在整個pop方法中,有這么幾個隊列,queue:delayed,queue:reserved,queue.本來取出一個任務用lpop就可完成,為什么要多用兩個集合(注意,是有序集合不是隊列)來完成pop操作呢?因為要實現任務延遲和失敗處理.
其執行過程如圖:
過程解析:
(1). 取任務,因為要實現延遲的功能,所以在有序集合里的score是過期時間,過期時間的含義則是在此時間之前不執行,也就達到了延遲執行的效果.延遲的含義在這里指的並不是在多少秒后執行,而是在多少秒內不執行.對於過期的任務,就將其rpush到隊列中,直到lpop操作將其拿走.
(1).為什么在存在queue:reserved集合並且把lpop的任務zadd進支?因為只要lpop了job就可以將其記錄下來,若此時任務還未開始執行進程就非正常終止了,該任務就不會丟失,再次執行時,依據上面的步驟就可以將其取出,防止意外使job丟失.
(2).隊列的執行都是依據json中的類來完成,這部分較簡單,略.
(3).當任務執行成功時,要手動刪除queue:reserved中的任務;當任務執行失敗,刪除queue:reserved中的任務,再將其記錄下來,記錄方式是zadd queue:delayed
, 並且將該任務的執行次數加一,這個過程RedisQueue已經封裝(RedisQueue::release)好了.
這一系列的過程就完成了讓隊列任務延遲的功能.所以這么復雜的操作都是為了實現延遲的功能,當然,有更好的點子可以考慮自己實現.
執行任務
到此,任務的執行在json數據結構中表現的很明確,整個處理過程也很清晰了.需要注意的是當任務執行成功后要刪除任務.對於如何執行出隊列,以及如何執行隊列任務,可以詳細看看queue:work
命令(\Illuminate\Queue\Console\WorkCommand::fire
), 它是最好的示例;
隊列處理命令的自定義
在使用queue:work
之后,會發現它並不有處理所有的情況.所以在本文中一直提到過,自寫一個處理命令是可行的.當面臨queue:work
所不能解決的問題時,可以好好考慮下自己編寫.在實際開發中,任務的種類繁多,對於不同的任務應該有不同的處理方案.所以,有以下幾個問題是經常遇到的:
比如:
- 調用服務發生錯誤且由服務提供方造成,需另作記錄,而這樣的錯誤不算作job的執行錯誤
- 營銷短信只能在9:00到20:00之間發送, 所以在該時間段內沒有執行的必要
- 與數據庫交互時,數據庫連接是有時間限制的,而以守護進程的方式執行則無時間限制,這樣就會報錯
所以,面臨laravel所提供命令的局限性,有自定義處理命令的能力是很有必要的.
小結
之於框架(優秀開源產品),只不過是有着作者個人風格的一些封裝,要真正的學會使用它,則需要把這種風格化的表象移除, 看到這層’皮’下到底是什么,這樣才能學習到框架的本質.希望這篇文章能給同學們帶來一點幫助.
學習資料
laravel學院的中文文檔給力