TP5.1 下載安裝Redis
配置redis信息
<?php
namespace app\common\redis;
class RedisHandler
{
public $provider;
//創建實例子
private static $_instance = null;
//創建redis實例子
private function __construct()
{
$this->provider = new \Redis();
$this->provider->connect(config('redis.redis_host'),config('redis.redis_port'));
}
final private function __clone()
{
}
public static function getInstance(){
if(!self::$_instance){
self::$_instance = new RedisHandler();
}
return self::$_instance;
}
/**
* @param string $key 有序集key
* @param number $score 排序值
* @param string $value 格式化的數據
* @return int
*/
public function zAdd($key,$score,$value){
return $this->provider->zAdd($key,$score,$value);
}
/**
* 獲取有序集合數據
* @param $key
* @param $start
* @param $end
* @param null $withsscores
* @param array
*/
public function zRange($key,$start,$end,$withsscores = null){
return $this->provider->zRange($key,$start,$end,$withsscores);
}
/**
* 刪除有序集合
* @param $key
* @param $member
* @param int
*/
public function zRem($key,$member){
return $this->provider->zRem($key,$member);
}
}
創建一個命令
目錄為
namespace app\command;
<?php
namespace app\command;
use app\common\delayqueue\DelayQueue;
use think\console\Command;
use think\console\Input;
use think\console\Output;
class DelayQueueWorker extends Command
{
const COMMAND_ARGV_1 = 'queue';
protected function configure()
{
$this->setName('delay-queue')->setDescription('延遲隊列任務進程');
$this->addArgument(self::COMMAND_ARGV_1);
}
protected function execute(Input $input, Output $output)
{
$queue = $input->getArgument(self::COMMAND_ARGV_1);
//參數1 延遲隊列表名,對應與redis的有序集key名
//這邊是使用while死循環 來監聽
while (true) {
echo $queue."===".time()."\n";
DelayQueue::getInstance($queue)->perform();
usleep(1000000);
}
}
}
隊列創建方法
<?php
namespace app\common\delayqueue;
use app\common\redis\RedisHandler;
class DelayQueue
{
//默認參數
private $prefix = "delay_queue:";
private $queue;
//創建一個實例
private static $_instance = null;
//初始化
private function __construct($queue)
{
$this->queue = $queue;
}
//最終類不允許被克隆
final private function __clone()
{
}
//創建工廠方法
public static function getInstance($queue=''){
if(!self::$_instance){
self::$_instance = new DelayQueue($queue);
}
return self::$_instance;
}
/**
*@Description:
*@MethodAuthor: lijian
*@Date: 2021-06-12 11:44:12
*@param:
*@return:
*/
public function addTask($jobClass,$runTime,$args = null){
$key = $this->prefix.$this->queue;
//導入數據到redis中
$params = [
'class'=> $jobClass,
'args' => $args,
'runtime'=>$runTime
];
RedisHandler::getInstance()->zAdd(
$key,
$runTime,
serialize($params)
);
}
/**
*@Description: 執行job
*@MethodAuthor: lijian
*@Date: 2021-06-12 11:49:21
*/
public function perform(){
$key = $this->prefix.$this->queue;
//取出有序集合第一個元素
$result = RedisHandler::getInstance()->zRange($key,0,0);
if(!$result) return false;
//序列化
$jobInfo = unserialize($result['0']);
print_r('job: '.$jobInfo['class'].' will run at: '. date('Y-m-d H:i:s',$jobInfo['runtime']).PHP_EOL);
$jobClass = $jobInfo['class'];
if(!@class_exists($jobClass)){
print_r($jobClass.'undefined',PHP_EOL);
RedisHandler::getInstance()->zRem($key,$result[0]);
return false;
}
//到時間執行
if(time() >= $jobInfo['runtime']){
$job = new $jobClass;
//獲取訂單號
$job->setPayload($jobInfo['args']);
$jobResult = $job->perform();
if($jobResult){
//將任務移除
RedisHandler::getInstance()->zRem($key,$result[0]);
return true;
}
}
return false;
}
}
檢驗數據
DelayQueue::getInstance('delay_job')->addTask(
'app\common\delayqueue\CloseOrder', // 自己實現的job
time()+50, // 訂單失效時間
['order_id'=>123456] // 傳遞給job的參數
);