最近有一個需求,A用戶充值積分到錢包,但是錢包只能在一分鍾之后做出響應,那么就需要異步執行查看錢包是否到賬的操作,本來打算用swoole異步,突然想到think-queue,那不妨就用對列來玩玩
本文參考 CSDN 鼠你有錢 tp5.1 + think-queue + supervisor博文 點此穿越
第一步 安裝 think-queue
composer require topthink/think-queue
think-queue包地址 需要注意框架版本問題,現版本默認tp6框架
第二步 配置
安裝好之后 默認會在 config文件夾下生成 queue.php配置文件
<?php // +---------------------------------------------------------------------- // | ThinkPHP [ WE CAN DO IT JUST THINK IT ] // +---------------------------------------------------------------------- // | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved. // +---------------------------------------------------------------------- // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 ) // +---------------------------------------------------------------------- // | Author: yunwuxin <448901948@qq.com> // +---------------------------------------------------------------------- use think\Env; return [ 'connector' => 'redis', "expire"=>60,//任務過期時間默認為秒,禁用為null "default"=>"default",//默認隊列名稱 "host"=>"127.0.0.1",//Redis主機IP地址 "port"=>6379,//Redis端口 "password"=>"******",//Redis密碼 "select"=>5,//Redis數據庫索引 "timeout"=>0,//Redis連接超時時間 "persistent"=>false,//是否長連接 ];
這里我使用的是 redis驅動 也可以根據官方文檔選擇數據庫驅動
第三步 編寫代碼
在application/index/controller下創建Jobtest.php
<?php
namespace app\index\controller;
use think\Queue;
class Jobtest{ public function actionWithHelloJob(){ $params = request()->param(); // 1.當前任務將由哪個類來負責處理。 // 當輪到該任務時,系統將生成一個該類的實例,並調用其 fire 方法 $jobHandlerClassName = 'app\index\job\Hello'; // 2.當前任務歸屬的隊列名稱,如果為新隊列,會自動創建 $jobQueueName = "send"; // 3.當前任務所需的業務數據 . 不能為 resource 類型,其他類型最終將轉化為json形式的字符串 $data = $this->add($params); // 4.將該任務推送到消息隊列,等待對應的消費者去執行 // $isPushed = Queue::push( $jobHandlerClassName , $data , $jobQueueName ); $isPushed = Queue::later(60,$jobHandlerClassName,$data,$jobQueueName); //把任務分配到隊列中,延遲60s后執行 // database 驅動時,返回值為 1|false ; redis 驅動時,返回值為 隨機字符串|false if( $isPushed !== false ){ echo '成功'; }else{ echo '錯誤'; } } public function add($params){ $data =[ 'order_no'=>$params['orderNo'], 'msg'=>$params['orderNo'], 'create_time'=>date('Y-m-d H:i:s'), ]; Db::name('test')->insert($data); } }
這里我創建了 test表來檢測,你也可以根據自己的需求來創建數據庫
CREATE TABLE `test` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `order_no` varchar(255), `msg` varchar(255), `create` varchar(255), PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8;
下來我們需要在 application/index 下在創建一個job目錄 在job目錄下創建Hello.php文件
<?php namespace app\index\job; use think\queue\Job; use think\Db; class Hello { public function fire(Job $job,$data) { // 有些消息在到達消費者時,可能已經不再需要執行了 $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data); if(!$isJobStillNeedToBeDone){ $job->delete(); return; } $isJobDone = $this->doHelloJob($data); if ($isJobDone) { // 如果任務執行成功, 記得刪除任務 $job->delete(); }else{ $job->release(3); //$delay為延遲時間 } if ($job->attempts() > 3) { //通過這個方法可以檢查這個任務已經重試了幾次了 print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n"); $job->delete(); // 也可以重新發布這個任務 //print("<info>Hello Job will be availabe again after 2s."."</info>\n"); //$job->release(2); //$delay為延遲時間,表示該任務延遲2秒后再執行 } } public function failed($data) { // ...任務達到最大重試次數后,失敗了 } /** * 有些消息在到達消費者時,可能已經不再需要執行了 * @param array|mixed $data 發布任務時自定義的數據 * @return boolean 任務執行的結果 */ private function checkDatabaseToSeeIfJobNeedToBeDone($data){ return true; } /** * 根據消息中的數據進行實際的業務處理... */ private function doHelloJob($data) { //根據你的業務需求寫邏輯即可 成功返回true失敗返回false即可 } }
到這里代碼基本就完成了,我們使用瀏覽器來訪問我們的 Jobtest下actionWithHelloJob方法
然后在終端執行
php think queue:work --queue send
這里的 send就是你的對列名稱
執行后我們可以在redis里看到具體的數據對列 如果你沒有安裝redis那就需要在安裝think-queue之前安裝redis擴展
第四步 使用supervisor 將queue進程常駐
以下內容均來自 CSDN 鼠你有錢 tp5.1 + think-queue + supervisor博文 點此穿越 如有侵權 可聯系本人第一時間刪除
1.安裝supervisor
# yum install epel-release # yum install supervisor //設置成開機自動啟動 # systemctl enable supervisord
2.配置
在這里我創建了一個命名為supervisor
的目錄用於存放supervisor
和隊列的日志文件以及include
的配置文件,其目錄結構為:
/var/supervisor/log/ #可以自定義 /run/ #可以自定義 /conf/ #可以自定義
然后找到/etc/supervisord.conf
配置文件,編輯如下信息:
; 將supervisor.sock 的路徑換成如下 [unix_http_server] file=/var/supervisor/run/supervisor.sock ; (the path to the socket file) ; 將supervisord.log 和 supervisord.pid 的路徑換成如下 [supervisord] logfile=/var/supervisor/log/supervisord.log ; (main log file;default $CWD/supervisord.log) pidfile=/var/supervisor/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid) ; 將supervisor.sock 的路徑換成如下 [supervisorctl] serverurl=unix:///var/supervisor/run/supervisor.sock ; use a unix:// URL for a unix socket ; 將最底部的files路徑換成如下 [include] files = /var/supervisor/conf/*.conf
在/var/supervisor/conf
目錄里創建一個.conf
文件,這里命名為queue_work.conf
,內容如下:
[program:queue_worker] ;項目名稱 directory = /opt/www/tp5.1 ; 程序的啟動目錄,項目根目錄的上一級 command = php think queue:work --queue queueName --daemon ; 啟動命令 queueName就是隊列名 process_name=%(program_name)s_%(process_num)02d numprocs = 3 ; 開啟的進程數量 autostart = true ; 在 supervisord 啟動的時候也自動啟動 startsecs = 5 ; 啟動 5 秒后沒有異常退出,就當作已經正常啟動了 autorestart = true ; 程序異常退出后自動重啟 startretries = 3 ; 啟動失敗自動重試次數,默認是 3 user = root ; 用哪個用戶啟動 redirect_stderr = true ; 把 stderr 重定向到 stdout,默認 false stdout_logfile_maxbytes = 50MB ; stdout 日志文件大小,默認 50MB stdout_logfile_backups = 20 ; stdout 日志文件備份數 ; stdout 日志文件,需要手動創建目錄(supervisord 會自動創建日志文件) stdout_logfile = /var/supervisor/log/queue_worker.log loglevel=info
對於index
這個單模塊而言,不同的業務邏輯為了區分可能會存在多個隊列名,這種情況將多個隊列名用逗號拼接起來:
command = php think queue:work --queue queueName1,queueName2 --daemon ;
重啟
# systemctl stop supervisord # systemctl start supervisord
或
# systemctl restart supervisord