c#之Redis隊列在郵件提醒中的應用


場景

有這樣一個場景,一個郵件提醒的windows服務,獲取所有開啟郵件提醒的用戶,循環獲取這些用戶的郵件,發送一條服務號消息。但問題來了,用戶比較少的情況下,輪詢一遍時間還能忍受,如果用戶多了,那用戶名稱排序靠后的人,收到郵件提醒的消息,延遲時間就非常長了。

准備

c#之Redis實踐list,hashtable

c#之Redis隊列

方案

1、生產者線程一獲取所有開啟郵件提醒的用戶。

2、根據配置來決定使用多少個隊列,以及每個隊列的容量。

3、線程一,獲取未滿的隊列,將當前用戶入隊。如果所有的隊列已滿,則掛起2s,然后重新獲取未滿的隊列,用戶入隊。

4、根據配置開啟消費者線程,每個線程獨立處理邏輯。如果獲取的用戶為空或者當前隊列為空,掛起2s。否則通過EWS服務拉取該用戶的郵件,並提醒。

5、如果在獲取用戶郵件的過程中出錯,則將該用戶重新入當前隊列,等待下次拉取。

測試

隊列

測試代碼

    /// <summary>
    /// 消息隊列管理
    /// </summary>
    public class MyRedisQueueBus : IDisposable
    {
        /// <summary>
        /// 線程個數
        /// </summary>
        private int _threadCount;
        /// <summary>
        /// 每個線程中itcode的容量
        /// </summary>
        private int _threadCapacity;
        /// <summary>
        /// 線程
        /// </summary>
        private Thread[] _threads;
        /// <summary>
        /// 生產者線程
        /// </summary>
        private Thread _producerThread;
        /// <summary>
        /// 掛起時間
        /// </summary>
        private const int WAITSECONDE = 2000;
        /// <summary>
        /// 隊列名稱前綴
        /// </summary>
        private string _queuePrefix;
        /// <summary>
        /// 構造函數
        /// </summary>
        /// <param name="threadCount">線程個數</param>
        /// <param name="threadCapacity">每個線程處理的隊列容量</param>
        ///  <param name="queuePrefix">每個線程處理的隊列容量</param>
        public MyRedisQueueBus(int threadCount, int threadCapacity, string queuePrefix)
        {
            this._threadCapacity = threadCapacity;
            this._threadCount = threadCount;
            this._queuePrefix = queuePrefix + "_{0}";
        }
        /// <summary>
        /// 開啟生產者
        /// </summary>
        public void StartProducer()
        {
            _producerThread = new Thread(() =>
            {
                IRedisClientFactory factory = RedisClientFactory.Instance;
                EmailAlertsData emailAlertsData = new EmailAlertsData();
                //白名單
                string[] userIdsWhiteArray = TaskGloableParameter.WhiteList.Split(new char[] { ',', '' },
 StringSplitOptions.RemoveEmptyEntries);
                //入隊                  
                using (IRedisClient client = factory.CreateRedisClient(WebConfig.RedisServer, WebConfig.RedisPort))
                {
                    client.Password = WebConfig.RedisPwd;
                    client.Db = WebConfig.RedisServerDb;
                    while (true)
                    {
                        //獲取所有開啟郵件提醒的用戶
                        List<EmailAlerts> lstEmails = emailAlertsData.GetAllStartAlerts(SyncState.ALL, userIdsWhiteArray);

                        foreach (var item in lstEmails)
                        {
                            int queueIndex = -1;
                            string queueName = string.Format(this._queuePrefix, queueIndex);
                            for (int i = 0; i < _threadCount; i++)
                            {
                                queueName = string.Format(this._queuePrefix, i);
                                //如果當前隊列沒有填滿,則直接跳出,使用該隊列進行入隊
                                if (client.GetListCount(queueName) < _threadCapacity)
                                {
                                    queueIndex = i;
                                    break;
                                }
                            }
                            //如果所有隊列都已經滿了,則掛起2s等待消費者消耗一部分數據,然后重新開始
                            if (queueIndex == -1)
                            {
                                Thread.SpinWait(WAITSECONDE);
                                //重新獲取隊列
                                for (int i = 0; i < _threadCount; i++)
                                {
                                    queueName = string.Format(this._queuePrefix, i);
                                    //如果當前隊列沒有填滿,則直接跳出,使用該隊列進行入隊
                                    if (client.GetListCount(queueName) < _threadCapacity)
                                    {
                                        queueIndex = i;
                                        break;
                                    }
                                }
                            }
                            else
                            {
                                //入隊
                                client.EnqueueItemOnList(queueName, JsonConvert.SerializeObject(new MyQueueItem
                                {
                                    UserId = item.itcode,
                                    SyncState = item.Email_SyncState
                                }));
                            }
                        }                    
                  
                    }
                }

            });
            _producerThread.Start();
        }

        /// <summary>
        /// 開啟消費者
        /// </summary>
        public void StartCustomer()
        {
            _threads = new Thread[_threadCount];
            for (int i = 0; i < _threads.Length; i++)
            {
                _threads[i] = new Thread(CustomerRun);
                _threads[i].Start(i);
            }
        }
        private void CustomerRun(object obj)
        {
            int threadIndex = Convert.ToInt32(obj);
            string queueName = string.Format(this._queuePrefix, threadIndex);

            IRedisClientFactory factory = RedisClientFactory.Instance;
            using (IRedisClient client = factory.CreateRedisClient(WebConfig.RedisServer, WebConfig.RedisPort))
            {
                while (true)
                {
                    client.Password = WebConfig.RedisPwd;
                    client.Db = WebConfig.RedisServerDb;
                    if (client.GetListCount(queueName) > 0)
                    {
                        string resultJson = client.DequeueItemFromList(queueName);
                        //如果獲取的結果為空,則掛起2s
                        if (string.IsNullOrEmpty(resultJson))
                        {
                            Thread.SpinWait(WAITSECONDE);
                        }
                        else
                        {
                            try
                            {
                                //耗時業務處理
                                MyQueueItem item = JsonConvert.DeserializeObject<MyQueueItem>(resultJson);
                                Console.WriteLine("Threadid:{0},User:{1}", Thread.CurrentThread.ManagedThreadId.ToString(), item.UserId);
                            }
                            catch (Exception ex)
                            {
                                //如果出錯,重新入隊
                                client.EnqueueItemOnList(queueName, resultJson);

                            }

                        }
                    }
                    else
                    {
                        //當前隊列為空,掛起2s
                        Thread.SpinWait(WAITSECONDE);
                    }
                }
            }

        }
        public void Dispose()
        {
            //釋放資源時,銷毀線程
            if (this._threads != null)
            {
                for (int i = 0; i < this._threads.Length; i++)
                {
                    this._threads[i].Abort();
                }
            }
            GC.Collect();
        }
    }

Main方法調用

        static void Main(string[] args)
        {         
            MyRedisQueueBus bus = new MyRedisQueueBus(10, 10, "mail_reminder_queue");
            bus.StartProducer();
            Thread.SpinWait(2000);
            bus.StartCustomer();
            Console.Read();
        }

總結

通過配置的方式,確定開啟的隊列數和線程數,如果用戶增加可以增加線程數,或者添加機器的方式解決。這樣,可以解決排名靠后的用戶,通過隨機分發隊列,有機會提前獲取郵件提醒,可以縮短郵件提醒的延遲時間。當然,這種方案並不太完美,目前也只能想到這里了。這里把這個思路寫出來,也是希望獲取一個更好的解決方案。

上面的代碼只是測試用的代碼,后來發現將創建IRedisClient寫在循環內,很容易出問題,頻繁創建client,也以為這頻繁打開關閉,如果釋放不及時,那么會產生很多的redis連接,造成redis服務器負擔。如果放在循環外邊,這個client負責一直從隊列中取數據就行,直到該線程停止。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM