采用簡易的環形延時隊列處理秒級定時任務的解決方案


 業務背景

在稍微復雜點業務系統中,不可避免會碰到做定時任務的需求,比如淘寶的交易超時自動關閉訂單、超時自動確認收貨等等。對於一些定時作業比較多的系統,通常都會搭建專門的調度平台來管理,通過創建定時器來周期性執行任務。如剛才所說的場景,我們可以給訂單創建一個專門的任務來處理交易狀態,每秒輪詢一次訂單表,找出那些符合超時條件的訂單然后標記狀態。這是最簡單粗暴的做法,但明顯也很low,自己都下不去手寫這樣的代碼,所有必須要找個更好的方案。

回到真實項目中的場景,系統中某個活動上線后要給目標用戶發送短信通知,這些通知需要按時間點批量發送。雖然已經基於quartz.net給系統搭建了任務調度平台,但着實不想用上述方案來實現。在網上各種搜索和思考,找到一篇文章讓我眼前一亮,稍加分析發現里面的思路完全符合現在的場景,於是決定在自己項目中實現出來。

 

原理分析

 這種方案的核心就是構造一種數據結構,稱之為環形隊列,但實際上還是一個數組,加上對它的循環遍歷,達到一種環狀的假象。然后再配合定時器,就可以實現按需延時的效果。上面提到的文章中也介紹了實現思路,這里我采用我的理解再更加詳細的解釋一下。

我們先為這個數組分配一個固定大小的空間,比如60,每個數組的元素用來存放任務的集合。然后開啟一個定時器每隔一秒來掃描這個數組,掃完一圈剛好是一分鍾。如果提前設置好任務被掃描的圈數(CycleNum)和在數組中的位置(Slot),在剛好掃到數組的Slot位置時,集合里那些CycleNum為0的任務就是達到觸發條件的任務,拉出來做業務操作然后移除掉,其他的把圈數減掉一次,然后留到下次繼續掃描,這樣就實現了延時的效果。原理如下圖所示:

可以看出中間的重點是計算出每個任務所在的位置以及需要循環的圈數。假設當前時間為15:20:08,當前掃描位置是2,我的任務要在15:22:35這個時刻觸發,也就是147秒后。那么我需要循環的圈數就是147/60=2圈,需要被掃描的位置就是(147+2)%60=29的地方。計算好任務的坐標后塞到數組中屬於它的位置,然后靜靜等待被消費就好啦。

 

擼碼實現

光講原理不上代碼怎么能行呢,根據上面的思路,下面一步步在.net平台下實現出來。

先做一些基礎封裝。

首先構造任務參數的基類,用來記錄任務的位置信息和定義業務回調方法:

    public class DelayQueueParam
    {
        internal int Slot { get; set; }

        internal int CycleNum { get; set; }

        public Action<object> Callback { get; set; }
    }

接下來是核心地方。再構造隊列的泛型類,真實類型必須派生自上面的基類,用來擴展一些業務字段方便消費時使用。隊列的主要屬性有當前位置指針以及數組容器,主要的操作有插入、移除和消費。插入任務時需要傳入執行時間,用來計算這個任務的坐標。

    public class DelayQueue<T> where T : DelayQueueParam
    {
        private List<T>[] queue;

        private int currentIndex = 1;

        public DelayQueue(int length)
        {
            queue = new List<T>[length];
        }

        public void Insert(T item, DateTime time)
        {
            //根據消費時間計算消息應該放入的位置
            var second = (int)(time - DateTime.Now).TotalSeconds;
            item.CycleNum = second / queue.Length;
            item.Slot = (second + currentIndex) % queue.Length;
            //加入到延時隊列中
            if (queue[item.Slot] == null)
            {
                queue[item.Slot] = new List<T>();
            }
            queue[item.Slot].Add(item);
        }

        public void Remove(T item)
        {
            if (queue[item.Slot] != null)
            {
                queue[item.Slot].Remove(item);
            }
        }

        public void Read()
        {
            if (queue.Length >= currentIndex)
            {
                var list = queue[currentIndex - 1];
                if (list != null)
                {
                    List<T> target = new List<T>();
                    foreach (var item in list)
                    {
                        if (item.CycleNum == 0)
                        {
                            //在本輪命中,用單獨線程去執行業務操作
                            Task.Run(()=> { item.Callback(item); });
                            target.Add(item);
                        }
                        else
                        {
                            //等下一輪
                            item.CycleNum--;
                            System.Diagnostics.Debug.WriteLine($"@@@@@索引:{item.Slot},剩余:{item.CycleNum}");
                        }
                    }
                    //把已過期的移除掉
                    foreach (var item in target)
                    {
                        list.Remove(item);
                    }
                }
                currentIndex++;
                //下一遍從頭開始
                if (currentIndex > queue.Length)
                {
                    currentIndex = 1;
                }
            }
        }
    }

接下來是使用方法。

創建一個管理隊列實例的靜態類,里面封裝對隊列的操作:

    public static class NotifyPlanManager
    {
        private static DelayQueue<NotifyPlan> _queue = new DelayQueue<NotifyPlan>(60);

        public static void Insert(NotifyPlan plan, DateTime time)
        {
            _queue.Insert(plan, time);
        }

        public static void Read()
        {
            _queue.Read();
        }
    }

構建我們的實際業務參數類,派生自DelayQueueParam:

    public class NotifyPlan : DelayQueueParam
    {
        public Guid CamId { get; set; }

        public int PreviousTotal { get; set; }

        public int Amount { get; set; }
    }

生產端往隊列中插入數據:

    Action<object> callback = (result) =>
    {
        var np = result as NotifyPlan;
        //這里做自己的業務操作
        //舉個例子:
        Debug.WriteLine($"活動ID:{np.CamId},已發送數量:{np.PreviousTotal},本次發送數量:{np.Amount}");
    };
    NotifyPlanManager.Insert(new NotifyPlan
    {
        Amount = set.MainAmount,
        CamId = camId,
        PreviousTotal = 0,
        Callback = callback
    }, smsTemplate.SendDate);

再創建一個每秒執行一次的定時器用做消費端,我這里使用的是FluentScheduler,核心代碼:

    internal class NotifyPlanJob : IJob
    {
        /// <summary>
        /// 執行計划
        /// </summary>
        public void Execute()
        {
            NotifyPlanManager.Read();
        }
    }

    internal class JobFactory : Registry
    {
        public JobFactory()
        {
            //每秒運行一次
            Schedule<NotifyPlanJob >().ToRunEvery(1).Seconds();
        }
    }

  JobManager.Initialize(new JobFactory());

然后開啟調試運行,打開本機的系統時間面板,對着時間看輸出結果。親測有效。

 

總結

 這種方案的好處是避免了頻繁地掃描數據庫和不必要的業務操作,另外也很方便控制時間精度。帶來的問題是如果web服務異常或重啟可能會發生任務丟失的情況,我目前的處理方法是在數據庫中標記任務狀態,服務啟動時把狀態為“排隊中”的任務重新加載到隊列中等待消費。

以上方案在單機環境測試沒問題,多節點情況下暫時沒有深究。若有設計實現上的缺陷,歡迎討論與指正,要是有更好的方案,那就當拋磚引玉,再好不過了~

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM