TP5系列 | Queue消息隊列


消費信息如下ThinkPHP5 Queue消息隊列

優點

1、Queue內置了 Redis,Database,Topthink ,Sync這四種驅動,本文使用Redis驅動

2、Queue消息隊列適用於大並發或者返回結果 時間有點長並需要批量操作的第三方接口,可用於短信發送、郵件發送、APP推送

3、Queue消息消息可進行發布,獲取,執行,刪除,重發,失敗處理,延遲執行,超時控制等操作

流程圖

  

 

 

 

創建隊列

文件路徑:application\common\queue\TestQueue.php

TestQueue.php 參考代碼

  

<?php
namespace app\common\queue;

use think\facade\Log;
use think\queue\Job;

class TestQueue
{
    public function fire(Job $job, $data)
    {
        $isJobDone = $this->testJob($data);
        // 如果任務執行成功后 記得刪除任務,不然這個任務會重復執行,直到達到最大重試次數后失敗后,執行failed方法
        if ($isJobDone) {
            $job->delete();
        } else {
            //通過這個方法可以檢查這個任務已經重試了幾次了
            $attempts = $job->attempts();
            echo $attempts;
            if ($attempts == 0 || $attempts == 1) {
                // 重新發布這個任務
                $job->release(2); //$delay為延遲時間,延遲2S后繼續執行
            } elseif ($attempts == 2) {
                $job->release(5); // 延遲5S后繼續執行
            }
        }
    }

    /**
     * @Desc: 任務執行失敗后自動執行方法
     * @param $data
     */
    public function failed($data)
    {
        // ...任務達到最大重試次數后,失敗了
        Log::error('任務達到最大重試次數后,失敗了 '.json_encode($data));
    }

    /**
     * @Desc: 自定義需要加入的隊列任務
     */
    private function testJob($data)
    {
        $jsonData = json_encode($data);
        echo "1、具體執行任務接受到的參數:{$jsonData} \r\n";
        if($data){
            echo "2、恭喜你!{$data['email']} 郵件發送成功了 \r\n";
            return true;
        }else{
            echo "2、很遺憾,{$data['email']} 郵件發送失敗了 \r\n";
            return false;
        }
    }
}

  

配置隊列

1、這里使用Redis驅動來存儲隊列消息

2、隊列配置文件路徑:application\config\queue

配置參考代碼

  

return [
    'connector'  => 'Redis',
    'expire'     => 3600,
    'default'    => 'REDIS_QUEUE',
    'host'       => 'dnmp-redis',
    'port'       => 6379,
    'password'   => '',
    'select'     => 0,
    'timeout'    => 0,
    'persistent' => false,
];

  生產者參考代碼

    

/**
* @Desc: 生產者生產消息
*/
public function productMsg()
{
    // 當前任務所需的業務數據,不能為 resource 類型,其他類型最終將轉化為json形式的字符串
    $data = [
        'email' => rand(11,99).'@qq.com',
        'username' => 'Tinywan'
    ];

    // 當前任務歸屬的隊列名稱,如果為新隊列,會自動創建
    $queueName = 'testQueue';

    // 將該任務推送到消息隊列,等待對應的消費者去執行
    $isPushed = Queue::push(TestQueue::class, $data, $queueName);

    // database 驅動時,返回值為 1|false; redis驅動時,返回值為 隨機字符串|false
    if ($isPushed !== false) {
        echo '['.$data['email'].']'." 隊列加入成功 \r\n";
    } else {
        echo "隊列加入失敗 \r\n";
    }
}

  

為了方便演示,這里使用cli模式。

執行生產者:php product_msg.php

# php product_msg.php
[27@qq.com] 隊列加入成功
# php product_msg.php
[77@qq.com] 隊列加入成功

  

1、這時候消息已經被持久化到Redis中去了(通過列表去存儲)

2、推送成功,雖然我們這時候已經寫好了消費者,但是我們並沒有開始消費。但是推送消息依然是成功的。這個就是中間件的優勢。他連接兩個系統,但是又不會互相耦合,生產者並不會因為消費者的異常而影響到自己。

3、消息推送成功之后,如果沒有消費者,消息會堆積在隊列中。不過別怕,消息堆積很正常,並且一般的中間件堆積能力是非常強的。比如阿里就宣傳自己mq可以堆積上億條數據。

查看Redis消息與隊列

> docker exec -it dnmp-redis redis-cli
127.0.0.1:6379> keys *
127.0.0.1:6379> keys *
1) "queues:testQueue"
127.0.0.1:6379> TYPE queues:testQueue
list
127.0.0.1:6379> LRANGE queues:testQueue 0 -1
1) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"27@qq.com\",\"username\":\"Tinywan\"},\"id\":\"MLgNb4LFALhtmp7HZtfXMFPRUT0r94Bi\",\"attempts\":1}"
2) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"77@qq.com\",\"username\":\"Tinywan\"},\"id\":\"JM16vvjMylfJDnOpldJaHda8xMwuYYzP\",\"attempts\":1}"
127.0.0.1:6379>

消費者

開始消費消息。執行cli 命令 php think queue:work--queue隊列名稱

# php think queue:work --queue testQueue
1、具體執行任務接受到的參數: {"email":"27@qq.com","username":"Tinywan"}
2、恭喜你!27@qq.com 郵件發送成功了
Processed: app\common\queue\TestQueue

  

這里每消費掉一條消息,Redis數據庫中將會減少一條消息

查看Redis隊列消息

127.0.0.1:6379> LRANGE queues:testQueue 0 -1
1) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"77@qq.com\",\"username\":\"Tinywan\"},\"id\":\"JM16vvjMylfJDnOpldJaHda8xMwuYYzP\",\"attempts\":1}"
127.0.0.1:6379>

  命令行掛起守護進程執行

/usr/bin/php /var/www/tp5/think queue:work --daemon --queue testQueue --memory 256

  

--daemon 是否循環執行,如果不加該參數則該命令處理完下一個消息就退出 --queue 要處理的隊列的名稱 --delay 0 如果本次任務執行拋出異常且任務未被刪除時,設置其下次執行前延遲多少秒,默認為0。 --memory 該進程允許使用的內存上限,以M為單位。

流程圖

 

 

 

 

  

 

消費信息如下

 php think queue:work --daemon --queue testQueue
1、具體執行任務接受到的參數: {"email":"77@qq.com","username":"Tinywan"}
2、恭喜你!77@qq.com 郵件發送成功了
Processed: app\common\queue\TestQueue
1、具體執行任務接受到的參數: {"email":"80@qq.com","username":"Tinywan"}
2、恭喜你!80@qq.com 郵件發送成功了
Processed: app\common\queue\TestQueue
1、具體執行任務接受到的參數: {"email":"34@qq.com","username":"Tinywan"}
2、恭喜你!34@qq.com 郵件發送成功了
Processed: app\common\queue\TestQueue

  

1、命令行模式可以常駐內存不停的執行php代碼。這樣就可以達到類似於靜態語言的java的效果。

2、一開始監聽隊列。剛剛在隊列中堆積的消息立刻就被獲取到,開始執行了代碼。最后執行完成,刪除了消息。

3、在 queue:work--daemon 單進程循環消費的時候,改了代碼是不會生效的。這時腳本語言有點類似於靜態語言在執行。所以需要我們用 queue:restart 重啟 work 進程

命令行掛起守護進程執行

/usr/local/php/bin/php    /data/wwwroot/default/thinkphp_5/think    queue:work --daemon --queue testQueue --memory 256

  查看進程是否在運行

# ps
PID   USER     TIME  COMMAND
    1 root      0:00 php-fpm: master process (/usr/local/etc/php-fpm.conf)
    6 www-data  0:00 php-fpm: pool www
    7 www-data  0:00 php-fpm: pool www
   16 root      0:00 sh
   56 root      0:00 sh
  113 root      0:00 php think queue:work --daemon --queue testQueue

  你再也不用守在終端了,以后只負責生產消息就可以了。Redis隊列也不會積累消息了

 

其他(中間件)

中間件系統的定義是兩個獨立的不同的系統在中間構建起傳遞消息的工具。但是同一個系統也可以通過中間件來榨取性能,大家肯定項目中遇到過性能瓶頸。

比如發送郵件,發送短信,轉換視頻格式等等。這些業務都是比較耗性能,又對先后順序不敏感的業務。這種業務就非常適合使用消息隊列系統來異步處理,使性能提升。

 

重啟隊列和生成隊列

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM