借助Redis完成延時任務


背景

相信我們或多或少的會遇到類似下面這樣的需求:

第三方給了一批數據給我們處理,我們處理好之后就通知他們處理結果。

大概就是下面這個圖說的。

本來在處理完數據之后,我們就會馬上把處理結果返回給對方,但是對方要求我們處理速度不能過快,要有一種人為處理的效果。

換句話就是說,就算是處理好了,也要晚一點再執行通知操作。

這就是一個典型的延時任務。

延時,那還不簡單,執行完之后,讓它Sleep一下就好了,這樣就達到目標了。

Sleep一下確定是最容易實現的一種方案,但是試想一下,數據的數量不斷的增加,這樣Sleep真的好嗎?答案是否定的。

延時隊列,是處理這個場景最為妥當的方案。

RabbitMQ,RocketMQ,Cmq等都可以直接或間接的達到相應的效果。

如果不具備隊列條件,又要怎么處理呢?還可以借助Redis來完成這項工作。

MQ不一定每個公司都會用,但Redis應該80%以上的都會用吧。

處理方案

Redis這邊,可用的方案有兩種,下面分別來介紹一下。

#1 鍵的過期時間

在設置緩存的時候,我們比較多情況下都會設置一個緩存的過期時間,這個時間過期后,會重新去數據源拿數據回來。

可以基於這個過期時間結合Redis的keyspace notifications共同完成。

keyspace notifications里面包含了非常多的事件,這里只關注EXPIRE,這個是和過期有關的。

只要訂閱了__keyevent@0__:expired這個主題,當有key過期的時候,就會收到對應的信息。

注:主題@后面的0,指的是db 0.

要想使用這個特性,必不可少的一步是修改Redis默認的配置,把notify-keyspace-events設置成Ex

############################# Event notification ##############################  
 
# Redis can notify Pub/Sub clients about events happening in the key space.  
# This feature is documented at http://redis.io/topics/notifications  
#  
# .........  
#  
#  By default all notifications are disabled because most users don't need  
#  this feature and the feature has some overhead. Note that if you don't  
#  specify at least one of K or E, no events will be delivered.  
notify-keyspace-events "Ex"  

其中 E 指的是鍵事件通知,x 指的是過期事件。

根據這個特性,重新調整一下流程圖:

應該也比較好懂,下面通過簡單的代碼來實現一下這種方案。

首先是處理完數據及往Redis寫數據。

public async Task DoTaskAsync()
{
    // 數據處理
    // ...

    // 后續操作要延時,把Id記錄下來
    var taskId = new Random().Next(1, 10000);
    // 要延遲的時間
    int sec = new Random().Next(1, 5);

    // 可以加個重試機制,預防單次執行失敗。
    await RedisHelper.SetAsync($"task:{taskId}", "1", sec);
}

還需要回傳結果的后台任務,這個任務就是去訂閱上面說的鍵過期事件,然后回傳結果。

這里可以借助BackgroundService來訂閱處理。

public class SubscribeTaskBgTask : BackgroundService
{
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        stoppingToken.ThrowIfCancellationRequested();
        var keyPrefix = "task:";
        RedisHelper.Subscribe(
            ("__keyevent@0__:expired", arg =>
                {
                    var msg = arg.Body;
                    Console.WriteLine($"recive {msg}");
                    if (msg.StartsWith(keyPrefix))
                    {
                        // 取到任務Id
                        var val = msg.Substring(keyPrefix.Length);
                        Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");
                        
                        // 回傳處理結果給第三方,這里可以考慮這個並發鎖,避免多實例都處理了這個任務。
                        // ....
                    }
                }
            ));

        return Task.CompletedTask;
    }
}

這里有一個要注意的地方,要在key里面包含任務的Id,因為訂閱處理的時候,只能拿到一個key,后續能做的操作也只是基於這個key。

上面的例子,是用了task:任務Id的形式,所以在訂閱處理的時候,只處理以task:開頭的那些key。

效果如下:

這種方案,直觀上是非常簡單的,不過這種方案會遇到一個小問題。

當一個key過期后,並不一定會馬上收到通知,這個也是會有一定的延時的,取決於Redis的內部機制。

Redis Keyspace Notifications文檔的最后一段也提到了這個問題。

所以用這種方案的時候,要考慮一下,你的延時是不是要及時~~

#2 有序集合

有序集合是Redis中一種十分有用的數據結構,它的本質其實就是集合加了一個排序的功能,每個集合里面的元素還會有一個分值的屬性。

它提供了一個可以獲取指定分值范圍內的元素,這個也就是我們的出發點。

在這個場景下,什么東西可能作為這個分值呢?現在只有一個處理任務的Id還有一個延遲的時間,Id肯定不行,那么也只能是延遲時間來作這個分值了。

延遲1秒,5秒,1分鍾,這個都是比較大粒度的時間,這里要轉化一下,用時間戳來代替這些延遲的時間。

假設現在的時間戳是 1584171520, 要延遲5秒執行,那么執行任務的時間就是 1584171525,在當前時間戳的基礎上加個5秒,就是最終要執行的了。

到時有序集合中存的元素就會是這樣的

任務Id-1 1584171525
任務Id-2 1584171528
任務Id-3 1584171530

接下來就是要怎么取出這些任務的問題了!

把當前時間戳當成是取數的最大分值,0作為最小分值,這個時候取出的元素就是應該要執行回傳的任務了。

根據這個方案,重新調整一下流程圖:

交代清楚了思路,再來點代碼,加深一下理解。

首先還是處理完數據后往Redis寫數據。

public async Task DoTaskAsync()
{
    // 數據處理
    // ...

    // 后續操作要延時,把Id記錄下來
    var taskId = new Random().Next(1, 10000);
    
    var cacheKey = "task:delay";
    int sec = new Random().Next(1, 5);
    
    // 要執行這個任務的時間戳
    var time = DateTimeOffset.Now.AddSeconds(sec).ToUnixTimeSeconds();
    
    await RedisHelper.ZAddAsync(cacheKey, (time, taskId));
    Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} done {taskId} here - {sec}");
}

后面就是輪訓有序集合里面的元素了,這里同樣是借助BackgroundService來處理。

public class SubscribeTaskBgTask : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        stoppingToken.ThrowIfCancellationRequested();
        var cacheKey = "task:delay";
        while (true)
        {
            // 先取,后刪,不具備原子性,可考慮用lua腳本來保證原子性。
            var vals = await RedisHelper.ZRangeByScoreAsync(cacheKey, -1, DateTimeOffset.Now.ToUnixTimeSeconds(), 1, 0);

            if (vals != null && vals.Length > 0)
            {
                var val = vals[0];

                var rmCount = await RedisHelper.ZRemAsync(cacheKey, vals);

                if (rmCount > 0)
                {
                    // 要把這個元素先刪除成功了,再執行任務,不然會重復
                    Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");
                    
                    // 回傳處理結果給第三方,這里可以考慮這個並發鎖,避免多實例都處理了這個任務。
                    // ....
                }
            }
            else
            {
                // 沒有數據,休眠500ms,避免CPU空轉
                await Task.Delay(500);
            }
        }
    }
}

效果如下:

參考文章

https://redis.io/topics/notifications

https://zhuanlan.zhihu.com/p/87113913

本文首發於我的個人公眾號,歡迎大家關注

借助Redis完成延時任務


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM