php調用rabbitmq實現訂單消費隊列,和延時消費隊列


2020年10月19日15:57:24

 個人一點學習和使用rabbitmq,先理解其中概念,不然使用起來十分混亂

php使用rabbitmq的相關博客還是相對較少的,java的偏多一些,我也是參考一些java博客才算是搞清楚

環境php7.3 laravel 8.0 一部分原因也是測試一下 laravel 8.0的改變

安裝參考

composer require php-amqplib/php-amqplib

https://www.cnblogs.com/zx-admin/p/13825182.html

先貼代碼

BaseRabbitmqService

<?php

namespace App\Service;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;

class BaseRabbitmqService {

    //死信隊列和交換機
    public static $dlxQueue = 'dlx.queue';
    public static $dlxExchange = 'dlx.exchange';
    public static $dlxKey = 'dlxKey';
    //死信之后的隊列和交換機
    public static $normalQueue = 'normal.queue';
    public static $normalExchange = 'normal.exchange';
    public static $normalKey = 'normalKey';
    //消息發布者的routing_key
    public static $msgKey = 'msgkey';

    private static function getConfig() {
        $isOnline = config('system.is_online');
        if ($isOnline) {
            return config('system.online');
        } else {
            return config('system.offline');
        }
    }

    public static function getConnection() {
        $config = self::getConfig();

        $connection = new AMQPStreamConnection($config['host'], $config['port'], $config['name'], $config['password']);
        self::init($connection);
        return $connection;
    }

    //初始化一些隊列信息
    private static function init(&$connection) {
        $channel = $connection->channel();

        //定義交換機
        $channel->exchange_declare(self::$dlxExchange, AMQPExchangeType::DIRECT, false, true);
        $channel->exchange_declare(self::$normalExchange, AMQPExchangeType::FANOUT, false, true);

        //定義隊列,在正常隊列超時之后就送去死信隊列
        $args = new AMQPTable();
        // 消息過期方式:設置 queue.normal 隊列中的消息5s之后過期,毫秒單位
        $args->set('x-message-ttl', 5000);
        // 設置隊列最大長度方式: x-max-length
        //$args->set('x-max-length', 1);
        $args->set('x-dead-letter-exchange', self::$dlxExchange);
        $args->set('x-dead-letter-routing-key', self::$msgKey);
        $channel->queue_declare(self::$normalQueue, false, true, false, false, false, $args);
        $channel->queue_declare(self::$dlxQueue, false, true, false, false);

        $channel->queue_bind(self::$normalQueue, self::$normalExchange);
        $channel->queue_bind(self::$dlxQueue, self::$dlxExchange, self::$msgKey);
    }

}
View Code

 

生產者代碼 ProducerService

<?php

namespace App\Service;

use App\Service\BaseRabbitmqService;
use App\Models\Order;
use PhpAmqpLib\Message\AMQPMessage;

class ProducerService extends BaseRabbitmqService {

    public static function doTask() {
//        echo 'ProducerService';
        $connection = self::getConnection();
        $channel = $connection->channel();

        $data = [];
        //生成5條數數據
        for ($i = 0; $i < 5; $i++) {

            $data['user_id'] = mt_rand(1, 100);
            $data['order_amount'] = mt_rand(10000, 99999);
            $data['order_number'] = mt_rand(100, 999);

//            $msg = new AMQPMessage(json_encode($data),
//                    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) # 使消息持久化
//            );
            $msg = new AMQPMessage(json_encode($data));

            echo " [x] Send  ", date('Y-m-d H:i:s') . '--' . json_encode($data), "\n";
            $channel->basic_publish($msg, self::$normalExchange);
        }

        $channel->close();
        $connection->close();
    }

}
View Code

 

消費者代碼

<?php

namespace App\Service;

use App\Service\BaseRabbitmqService;
use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Facades\DB;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class ConsumerService extends BaseRabbitmqService {

    public static function doTask() {
//        echo 'ConsumerService';
        $connection = self::getConnection();
        $channel = $connection->channel();

        $callback = function($msg) {
            echo " [x] Received ", date('Y-m-d H:i:s') . '--' . $msg->body, "\n";
            //主動確認信息處理完
//            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            //沒有確認就手動丟給死信隊列
            sleep(10);
            $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);
        };
        //發送一個未處理完就不發送下一個
//        $channel->basic_qos(null, 1, null);
        $channel->basic_consume(self::$normalQueue, 'ConsumerService', false, false, false, false, $callback);

        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }

}
View Code

消費者代碼t

<?php

namespace App\Service;

use App\Service\BaseRabbitmqService;
use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Facades\DB;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class ConsumerServicet extends BaseRabbitmqService {

    public static function doTask() {
//        echo 'ConsumerServicet';
        $connection = self::getConnection();
        $channel = $connection->channel();

        $callback = function($msg) {
            echo " [x] Received ", date('Y-m-d H:i:s') . '--' . $msg->body, "\n";
            //主動確認信息處理完
//            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };
        //發送一個未處理完就不發送下一個
//        $channel->basic_qos(null, 1, null);
        $channel->basic_consume(self::$dlxQueue, 'ConsumerServicet', false, false, false, false, $callback);

        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }

}
View Code

 

 

注意點:

1,AMQPExchangeType::DIRECT和AMQPExchangeType::FANOUT 交換機類型的區別,也就是訂閱分發布的關系

2,x-dead-letter-routing-key 死信key也就是死信訂閱交換機需要關注的key,不然交換不過去,在綁定死信交換機和死信隊列的時候綁定同一個key

3,注意如何手動確認消息到達,和手動拒絕消息,這個再處理業務邏輯的時候,就需要

 //主動確認信息處理完
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
//沒有確認就手動丟給死信隊列$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);

 

4,對於死信隊列的里面什么情況下才會丟給死信交換機,

1,消息被拒絕(Basic.Reject/Basic.Nack) ,井且設置requeue 參數為false
2,消息過期
3,隊列達到最大長度
4.當消息在一個隊列中變成了死信消息后,可以被發送到另一個交換機,這個交換機就是DLX,綁定DLX的隊列成為死信隊列。當這個隊列中存在死信時, RabbitMQ 就會立即自動地將這個消息重新發布到設置的DLX 上去,進而被路由到綁定該DLX的死信隊列上。可以監聽這個隊列中的消息、以進行相應的處理,這個特性與將消息的TTL 設置為0 配合使用可以彌補imrnediate 參數的功能

這里需要注意的是,你在監聽正常消費的設置死信的隊列的時候,即使設置的時間到了也是不會丟給死信隊列的,如果你不開啟正常消費隊列的監聽,這個設置了死信的隊列就成了延遲隊列的效果,再次強調 理解概念

5,手動丟給死信隊列

$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);

為啥在此說這個問題,因為4,5你需要多次嘗試之后才能理解,所以在你想實現 延時消費隊列的時候就可以不去監聽正常消費隊列,直接去監聽死信隊列,就可以實現延時效果,

你也可以通過延遲插件來實現,但是在代碼里就需要非常注意,不然就容易出現邏輯混亂的問題了

6,Consumer必須在cli模式下執行,但是Producer就不必要

7,邏輯梳理

發布消息->正常交換機->設置了死信屬性的隊列->超時,拒絕,無人監聽->死信交換機—>死信隊列

根據邏輯處理不同可以分為死信隊列,也可以是延遲隊列

 

 

參考資料

https://www.bbsmax.com/A/QV5Z36WZdy/

https://www.cnblogs.com/wudequn/p/11198427.html

https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/ying-yong-jiao-cheng/php-ban/1-hello_world


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM