Hyperf-消息隊列


Hyperf-消息隊列

     官方文檔里面有詳細說明,安裝和配置不再贅述,這里只是記錄下實際中Redis異步隊列的具體使用。

     注意:異步隊列區別於 RabbitMQ Kafka 等消息隊列,它只提供一種 異步處理 和 異步延時處理 的能力,並不能嚴格地保證消息的持久化不支持完備的ACK 應答機制

     工作原理

     ConsumerProcess 是異步消費進程,會根據用戶創建的 Job 或者使用 @AsyncQueueMessage 的代碼塊,執行消費邏輯。 Job 和 @AsyncQueueMessage 都是需要投遞和執行的任務,即數據、消費邏輯都會在任務中定義

     1)Job 類中成員變量即為待消費的數據,handle() 方法則為消費邏輯。
     2)@AsyncQueueMessage 注解的方法,構造函數傳入的數據即為待消費的數據,方法體則為消費邏輯。

 

    1、配置異步消費進程

    config/autoload/processes.php

1 <?php
2 
3   return [
4 
5        Hyperf\AsyncQueue\Process\ConsumerProcess::class,
6   ];

    2、定義Job

    Job 類中成員變量即為待消費的數據,handle() 方法則為消費邏輯

    app/Job/AnalyseJob.php

 1 <?php
 2 
 3 declare(strict_types=1);
 4 
 5 namespace App\Job;
 6 
 7 use App\Log;
 8 use App\Service\V1\Order\OrderService;
 9 use Hyperf\AsyncQueue\Job;
10 
11 class AnalyseJob extends Job
12 {
13     public $params;
14 
15     /**
16      * 任務執行失敗后的重試次數,即最大執行次數為 $maxAttempts+1 次
17      * @var int
18      */
19     protected $maxAttempts = 2;
20 
21     public function __construct($params)
22     {
23         $this->params = $params;
24     }
25 
26     public function handle()
27     {
28         // 這里的邏輯會在 ConsumerProcess 進程中執行
29         try {
30             var_dump(">>> 監聽到隊列消息(曬單數據):");
31             var_dump($this->params);
32             // 分析曬單數據
33             $orderService = new OrderService();
34             $res = $orderService->analyseOne($this->params);
35             Log::get('AnalyseJob')->debug('監聽到隊列消息(曬單數據),進行分析:', $res);
36         } catch (\Exception $e) {
37             Log::get('AnalyseJob')->debug('監聽到隊列消息(曬單數據),進行分析:' . $e->getMessage());
38         }
39     }
40 }

    3、寫一個專門投遞消息的service

     QueueService.php

 1 <?php
 2 
 3 declare(strict_types=1);
 4 
 5 namespace App\Service\Producer;
 6 
 7 use App\Job\AnalyseJob;
 8 use Hyperf\AsyncQueue\Driver\DriverFactory;
 9 use Hyperf\AsyncQueue\Driver\DriverInterface;
10 
11 class QueueService
12 {
13     /**
14      * @var DriverInterface
15      */
16     protected $driver;
17 
18     public function __construct(DriverFactory $driverFactory)
19     {
20         $this->driver = $driverFactory->get('default');
21     }
22 
23     /**
24      * 生產消息
25      * @param $params
26      * @param int $delay
27      * @return bool
28      */
29     public function push($params, int $delay = 0): bool
30     {
31         return $this->driver->push(new AnalyseJob($params), $delay);
32     }
33 }

    4、投遞消息

    業務中調用service,來投遞消息

 1 <?php
 2 
 3 declare(strict_types=1);
 4 
 5 namespace App\Controller;
 6 
 7 use App\Service\QueueService;
 8 use Hyperf\Di\Annotation\Inject;
 9 use Hyperf\HttpServer\Annotation\AutoController;
10 
11 /**
12  * @AutoController
13  */
14 class UserOrderController extends AbstractController
15 {
16     /**
17      * @Inject
18      * @var QueueService
19      */
20     protected $service;
21 
22     /**
23      * 注解模式投遞消息
24      */
25     public function example()
26     {
27         // 創建/編輯成功后,推送隊列消息,消費曬單消息用於比對原始訂單
28         if ($ret['code'] == 0) {
29             $userOrderRes = $this->orderService->getUserOrderNativeInfo($ret['data']);
30             $this->queueService->push($userOrderRes['data']);
31         }
32 
33         // ...
34 
35     }
36 }

 

參考鏈接:

https://hyperf.wiki/2.1/#/zh-cn/async-queue


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM