php-resque 隊列簡單使用


一、安裝 php-resque

進入項目根目錄,composer 安裝 php-resque

composer require chrisboulton/php-resque

 

二、常用方法

1、連接 redis

// setBackend($server, $database = 0)
Resque::setBackend('127.0.0.1:6379');

2、向隊列中添加工作

// enqueue($queue, $class, $args = null, $trackStatus = false)
$token = Resque::enqueue('default', 'My_Job', ['name'=>'test'], true);

3、查看工作狀態

$status = (new Resque_Job_Status($token))->get();

4、停止(移除)工作

(new Resque_Job_Status($token))->stop();

 

三、常駐任務處理隊列(示例:worker.php)

// 處理 default 隊列;也可以填 *,代表所有隊列
$worker = new Resque_Worker('default');
// LOG_NONE 不寫日志, LOG_NORMAL 普通,LOG_VERBOSE 詳細
$worker->logLevel = Resque_Worker::LOG_VERBOSE;
// 隊列處理時間間隔,單位:秒
$worker->work(5);

注:worker.php 要以命令行的方法執行,並長駐后台,/usr/local/php/bin/php /xxx/xxx/worker.php

 

四、處理工作的類

class My_Job
{
    /**
     * 前置操作
     * @return void
     */
    public function setUp()
    {
        // ... Set up environment for this job
    }

    /**
     * 消費隊列
     * @return void
     */
    public function perform()
    {
        // execute a job
    }

    /**
     * 后置操作
     * @return void
     */
    public function tearDown()
    {
        // ... Remove environment for this job
    }
}

 

五、完整例子

test.php

<?php
require 'vendor/autoload.php';

/**
 * php-resque 隊列服務
 */
class Queue
{
    private $jobStatusObject = [];

    function __construct($server, $database = 0)
    {
        // 連接 redis
        Resque::setBackend($server);
    }

    private function getJobStatusObject($token)
    {
        if (!isset($this->jobStatusObject[$token])) {
            $this->jobStatusObject[$token] = new Resque_Job_Status($token);
        }
        
        return $this->jobStatusObject[$token];
    }

    /**
     * 入隊並返回 token
     * @param  array  $args 參數
     * @return string
     */
    public function enqueue($args = [])
    {
        // 隊列名稱、指定消費類、參數
        return Resque::enqueue('default', 'My_Job', $args, true);
    }

    /**
     * 查詢 job 的狀態
     * @param  string $token 任務token
     * @return array         狀態信息
     */
    public function getJobStatus($token)
    {
        $status = $this->getJobStatusObject($token)->get();
        $statusOptions = [
            1 => 'STATUS_WAITING',
            2 => 'STATUS_RUNNING',
            3 => 'STATUS_FAILED',
            4 => 'STATUS_COMPLETE'
        ];
        return isset($statusOptions[$status])? $statusOptions[$status]:$status;
    }

    /**
     * 清除 job
     * @param  string $token 任務token
     * @return boolean         狀態信息
     */
    public function clearJob($token)
    {
        if ($this->getJobStatusObject($token) == 'STATUS_COMPLETE')
        {
            return $this->getJobStatusObject($token)->stop();
        }
        else
        {
            error_log("非已完成任務,不可清除。token: {$token}\n", 3, 'logs.txt');
            return false;
        }
    }
}

empty($_GET) && die('url參數或數據不能為空!!');
$token = isset($_GET['token'])? trim($_GET['token']):'';
$act   = isset($_GET['act'])? trim($_GET['act']):'';

$queue = new Queue('127.0.0.1:6379');
if (empty($act))
{
    // 任務入隊
    $token = $queue->enqueue($_GET);
    error_log("token: {$token}\n", 3, 'logs.txt');
    echo 'token: ', $token, '<hr>';
    exit;
}
elseif ($act == 'status' && !empty($token))
{
    // 查看任務狀態
    var_dump($queue->getJobStatus($token));
    exit;
}
elseif ($act == 'del' && !empty($token))
{
    // 移除任務
    var_dump($queue->clearJob($token));
    exit;
}
?>

測試url:

http://localhost/redis/queue/test.php?name=test
// token: 37efa5889472ef04108a55ba8cc448e4

http://localhost/redis/queue/test.php?act=status&token=37efa5889472ef04108a55ba8cc448e4
// string(14) "STATUS_WAITING"

http://localhost/redis/queue/test.php?act=del&token=37efa5889472ef04108a55ba8cc448e4
// bool(false)

 

worker.php

<?php
// 消費(處理)隊列

require 'vendor/autoload.php';

/**
 * 任務
 */
class My_Job
{
    /**
     * 前置操作
     * @return void
     */
    public function setUp()
    {
        // ... Set up environment for this job
    }

    /**
     * 消費隊列
     * @return void
     */
    public function perform()
    {
        // 按自己的業務處理 job
        error_log(var_export($this->args, true)."\n\n", 3, 'logs.txt');
        
        // 拋出錯誤則表示工作處理失敗,否則工作狀態將更新為完成(STATUS_COMPLETE)
        // 注:return false 不管用,一定要 throw Exception 才行
        // throw new Exception('Unable to run this job!');
    }

    /**
     * 后置操作
     * @return void
     */
    public function tearDown()
    {
        // ... Remove environment for this job
    }
}

// 處理 default 隊列;也可以填 *,代表所有隊列
$worker = new Resque_Worker('default');
// LOG_NONE 不寫日志, LOG_NORMAL 普通,LOG_VERBOSE 詳細
$worker->logLevel = Resque_Worker::LOG_VERBOSE;
// 隊列處理時間間隔,單位:秒
$worker->work(5);
?>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM