.net core中使用redis 實現延遲隊列


一、項目場景:

添加任務並設定任務的執行時間,然后按時間順序由近到遠依次執行。

二、思路:

可以利用redis的有序集合(SortedSet),用時間戳排序實現,大概的流程如下。

三、關鍵思路&代碼段

  1. 寫入任務

使用任務下一次的執行時間按分鍾生成key,將同一分鍾待執行的任務放到一個key中,這一步主要思考的問題是:拆分隊列,設置各自的過期時間,如:過期時間 = 執行時間 + 5分鍾,保證過期的隊列自動刪除,不會造成后續因消費能力不足而導致redis持續膨脹。

IDictionary<double, string> dic = new Dictionary<double, string>()
{
    {
       interLineCacheModel.NextHandTimeSpan,
       interLineCacheModel.CacheRoute
    }
};
var taskKey = GetKey(interLineCacheModel.NextHandTime);
await _buildInterLineRepository.ZAddAsync(taskKey, dic);
await _buildInterLineRepository.ExpireAsync(taskKey, new DateTimeOffset(interLineCacheModel.NextHandTime.AddMinutes(5)).ToUnixTimeSeconds());
private string GetKey(DateTime dateTime)
{
    return $"IRTask{dateTime.ToString("yyyyMMddHHmm")}";
}
  1. 消費服務

因為是一個有序集合,所以隊列會自動按時間戳的大小來排序,這樣就自動實現了由近到遠依次執行,使用當前時間生成key,來獲取對應的task,每次可以獲取1條或者N條。

var taskKey = GetKey(DateTime.Now);
var routeList = await _buildInterLineRepository.ZRangeAsync(taskKey, 0, 0);

再拿到對應的執行時間戳,與當前時間做比較,如果還沒有到執行時間就跳出隊列。

var nextHandleTs = await _buildInterLineRepository.ZScoreAsync(taskKey, route);
if (long.TryParse(nextHandleTs, out var nextHandleTimeSpan))
{
    var nextHandleTime = DateTimeOffset.FromUnixTimeMilliseconds(nextHandleTimeSpan).ToBeiJingDateTime();
    if (nextHandleTime > DateTime.Now)
    {
       continue;
    }
}

最后一步,使用ZRemAsync,以確保誰刪除成功誰執行,解決多台機器同時拿到數據的問題

var success = await _buildInterLineRepository.ZRemAsync(taskKey, route) > 0;
if (success)
{
    //todo
}

注意事項:因為我們是按時間分鍾來生成key,所以到時間臨界點的時候,如果消費能力不足會導致key仍然有遺留任務,如果對此很敏感,可以在臨界點將時間回退1秒,再獲取出所有的任務。stop = -1 能拿出所有集合。

if second = 0
{
    addsecond(-1);
    var routeList = await _buildInterLineRepository.ZRangeAsync(taskKey, 0, -1);
}

四、注冊消費服務

方式很多,這里使用的是IHostedService實現后台任務,具體可以參考:https://docs.microsoft.com/zh-cn/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-3.0&tabs=visual-studio

public class InterLineLoopService : IHostedService
    {
        private readonly ILog _log;
        private readonly IInterLineTaskService _interLineTaskService;


        public InterLineLoopService(
            ILog log,
            IInterLineTaskService interLineTaskService)
        {
            _log = Guard.ArgumentNotNull(nameof(log), log);
            _interLineTaskService = Guard.ArgumentNotNull(nameof(interLineTaskService), interLineTaskService);
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _log.Info("LoopDelayMessage start....", "Domain", "InterLineLoopService", "StartAsync");
            return _interLineTaskService.LoopDelayMessage();
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            return Task.CompletedTask;
        }
    }

再到startup中注冊下

services.AddHostedService<InterLineLoopService>();


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM