laravel中的隊列一


一個隊列最基本的功能應該是入站和出站。一邊把任務放到隊列中,一邊從隊列中讀取處理任務。

我們看一下laravel中對隊列的設計,首先是接口的設計在\Illuminate\Contracts\Queue\Queue

相關的接口被我用虛線分成了3類,第一類是push,也就是入站。第二類是pop也就是出戰,第三類是讀取隊列大小,獲取和設置ConnectionName。

其中入站的方法中,pushOn 和 laterOn 分別是 push 和 later 的別名。 bulk 是批量的 push  他們都是把job放到隊列中,pushRaw指的是把原始數據放入到隊列中。

舉個例子來說明下這兩類方法,用戶123登錄了,我需要進行數據統計可以有兩種做法,

1、我們把123放入到隊列中那么可以通過pushRaw這個方法把123放入隊列,等處理程序拿到123再去處理,進行數據統計。但是在laravel中我們往往不這么處理,參考第二中做法

2、我們定義一個Job,用123去初始這個Job,然后把Job放入到隊列中。這兩種方式的差異在於一個放的是數據,一個放的是Job。laravel對隊列的封裝都是用的這種方式。在實際處理的時候,是把Job中的數據通過pushRaw放入到隊列中的。

class Job 
{
   

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct($userId)
    {
        $this->userId = $userId;

    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
       //處理相關數據  統計數據加1
    }
}

 

接下里我們以Redis隊列為例看下具體的處理過程

 

一、首先看一下抽象類的封裝

抽象類的代碼主要分3分布,圖中用實線進行了分割,對應接口的3分布,一部分是關於隊列的。接口中對入站的操作有pushOn laterOn push  later   bulk pushRaw 6個方法的定義。具體的如站和出站的方法,應該會每個驅動都不一樣,所以在抽象類中只實現了 pushOn laterOn 和 bulk,而沒有實現 push later pushRaw

/**
     * Push a new job onto the queue.
     *
     * @param  string  $queue
     * @param  string  $job
     * @param  mixed   $data
     * @return mixed
     */
    public function pushOn($queue, $job, $data = '')
    {
        return $this->push($job, $data, $queue);
    }

    /**
     * Push a new job onto the queue after a delay.
     *
     * @param  string  $queue
     * @param  \DateTimeInterface|\DateInterval|int  $delay
     * @param  string  $job
     * @param  mixed   $data
     * @return mixed
     */
    public function laterOn($queue, $delay, $job, $data = '')
    {
        return $this->later($delay, $job, $data, $queue);
    }

    /**
     * Push an array of jobs onto the queue.
     *
     * @param  array   $jobs
     * @param  mixed   $data
     * @param  string  $queue
     * @return mixed
     */
    public function bulk($jobs, $data = '', $queue = null)
    {
        foreach ((array) $jobs as $job) {
            $this->push($job, $data, $queue);
        }
    }

 

第二部分是createPayload,我們上面提到當把一個job放入隊列的時候,實際是從job中提取信息,然后把信息通過pushRaw放入隊列的。createPayload就是從job中讀取信息以供pushRaw使用的。這一塊的方法稍多一些,主要是createPayload通過createPayloadArray創建配置;createPayloadArray根據job的類型調用createObjectPayload或者createStringPayload。getDisplayName和getJobExpiration,而后者又需要相關的時間函數

第三部分是 getConnectionName 和 setConnectionName 的實現。size方法應該是每個驅動都不一樣的。此外還提供了setContainer方法。

即成抽象類之后,隊列必須實現的方法只剩下了push later pushRaw pop size。

二、RedisQueue中的具體實現

1、 push  pushRaw

public function push($job, $data = '', $queue = null)
    {
        return $this->pushRaw($this->createPayload($job, $data), $queue);
    }

push是通過createPayload整理數據之后,通過pushRaw寫入隊列。pushRaw的代碼也非常簡單,獲取redis連接,通過getQueue獲取隊列,也就是redis中的key,然后用rpush命令插入到列表的最右邊。

public function pushRaw($payload, $queue = null, array $options = []) { $this->getConnection()->rpush($this->getQueue($queue), $payload); return json_decode($payload, true)['id'] ?? null; }

 

2、later  laterRaw

和上面的方法對應later也是通過createPayload整理數據之后,通過laterRaw寫入隊列。

protected function laterRaw($delay, $payload, $queue = null)
    {
        $this->getConnection()->zadd(
            $this->getQueue($queue).':delayed', $this->availableAt($delay), $payload
        );

        return json_decode($payload, true)['id'] ?? null;
    }

對比pushRaw的方法,看到laterRaw在在原有隊列的名稱后補充了':delayed'后綴,以有序集合的方式存儲。也就是說他們存儲的不是一個地方。那么出棧的時候是怎么讀取的呢?我們看下pop

3、pop

public function pop($queue = null)
    {
        $this->migrate($prefixed = $this->getQueue($queue));

        list($job, $reserved) = $this->retrieveNextJob($prefixed);

        if ($reserved) {
            return new RedisJob(
                $this->container, $this, $job,
                $reserved, $this->connectionName, $queue ?: $this->default
            );
        }
    }

這個的處理相對比較麻煩一些。$this->migrate($prefixed = $this->getQueue($queue));對隊列進行了合並,然后從隊列中讀取任務,返回RedisJob

3.1 隊列的合並

 

/**
     * Migrate any delayed or expired jobs onto the primary queue.
     *
     * @param  string  $queue
     * @return void
     */
    protected function migrate($queue)
    {
        $this->migrateExpiredJobs($queue.':delayed', $queue);

        if (! is_null($this->retryAfter)) {
            $this->migrateExpiredJobs($queue.':reserved', $queue);
        }
    }

    /**
     * Migrate the delayed jobs that are ready to the regular queue.
     *
     * @param  string  $from
     * @param  string  $to
     * @return array
     */
    public function migrateExpiredJobs($from, $to)
    {
        return $this->getConnection()->eval(
            LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->currentTime()
        );
    }

 

從代碼中看到,首先是$queue和$queue.':delayed'的合並,然后對於設置了retryAfter的,再次用 $queue$queue.':reserved'進行合並。合並的Lua腳本:

public static function migrateExpiredJobs()
    {
        return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])

-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)

    for i = 1, #val, 100 do
        redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
    end
end

return val
LUA;
    }

每次對不超過100個的超時數據,從from,rpush到了to,也就是從$queue.':delayed'和 $queue.':reserved'插入了 $queue。從這里能看到redis隊列對順序的保證並不是特別准確。比如:

放入第1個消息a到隊列,

延遲1s放入第2個消息b到隊列,

然后過了1s再放入第3個消息c到隊列。

如果在放入第3個消息c之前開啟了隊列的處理,輸出的是abc,但是如果在放入第三個消息后開始的隊列處理,那輸出的就是acb了。

3、size

public function size($queue = null)
    {
        $queue = $this->getQueue($queue);

        return $this->getConnection()->eval(
            LuaScripts::size(), 3, $queue, $queue.':delayed', $queue.':reserved'
        );
    }

size的方法就相對簡單了,對上面提到的3類消息$queue$queue.':delayed'和 $queue.':reserved'求和,其中$queue是隊列,$queue.':delayed'是上面提到的延遲放入隊列的消息, $queue.':reserved'是真正處理中的消息。

Redis隊列用了很多的Lua腳本,來保證操作的原子性。

 

4、deleteReserved  deleteAndRelease

在處理成功之后需要通過deleteReserved刪除保持的消息,在處理失敗后通過deleteAndRelease刪除保存的消息,並放回隊列。

測試代碼:

$redis = app(\Illuminate\Contracts\Queue\Factory::class)->connection('redis');
 //$redis->pushRaw("hi time is ".time());
$redis->push(new \App\Jobs\Test(123

dispatch((new \App\Jobs\Test(1));

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM