Hyperf-消息隊列
官方文檔里面有詳細說明,安裝和配置不再贅述,這里只是記錄下實際中Redis異步隊列的具體使用。
注意:異步隊列區別於 RabbitMQ Kafka 等消息隊列,它只提供一種 異步處理 和 異步延時處理 的能力,並不能嚴格地保證消息的持久化和不支持完備的ACK 應答機制。
工作原理
ConsumerProcess 是異步消費進程,會根據用戶創建的 Job 或者使用 @AsyncQueueMessage 的代碼塊,執行消費邏輯。 Job 和 @AsyncQueueMessage 都是需要投遞和執行的任務,即數據、消費邏輯都會在任務中定義。
1)Job 類中成員變量即為待消費的數據,handle() 方法則為消費邏輯。
2)@AsyncQueueMessage 注解的方法,構造函數傳入的數據即為待消費的數據,方法體則為消費邏輯。
1、配置異步消費進程
config/autoload/processes.php
1 <?php 2 3 return [ 4 5 Hyperf\AsyncQueue\Process\ConsumerProcess::class, 6 ];
2、定義Job
Job 類中成員變量即為待消費的數據,handle() 方法則為消費邏輯
app/Job/AnalyseJob.php
1 <?php 2 3 declare(strict_types=1); 4 5 namespace App\Job; 6 7 use App\Log; 8 use App\Service\V1\Order\OrderService; 9 use Hyperf\AsyncQueue\Job; 10 11 class AnalyseJob extends Job 12 { 13 public $params; 14 15 /** 16 * 任務執行失敗后的重試次數,即最大執行次數為 $maxAttempts+1 次 17 * @var int 18 */ 19 protected $maxAttempts = 2; 20 21 public function __construct($params) 22 { 23 $this->params = $params; 24 } 25 26 public function handle() 27 { 28 // 這里的邏輯會在 ConsumerProcess 進程中執行 29 try { 30 var_dump(">>> 監聽到隊列消息(曬單數據):"); 31 var_dump($this->params); 32 // 分析曬單數據 33 $orderService = new OrderService(); 34 $res = $orderService->analyseOne($this->params); 35 Log::get('AnalyseJob')->debug('監聽到隊列消息(曬單數據),進行分析:', $res); 36 } catch (\Exception $e) { 37 Log::get('AnalyseJob')->debug('監聽到隊列消息(曬單數據),進行分析:' . $e->getMessage()); 38 } 39 } 40 }
3、寫一個專門投遞消息的service
QueueService.php
1 <?php 2 3 declare(strict_types=1); 4 5 namespace App\Service\Producer; 6 7 use App\Job\AnalyseJob; 8 use Hyperf\AsyncQueue\Driver\DriverFactory; 9 use Hyperf\AsyncQueue\Driver\DriverInterface; 10 11 class QueueService 12 { 13 /** 14 * @var DriverInterface 15 */ 16 protected $driver; 17 18 public function __construct(DriverFactory $driverFactory) 19 { 20 $this->driver = $driverFactory->get('default'); 21 } 22 23 /** 24 * 生產消息 25 * @param $params 26 * @param int $delay 27 * @return bool 28 */ 29 public function push($params, int $delay = 0): bool 30 { 31 return $this->driver->push(new AnalyseJob($params), $delay); 32 } 33 }
4、投遞消息
業務中調用service,來投遞消息
1 <?php 2 3 declare(strict_types=1); 4 5 namespace App\Controller; 6 7 use App\Service\QueueService; 8 use Hyperf\Di\Annotation\Inject; 9 use Hyperf\HttpServer\Annotation\AutoController; 10 11 /** 12 * @AutoController 13 */ 14 class UserOrderController extends AbstractController 15 { 16 /** 17 * @Inject 18 * @var QueueService 19 */ 20 protected $service; 21 22 /** 23 * 注解模式投遞消息 24 */ 25 public function example() 26 { 27 // 創建/編輯成功后,推送隊列消息,消費曬單消息用於比對原始訂單 28 if ($ret['code'] == 0) { 29 $userOrderRes = $this->orderService->getUserOrderNativeInfo($ret['data']); 30 $this->queueService->push($userOrderRes['data']); 31 } 32 33 // ... 34 35 } 36 }
參考鏈接:
https://hyperf.wiki/2.1/#/zh-cn/async-queue