thinkphp5.1+think-queue


最近有一個需求,A用戶充值積分到錢包,但是錢包只能在一分鍾之后做出響應,那么就需要異步執行查看錢包是否到賬的操作,本來打算用swoole異步,突然想到think-queue,那不妨就用對列來玩玩

本文參考 CSDN 鼠你有錢 tp5.1 + think-queue + supervisor博文 點此穿越

第一步 安裝 think-queue

 

composer require topthink/think-queue

 

think-queue包地址 需要注意框架版本問題,現版本默認tp6框架

第二步 配置

 

安裝好之后 默認會在 config文件夾下生成 queue.php配置文件

<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
use think\Env;

return [
    'connector' => 'redis',
    "expire"=>60,//任務過期時間默認為秒,禁用為null
    "default"=>"default",//默認隊列名稱
    "host"=>"127.0.0.1",//Redis主機IP地址
    "port"=>6379,//Redis端口
    "password"=>"******",//Redis密碼
    "select"=>5,//Redis數據庫索引
    "timeout"=>0,//Redis連接超時時間
    "persistent"=>false,//是否長連接
];

這里我使用的是 redis驅動 也可以根據官方文檔選擇數據庫驅動

第三步 編寫代碼

在application/index/controller下創建Jobtest.php

 
         

<?php

 
         

namespace app\index\controller;

 
         

use think\Queue;

class Jobtest{
    public function actionWithHelloJob(){
        $params = request()->param();
        // 1.當前任務將由哪個類來負責處理。
        //   當輪到該任務時,系統將生成一個該類的實例,並調用其 fire 方法
        $jobHandlerClassName  = 'app\index\job\Hello';

        // 2.當前任務歸屬的隊列名稱,如果為新隊列,會自動創建
        $jobQueueName        = "send";

        // 3.當前任務所需的業務數據 . 不能為 resource 類型,其他類型最終將轉化為json形式的字符串
        $data = $this->add($params);

        // 4.將該任務推送到消息隊列,等待對應的消費者去執行

        // $isPushed = Queue::push( $jobHandlerClassName , $data , $jobQueueName );

        $isPushed = Queue::later(60,$jobHandlerClassName,$data,$jobQueueName); //把任務分配到隊列中,延遲60s后執行

        // database 驅動時,返回值為 1|false  ;   redis 驅動時,返回值為 隨機字符串|false
        if( $isPushed !== false ){
            echo '成功';
        }else{
            echo '錯誤';
        }
    }
    
    public function add($params){
        $data =[
            'order_no'=>$params['orderNo'],
            'msg'=>$params['orderNo'],
            'create_time'=>date('Y-m-d H:i:s'),
        ];
        Db::name('test')->insert($data);
    }
}

這里我創建了 test表來檢測,你也可以根據自己的需求來創建數據庫

CREATE TABLE `test` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `order_no` varchar(255),
  `msg` varchar(255),
  `create` varchar(255),  
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8;

下來我們需要在 application/index 下在創建一個job目錄 在job目錄下創建Hello.php文件

<?php
namespace app\index\job;
use think\queue\Job;
use think\Db;

class Hello {
    public function fire(Job $job,$data) {
        // 有些消息在到達消費者時,可能已經不再需要執行了
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(!$isJobStillNeedToBeDone){
            $job->delete();
            return;
        }

        $isJobDone = $this->doHelloJob($data);

        if ($isJobDone) {
            // 如果任務執行成功, 記得刪除任務
            $job->delete();
        }else{
            $job->release(3); //$delay為延遲時間
        }
        if ($job->attempts() > 3) {
            //通過這個方法可以檢查這個任務已經重試了幾次了
            print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");

            $job->delete();

            // 也可以重新發布這個任務
            //print("<info>Hello Job will be availabe again after 2s."."</info>\n");
            //$job->release(2); //$delay為延遲時間,表示該任務延遲2秒后再執行
        }
    }
    
    public function failed($data)
    {
        // ...任務達到最大重試次數后,失敗了
    }

    /**
    * 有些消息在到達消費者時,可能已經不再需要執行了
    * @param array|mixed    $data     發布任務時自定義的數據
    * @return boolean                 任務執行的結果
    */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data){
        return true;
    }

    /**
    * 根據消息中的數據進行實際的業務處理...
    */
    private function doHelloJob($data)
    {
        //根據你的業務需求寫邏輯即可 成功返回true失敗返回false即可
    }
}

到這里代碼基本就完成了,我們使用瀏覽器來訪問我們的 Jobtest下actionWithHelloJob方法

然后在終端執行

php think queue:work --queue send 

這里的 send就是你的對列名稱

執行后我們可以在redis里看到具體的數據對列 如果你沒有安裝redis那就需要在安裝think-queue之前安裝redis擴展

第四步 使用supervisor 將queue進程常駐

以下內容均來自 CSDN 鼠你有錢 tp5.1 + think-queue + supervisor博文 點此穿越  如有侵權 可聯系本人第一時間刪除
1.安裝supervisor

# yum install epel-release
# yum install supervisor

//設置成開機自動啟動
# systemctl enable supervisord

2.配置

在這里我創建了一個命名為supervisor的目錄用於存放supervisor和隊列的日志文件以及include的配置文件,其目錄結構為:

/var/supervisor/log/    #可以自定義
               /run/    #可以自定義
               /conf/   #可以自定義

然后找到/etc/supervisord.conf配置文件,編輯如下信息:

; 將supervisor.sock 的路徑換成如下
[unix_http_server]
file=/var/supervisor/run/supervisor.sock   ; (the path to the socket file)

; 將supervisord.log 和 supervisord.pid 的路徑換成如下
[supervisord]
logfile=/var/supervisor/log/supervisord.log  ; (main log file;default $CWD/supervisord.log)
pidfile=/var/supervisor/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid)

; 將supervisor.sock 的路徑換成如下
[supervisorctl]
serverurl=unix:///var/supervisor/run/supervisor.sock ; use a unix:// URL  for a unix socket

; 將最底部的files路徑換成如下
[include]
files = /var/supervisor/conf/*.conf

/var/supervisor/conf目錄里創建一個.conf文件,這里命名為queue_work.conf,內容如下:

[program:queue_worker] ;項目名稱
directory = /opt/www/tp5.1 ; 程序的啟動目錄,項目根目錄的上一級
command = php think queue:work --queue queueName --daemon ; 啟動命令 queueName就是隊列名
process_name=%(program_name)s_%(process_num)02d
numprocs = 3         ; 開啟的進程數量
autostart = true     ; 在 supervisord 啟動的時候也自動啟動
startsecs = 5        ; 啟動 5 秒后沒有異常退出,就當作已經正常啟動了
autorestart = true   ; 程序異常退出后自動重啟
startretries = 3     ; 啟動失敗自動重試次數,默認是 3
user = root          ; 用哪個用戶啟動
redirect_stderr = true  ; 把 stderr 重定向到 stdout,默認 false
stdout_logfile_maxbytes = 50MB  ; stdout 日志文件大小,默認 50MB
stdout_logfile_backups = 20     ; stdout 日志文件備份數
; stdout 日志文件,需要手動創建目錄(supervisord 會自動創建日志文件)
stdout_logfile = /var/supervisor/log/queue_worker.log
loglevel=info

對於index這個單模塊而言,不同的業務邏輯為了區分可能會存在多個隊列名,這種情況將多個隊列名用逗號拼接起來:

command = php think queue:work --queue queueName1,queueName2 --daemon ;

重啟

# systemctl stop supervisord
# systemctl start supervisord

# systemctl restart supervisord

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM