Lumen 隊列


 

隊列

 

簡介

{tip} 現在,Laravel 為你的 Redis 隊列提供了 Horizon,一個擁有漂亮的儀表盤的配置系統。 查看完整的 Horizon 文檔 Horizon documentation 了解更多信息。

Laravel 隊列為不同的隊列后台服務提供了統一的 API,比如 Beanstalk, Amazon SQS, Redis, 甚至是關系型數據庫。隊列可以使你延遲處理一些耗時的任務,比如發送郵件。延遲這些耗時的任務會大幅提高你的應用對 web 請求的處理速度。

隊列配置文件存儲在 config/queue.php。在這個文件中,你可以找到框架包含的所有隊列驅動的連接配置: database, BeanstalkdAmazon SQSRedis,和一個直接執行任務的同步驅動(本地使用)。 還包括了一個 null 隊列驅動用於直接丟棄隊列任務。

 

連接 Vs. 隊列

在使用 Laravel 的隊列之前,搞懂「隊列」和「連接」的差別是很重要的。 在你的 config/queue.php 配置中,有一個 connections 配置選項。 這個選項定義了一些特定的后台服務,例如 Amazon SQS, Beanstalk 或 Redis 。任何一個連接都可能會有多個「隊列」用於提供不同的任務堆棧。

注意「隊列」配置文件 queue 中的每一個連接配置中都包含一個 queue 屬性。這是一個默認的隊列,當任務被分發到選定的連接時就會進入這個隊列。也就是說,如果你沒有明確定義一個任務將會被分發到哪個隊列,它就會被分發到對應連接的 queue 屬性配置中定義的隊列。

// 這個任務將會被分發到默認隊列...
Job::dispatch();

  

 //這個任務將會被分發到「emails」隊列...
Job::dispatch()->onQueue('emails');

  

一些應用也許不需要將任務推到多個隊列,而是發到一個簡單的隊列就可以了。然而,一些將任務分發到多個隊列對於想優先處理或是對其進行分類的任務是非常有用的,因為 Laravel 隊列處理器允許你指定隊列的優先級。例如,如果將任務分發到 high 隊列,你就可以讓隊列處理器優先處理這些任務了。

php artisan queue:work --queue=high,default

  

 

驅動的必要設置

 

Database

為了使用 database 隊列驅動,你需要一張數據表來存儲任務。運行 queue:table Artisan 命令來創建這張表的遷移文件。當遷移文件創建好后,你就可以使用 migrate 命令來進行遷移:

php artisan queue:table

php artisan migrate

  

Redis

為了使用 redis 隊列驅動,你需要在 config/database.php 配置文件中配置 Redis 的數據庫連接。

Redis 集群

如果你的 Redis 隊列驅動使用了 Redis 集群,你的隊列名必須包含一個 key hash tag 。這是為了確保所有的 Redis 鍵對於一個隊列都被放在同一哈希中。

'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => '{default}',
    'retry_after' => 90,
],

  

阻塞

當使用 Redis 隊列時,你可以用 block_for 配置項來具體說明驅動應該在將任務重新放入 Redis 數據庫以及處理器輪詢之前阻塞多久。

基於你的隊列加載來調整這個值比把新任務放入 Redis 數據庫輪詢要更有效率的多。例如,你可以將這個值設置為 5來表明這個驅動應該在等待任務可用時阻塞5秒。

'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => 'default',
    'retry_after' => 90,
    'block_for' => 5,
],

  

{note} 阻塞是一個實驗性功能。如果在任務被取回的同時,Redis 服務或隊列處理器崩潰的話,有很小的可能導致一個隊列任務丟失。

 

其它隊列驅動的依賴擴展包

在使用列表里的隊列服務前,必須安裝以下依賴擴展包:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~3.0
  • Redis: predis/predis ~1.0

 

創建任務

 

生成任務類

在你的應用程序中,隊列的任務類都默認放在 app/Jobs 目錄下。如果這個目錄不存在,那當你運行 make:jobArtisan 命令時目錄就會被自動創建。你可以用以下的 Artisan 命令來生成一個新的隊列任務:

php artisan make:job ProcessPodcast

  

生成的類實現了 Illuminate\Contracts\Queue\ShouldQueue 接口,這意味着這個任務將會被推送到隊列中,而不是同步執行。

 

任務類結構

任務類的結構很簡單,一般來說只會包含一個讓隊列用來調用此任務的 handle 方法。我們來看一個示例的任務類。這個示例里,假設我們管理着一個播客發布服務,在發布之前需要處理上傳播客文件:

<?php

namespace App\Jobs;

use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * 創建一個新的任務實例。
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * 運行任務。
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // Process uploaded podcast...
    }
}

  

注意,在這個例子中,我們在任務類的構造器中直接傳遞了一個 Eloquent 模型 。因為我們在任務類里引用了 SerializesModels 這個 trait,使得 Eloquent 模型在處理任務時可以被優雅地序列化和反序列化。如果你的隊列任務類在構造器中接收了一個 Eloquent 模型,那么只有可識別出該模型的屬性會被序列化到隊列里。當任務被實際運行時,隊列系統便會自動從數據庫中重新取回完整的模型。這整個過程對你的應用程序來說是完全透明的,這樣可以避免在序列化完整的 Eloquent 模式實例時所帶來的一些問題。

在隊列處理任務時,會調用 handle 方法,而這里我們也可以通過 handle 方法的參數類型提示,讓 Laravel 的 服務容器 自動注入依賴對象。

{note} 像圖片內容這種二進制數據,在放入隊列任務之前必須使用 base64_encode 方法轉換一下。否則,當這項任務放置到隊列中時,可能無法正確序列化為 JSON。

 

分發任務

一旦你寫完了你的任務類你就可以使用它自帶的 dispatch 方法分發它。傳遞給 dispatch 方法的參數將會被傳遞給任務的構造函數:

<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存儲一個新的播客節目。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 創建播客...

        ProcessPodcast::dispatch($podcast);
    }
}

  

 

延遲分發

如果你想延遲你的隊列任務的執行,你可以在分發任務的時候使用 delay 方法。例如,讓我們詳細說明一個十分鍾之后才會執行的任務:

<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存儲一個新的播客節目。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 創建播客...

        ProcessPodcast::dispatch($podcast)
                ->delay(now()->addMinutes(10));
    }
}

  

{note} Amazon SQS 隊列服務最大延遲 15 分鍾的時間。

 

工作鏈

工作鏈允許你具體定義一個按序列執行的隊列任務的列表。一旦序列中的任務失敗了,剩余的工作將不會執行。要運行一個工作鏈,你可以對可分發的任務使用 withChain 方法:

ProcessPodcast::withChain([
    new OptimizePodcast,
    new ReleasePodcast
])->dispatch();

  

 

工作鏈連接&隊列

如果你想定義用於工作鏈的默認連接和隊列,你可以使用 allOnConnection 和 allOnQueue 方法。 這些方法指定了所需隊列的連接和隊列——除非隊列任務被明確指定給了不同的連接/隊列:

ProcessPodcast::withChain([
    new OptimizePodcast,
    new ReleasePodcast
])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');

  

 

自定義連接&隊列

 

分發任務到指定隊列

通過將任務分發到不同隊列,你可以將你的隊列任務「分類」,甚至指定給不同隊列分配的任務數量。記住,這不是推送任務到你定義的隊列配置文件的不同的連接里,而是一個單一的連接。要指定隊列,在分發任務時使用 onQueue 方法:

<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存儲一個新的播客節目。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 創建播客...

        ProcessPodcast::dispatch($podcast)->onQueue('processing');
    }
}

  

 

分發任務到指定連接

如果你在多隊列連接中工作,你可以指定將任務分發到哪個連接。要指定連接,在分發任務時使用 onConnection 方法:

<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 存儲一個新播客節目。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 創建播客...

        ProcessPodcast::dispatch($podcast)->onConnection('sqs');
    }
}

  

當然,你可以鏈式調用 onConnection 和 onQueue 方法來指定連接和隊列。

ProcessPodcast::dispatch($podcast) ->onConnection('sqs') ->onQueue('processing');

 

指定最大任務嘗試次數/超時值

 

最大嘗試次數

在一個任務重指定最大嘗試次數可以通過 Artisan 命令的 --tries 選項 指定:

php artisan queue:work --tries=3

你可能想通過任務類自身對最大任務嘗試次數進行一個更顆粒化的處理。如果最大嘗試次數是在任務類中定義的,它將優先於命令行中的值提供。

<?php namespace App\Jobs; class ProcessPodcast implements ShouldQueue { /** * 任務可以嘗試的最大次數。 * * @var int */ public $tries = 5; }

 

基於時間的嘗試

作為另外一個選擇來定義任務在失敗前會嘗試多少次,你可以定義一個任務超時時間。這樣的話,在給定的時間范圍內,任務可以無限次嘗試。要定義一個任務的超時時間,在你的任務類中新增一個 retryUntil 方法:

/** * 定義任務超時時間 * * @return \DateTime */ public function retryUntil() { return now()->addSeconds(5); }

{tip} 你也可以在你的隊列事件監聽器中使用 retryUntil 方法。

 

超時

{note} timeout 特性對於 PHP 7.1+ 和 pcntl PHP 擴展進行了優化.

同樣的,任務執行最大秒數的數值可以通過 Artisan 命令行的 --timeout 選項指定。

php artisan queue:work --timeout=30

然而,你可能也想在任務類自身定義一個超時時間。如果在任務類中指定,優先級將會高於命令行:

<?php namespace App\Jobs; class ProcessPodcast implements ShouldQueue { /** * 超時時間。 * * @var int */ public $timeout = 120; }

 

頻率限制

{note} 這個特性要求你的應用可以使用 Redis 服務器.

如果你的應用使用了 Redis ,你可以通過時間或並發限制你的隊列任務。當你的隊列任務通過同樣有速率限制的 API 使用時,這個特性將很有幫助。例如,使用 throttle 方法,你可以限制一個給定類型的任務每 60 秒只執行 10 次。如果沒有獲得鎖,一般情況下你應該將任務放回隊列以使其可以被稍后重試。

Redis::throttle('key')->allow(10)->every(60)->then(function () { // 任務邏輯... }, function () { // 無法獲得鎖... return $this->release(10); });

{tip}在上述的例子里, key 可以是任何你想要限制頻率的任務類型的唯一識別字符串。例如,你也許想使構件基於任務類名的 key 或它操作的一系列 Eloquent 模型的 ID。

或者,你想具體說明一個任務可以同時執行的最大數量。在如下情況時這會很有用處:當一個隊列中的任務正在修改資源時,一次只能被一個任務修改。例如,使用 funnel 方法,你可以限制一個給定類型的任務一次只能執行一個處理器。

Redis::funnel('key')->limit(1)->then(function () { // 任務邏輯... }, function () { // 無法獲得鎖... return $this->release(10); });

{tip} 當使用頻率限制時,任務執行成功的嘗試的次數可能會難以確定。所以,將頻率限制與 時間限制 組合是很有作用的。

 

錯誤處理

如果在任務執行的時候出現異常,任務會被自動釋放到隊列中以再次嘗試。任務將會一直被釋放直到達到應用允許的最大重試次數。最大重試的數值由 queue:work Artisan 命令的 --tries 選項定義,或者在任務類中定義。更多執行隊列處理器的信息可以 在以下找到 。

 

運行隊列處理器

Laravel 包含了一個隊列處理器以將推送到隊列中的任務執行。你可以使用 queue:work Artisan 命令運行處理器。 注意一旦 queue:work 命令開始執行,它會一直運行直到它被手動停止或終端被關閉。

php artisan queue:work

{tip} 要使 queue:work 進程一直在后台運行,你應該使用進程管理器比如 Supervisor 來確保隊列處理器不會停止運行

記住,隊列處理器是一個常駐的進程並且在內存中保存着已經啟動的應用狀態。因此,它們並不會在啟動后注意到你代碼的更改。所以,在你的重新部署過程中,請記得 重啟你的隊列處理器.

 

執行單一任務

--once 選項用於使隊列處理器只處理隊列中的單一任務。

php artisan queue:work --once
 

指定連接&隊列

你也可以具體說明隊列處理器應該使用哪個隊列連接。 傳遞給 work 的連接名應該與你的 config/queue.php 配置文件中定義的連接之一相符。

php artisan queue:work redis

你甚至可以自定義你的隊列處理器使其只執行連接中指定的隊列。例如,如果你的所有郵件都由 redis 連接的 emails 隊列處理,你可以使用如下的命令啟動一個僅執行此隊列的處理器:

php artisan queue:work redis --queue=emails
 

資源注意事項

后台駐留的隊列處理器不會在執行完每個任務后「重啟」框架。因此,你應該在每個任務完成后釋放任何占用過大的資源。例如,如果你正在用 GD 庫執行圖像處理,你應該在完成后使用 imagedestroy 釋放內存。

 

隊列優先級

有時你可能想確定隊列執行的優先順序。例如在 config/queue.php 中你可以將 redis 連接的 queue 隊列的優先級從 default 設置為 low。然而, 偶爾你也想像如下方式將一個任務推送到 high 隊列:

dispatch((new Job)->onQueue('high'));

要運行一個處理器來確認 low 隊列中的任務在全部的 high 隊列任務完成后才繼續執行,你可以傳遞一個逗號分隔的隊列名列表作為 work 命令的參數。

php artisan queue:work --queue=high,low

 

隊列處理器&部署

因為隊列處理器是常駐進程,他們在重啟前不會應用你代碼的更改。因此,部署使用隊列處理器的應用最簡單的方法是在部署進程中重啟隊列處理器。你可以平滑地重啟所有隊列處理器通過使用 queue:restart 方法:

php artisan queue:restart

這個命令將會引導所有的隊列處理器在完成當前任務后平滑「中止」,這樣不會有丟失的任務。由於在執行 queue:restart 后隊列處理器將會中止,所以你應該運行一個進程管理器例如 Supervisor 來自動重啟隊列處理器。

{tip} 隊列使用 緩存 存儲重啟信號,所以你應該確定在使用這個功能之前配置好緩存驅動。

 

任務過期&超時

 

任務過期

在你的 config/queue.php 配置文件中,每個隊列連接都定義了一個 retry_after 選項。這個選項指定了隊列連接在重試一個任務前應該等它執行多久。例如,如果 retry_after 的值設置為 90 ,那么任務在執行了 90 秒后將會被放回隊列而不是刪除它。一般情況下,你應該將 retry_after 的值設置為你認為你的任務可能會執行需要最長時間的值。

{note} 只有在 Amazon SQS 中不存在 retry_after 這個值。 SQS將會以 AWS 控制台配置的 默認可見超時值 作為重試任務的依據。

 

處理器超時

queue:work Artisan 命令包含一個 --timeout 選項。 --timeout 選項指定了 Laravel 的隊列主進程在中止一個執行任務的子進程之前需要等到多久。有時一個子進程可能會因為各種原因「凍結」,比如一個外部的 HTTP 請求失去響應。 --timeout 選項會移除那些超過指定時間被凍結的進程。

php artisan queue:work --timeout=60

retry_after 配置項和 --timeout 命令行配置並不同,但將它們同時使用可以確保任務不會丟失並且任務只會成功執行一次。

{note} --timeout 的值應該比你在 retry_after 中配置的值至少短幾秒。這會確保處理器永遠會在一個任務被重試之前中止。如果你的 --timeout 值比 retry_after 的值長的話,你的任務可能會被執行兩次。

 

隊列進程睡眠時間

當任務在隊列中可用時,處理器將會一直無間隔地處理任務。 然而, sleep 選項定義了如果沒有新任務的時候處理器將會「睡眠」多長時間。在處理器睡眠時,它不會處理任何新任務 —— 任務將會在隊列處理器再次啟動后執行。

php artisan queue:work --sleep=3

 

Supervisor 配置

 

安裝 Supervisor

Supervisor 是一個 Linux 下的進程管理器,它會在 queue:work 進程關閉后自動重啟。要在 Ubuntu 下安裝 Supervisor ,你可以使用以下命令:

sudo apt-get install supervisor

{tip} 如果自己配置 Supervisor 很難的話,你可以考慮使用 Laravel Forge ,它將會為你的 Laravel 項目自動安裝配置 Supervisor

 

配置 Supervisor

一般 Supervisor 的配置文件存儲在 /etc/supervisor/conf.d 目錄。 在這個目錄下,你可以創建任意數量的配置文件控制 Supervisor 對於進進程的管理。例如,讓我們創建一個 laravel-worker.conf 文件開始管理 queue:work 進程:

[program:laravel-worker] process_name=%(program_name)s_%(process_num)02d command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 autostart=true autorestart=true user=forge numprocs=8 redirect_stderr=true stdout_logfile=/home/forge/app.com/worker.log

在這個示例中, numprocs 會告訴 Supervisor 運行 8 個 queue:work 進程並且管理它們,當它們關閉時會將其自動重啟。當然,你應該將 command 選項中的 queue:work sqs 部分修改為你的隊列連接。

 

啟動 Supervisor

當配置文件創建好后,你可以用下面的命令更新 Supervisor 配置文件並且啟動進程:

sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start laravel-worker:*

要了解更多關於 Supervisor 的信息,查詢 Supervisor 文檔 。

 

處理失敗的任務

有時你的隊列任務會失敗。別擔心,凡事無完美! Laravel 包含了一個便捷的方式指定任務會被最大嘗試的次數。在一個任務達到了它最大嘗試次數后,它會被放入 failed_jobs 表。要創建 failed_jobs 表你可以使用 queue:failed-table 命令:

php artisan queue:failed-table php artisan migrate

然后,運行你的 隊列處理器 ,你應該在 queue:work 命令上使用 --tries 選項。如果你沒有指定 --tries 的值,任務將會被無限次嘗試。

php artisan queue:work redis --tries=3

 

清除失敗的任務

你可以在你的任務類中定義一個 failed 方法,它可以允許你在一個任務失敗后清除它。這是一個提醒你的用戶或撤回任何任務所做出的修改的絕佳時機。導致任務失敗的 Exception 會被傳入到 failed 方法:

<?php namespace App\Jobs; use Exception; use App\Podcast; use App\AudioProcessor; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; class ProcessPodcast implements ShouldQueue { use InteractsWithQueue, Queueable, SerializesModels; protected $podcast; /** * 創建一個新的任務實例。 * * @param Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * 執行任務。 * * @param AudioProcessor $processor * @return void */ public function handle(AudioProcessor $processor) { // 執行上傳播客... } /** * 執行失敗的任務。 * * @param Exception $exception * @return void */ public function failed(Exception $exception) { // 給用戶發送失敗的通知等等... } }

 

任務失敗事件

如果你想注冊一個在任務失敗時調用的事件,你可以使用 Queue::failing 方法。這是通過 email 或 Stride 通知你的團隊的絕佳時機。例如,我們可以從 Laravel 中的 AppServiceProvider 注冊一個此事件的回調。

<?php namespace App\Providers; use Illuminate\Support\Facades\Queue; use Illuminate\Queue\Events\JobFailed; use Illuminate\Support\ServiceProvider; class AppServiceProvider extends ServiceProvider { /** * 啟動任意服務。 * * @return void */ public function boot() { Queue::failing(function (JobFailed $event) { // $event->connectionName // $event->job // $event->exception }); } /** * 注冊服務提供者。 * * @return void */ public function register() { // } }

 

重試失敗的任務

想要查看所有被放入 failed_jobs 數據表中的任務,你可以使用 queue:failed Artisan 命令:

php artisan queue:failed

queue:failed 命令會列出任務 ID ,隊列,以及失敗的事件。任務 ID 可能會被用於重試失敗的任務。例如,要重試一個任務 ID 為 5 的任務,使用如下命令:

php artisan queue:retry 5

要重試所有失敗的任務,執行 queue:retry 命令,將 all 作為 ID 傳入:

php artisan queue:retry all

如果你想刪除一個失敗的任務,使用 queue:forget 命令:

php artisan queue:forget 5

要清空所有失敗的任務,使用 queue:flush 命令:

php artisan queue:flush

 

任務事件

通過在隊列 facade 中使用 before 和 after 方法,你可以指定一個隊列任務被執行前后的回調。這些回調是添加額外的日志或增加統計的絕好時機。通常,你應該在 服務提供者 中調用這些方法。例如,我們可以使用 Laravel 的 AppServiceProvider :

<?php namespace App\Providers; use Illuminate\Support\Facades\Queue; use Illuminate\Support\ServiceProvider; use Illuminate\Queue\Events\JobProcessed; use Illuminate\Queue\Events\JobProcessing; class AppServiceProvider extends ServiceProvider { /** * 引導啟動任意應用服務 * * @return void */ public function boot() { Queue::before(function (JobProcessing $event) { // $event->connectionName // $event->job // $event->job->payload() }); Queue::after(function (JobProcessed $event) { // $event->connectionName // $event->job // $event->job->payload() }); } /** * 注冊服務提供者 * * @return void */ public function register() { // } }

在隊列 facade 使用 looping 方法可以在處理器嘗試獲取任務之前執行回調。例如,你也許想用一個閉包來回滾之前失敗的任務尚未關閉的事務。

Queue::looping(function () { while (DB::transactionLevel() > 0) { DB::rollBack(); } });

 

轉載自:https://laravel-china.org/docs/laravel/5.6/queues/1395


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM