延遲任務應用場景
場景一:物聯網系統經常會遇到向終端下發命令,如果命令一段時間沒有應答,就需要設置成超時。
場景二:訂單下單之后30分鍾后,如果用戶沒有付錢,則系統自動取消訂單。
場景三:過1分鍾給新注冊會員的用戶,發送注冊郵件等。
php 使用rabbitmq-delayed-message-exchange插件實現延遲功能
1.安裝
下載后解壓,並將其拷貝至(使用Linux Debian/RPM部署)rabbitmq服務器目錄:/usr/local/rabbitmq/plugins
中( windows安裝目錄\rabbitmq_server-version\plugins
).
2.啟用插件
使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang
啟用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchang
輸出如下:
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
通過rabbitmq-plugins list
查看已安裝列表,如下:
...
[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x ...
3.機制解釋
安裝插件后會生成新的Exchange類型x-delayed-message
,該類型消息支持延遲投遞機制,接收到消息后並未立即將消息投遞至目標隊列中,而是存儲在mnesia
(一個分布式數據系統)表中,檢測消息延遲時間,如達到可投遞時間時並將其通過x-delayed-type
類型標記的交換機類型投遞至目標隊列。
4.php實現過程
消費者 delay_consumer2.php:
<?php //header('Content-Type:text/html;charset=utf8;'); $params = array( 'exchangeName' => 'delayed_exchange_test', 'queueName' => 'delayed_queue_test', 'routeKey' => 'delayed_route_test', ); $connectConfig = array( 'host' => 'localhost', 'port' => 5672, 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ); //var_dump(extension_loaded('amqp')); //exit(); try { $conn = new AMQPConnection($connectConfig); $conn->connect(); if (!$conn->isConnected()) { //die('Conexiune esuata'); //TODO 記錄日志 echo 'rabbit-mq 連接錯誤:', json_encode($connectConfig); exit(); } $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { // die('Connection through channel failed'); //TODO 記錄日志 echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig); exit(); } $exchange = new AMQPExchange($channel); //$exchange->setFlags(AMQP_DURABLE);//聲明一個已存在的交換器的,如果不存在將拋出異常,這個一般用在consume端 $exchange->setName($params['exchangeName']); $exchange->setType('x-delayed-message'); //x-delayed-message類型 /*RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。 fanout:把所有發送到該Exchange的消息投遞到所有與它綁定的隊列中。 direct:把消息投遞到那些binding key與routing key完全匹配的隊列中。 topic:將消息路由到binding key與routing key模式匹配的隊列中。*/ $exchange->setArgument('x-delayed-type','direct'); $exchange->declareExchange(); //$channel->startTransaction(); $queue = new AMQPQueue($channel); $queue->setName($params['queueName']); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); //綁定 $queue->bind($params['exchangeName'], $params['routeKey']); } catch(Exception $e) { echo $e->getMessage(); exit(); } function callback(AMQPEnvelope $message) { global $queue; if ($message) { $body = $message->getBody(); echo '接收時間:'.date("Y-m-d H:i:s", time()). PHP_EOL; echo '接收內容:'.$body . PHP_EOL; //為了防止接收端在處理消息時down掉,只有在消息處理完成后才發送ack消息 $queue->ack($message->getDeliveryTag()); } else { echo 'no message' . PHP_EOL; } } //$queue->consume('callback'); 第一種消費方式,但是會阻塞,程序一直會卡在此處 //第二種消費方式,非阻塞 /*$start = time(); while(true) { $message = $queue->get(); if(!empty($message)) { echo $message->getBody(); $queue->ack($message->getDeliveryTag()); //應答,代表該消息已經消費 $end = time(); echo '<br>' . ($end - $start); exit(); } else { //echo 'message not found' . PHP_EOL; } }*/ //注意:這里需要注意的是這個方法:$queue->consume,queue對象有兩個方法可用於取消息:consume和get。前者是阻塞的,無消息時會被掛起,適合循環中使用;后者則是非阻塞的,取消息時有則取,無則返回false。 //就是說用了consume之后,會同步阻塞,該程序常駐內存,不能用nginx,apache調用。 $action = '2'; if($action == '1'){ $queue->consume('callback'); //第一種消費方式,但是會阻塞,程序一直會卡在此處 }else{ //第二種消費方式,非阻塞 $start = time(); while(true) { $message = $queue->get(); if(!empty($message)) { echo '接收時間:'.date("Y-m-d H:i:s", time()). PHP_EOL; echo '接收內容:'.$message->getBody().PHP_EOL; $queue->ack($message->getDeliveryTag()); //應答,代表該消息已經消費 $end = time(); echo '運行時間:'.($end - $start).'秒'.PHP_EOL; //exit(); } else { //echo 'message not found' . PHP_EOL; } } }
生產者delay_publisher2.php:
<?php //header('Content-Type:text/html;charset=utf-8;'); $params = array( 'exchangeName' => 'delayed_exchange_test', 'queueName' => 'delayed_queue_test', 'routeKey' => 'delayed_route_test', ); $connectConfig = array( 'host' => 'localhost', 'port' => 5672, 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ); //var_dump(extension_loaded('amqp')); 判斷是否加載amqp擴展 //exit(); try { $conn = new AMQPConnection($connectConfig); $conn->connect(); if (!$conn->isConnected()) { //die('Conexiune esuata'); //TODO 記錄日志 echo 'rabbit-mq 連接錯誤:', json_encode($connectConfig); exit(); } $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { // die('Connection through channel failed'); //TODO 記錄日志 echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig); exit(); } $exchange = new AMQPExchange($channel); $exchange->setName($params['exchangeName']); $exchange->setType('x-delayed-message'); //x-delayed-message類型 /*RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。 fanout:把所有發送到該Exchange的消息投遞到所有與它綁定的隊列中。 direct:把消息投遞到那些binding key與routing key完全匹配的隊列中。 topic:將消息路由到binding key與routing key模式匹配的隊列中。*/ $exchange->setArgument('x-delayed-type','direct'); $exchange->declareExchange(); //$channel->startTransaction(); //RabbitMQ不容許聲明2個相同名稱、配置不同的Queue,否則報錯 $queue = new AMQPQueue($channel); $queue->setName($params['queueName']); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); //綁定隊列和交換機 $queue->bind($params['exchangeName'], $params['routeKey']); //$channel->commitTransaction(); } catch(Exception $e) { } for($i=5;$i>0;$i--){ //生成消息 echo '發送時間:'.date("Y-m-d H:i:s", time()).PHP_EOL; echo 'i='.$i.',延遲'.$i.'秒'.PHP_EOL; $message = json_encode(['order_id'=>time(),'i'=>$i]); $exchange->publish($message, $params['routeKey'], AMQP_NOPARAM, ['headers'=>['x-delay'=> 1000*$i]]); sleep(2); } $conn->disconnect();
對於代碼來講,首先對於消費者核心代碼
$exchange->setType('x-delayed-message'); //x-delayed-message類型 $exchange->setArgument('x-delayed-type','direct');
生產者核心代碼
$exchange = new AMQPExchange($channel); $exchange->setName($params['exchangeName']); $exchange->setType('x-delayed-message'); //x-delayed-message類型 $exchange->setArgument('x-delayed-type','direct'); $exchange->declareExchange();
使用方法:先運行delay_consumer1.php,再運行delay_publisher1.php
運行效果: