Redis實現異步消息隊列與延時隊列


 

異步消息隊列

說道消息隊列,你肯定會想到KafkaRabbitmq等消息中間件,這些專業的消息中間件提供了很多功能特性,當然他的部署使用維護都是比較麻煩的。如果你對消息隊列沒那么高要求,想要輕量級的,使用Redis就沒錯啦。

Redis通過list數據結構來實現消息隊列.主要使用到如下命令:

  • lpush和rpush入隊列

  • lpop和rpop出隊列

  • blpop和brpop阻塞式出隊列

廢話補不多說上代碼:

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

//發送消息
$redis->lPush($list, $value);

//消費消息
while (true) {
    try {
        $msg = $redis->rPop($list);
        if (!$msg) {
            sleep(1);
        }
        //業務處理
     
    } catch (Exception $e) {
        echo $e->getMessage();
    }
}

上面代碼會有個問題如果隊列長時間是空的,那pop就不會不斷的循環,這樣會導致redis的QPS升高,影響性能。所以我們使用sleep來解決,當沒有消息的時候阻塞一段時間。但其實這樣還會帶來另一個問題,就是sleep會導致消息的處理延遲增加。這個問題我們可以通過blpop/brpop 來阻塞讀取隊列。

blpop/brpop在隊列沒有數據的時候,會立即進入休眠狀態,一旦數據到來,則立刻醒過來。消息的延遲幾乎為零。用blpop/brpop替代前面的lpop/rpop,就完美解決了上面的問題。

還有一個需要注意的點是我們需要是用try/catch來進行異常捕獲,如果一直阻塞在那里,Redis服務器一般會主動斷開掉空鏈接,來減少閑置資源的占用。

延遲隊列

你是否在做電商項目的時候會遇到如下場景:

  • 訂單下單后超過一小時用戶未支付,需要關閉訂單

  • 訂單的評論如果7天未評價,系統需要自動產生一條評論

這個時候我們就需要用到延時隊列了,顧名思義就是需要延遲一段時間后執行。Redis可通過zset來實現。我們可以將有序集合的value設置為我們的消息任務,把value的score設置為消息的到期時間,然后輪詢獲取有序集合的中的到期消息進行處理。

實現代碼如下:

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

$redis->zAdd($delayQueue,$tts, $value);

while(true) {
    try{
        $msg = $redis->zRangeByScore($delayQueue,0,time(),0,1);
        if($msg){
            continue;
        }
        //刪除消息
        $ok = $redis.zrem($delayQueue,$msg);
        if($ok){
            //業務處理
        }
    } catch(\Exception $e) {

    }
}
 

這里又產生了一個問題,同一個任務可能會被多個進程取到之后再使用 zrem 進行爭搶,那些沒搶到的進程都是白取了一次任務,這是浪費。解決辦法:將 zrangebyscorezrem使用lua腳本進行原子化操作,這樣多個進程之間爭搶任務時就不會出現這種浪費了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM