使用Redis的List(列表)命令實現消息隊列,生產者使用lPush命令發布消息,消費者使用rpoplpush命令獲取消息,同時將消息放入監聽隊列,如果處理超時,監聽者將把消息彈回消息隊列
1.用到的List(列表)命令
命令 | 作用 |
---|---|
lPush | 將一個或多個值插入到列表頭部 |
rpoplpush | 彈出列表最后一個值,同時插入到另一個列表頭部,並返回該值 |
lRem | 刪除列表內的給定值 |
lIndex | 按索引獲取列表內的值 |
2.隊列的組成
名稱 | 職責 |
---|---|
生產者 | 發布消息 |
消費者 | 獲取並處理消息 |
監聽者 | 監聽超時的消息,彈回原消息隊列,確保消費者掛掉后或處理失敗后消息能被其他消費者處理 |
3.php實現代碼
生產者Producter.php
<?php try { //聲明消息隊列-list的鍵名 $queueKey = 'testQueueKey'; $redis = new Redis(); $redis->connect('ip', 6379); //向列表中push10條消息 for ($i = 0;$i < 10;$i++){ //為消息生成唯一標識 $uniqid = uniqid(mt_rand(10000, 99999).getmypid().memory_get_usage(), true); $ret = $redis->lPush($queueKey, json_encode(array('uniqid' => $uniqid, 'key' => 'key-'.$i, 'value' => 'data'))); var_dump($ret); } } catch (Exception $e){ echo $e->getMessage(); }
消費者Consumer.php
<?php try { //聲明消息隊列-list的鍵名 $queueKey = 'testQueueKey'; //聲明監聽者隊列-list的鍵名 $watchQueueKey = 'watchQueueKey'; $redis = new Redis(); $redis->connect('ip', 6379); //隊列先進先出,彈出最先加入的消息,同時放入監聽隊列 while (true){ $ret = $redis->rpoplpush($queueKey, $watchQueueKey); if ($ret === false){ sleep(1); } else { $retArray = json_decode($ret, true); //將唯一id寫入緩存設置有效期 $redis->setex($retArray['uniqid'], 60, 0); //模擬失敗 $rand = mt_rand(0,9); if ($rand < 3){ echo "failure:".$ret."\n"; } else { //todo //處理成功移除消息 $redis->lRem($watchQueueKey, $ret, 0); echo "success:".$ret."\n"; } } } } catch (Exception $e){ echo $e->getMessage(); }
監聽者Watcher.php
<?php try { //聲明消息隊列-list的鍵名 $queueKey = 'testQueueKey'; //聲明監聽者隊列-list的鍵名 $watchQueueKey = 'watchQueueKey'; $redis = new Redis(); $redis->connect('ip', 6379); while (true){ //取出列表尾部的一個值 $ret = $redis->lIndex($watchQueueKey, -1); //如果不存在則休眠1秒 if ($ret === false){ sleep(1); } else { $retArray = json_decode($ret, true); $idCache = $redis->get($retArray['uniqid']); if ($idCache === false){ //如果已過期,表示任務超時,彈回原隊列 $redis->rpoplpush($watchQueueKey, $queueKey); echo "rpoplpush:".$ret."\n"; } else { //處理中,繼續等待 sleep(1); } } } } catch (Exception $e){ echo $e->getMessage(); }
4.執行隊列
開啟監聽者php Watcher.php
開啟消費者php Consumer.php
執行生產者php Producter.php
生產者輸出
int(1) int(2) int(3) int(4) int(5) int(6) int(7) int(8) int(9) int(10)
監聽者輸出
rpoplpush:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"} rpoplpush:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"} rpoplpush:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"} rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"} rpoplpush:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"} rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
消費者輸出
success:{"uniqid":"47280267323557445c4bde640dbfb4.78962728","key":"key-0","value":"data"} failure:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"} success:{"uniqid":"39394267323642245c4bde640de992.34641654","key":"key-2","value":"data"} success:{"uniqid":"41335267323642245c4bde640df980.38466514","key":"key-3","value":"data"} failure:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"} failure:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"} failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"} success:{"uniqid":"43817267323642245c4bde640ec189.44008738","key":"key-7","value":"data"} success:{"uniqid":"69276267323642245c4bde640ecb91.04877522","key":"key-8","value":"data"} failure:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"} success:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"} success:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"} success:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"} failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"} success:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"} success:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
我們看到消費者第一次執行時失敗的消息,超時后又被彈回了消息隊列,消費者有了再次執行的機會,監聽者的職責就是確保消費者執行失敗或掛掉后消息還能再彈回原隊列得到再次執行
轉自:https://www.jmsite.cn/blog-615.html