場景
有這樣一個場景,一個郵件提醒的windows服務,獲取所有開啟郵件提醒的用戶,循環獲取這些用戶的郵件,發送一條服務號消息。但問題來了,用戶比較少的情況下,輪詢一遍時間還能忍受,如果用戶多了,那用戶名稱排序靠后的人,收到郵件提醒的消息,延遲時間就非常長了。
准備
方案
1、生產者線程一獲取所有開啟郵件提醒的用戶。
2、根據配置來決定使用多少個隊列,以及每個隊列的容量。
3、線程一,獲取未滿的隊列,將當前用戶入隊。如果所有的隊列已滿,則掛起2s,然后重新獲取未滿的隊列,用戶入隊。
4、根據配置開啟消費者線程,每個線程獨立處理邏輯。如果獲取的用戶為空或者當前隊列為空,掛起2s。否則通過EWS服務拉取該用戶的郵件,並提醒。
5、如果在獲取用戶郵件的過程中出錯,則將該用戶重新入當前隊列,等待下次拉取。
測試
隊列
測試代碼
/// <summary> /// 消息隊列管理 /// </summary> public class MyRedisQueueBus : IDisposable { /// <summary> /// 線程個數 /// </summary> private int _threadCount; /// <summary> /// 每個線程中itcode的容量 /// </summary> private int _threadCapacity; /// <summary> /// 線程 /// </summary> private Thread[] _threads; /// <summary> /// 生產者線程 /// </summary> private Thread _producerThread; /// <summary> /// 掛起時間 /// </summary> private const int WAITSECONDE = 2000; /// <summary> /// 隊列名稱前綴 /// </summary> private string _queuePrefix; /// <summary> /// 構造函數 /// </summary> /// <param name="threadCount">線程個數</param> /// <param name="threadCapacity">每個線程處理的隊列容量</param> /// <param name="queuePrefix">每個線程處理的隊列容量</param> public MyRedisQueueBus(int threadCount, int threadCapacity, string queuePrefix) { this._threadCapacity = threadCapacity; this._threadCount = threadCount; this._queuePrefix = queuePrefix + "_{0}"; } /// <summary> /// 開啟生產者 /// </summary> public void StartProducer() { _producerThread = new Thread(() => { IRedisClientFactory factory = RedisClientFactory.Instance; EmailAlertsData emailAlertsData = new EmailAlertsData(); //白名單 string[] userIdsWhiteArray = TaskGloableParameter.WhiteList.Split(new char[] { ',', ',' }, StringSplitOptions.RemoveEmptyEntries); //入隊 using (IRedisClient client = factory.CreateRedisClient(WebConfig.RedisServer, WebConfig.RedisPort)) { client.Password = WebConfig.RedisPwd; client.Db = WebConfig.RedisServerDb; while (true) { //獲取所有開啟郵件提醒的用戶 List<EmailAlerts> lstEmails = emailAlertsData.GetAllStartAlerts(SyncState.ALL, userIdsWhiteArray); foreach (var item in lstEmails) { int queueIndex = -1; string queueName = string.Format(this._queuePrefix, queueIndex); for (int i = 0; i < _threadCount; i++) { queueName = string.Format(this._queuePrefix, i); //如果當前隊列沒有填滿,則直接跳出,使用該隊列進行入隊 if (client.GetListCount(queueName) < _threadCapacity) { queueIndex = i; break; } } //如果所有隊列都已經滿了,則掛起2s等待消費者消耗一部分數據,然后重新開始 if (queueIndex == -1) { Thread.SpinWait(WAITSECONDE); //重新獲取隊列 for (int i = 0; i < _threadCount; i++) { queueName = string.Format(this._queuePrefix, i); //如果當前隊列沒有填滿,則直接跳出,使用該隊列進行入隊 if (client.GetListCount(queueName) < _threadCapacity) { queueIndex = i; break; } } } else { //入隊 client.EnqueueItemOnList(queueName, JsonConvert.SerializeObject(new MyQueueItem { UserId = item.itcode, SyncState = item.Email_SyncState })); } } } } }); _producerThread.Start(); } /// <summary> /// 開啟消費者 /// </summary> public void StartCustomer() { _threads = new Thread[_threadCount]; for (int i = 0; i < _threads.Length; i++) { _threads[i] = new Thread(CustomerRun); _threads[i].Start(i); } } private void CustomerRun(object obj) { int threadIndex = Convert.ToInt32(obj); string queueName = string.Format(this._queuePrefix, threadIndex); IRedisClientFactory factory = RedisClientFactory.Instance; using (IRedisClient client = factory.CreateRedisClient(WebConfig.RedisServer, WebConfig.RedisPort)) { while (true) { client.Password = WebConfig.RedisPwd; client.Db = WebConfig.RedisServerDb; if (client.GetListCount(queueName) > 0) { string resultJson = client.DequeueItemFromList(queueName); //如果獲取的結果為空,則掛起2s if (string.IsNullOrEmpty(resultJson)) { Thread.SpinWait(WAITSECONDE); } else { try { //耗時業務處理 MyQueueItem item = JsonConvert.DeserializeObject<MyQueueItem>(resultJson); Console.WriteLine("Threadid:{0},User:{1}", Thread.CurrentThread.ManagedThreadId.ToString(), item.UserId); } catch (Exception ex) { //如果出錯,重新入隊 client.EnqueueItemOnList(queueName, resultJson); } } } else { //當前隊列為空,掛起2s Thread.SpinWait(WAITSECONDE); } } } } public void Dispose() { //釋放資源時,銷毀線程 if (this._threads != null) { for (int i = 0; i < this._threads.Length; i++) { this._threads[i].Abort(); } } GC.Collect(); } }
Main方法調用
static void Main(string[] args) { MyRedisQueueBus bus = new MyRedisQueueBus(10, 10, "mail_reminder_queue"); bus.StartProducer(); Thread.SpinWait(2000); bus.StartCustomer(); Console.Read(); }
總結
通過配置的方式,確定開啟的隊列數和線程數,如果用戶增加可以增加線程數,或者添加機器的方式解決。這樣,可以解決排名靠后的用戶,通過隨機分發隊列,有機會提前獲取郵件提醒,可以縮短郵件提醒的延遲時間。當然,這種方案並不太完美,目前也只能想到這里了。這里把這個思路寫出來,也是希望獲取一個更好的解決方案。
上面的代碼只是測試用的代碼,后來發現將創建IRedisClient寫在循環內,很容易出問題,頻繁創建client,也以為這頻繁打開關閉,如果釋放不及時,那么會產生很多的redis連接,造成redis服務器負擔。如果放在循環外邊,這個client負責一直從隊列中取數據就行,直到該線程停止。