1.概述
消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。原理都类似,本文主要基于rabbitmq
2.创建生产者
php bin/hyperf.php gen:amqp-producer DemoProducer
在 DemoProducer 文件中,我们可以修改 @Producer 注解对应的字段来替换对应的 exchange 和 routingKey。 其中 payload 就是最终投递到消息队列中的数据,所以我们可以随意改写 __construct 方法,只要最后赋值 payload 即可。 示例如下。
<?php
declare(strict_types=1);
namespace App\Amqp\Producers;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;
use App\Models\User;
/**
* DemoProducer
* @Producer(exchange="hyperf", routingKey="hyperf")
*/
class DemoProducer extends ProducerMessage
{
public function __construct($id)
{
// 设置不同 pool
$this->poolName = 'pool2';
$user = User::where('id', $id)->first();
$this->payload = [
'id' => $id,
'data' => $user->toArray()
];
}
}
通过di容器创建实例投递消息
$message = new DemoProducer(1); $producer = ApplicationContext::getContainer()->get(Producer::class); $result = $producer->produce($message);
3.创建消费者
php bin/hyperf.php gen:amqp-consumer DemoConsumer
在 DemoConsumer 文件中,我们可以修改 @Consumer 注解对应的字段来替换对应的 exchange、routingKey 和 queue。 其中 $data 就是解析后的消息数据。 示例如下
<?php
declare(strict_types=1);
namespace App\Amqp\Consumers;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
/**
* @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
*/
class DemoConsumer extends ConsumerMessage
{
public function consume($data): string
{
print_r($data);
return Result::ACK;
}
}
更多细节请查阅hyperf官方文档
