2020年10月19日15:57:24
個人一點學習和使用rabbitmq,先理解其中概念,不然使用起來十分混亂
php使用rabbitmq的相關博客還是相對較少的,java的偏多一些,我也是參考一些java博客才算是搞清楚
環境php7.3 laravel 8.0 一部分原因也是測試一下 laravel 8.0的改變
安裝參考
composer require php-amqplib/php-amqplib
https://www.cnblogs.com/zx-admin/p/13825182.html
先貼代碼
BaseRabbitmqService

<?php namespace App\Service; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Wire\AMQPTable; class BaseRabbitmqService { //死信隊列和交換機 public static $dlxQueue = 'dlx.queue'; public static $dlxExchange = 'dlx.exchange'; public static $dlxKey = 'dlxKey'; //死信之后的隊列和交換機 public static $normalQueue = 'normal.queue'; public static $normalExchange = 'normal.exchange'; public static $normalKey = 'normalKey'; //消息發布者的routing_key public static $msgKey = 'msgkey'; private static function getConfig() { $isOnline = config('system.is_online'); if ($isOnline) { return config('system.online'); } else { return config('system.offline'); } } public static function getConnection() { $config = self::getConfig(); $connection = new AMQPStreamConnection($config['host'], $config['port'], $config['name'], $config['password']); self::init($connection); return $connection; } //初始化一些隊列信息 private static function init(&$connection) { $channel = $connection->channel(); //定義交換機 $channel->exchange_declare(self::$dlxExchange, AMQPExchangeType::DIRECT, false, true); $channel->exchange_declare(self::$normalExchange, AMQPExchangeType::FANOUT, false, true); //定義隊列,在正常隊列超時之后就送去死信隊列 $args = new AMQPTable(); // 消息過期方式:設置 queue.normal 隊列中的消息5s之后過期,毫秒單位 $args->set('x-message-ttl', 5000); // 設置隊列最大長度方式: x-max-length //$args->set('x-max-length', 1); $args->set('x-dead-letter-exchange', self::$dlxExchange); $args->set('x-dead-letter-routing-key', self::$msgKey); $channel->queue_declare(self::$normalQueue, false, true, false, false, false, $args); $channel->queue_declare(self::$dlxQueue, false, true, false, false); $channel->queue_bind(self::$normalQueue, self::$normalExchange); $channel->queue_bind(self::$dlxQueue, self::$dlxExchange, self::$msgKey); } }
生產者代碼 ProducerService

<?php namespace App\Service; use App\Service\BaseRabbitmqService; use App\Models\Order; use PhpAmqpLib\Message\AMQPMessage; class ProducerService extends BaseRabbitmqService { public static function doTask() { // echo 'ProducerService'; $connection = self::getConnection(); $channel = $connection->channel(); $data = []; //生成5條數數據 for ($i = 0; $i < 5; $i++) { $data['user_id'] = mt_rand(1, 100); $data['order_amount'] = mt_rand(10000, 99999); $data['order_number'] = mt_rand(100, 999); // $msg = new AMQPMessage(json_encode($data), // array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) # 使消息持久化 // ); $msg = new AMQPMessage(json_encode($data)); echo " [x] Send ", date('Y-m-d H:i:s') . '--' . json_encode($data), "\n"; $channel->basic_publish($msg, self::$normalExchange); } $channel->close(); $connection->close(); } }
消費者代碼

<?php namespace App\Service; use App\Service\BaseRabbitmqService; use Illuminate\Support\Facades\Redis; use Illuminate\Support\Facades\DB; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class ConsumerService extends BaseRabbitmqService { public static function doTask() { // echo 'ConsumerService'; $connection = self::getConnection(); $channel = $connection->channel(); $callback = function($msg) { echo " [x] Received ", date('Y-m-d H:i:s') . '--' . $msg->body, "\n"; //主動確認信息處理完 // $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //沒有確認就手動丟給死信隊列 sleep(10); $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']); }; //發送一個未處理完就不發送下一個 // $channel->basic_qos(null, 1, null); $channel->basic_consume(self::$normalQueue, 'ConsumerService', false, false, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } } }
消費者代碼t

<?php namespace App\Service; use App\Service\BaseRabbitmqService; use Illuminate\Support\Facades\Redis; use Illuminate\Support\Facades\DB; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class ConsumerServicet extends BaseRabbitmqService { public static function doTask() { // echo 'ConsumerServicet'; $connection = self::getConnection(); $channel = $connection->channel(); $callback = function($msg) { echo " [x] Received ", date('Y-m-d H:i:s') . '--' . $msg->body, "\n"; //主動確認信息處理完 // $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; //發送一個未處理完就不發送下一個 // $channel->basic_qos(null, 1, null); $channel->basic_consume(self::$dlxQueue, 'ConsumerServicet', false, false, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } } }
注意點:
1,AMQPExchangeType::DIRECT和AMQPExchangeType::FANOUT 交換機類型的區別,也就是訂閱分發布的關系
2,x-dead-letter-routing-key 死信key也就是死信訂閱交換機需要關注的key,不然交換不過去,在綁定死信交換機和死信隊列的時候綁定同一個key
3,注意如何手動確認消息到達,和手動拒絕消息,這個再處理業務邏輯的時候,就需要
//主動確認信息處理完 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //沒有確認就手動丟給死信隊列$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);
4,對於死信隊列的里面什么情況下才會丟給死信交換機,
1,消息被拒絕(Basic.Reject/Basic.Nack) ,井且設置requeue 參數為false
2,消息過期
3,隊列達到最大長度
4.當消息在一個隊列中變成了死信消息后,可以被發送到另一個交換機,這個交換機就是DLX,綁定DLX的隊列成為死信隊列。當這個隊列中存在死信時, RabbitMQ 就會立即自動地將這個消息重新發布到設置的DLX 上去,進而被路由到綁定該DLX的死信隊列上。可以監聽這個隊列中的消息、以進行相應的處理,這個特性與將消息的TTL 設置為0 配合使用可以彌補imrnediate 參數的功能
這里需要注意的是,你在監聽正常消費的設置死信的隊列的時候,即使設置的時間到了也是不會丟給死信隊列的,如果你不開啟正常消費隊列的監聽,這個設置了死信的隊列就成了延遲隊列的效果,再次強調 理解概念
5,手動丟給死信隊列
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);
為啥在此說這個問題,因為4,5你需要多次嘗試之后才能理解,所以在你想實現 延時消費隊列的時候就可以不去監聽正常消費隊列,直接去監聽死信隊列,就可以實現延時效果,
你也可以通過延遲插件來實現,但是在代碼里就需要非常注意,不然就容易出現邏輯混亂的問題了
6,Consumer必須在cli模式下執行,但是Producer就不必要
7,邏輯梳理
發布消息->正常交換機->設置了死信屬性的隊列->超時,拒絕,無人監聽->死信交換機—>死信隊列
根據邏輯處理不同可以分為死信隊列,也可以是延遲隊列
參考資料
https://www.bbsmax.com/A/QV5Z36WZdy/
https://www.cnblogs.com/wudequn/p/11198427.html
https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/ying-yong-jiao-cheng/php-ban/1-hello_world