ThinkPHP 實現隊列


官方文檔: top-think/think-queue

應用場景: 

   隊列適用與多個用戶同時執行一個操作,鎖適用與單個用戶多次執行同一個操作

  1. 消息隊列,發送郵件、短信

缺點:

   一旦需處理數據加入到任務內就不能刪除,如果刪除可以使用redis 

隊列配置:

Tp5.1  config/queue.php 配置文件 ,啟動reids 服務

/**
 * 消息隊列配置
 * 內置驅動:redis、database、topthink、sync
*/
return [
    //sync驅動表示取消消息隊列還原為同步執行
    //'connector' => 'Sync',

    //Redis驅動
    'connector' => 'redis',
    "expire"=>60,//任務過期時間默認為秒,禁用為null
    "default"=>"default",//默認隊列名稱
    "host"=>Env::get("redis.host", "127.0.0.1"),//Redis主機IP地址
    "port"=>Env::get("redis.port", 6379),//Redis端口
    "password"=>Env::get("redis.password", "123456"),//Redis密碼
    "select"=>5,//Redis數據庫索引
    "timeout"=>0,//Redis連接超時時間
    "persistent"=>false,//是否長連接

    //Database驅動
    //"connector"=>"Database",//數據庫驅動
    //"expire"=>60,//任務過期時間,單位為秒,禁用為null
    //"default"=>"default",//默認隊列名稱
    //"table"=>"jobs",//存儲消息的表明,不帶前綴
    //"dsn"=>[],

    //Topthink驅動 ThinkPHP內部的隊列通知服務平台
    //"connector"=>"Topthink",
    //"token"=>"",
    //"project_id"=>"",
    //"protocol"=>"https",
    //"host"=>"qns.topthink.com",
    //"port"=>443,
    //"api_version"=>1,
    //"max_retries"=>3,
    //"default"=>"default"
];
 

代碼示例: 

/*1.先在控制器里添加下面兩個方法*/
namespace app\index\controller;
use think\Controller;
use think\Queue;
use think\Db;
class Index extends Controller
{
    public function queueTest(){
        $data = [
            'order_no' =>rand(100000,999999),
        ];
        $this->add($data['order_no']);
        // 一次執行一個任務  
        // 類的命名空間\類名
        // 類的命名空間\類名@方法
        $res =Queue::push('app\queue\controller\Execjob', $data, $queue = null); // 創建發布任務
        var_dump($res); // database 驅動時,返回值為 1|false  ;   redis 驅動時,返回值為 隨機字符串|false
  }

    public function add($orderNo){
        $data =[
            'order_no'=>$orderNo,
            'msg'=>$orderNo,
            'create_time'=>date('Y-m-d H:i:s'),
        ];
        Db::name('tp5_test')->insert($data);
    }
}

/*2.在 app/queue/controller/Execjob 創建文件,要執行的任務內容*/
namespace app\queue\controller;

use think\Controller;
use think\facade\Cache;
use think\queue\Job;

class Execjob extends Controller
{
public function fire(Job $job, $data)
    {
        //....這里執行具體的任務
      if($this->jobDone($data))
      {
          $job->delete();
          print("<info>Hello Job has been done and deleted"."</info>\n");
      }else{
          $job->release(3); //$delay為延遲時間
      }
      if ($job->attempts() > 3) {
          //通過這個方法可以檢查這個任務已經重試了幾次了
      }
        //如果任務執行成功后 記得刪除任務,不然這個任務會重復執行,直到達到最大重試次數后失敗后,執行failed方法
        // $job->delete();
        // 也可以重新發布這個任務
        // $job->release($delay); //$delay為延遲時間
    }
    public function failed($data)
    {
        // ...任務達到最大重試次數后,失敗了
    }
    public function jobDone($data)
    {
       print("<info>Job is Done status!"."</info> \n");
       return Db::name('tp5_test')->where('order_no',$data['order_no'])->update(['status'=>2]);
    }
}

  

監聽隊列

寫完之后再控制器切換到當前目錄下執行,任務的消費與刪除

//  --queue 隊列名稱,不指定監聽所有隊列
php think queue:listen --queue send_mail

 

測試訪問頁面

http://tp.com/pub/test/index,任務的推送

<?php
namespace app\pub\controller;

use app\queue\controller\Jobqueue;
use think\Controller;

class Test extends Controller
{
    public function index(){
      $job_queue=new Jobqueue();
      $job_queue->index('send_mail',['1938450598@qq.com','sa','測試']);
    }
}

 

隊列執行完成后,處於等待下次調用

隊列流程:創建 ->監聽-> 推送 -> 消費 -> 刪除

 

相關文章:tp5.1_隊列queue學習

 thinkphp-queue 筆記

      think-queue使用教程-用戶注冊場景異步發送郵件

think-queue


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM