隊列適用與多個用戶同時執行一個操作,鎖適用與單個用戶多次執行同一個操作
缺點:
一旦需處理數據加入到任務內就不能刪除,如果刪除可以使用redis
隊列配置:
Tp5.1 config/queue.php 配置文件 ,啟動reids 服務
/** * 消息隊列配置 * 內置驅動:redis、database、topthink、sync */ return [ //sync驅動表示取消消息隊列還原為同步執行 //'connector' => 'Sync', //Redis驅動 'connector' => 'redis', "expire"=>60,//任務過期時間默認為秒,禁用為null "default"=>"default",//默認隊列名稱 "host"=>Env::get("redis.host", "127.0.0.1"),//Redis主機IP地址 "port"=>Env::get("redis.port", 6379),//Redis端口 "password"=>Env::get("redis.password", "123456"),//Redis密碼 "select"=>5,//Redis數據庫索引 "timeout"=>0,//Redis連接超時時間 "persistent"=>false,//是否長連接 //Database驅動 //"connector"=>"Database",//數據庫驅動 //"expire"=>60,//任務過期時間,單位為秒,禁用為null //"default"=>"default",//默認隊列名稱 //"table"=>"jobs",//存儲消息的表明,不帶前綴 //"dsn"=>[], //Topthink驅動 ThinkPHP內部的隊列通知服務平台 //"connector"=>"Topthink", //"token"=>"", //"project_id"=>"", //"protocol"=>"https", //"host"=>"qns.topthink.com", //"port"=>443, //"api_version"=>1, //"max_retries"=>3, //"default"=>"default" ]; 代碼示例: /*1.先在控制器里添加下面兩個方法*/ namespace app\index\controller; use think\Controller; use think\Queue; use think\Db; class Index extends Controller { public function queueTest(){ $data = [ 'order_no' =>rand(100000,999999), ]; $this->add($data['order_no']); // 一次執行一個任務 // 類的命名空間\類名 // 類的命名空間\類名@方法 $res =Queue::push('app\queue\controller\Execjob', $data, $queue = null); // 創建發布任務 var_dump($res); // database 驅動時,返回值為 1|false ; redis 驅動時,返回值為 隨機字符串|false } public function add($orderNo){ $data =[ 'order_no'=>$orderNo, 'msg'=>$orderNo, 'create_time'=>date('Y-m-d H:i:s'), ]; Db::name('tp5_test')->insert($data); } } /*2.在 app/queue/controller/Execjob 創建文件,要執行的任務內容*/ namespace app\queue\controller; use think\Controller; use think\facade\Cache; use think\queue\Job; class Execjob extends Controller { public function fire(Job $job, $data) { //....這里執行具體的任務 if($this->jobDone($data)) { $job->delete(); print("<info>Hello Job has been done and deleted"."</info>\n"); }else{ $job->release(3); //$delay為延遲時間 } if ($job->attempts() > 3) { //通過這個方法可以檢查這個任務已經重試了幾次了 } //如果任務執行成功后 記得刪除任務,不然這個任務會重復執行,直到達到最大重試次數后失敗后,執行failed方法 // $job->delete(); // 也可以重新發布這個任務 // $job->release($delay); //$delay為延遲時間 } public function failed($data) { // ...任務達到最大重試次數后,失敗了 } public function jobDone($data) { print("<info>Job is Done status!"."</info> \n"); return Db::name('tp5_test')->where('order_no',$data['order_no'])->update(['status'=>2]); } }
監聽隊列
寫完之后再控制器切換到當前目錄下執行,任務的消費與刪除
// --queue 隊列名稱,不指定監聽所有隊列 php think queue:listen --queue send_mail
測試訪問頁面
http://tp.com/pub/test/index,任務的推送
<?php namespace app\pub\controller; use app\queue\controller\Jobqueue; use think\Controller; class Test extends Controller { public function index(){ $job_queue=new Jobqueue(); $job_queue->index('send_mail',['1938450598@qq.com','sa','測試']); } }
隊列執行完成后,處於等待下次調用
隊列流程:創建 ->監聽-> 推送 -> 消費 -> 刪除
相關文章:tp5.1_隊列queue學習