redis 隊列及ACK代碼實現


概述

使用PHP+Redis簡單實現一下隊列以及ACK,確保服務的可靠性。

流程圖

流程圖

代碼

    

RedisQueueAbstract
<?php

namespace Redis\Queue;

use Redis\Libs\RedisClient;
use Redis\Queue\QueueTrait;

abstract class RedisQueueAbstract
{
use QueueTrait;

// redis
protected $redis;

// 隊列
protected $queue;

// 隊列值
protected $queueVal;

// 自動 Ack
protected $autoAck = true;

// 隊列阻塞時間
protected $queueTimeout = 20;

// 無數據休眠時間
protected $sleepTimeout = 5;

// 處理超時時間,最小10秒
protected $execTimeout = 60;

public function __construct()
{
$this->redis = (new RedisClient)->connect();
}

/**
* 處理取出的隊列數據
*
* @param array $data
* @return mixed
*/
abstract protected function handle(array $data);

/**
* 設置隊列
*
* @param string $queue
* @return self
*/
public function setQueue(string $queue): self
{
$this->queue = $queue;
return $this;
}

/**
* 加入隊列
*
* @param array $data
* @return integer
*/
public function dispatch(array $data): bool
{
return (bool) $this->redis->lPush($this->queue, $this->handleData($data));
}

/**
* 啟動消費者
*
* @return void
*/
public function run(): void
{
$this->start();
}

/**
* ACK操作,刪除ack隊列的消息
*
* @return void
*/
public function ack(): void
{
$this->redis->lRem($this->getACKQueue(), $this->queueVal, 1);
}

/**
* 開始
*
* @return void
*/
private function start(): void
{
while (true) {
try {
$queueVal = $this->redis->bRPopLPush($this->queue, $this->getACKQueue(), $this->queueTimeout);
if (! $queueVal) {
echo sprintf("sleep %d秒" . PHP_EOL, $this->sleepTimeout);
sleep($this->sleepTimeout);
continue;
}
$this->queueVal = $queueVal;
$this->alterAckQueueVal();
$this->handle($this->decodeData($this->queueVal)['data']);
if ($this->autoAck) {
$this->ack();
}
} catch (\Exception $e) {
echo $e->getMessage();
continue;
}
}
}

/**
* 處理加入隊列的數據
*
* @param array $data
* @return string
*/
private function handleData(array $data): string
{
return $this->encodeData([
'queue' => $this->queue,
'data' => $data,
'join_date' => date('Y-m-d H:i:s'),
'timeout' => $this->execTimeout
]);
}

/**
* 修改剛加入ack隊列的值
*
* @return void
*/
private function alterAckQueueVal(): void
{
$val = $this->decodeData($this->queueVal);
$val['pop_date'] = date('Y-m-d H:i:s');
$val['timeout_date'] = date('Y-m-d H:i:s', (time() + min($this->execTimeout, 10)));
$val['timeout'] = $this->execTimeout;
$newQueueVal = $this->encodeData($val);
$this->redis->multi()
->lPush($this->getACKQueue(), $newQueueVal)
->lRem($this->getACKQueue(), $this->queueVal, 1)
->exec();
$this->queueVal = $newQueueVal;
}
}


AckQueue
<?php

namespace Redis\Queue;

use Redis\Libs\RedisClient;
use Redis\Queue\QueueTrait;

class AckQueue
{

use QueueTrait;

protected $redis;

protected $ackQueue = 'list:ack';

protected $queueValList = [];

public function __construct()
{
$this->redis = (new RedisClient)->connect();
}

public function run(): void
{
try{
if (($len = $this->redis->lLen($this->ackQueue)) == 0) {
echo '無數據,跳過';
return;
}
for ($i=0; $i < $len; $i++) {
$queueVal = $this->redis->lIndex($this->getACKQueue(), $i);
if (! $queueVal) {
break;
}
$queueValData = $this->decodeData($queueVal);
$originQueue = $queueValData['queue'];
if (! array_key_exists('timeout_date', $queueValData)) {
// 防止誤處理剛加入隊列的數據
if ((strtotime($queueValData['join_date']) + $queueValData['timeout']) > time()) {
echo '保險起見,暫不處理';
continue;
}
if (in_array($queueVal, $this->queueValList)) {
echo '第二次處理';
$this->makeValToOriginQueue($originQueue, $queueVal);
} else {
$this->queueValList[] = $queueVal;
echo '第一次不處理';
sleep(5);
}
$i--;
continue;
}
if (strtotime($queueValData['timeout_date']) < time()) {
$this->makeValToOriginQueue($originQueue, $queueVal);
$i--;
continue;
}
}
} catch(\Exception $e) {
echo $e->getMessage();
}
}

/**
* 還原隊列數據到原隊列
*
* @param string $originQueue
* @param string $queueVal
* @return void
*/
public function makeValToOriginQueue(string $originQueue, string $queueVal): void
{
$queueValData = $this->decodeData($queueVal);
$queueValData['timeout_date'] = date('Y-m-d H:i:s', (time() + min($queueValData['timeout'], 10)));
$nowQueueVal = $this->encodeData($queueValData);
$this->redis->multi()
->rPush($originQueue, $nowQueueVal)
->lRem($this->getACKQueue(), $queueVal, 1)
->exec();
}
}


QueueTrait
<?php

namespace Redis\Queue;

trait QueueTrait
{

/**
* 數據編碼
*
* @param array $data
* @return string
*/
private function encodeData(array $data): string
{
return json_encode($data);
return base64_encode(json_encode($data));
}

/**
* 數據解碼
*
* @return array
*/
private function decodeData(string $data): array
{
return json_decode($data, true);
return json_decode(base64_decode($data), true);
}

/**
* 獲取ACK隊列
*
* @return string
*/
private function getACKQueue(): string
{
return 'list:ack';
}
}

使用
消費者
<?php

namespace Example;

use Redis\Queue\RedisQueueAbstract;;

class ExampleQueue extends RedisQueueAbstract
{

protected $queue;

protected $autoAck = false;

protected $execTimeout = 30;


protected function handle(array $data)
{
print_r($data);
}
}

<?php

namespace Example;

require_once __DIR__ . '/../vendor/autoload.php';

use Example\ExampleQueue;

(new ExampleQueue())->run();


ACK
定時任務執行

<?php

namespace Example;

require_once __DIR__ . '/../vendor/autoload.php';

use Redis\Queue\AckQueue;

(new AckQueue())->run();

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM