延遲任務應用場景
場景一:物聯網系統經常會遇到向終端下發命令,如果命令一段時間沒有應答,就需要設置成超時。
場景二:訂單下單之后30分鍾后,如果用戶沒有付錢,則系統自動取消訂單。
場景三:過1分鍾給新注冊會員的用戶,發送注冊郵件等。
php 使用rabbitmq-delayed-message-exchange插件實現延遲功能
1.安裝
下載后解壓,並將其拷貝至(使用Linux Debian/RPM部署)rabbitmq服務器目錄:/usr/local/rabbitmq/plugins中( windows安裝目錄\rabbitmq_server-version\plugins ).
2.啟用插件
使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang啟用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchang
輸出如下:
The following plugins have been enabled: rabbitmq_delayed_message_exchange
通過rabbitmq-plugins list查看已安裝列表,如下:
... [ ] rabbitmq_delayed_message_exchange 20171215-3.6.x ...
3.機制解釋
安裝插件后會生成新的Exchange類型x-delayed-message,該類型消息支持延遲投遞機制,接收到消息后並未立即將消息投遞至目標隊列中,而是存儲在mnesia(一個分布式數據系統)表中,檢測消息延遲時間,如達到可投遞時間時並將其通過x-delayed-type類型標記的交換機類型投遞至目標隊列。
4.php實現過程
消費者 delay_consumer2.php:
<?php
//header('Content-Type:text/html;charset=utf8;');
$params = array(
'exchangeName' => 'delayed_exchange_test',
'queueName' => 'delayed_queue_test',
'routeKey' => 'delayed_route_test',
);
$connectConfig = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
//var_dump(extension_loaded('amqp'));
//exit();
try {
$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {
//die('Conexiune esuata');
//TODO 記錄日志
echo 'rabbit-mq 連接錯誤:', json_encode($connectConfig);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
// die('Connection through channel failed');
//TODO 記錄日志
echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
exit();
}
$exchange = new AMQPExchange($channel);
//$exchange->setFlags(AMQP_DURABLE);//聲明一個已存在的交換器的,如果不存在將拋出異常,這個一般用在consume端
$exchange->setName($params['exchangeName']);
$exchange->setType('x-delayed-message'); //x-delayed-message類型
/*RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。
fanout:把所有發送到該Exchange的消息投遞到所有與它綁定的隊列中。
direct:把消息投遞到那些binding key與routing key完全匹配的隊列中。
topic:將消息路由到binding key與routing key模式匹配的隊列中。*/
$exchange->setArgument('x-delayed-type','direct');
$exchange->declareExchange();
//$channel->startTransaction();
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName']);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
//綁定
$queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {
echo $e->getMessage();
exit();
}
function callback(AMQPEnvelope $message) {
global $queue;
if ($message) {
$body = $message->getBody();
echo '接收時間:'.date("Y-m-d H:i:s", time()). PHP_EOL;
echo '接收內容:'.$body . PHP_EOL;
//為了防止接收端在處理消息時down掉,只有在消息處理完成后才發送ack消息
$queue->ack($message->getDeliveryTag());
} else {
echo 'no message' . PHP_EOL;
}
}
//$queue->consume('callback'); 第一種消費方式,但是會阻塞,程序一直會卡在此處
//第二種消費方式,非阻塞
/*$start = time();
while(true)
{
$message = $queue->get();
if(!empty($message))
{
echo $message->getBody();
$queue->ack($message->getDeliveryTag()); //應答,代表該消息已經消費
$end = time();
echo '<br>' . ($end - $start);
exit();
}
else
{
//echo 'message not found' . PHP_EOL;
}
}*/
//注意:這里需要注意的是這個方法:$queue->consume,queue對象有兩個方法可用於取消息:consume和get。前者是阻塞的,無消息時會被掛起,適合循環中使用;后者則是非阻塞的,取消息時有則取,無則返回false。
//就是說用了consume之后,會同步阻塞,該程序常駐內存,不能用nginx,apache調用。
$action = '2';
if($action == '1'){
$queue->consume('callback'); //第一種消費方式,但是會阻塞,程序一直會卡在此處
}else{
//第二種消費方式,非阻塞
$start = time();
while(true)
{
$message = $queue->get();
if(!empty($message))
{
echo '接收時間:'.date("Y-m-d H:i:s", time()). PHP_EOL;
echo '接收內容:'.$message->getBody().PHP_EOL;
$queue->ack($message->getDeliveryTag()); //應答,代表該消息已經消費
$end = time();
echo '運行時間:'.($end - $start).'秒'.PHP_EOL;
//exit();
}
else
{
//echo 'message not found' . PHP_EOL;
}
}
}
生產者delay_publisher2.php:
<?php
//header('Content-Type:text/html;charset=utf-8;');
$params = array(
'exchangeName' => 'delayed_exchange_test',
'queueName' => 'delayed_queue_test',
'routeKey' => 'delayed_route_test',
);
$connectConfig = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
//var_dump(extension_loaded('amqp')); 判斷是否加載amqp擴展
//exit();
try {
$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {
//die('Conexiune esuata');
//TODO 記錄日志
echo 'rabbit-mq 連接錯誤:', json_encode($connectConfig);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
// die('Connection through channel failed');
//TODO 記錄日志
echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
exit();
}
$exchange = new AMQPExchange($channel);
$exchange->setName($params['exchangeName']);
$exchange->setType('x-delayed-message'); //x-delayed-message類型
/*RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。
fanout:把所有發送到該Exchange的消息投遞到所有與它綁定的隊列中。
direct:把消息投遞到那些binding key與routing key完全匹配的隊列中。
topic:將消息路由到binding key與routing key模式匹配的隊列中。*/
$exchange->setArgument('x-delayed-type','direct');
$exchange->declareExchange();
//$channel->startTransaction();
//RabbitMQ不容許聲明2個相同名稱、配置不同的Queue,否則報錯
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName']);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
//綁定隊列和交換機
$queue->bind($params['exchangeName'], $params['routeKey']);
//$channel->commitTransaction();
} catch(Exception $e) {
}
for($i=5;$i>0;$i--){
//生成消息
echo '發送時間:'.date("Y-m-d H:i:s", time()).PHP_EOL;
echo 'i='.$i.',延遲'.$i.'秒'.PHP_EOL;
$message = json_encode(['order_id'=>time(),'i'=>$i]);
$exchange->publish($message, $params['routeKey'], AMQP_NOPARAM, ['headers'=>['x-delay'=> 1000*$i]]);
sleep(2);
}
$conn->disconnect();
對於代碼來講,首先對於消費者核心代碼
$exchange->setType('x-delayed-message'); //x-delayed-message類型
$exchange->setArgument('x-delayed-type','direct');
生產者核心代碼
$exchange = new AMQPExchange($channel);
$exchange->setName($params['exchangeName']);
$exchange->setType('x-delayed-message'); //x-delayed-message類型
$exchange->setArgument('x-delayed-type','direct');
$exchange->declareExchange();
使用方法:先運行delay_consumer1.php,再運行delay_publisher1.php
運行效果:

原文 https://www.cnblogs.com/-mrl/p/11114116.html
