一個隊列最基本的功能應該是入站和出站。一邊把任務放到隊列中,一邊從隊列中讀取處理任務。
我們看一下laravel中對隊列的設計,首先是接口的設計在\Illuminate\Contracts\Queue\Queue
相關的接口被我用虛線分成了3類,第一類是push,也就是入站。第二類是pop也就是出戰,第三類是讀取隊列大小,獲取和設置ConnectionName。
其中入站的方法中,pushOn 和 laterOn 分別是 push 和 later 的別名。 bulk 是批量的 push 他們都是把job放到隊列中,pushRaw指的是把原始數據放入到隊列中。
舉個例子來說明下這兩類方法,用戶123登錄了,我需要進行數據統計可以有兩種做法,
1、我們把123放入到隊列中那么可以通過pushRaw這個方法把123放入隊列,等處理程序拿到123再去處理,進行數據統計。但是在laravel中我們往往不這么處理,參考第二中做法
2、我們定義一個Job,用123去初始這個Job,然后把Job放入到隊列中。這兩種方式的差異在於一個放的是數據,一個放的是Job。laravel對隊列的封裝都是用的這種方式。在實際處理的時候,是把Job中的數據通過pushRaw放入到隊列中的。
class Job { /** * Create a new job instance. * * @return void */ public function __construct($userId) { $this->userId = $userId; } /** * Execute the job. * * @return void */ public function handle() { //處理相關數據 統計數據加1 } }
接下里我們以Redis隊列為例看下具體的處理過程
一、首先看一下抽象類的封裝
抽象類的代碼主要分3分布,圖中用實線進行了分割,對應接口的3分布,一部分是關於隊列的。接口中對入站的操作有pushOn laterOn push later bulk pushRaw 6個方法的定義。具體的如站和出站的方法,應該會每個驅動都不一樣,所以在抽象類中只實現了 pushOn laterOn 和 bulk,而沒有實現 push later pushRaw
/** * Push a new job onto the queue. * * @param string $queue * @param string $job * @param mixed $data * @return mixed */ public function pushOn($queue, $job, $data = '') { return $this->push($job, $data, $queue); } /** * Push a new job onto the queue after a delay. * * @param string $queue * @param \DateTimeInterface|\DateInterval|int $delay * @param string $job * @param mixed $data * @return mixed */ public function laterOn($queue, $delay, $job, $data = '') { return $this->later($delay, $job, $data, $queue); } /** * Push an array of jobs onto the queue. * * @param array $jobs * @param mixed $data * @param string $queue * @return mixed */ public function bulk($jobs, $data = '', $queue = null) { foreach ((array) $jobs as $job) { $this->push($job, $data, $queue); } }
第二部分是createPayload,我們上面提到當把一個job放入隊列的時候,實際是從job中提取信息,然后把信息通過pushRaw放入隊列的。createPayload就是從job中讀取信息以供pushRaw使用的。這一塊的方法稍多一些,主要是createPayload通過createPayloadArray創建配置;createPayloadArray根據job的類型調用createObjectPayload或者createStringPayload。getDisplayName和getJobExpiration,而后者又需要相關的時間函數
第三部分是 getConnectionName 和 setConnectionName 的實現。size方法應該是每個驅動都不一樣的。此外還提供了setContainer方法。
即成抽象類之后,隊列必須實現的方法只剩下了push later pushRaw pop size。
二、RedisQueue中的具體實現
1、 push pushRaw
public function push($job, $data = '', $queue = null) { return $this->pushRaw($this->createPayload($job, $data), $queue); }
push是通過createPayload整理數據之后,通過pushRaw寫入隊列。pushRaw的代碼也非常簡單,獲取redis連接,通過getQueue獲取隊列,也就是redis中的key,然后用rpush命令插入到列表的最右邊。
public function pushRaw($payload, $queue = null, array $options = []) { $this->getConnection()->rpush($this->getQueue($queue), $payload); return json_decode($payload, true)['id'] ?? null; }
2、later laterRaw
和上面的方法對應later也是通過createPayload整理數據之后,通過laterRaw寫入隊列。
protected function laterRaw($delay, $payload, $queue = null) { $this->getConnection()->zadd( $this->getQueue($queue).':delayed', $this->availableAt($delay), $payload ); return json_decode($payload, true)['id'] ?? null; }
對比pushRaw的方法,看到laterRaw在在原有隊列的名稱后補充了':delayed'后綴,以有序集合的方式存儲。也就是說他們存儲的不是一個地方。那么出棧的時候是怎么讀取的呢?我們看下pop
3、pop
public function pop($queue = null) { $this->migrate($prefixed = $this->getQueue($queue)); list($job, $reserved) = $this->retrieveNextJob($prefixed); if ($reserved) { return new RedisJob( $this->container, $this, $job, $reserved, $this->connectionName, $queue ?: $this->default ); } }
這個的處理相對比較麻煩一些。$this->migrate($prefixed = $this->getQueue($queue));對隊列進行了合並,然后從隊列中讀取任務,返回RedisJob
3.1 隊列的合並
/** * Migrate any delayed or expired jobs onto the primary queue. * * @param string $queue * @return void */ protected function migrate($queue) { $this->migrateExpiredJobs($queue.':delayed', $queue); if (! is_null($this->retryAfter)) { $this->migrateExpiredJobs($queue.':reserved', $queue); } } /** * Migrate the delayed jobs that are ready to the regular queue. * * @param string $from * @param string $to * @return array */ public function migrateExpiredJobs($from, $to) { return $this->getConnection()->eval( LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->currentTime() ); }
從代碼中看到,首先是$queue和$queue.':delayed'的合並,然后對於設置了retryAfter的,再次用 $queue和$queue.':reserved'進行合並。合並的Lua腳本:
public static function migrateExpiredJobs() { return <<<'LUA' -- Get all of the jobs with an expired "score"... local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1]) -- If we have values in the array, we will remove them from the first queue -- and add them onto the destination queue in chunks of 100, which moves -- all of the appropriate jobs onto the destination queue very safely. if(next(val) ~= nil) then redis.call('zremrangebyrank', KEYS[1], 0, #val - 1) for i = 1, #val, 100 do redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val))) end end return val LUA; }
每次對不超過100個的超時數據,從from,rpush到了to,也就是從$queue.':delayed'和 $queue.':reserved'插入了 $queue。從這里能看到redis隊列對順序的保證並不是特別准確。比如:
放入第1個消息a到隊列,
延遲1s放入第2個消息b到隊列,
然后過了1s再放入第3個消息c到隊列。
如果在放入第3個消息c之前開啟了隊列的處理,輸出的是abc,但是如果在放入第三個消息后開始的隊列處理,那輸出的就是acb了。
3、size
public function size($queue = null) { $queue = $this->getQueue($queue); return $this->getConnection()->eval( LuaScripts::size(), 3, $queue, $queue.':delayed', $queue.':reserved' ); }
size的方法就相對簡單了,對上面提到的3類消息$queue,$queue.':delayed'和 $queue.':reserved'求和,其中$queue是隊列,$queue.':delayed'是上面提到的延遲放入隊列的消息, $queue.':reserved'是真正處理中的消息。
Redis隊列用了很多的Lua腳本,來保證操作的原子性。
4、deleteReserved deleteAndRelease
在處理成功之后需要通過deleteReserved刪除保持的消息,在處理失敗后通過deleteAndRelease刪除保存的消息,並放回隊列。
測試代碼:
$redis = app(\Illuminate\Contracts\Queue\Factory::class)->connection('redis'); //$redis->pushRaw("hi time is ".time()); $redis->push(new \App\Jobs\Test(123 dispatch((new \App\Jobs\Test(1));