hyperf中使用消息中間件rabbitmq


1.概述

消息隊列已經逐漸成為企業IT系統內部通信的核心手段。它具有低耦合、可靠投遞、廣播、流量控制、最終一致性等一系列功能,成為異步RPC的主要手段之一。當今市面上有很多主流的消息中間件,如老牌的ActiveMQ、RabbitMQ,炙手可熱的Kafka,阿里巴巴自主開發RocketMQ等。原理都類似,本文主要基於rabbitmq

2.創建生產者

php bin/hyperf.php gen:amqp-producer DemoProducer

在 DemoProducer 文件中,我們可以修改 @Producer 注解對應的字段來替換對應的 exchange 和 routingKey。 其中 payload 就是最終投遞到消息隊列中的數據,所以我們可以隨意改寫 __construct 方法,只要最后賦值 payload 即可。 示例如下。

<?php

declare(strict_types=1);

namespace App\Amqp\Producers;

use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;
use App\Models\User;

/**
 * DemoProducer
 * @Producer(exchange="hyperf", routingKey="hyperf")
 */
class DemoProducer extends ProducerMessage
{
    public function __construct($id)
    {
        // 設置不同 pool
        $this->poolName = 'pool2';

        $user = User::where('id', $id)->first();
        $this->payload = [
            'id' => $id,
            'data' => $user->toArray()
        ];
    }
}

  通過di容器創建實例投遞消息

$message = new DemoProducer(1);
$producer = ApplicationContext::getContainer()->get(Producer::class);
$result = $producer->produce($message);

 3.創建消費者

php bin/hyperf.php gen:amqp-consumer DemoConsumer

  在 DemoConsumer 文件中,我們可以修改 @Consumer 注解對應的字段來替換對應的 exchangeroutingKey 和 queue。 其中 $data 就是解析后的消息數據。 示例如下

<?php

declare(strict_types=1);

namespace App\Amqp\Consumers;

use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;

/**
 * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
 */
class DemoConsumer extends ConsumerMessage
{
    public function consume($data): string
    {
        print_r($data);
        return Result::ACK;
    }
}

  更多細節請查閱hyperf官方文檔

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM