靈感來襲,基於Redis的分布式延遲隊列


延遲隊列

延遲隊列,也就是一定時間之后將消息體放入隊列,然后消費者才能正常消費。比如1分鍾之后發送短信,發送郵件,檢測數據狀態等。

Redisson Delayed Queue

如果你項目中使用了redisson,那么恭喜你,使用延遲隊列將非常的簡單。

 

基於Redis的Redisson分布式延遲隊列(Delayed Queue)結構的RDelayedQueue Java對象在實現了RQueue接口的基礎上提供了向隊列按要求延遲添加項目的功能。該功能可以用來實現消息傳送延遲按幾何增長或幾何衰減的發送策略。

RQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
// 10秒鍾以后將消息發送到指定隊列
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// 一分鍾以后將消息發送到指定隊列
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

在該對象不再需要的情況下,應該主動銷毀。僅在相關的Redisson對象也需要關閉的時候可以不用主動銷毀。

Java DelayQueue

DelayQueue它本質上是一個隊列,而這個隊列里也只有存放Delayed的子類才有意義。

延遲隊列demo

public class DelayTask implements Delayed {
    private long startDate;
    public DelayTask(Long delayMillions) {
        this.startDate = System.currentTimeMillis() + delayMillions;
    }


    @Override
    public int compareTo(Delayed o) {
        Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));
    }


    @Override
    public long getDelay(TimeUnit unit) {
        return this.startDate - System.currentTimeMillis();
    }

    public static void main(String[] args) throws Exception {
        BlockingQueue<DelayTask> queue = new DelayQueue<>();
        DelayTask delayTask = new DelayTask(1000 * 5L);
        queue.put(delayTask);
        while (queue.size()>0){
            queue.take();
        }
    }
}

延遲隊列消費原理

源碼中出現了三次await字眼:
  • 第一次是當隊列為空時,等待;
  • 第二次等待是因為,發現有任務,沒有到執行時間,並且有准備執行的線程(leader),那不好意思,還得接續等待直到下一個可執行的任務。
  • 第三次是真正延時的地方了,available.awaitNanos(delay),此時也沒有別的線程要執行,也就是我將要執行,等待剩下的延遲時間即可。

延遲隊列生產原理

為保證消費者正常消費,如果優先隊列頭元素和當前放入元素相等,則說明當前元素消費的優先級高,重置准備消費的線程(leader)為null,喚醒消費者線程重新執行take方法邏輯。

手寫一個Redis延遲隊列

Redis延遲隊列設計

延遲消息體設計

延遲消息體Message實現了Delayed接口,這樣Java DelayQueue就知道什么時候取出消息體。

Redis延遲隊列實現

RedisDelayQueue構造函數依賴redis操作緩存服務對象目標隊列名稱(redis key)。

offer方法傳入member(具體消息),delay(延遲時間),timeUnit(時間單位),然后封裝成延遲消息體Message對象,放入Java DelayQueue中。

run方法是一個循環體,不斷的從Java DelayQueue對象中獲取消息體,然后放入redis對應的目標隊列里。

延遲隊列測試demo

控制台打印效果

思考

這種方案實現比較簡單,使用的時候一定要謹慎,應用於延遲小,消息量不大的場景是沒問題的,畢竟Java DelayQueue是占用內存的。另外也可以考慮利用Redis的sorted set 結構實現延遲隊列【靈感來襲,基於Redis的分布式延遲隊列(續)】,使用TimeStamp作為score,比如你的任務是要延遲5分鍾,那么就在當前時間上加5分鍾作為 score ,輪詢任務每秒只輪詢 score 小於等於 當前時間的 key即可,如果任務支持有誤差,那么當沒有掃描到有效數據的時候可以休眠對應時間再繼續輪詢。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM