rabbitmq 延時隊列 插件方式實現 每條消息都延時自己時間


 

上篇文章的延時是加到隊列上的 通過死信過時推送 ,缺點就是不能每條消息定義自己的過時時間而且每次有新的過時時間,要新建一個交換機和隊列 

https://www.cnblogs.com/brady-wang/p/13335104.html

 

rabbitmq還有種方式 要安裝一個插件  rabbitmq-delayed-message-exchange 

參考  https://www.cnblogs.com/brady-wang/p/13335243.html 

實現是安裝插件后交換機會多出一種 不過這種插件要安裝 好像mq版本至少3.7

 

 

最終生產者生產時候 頭部加上延時時間,那么他會存儲在交換機里面,到時了才投遞到對應隊列 

$exchange->setType('x-delayed-message'); //x-delayed-message類型

$exchange->publish($message, $params['routeKey'], AMQP_NOPARAM, ['headers'=>['x-delay'=> 1000*$i]]);

 

[root@localhost mq]# php delay_publish.php 
發送時間:2020-07-18 14:05:25
i=3600,延遲3600秒
[root@localhost mq]# php delay_publish.php 
發送時間:2020-07-18 14:05:28
i=3600,延遲3600秒
[root@localhost mq]# php delay_publish.php 
發送時間:2020-07-18 14:05:30
i=3600,延遲3600秒
[root@localhost mq]# php delay_publish.php 
發送時間:2020-07-18 18:52:21
i=3600,延遲3600秒
[root@localhost mq]# php delay_publish.php 
發送時間:2020-07-18 18:52:48
i=3600,延遲3600秒
^[[A
[root@localhost mq]# php delay_publish.php 
發送時間:2020-07-18 18:52:51
i=3600,延遲3600秒
^[[A[root@localhost mq]# php delay_publish.php 
發送時間:2020-07-18 18:52:54
i=3600,延遲3600秒

  

[root@localhost mq]# php delay_comsumer.php 
接收時間:2020-07-18 13:58:11
接收內容:{"order_id":1595051842,"i":5,"date":"2020-07-18 13:57:22"}
接收時間:2020-07-18 13:58:11
接收內容:{"order_id":1595051844,"i":4,"date":"2020-07-18 13:57:24"}
接收時間:2020-07-18 13:58:11
接收內容:{"order_id":1595051846,"i":3,"date":"2020-07-18 13:57:26"}
接收時間:2020-07-18 13:58:11
接收內容:{"order_id":1595051848,"i":2,"date":"2020-07-18 13:57:28"}
接收時間:2020-07-18 13:58:11
接收內容:{"order_id":1595051850,"i":1,"date":"2020-07-18 13:57:30"}

 

 

 

可以看到里面有4條消息延時了

代碼 如下

生產者 delay_publish.php

<?php

//header('Content-Type:text/html;charset=utf-8;');

$params = array(
	'exchangeName' => 'delayed_exchange_test',
	'queueName' => 'delayed_queue_test',
	'routeKey' => 'delayed_route_test',
);

$connectConfig = array(
	'host' => '127.0.0.1',
	'port' => 5672,
	'login' => 'admin',
	'password' => 'password',
	'vhost' => '/'
);

//var_dump(extension_loaded('amqp')); 判斷是否加載amqp擴展
//exit();
try {
	$conn = new AMQPConnection($connectConfig);
	$conn->connect();
	if (!$conn->isConnected()) {
		//die('Conexiune esuata');
		//TODO 記錄日志
		echo 'rabbit-mq 連接錯誤:', json_encode($connectConfig);
		exit();
	}
	$channel = new AMQPChannel($conn);
	if (!$channel->isConnected()) {
		// die('Connection through channel failed');
		//TODO 記錄日志
		echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
		exit();
	}
	$exchange = new AMQPExchange($channel);
	$exchange->setName($params['exchangeName']);
	$exchange->setType('x-delayed-message'); //x-delayed-message類型
	/*RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。

      fanout:把所有發送到該Exchange的消息投遞到所有與它綁定的隊列中。

      direct:把消息投遞到那些binding key與routing key完全匹配的隊列中。

      topic:將消息路由到binding key與routing key模式匹配的隊列中。*/
	$exchange->setArgument('x-delayed-type','direct');
	$exchange->declareExchange();

	//$channel->startTransaction();
	//RabbitMQ不容許聲明2個相同名稱、配置不同的Queue,否則報錯
	$queue = new AMQPQueue($channel);
	$queue->setName($params['queueName']);
	$queue->setFlags(AMQP_DURABLE);
	$queue->declareQueue();

	//綁定隊列和交換機
	$queue->bind($params['exchangeName'], $params['routeKey']);
	//$channel->commitTransaction();
} catch(Exception $e) {

}

//for($i=5;$i>0;$i--){
$i = 3600;
	//生成消息
	echo '發送時間:'.date("Y-m-d H:i:s", time()).PHP_EOL;
	echo 'i='.$i.',延遲'.$i.'秒'.PHP_EOL;
	$message = json_encode(['order_id'=>time(),'i'=>$i,'date'=>date("Y-m-d H:i:s")]);
	$exchange->publish($message, $params['routeKey'], AMQP_NOPARAM, ['headers'=>['x-delay'=> 1000*$i]]);
	sleep(2);
//}
$conn->disconnect();

  

消費者 delay_consumer.php

<?php

//header('Content-Type:text/html;charset=utf8;');

$params = array(
	'exchangeName' => 'delayed_exchange_test',
	'queueName' => 'delayed_queue_test',
	'routeKey' => 'delayed_route_test',
);
$connectConfig = array(
	'host' => '127.0.0.1',
	'port' => 5672,
	'login' => 'admin',
	'password' => 'password',
	'vhost' => '/'
);

//var_dump(extension_loaded('amqp'));

//exit();

try {
	$conn = new AMQPConnection($connectConfig);
	$conn->connect();
	if (!$conn->isConnected()) {
		//die('Conexiune esuata');
		//TODO 記錄日志
		echo 'rabbit-mq 連接錯誤:', json_encode($connectConfig);
		exit();
	}
	$channel = new AMQPChannel($conn);
	if (!$channel->isConnected()) {
		// die('Connection through channel failed');
		//TODO 記錄日志
		echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
		exit();
	}
	$exchange = new AMQPExchange($channel);
	//$exchange->setFlags(AMQP_DURABLE);//聲明一個已存在的交換器的,如果不存在將拋出異常,這個一般用在consume端
	$exchange->setName($params['exchangeName']);
	$exchange->setType('x-delayed-message'); //x-delayed-message類型
	/*RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。

      fanout:把所有發送到該Exchange的消息投遞到所有與它綁定的隊列中。

      direct:把消息投遞到那些binding key與routing key完全匹配的隊列中。

      topic:將消息路由到binding key與routing key模式匹配的隊列中。*/
	$exchange->setArgument('x-delayed-type','direct');
	$exchange->declareExchange();

	//$channel->startTransaction();

	$queue = new AMQPQueue($channel);
	$queue->setName($params['queueName']);
	$queue->setFlags(AMQP_DURABLE);
	$queue->declareQueue();

	//綁定
	$queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {
	echo $e->getMessage();
	exit();
}

function callback(AMQPEnvelope $message) {
	global $queue;
	if ($message) {
		$body = $message->getBody();
		echo '接收時間:'.date("Y-m-d H:i:s", time()). PHP_EOL;
		echo '接收內容:'.$body . PHP_EOL;
		//為了防止接收端在處理消息時down掉,只有在消息處理完成后才發送ack消息
		$queue->ack($message->getDeliveryTag());
	} else {
		echo 'no message' . PHP_EOL;
	}
}

//$queue->consume('callback');  第一種消費方式,但是會阻塞,程序一直會卡在此處

//第二種消費方式,非阻塞
/*$start = time();
while(true)
{
    $message = $queue->get();
    if(!empty($message))
    {
        echo $message->getBody();
        $queue->ack($message->getDeliveryTag());    //應答,代表該消息已經消費
        $end = time();
        echo '<br>' . ($end - $start);
        exit();
    }
    else
    {
        //echo 'message not found' . PHP_EOL;
    }
}*/

//注意:這里需要注意的是這個方法:$queue->consume,queue對象有兩個方法可用於取消息:consume和get。前者是阻塞的,無消息時會被掛起,適合循環中使用;后者則是非阻塞的,取消息時有則取,無則返回false。
//就是說用了consume之后,會同步阻塞,該程序常駐內存,不能用nginx,apache調用。
$action = '2';

if($action == '1'){
	$queue->consume('callback');  //第一種消費方式,但是會阻塞,程序一直會卡在此處
}else{
	//第二種消費方式,非阻塞
	$start = time();
	while(true)
	{
		$message = $queue->get();
		if(!empty($message))
		{
			echo '接收時間:'.date("Y-m-d H:i:s", time()). PHP_EOL;
			echo '接收內容:'.$message->getBody().PHP_EOL;
			$queue->ack($message->getDeliveryTag());    //應答,代表該消息已經消費
			$end = time();
			//echo '運行時間:'.($end - $start).'秒'.PHP_EOL;
			//exit();
		}
		else
		{
			//echo 'message not found' . PHP_EOL;
		}
	}
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM