Lumen開發:結合Redis實現消息隊列(2)


上一篇講了Lumen配置Redis,現在來講一下,如何實現消息隊列

2、編寫任務類

2.1  任務類結構

默認情況下,應用的所有隊列任務都存放在app/Jobs目錄。任務類非常簡單,正常情況下只包含一個當隊列處理該任務時被執行的handle方法,讓我們看一個任務類的例子:、

<?php

namespace App\Jobs;

use App\User;
use App\Jobs\Job;
//use Illuminate\Contracts\Mail\Mailer;//略過郵箱操作
use Illuminate\Queue\SerializesModels;
//use Illuminate\Queue\InteractsWithQueue;//會引起jobs的沖突
//use Illuminate\Contracts\Bus\SelfHandling;//已自動實現
use Illuminate\Contracts\Queue\ShouldQueue;

//class SendReminderEmail extends Job implements SelfHandling, ShouldQueue
class SendReminderEmail extends Job implements ShouldQueue
{
    //use InteractsWithQueue, SerializesModels;
    use SerializesModels;

    protected $user;

    /**
     * 創建一個新的任務實例
     *
     * @param  User  $user
     * @return void
     */
    public function __construct(User $user)
    {
        $this->user = $user;
    }

    /**
     * 執行任務
     *
     * @param  Mailer  $mailer
     * @return void
     */
    public function handle(Mailer $mailer)
    {
        sleep(5);
        //TODO 這里可以往數據庫表插入一條記錄,可以看出異步效果
    }
}

 

在本例中,注意我們能夠直接將Eloquent模型傳遞到對列任務的構造函數中。由於該任務使用了SerializesModels trait,Eloquent模型將會在任務被執行是優雅地序列化和反序列化。如果你的隊列任務在構造函數中接收Eloquent模型,只有模型的主鍵會被序列化到隊列,當任務真正被執行的時候,隊列系統會自動從數據庫中獲取整個模型實例。這對應用而言是完全透明的,從而避免序列化整個Eloquent模型實例引起的問題。

handle方法在任務被隊列處理的時候被調用,注意我們可以在任務的handle方法中對依賴進行類型提示。Lumen服務容器會自動注入這些依賴。

出錯

如果任務被處理的時候拋出異常,則該任務將會被自動釋放回隊列以便再次嘗試執行。任務會持續被釋放知道嘗試次數達到應用允許的最大次數。最大嘗試次數通過Artisan任務queue:listenqueue:work上的--tries開關來定義。關於運行隊列監聽器的更多信息可以在下面看到。

手動釋放任務

如果你想要手動釋放任務,生成的任務類中自帶的InteractsWithQueue trait提供了釋放隊列任務的release方法,該方法接收一個參數——同一個任務兩次運行之間的等待時間:

public function handle(Mailer $mailer){
    if (condition) {
        $this->release(10);
    }
}

檢查嘗試運行次數

正如上面提到的,如果在任務處理期間發生異常,任務會自動釋放回隊列中,你可以通過attempts方法來檢查該任務已經嘗試運行次數:

public function handle(Mailer $mailer){
    if ($this->attempts() > 3) {
        //
    }
}

  

3、推送任務到隊列

默認的Lumen控制器位於app/Http/Controllers/Controller.php並使用了DispatchesJobs trait。該trait提供了一些允許你方便推送任務到隊列的方法,例如dispatch方法:

?php

namespace App\Http\Controllers;

use App\User;
use Illuminate\Http\Request;
use App\Jobs\SendReminderEmail;
use App\Http\Controllers\Controller;

class UserController extends Controller{
    /**
     * 發送提醒郵件到指定用戶
     *
     * @param  Request  $request
     * @param  int  $id
     * @return Response
     */
    public function sendReminderEmail(Request $request, $id)
    {
        $user = User::findOrFail($id);

        $this->dispatch(new SendReminderEmail($user));
    }
}

為任務指定隊列

你還可以指定任務被發送到的隊列。

通過推送任務到不同隊列,你可以對隊列任務進行“分類”,甚至優先考慮分配給多個隊列的worker數目。這並不會如隊列配置文件中定義的那樣將任務推送到不同隊列“連接”,而只是在單個連接中發送給特定隊列。要指定該隊列,使用任務實例上的onQueue方法,該方法有Lumen自帶的基類App\Jobs\Job提供:

<?php

namespace App\Http\Controllers;

use App\User;
use Illuminate\Http\Request;
use App\Jobs\SendReminderEmail;
use App\Http\Controllers\Controller;

class UserController extends Controller{
    /**
     * 發送提醒郵件到指定用戶
     *
     * @param  Request  $request
     * @param  int  $id
     * @return Response
     */
    public function sendReminderEmail(Request $request, $id)
    {
        $user = User::findOrFail($id);
        $job = (new SendReminderEmail($user))->onQueue('emails');
        $this->dispatch($job);
    }
}

3.1 延遲任務

有時候你可能想要延遲隊列任務的執行。例如,你可能想要將一個注冊15分鍾后給消費者發送提醒郵件的任務放到隊列中,可以通過使用任務類上的delay方法來實現,該方法由Illuminate\Bus\Queueable trait提供:

<?php

namespace App\Http\Controllers;

use App\User;
use Illuminate\Http\Request;
use App\Jobs\SendReminderEmail;
use App\Http\Controllers\Controller;

class UserController extends Controller{
    /**
     * 發送提醒郵件到指定用戶
     *
     * @param  Request  $request
     * @param  int  $id
     * @return Response
     */
    public function sendReminderEmail(Request $request, $id)
    {
        $user = User::findOrFail($id);
        $job = (new SendReminderEmail($user))->delay(60);
        $this->dispatch($job);
    }
}

在本例中,我們指定任務在隊列中開始執行前延遲60秒。

注意:Amazon SQS服務最大延遲時間是15分鍾。

 

3.2 從請求中分發任務

映射HTTP請求變量到任務中很常見,Lumen提供了一些幫助函數讓這種實現變得簡單,而不用每次請求時手動執行映射。讓我么看一下 DispatchesJobs trait上的dispatchFrom方法。默認情況下,該trait包含在Lumen控制器基類中:

<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class CommerceController extends Controller{
    /**
     * 處理指定訂單
     *
     * @param  Request  $request
     * @param  int  $id
     * @return Response
     */
    public function processOrder(Request $request, $id)
    {
        // 處理請求...
        $this->dispatchFrom('App\Jobs\ProcessOrder', $request);
    }
}

  

該方法檢查給定任務類的構造函數並從HTTP請求(或者其它ArrayAccess對象)中解析變量來填充任務需要的構造函數參數。所以,如果我們的任務類在構造函數中接收一個productId變量,該任務將會嘗試從HTTP請求中獲取productId參數。

你還可以傳遞一個數組作為dispatchFrom方法的第三個參數。該數組用於填充所有請求中不存在的構造函數參數:

$this->dispatchFrom('App\Jobs\ProcessOrder', $request, [
    'taxPercentage' => 20,
]);

  

4、運行隊列監聽器

開啟任務監聽器

Lumen包含了一個Artisan命令用來運行推送到隊列的新任務。你可以使用queue:listen命令運行監聽器:

php artisan queue:listen

  

每次執行控制器對應方法,執行任務寫入,然后再異步執行對應任務,后面兩次前后都相隔了5秒,因為加了sleep(5),不過執行sleep前就返回了結果!

下一篇再詳細分析具體的操作方法和處理方法

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM