異步消息隊列
說道消息隊列,你肯定會想到Kafka
、Rabbitmq
等消息中間件,這些專業的消息中間件提供了很多功能特性,當然他的部署使用維護都是比較麻煩的。如果你對消息隊列沒那么高要求,想要輕量級的,使用Redis就沒錯啦。
Redis通過list
數據結構來實現消息隊列.主要使用到如下命令:
-
lpush和rpush入隊列
-
lpop和rpop出隊列
-
blpop和brpop阻塞式出隊列
廢話補不多說上代碼:
$redis = new Redis(); $redis->connect('127.0.0.1', 6379); //發送消息 $redis->lPush($list, $value); //消費消息 while (true) { try { $msg = $redis->rPop($list); if (!$msg) { sleep(1); } //業務處理 } catch (Exception $e) { echo $e->getMessage(); } }
上面代碼會有個問題如果隊列長時間是空的,那pop就不會不斷的循環,這樣會導致redis的QPS升高,影響性能。所以我們使用sleep
來解決,當沒有消息的時候阻塞一段時間。但其實這樣還會帶來另一個問題,就是sleep
會導致消息的處理延遲增加。這個問題我們可以通過blpop/brpop
來阻塞讀取隊列。
blpop/brpop
在隊列沒有數據的時候,會立即進入休眠狀態,一旦數據到來,則立刻醒過來。消息的延遲幾乎為零。用blpop/brpop替代前面的lpop/rpop,就完美解決了上面的問題。
還有一個需要注意的點是我們需要是用try/catch
來進行異常捕獲,如果一直阻塞在那里,Redis服務器一般會主動斷開掉空鏈接,來減少閑置資源的占用。
延遲隊列
你是否在做電商項目的時候會遇到如下場景:
-
訂單下單后超過一小時用戶未支付,需要關閉訂單
-
訂單的評論如果7天未評價,系統需要自動產生一條評論
這個時候我們就需要用到延時隊列了,顧名思義就是需要延遲一段時間后執行。Redis可通過zset
來實現。我們可以將有序集合的value設置為我們的消息任務,把value的score設置為消息的到期時間,然后輪詢獲取有序集合的中的到期消息進行處理。
實現代碼如下:
$redis = new Redis(); $redis->connect('127.0.0.1', 6379); $redis->zAdd($delayQueue,$tts, $value); while(true) { try{ $msg = $redis->zRangeByScore($delayQueue,0,time(),0,1); if($msg){ continue; } //刪除消息 $ok = $redis.zrem($delayQueue,$msg); if($ok){ //業務處理 } } catch(\Exception $e) { } }
這里又產生了一個問題,同一個任務可能會被多個進程取到之后再使用 zrem 進行爭搶,那些沒搶到的進程都是白取了一次任務,這是浪費。解決辦法:將 zrangebyscore
和zrem
使用lua腳本進行原子化操作,這樣多個進程之間爭搶任務時就不會出現這種浪費了。