今天有點時間,我就來說兩句。最近接觸的Kafka相關的東西要多一些,其實以前也接觸過,但是在項目使用中的經驗不是很多。最近公司的項目里面使用了Kafka消息中間件,由於以前的人員編寫的客戶端的類不是很好,沒有設計的概念,就是一個簡單類的功能羅列,沒有考慮到后期的擴展和維護(以后可能會兼容其他形式的消息隊列,要做到無縫銜接),所以這個重構的任務就落到我的身上。
先說說我的感受,然后再貼出代碼的實現吧。我第一次是基於Confluent.Kafka編寫的Kafka消息生產者,后來經過測試,同步操作的時間比較長,要完成20萬數據發送消息並更新到數據庫的時間大概是16-18分鍾,這個結果有點讓人不能接受。為了提高性能,也做了很多測試,都沒有辦法解決這個問題。后來抱着試試看的想法,我又基於kafka-net重新實現了Kafka消息的生產者。經過測試,完成同樣的任務,時間大概需要3分鍾左右。兩種實現方法完成同樣的任務,都是以同步的方式生產消息,並將消息成功發送到Broker后,再將數據插入到數據庫做記錄。大家不要糾結為什么這樣使用消息隊列,這是上頭的做法,我還不能做大的改動,我也無奈。
目前看,基於kafka-net實現的消息生產者在生產消息並發送成功所需要的時間要比基於Confluent.Kafka實現的消息生產者的所需要的時間要少,尤其是發送的數據越多,這個時間的差距越大。具體的原因還不清楚,如果有高手可以不吝賜教。好了,我該上代碼了。
開始代碼之前,要說明一點:Confluent.Kafka的Broker是不需要帶Http://這個前綴的,但是 kafka-net 的Broker是有http://這個前綴的,大家要注意這個,剛開始的時候我也被坑了一下子。
using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Enterprise.Framework.MessageQueue { /// <summary> /// 消息生產者的接口定義,所有消息生產者的實現必須繼承該接口 /// </summary> public interface IMessageProducer { /// <summary> /// 將指定的消息內容發送到消息服務器並存放在指定的主題名稱里 /// </summary> /// <param name="topic">發送消息的主題名稱,這個主題就是對消息的分類,不同的主題存放不同的消息,該參數不能為空,空值會拋出異常</param> /// <param name="message">需要發送的消息內容,該參數不能為空,空值會拋出異常</param> void Produce(string topic, string message); } }
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace Enterprise.Framework.MessageQueue 8 { 9 /// <summary> 10 /// Kafka消息生產者的接口定義,所有Kafka消息生產者的實現必須繼承該接口 11 /// </summary> 12 public interface IKafkaMessageProducer : IMessageProducer 13 { 14 /// <summary> 15 /// 將指定的消息內容發送到消息服務器並存放在指定的主題名稱里 16 /// </summary> 17 /// <param name="topic">發送消息的主題名稱,這個主題就是對消息的分類,不同的主題存放不同的消息,該參數不能為空,空值會拋出異常</param> 18 /// <param name="message">需要發送的消息內容,該參數不能為空,空值會拋出異常</param> 19 /// <param name="producedAction">當消息生產完成並成功發送到服務器后,可以對成功生產並發送的消息執行代理所封裝方法的操作,默認值為空</param> 20 void Produce(string topic, string message, Action<MessageResult> producedAction = null); 21 } 22 }
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Runtime.InteropServices; 5 using System.Text; 6 7 namespace Enterprise.Framework.AbstractInterface 8 { 9 /// <summary> 10 /// 該抽象類定義了所有需要釋放資源類型的抽象類 11 /// </summary> 12 public abstract class DisposableBase : IDisposable 13 { 14 private bool disposed = false; 15 16 /// <summary> 17 /// 實現IDisposable中的Dispose方法 18 /// </summary> 19 public void Dispose() 20 { 21 //必須為true 22 Dispose(true); 23 //通知垃圾回收機制不再調用終結器(析構器) 24 GC.SuppressFinalize(this); 25 } 26 27 /// <summary> 28 /// 不是必要的,提供一個Close方法僅僅是為了更符合其他語言(如C++)的規范 29 /// </summary> 30 public void Close() 31 { 32 Dispose(); 33 } 34 35 /// <summary> 36 /// 必須,以備程序員忘記了顯式調用Dispose方法 37 /// </summary> 38 ~DisposableBase() 39 { 40 //必須為false 41 Dispose(false); 42 } 43 44 /// <summary> 45 /// 非密封類修飾用protected virtual 46 /// 密封類修飾用private 47 /// </summary> 48 /// <param name="disposing">是否要清理托管資源,true表示需要清理托管資源,false表示不需要清理托管資源</param> 49 protected virtual void Dispose(bool disposing) 50 { 51 if (disposed) 52 { 53 return; 54 } 55 if (disposing) 56 { 57 // 清理托管資源 58 DisposeManagedResources(); 59 } 60 // 清理非托管資源 61 DisposeUnmanagedResource(); 62 //讓類型知道自己已經被釋放 63 disposed = true; 64 } 65 66 /// <summary> 67 /// 釋放托管資源 68 /// </summary> 69 protected abstract void DisposeManagedResources(); 70 71 /// <summary> 72 /// 釋放非托管資源 73 /// </summary> 74 protected abstract void DisposeUnmanagedResource(); 75 } 76 }
1 using KafkaNet; 2 using KafkaNet.Model; 3 using KafkaNet.Protocol; 4 using System; 5 using System.Collections.Generic; 6 using System.IO; 7 using System.Linq; 8 using System.Text; 9 using System.Threading; 10 using System.Threading.Tasks; 11 using ThreeSoft.Framework.AbstractInterface; 12 13 namespace Enterprise.Framework.MessageQueue 14 { 15 /// <summary> 16 /// Kafka消息生產者具體的實現類,可以針對長鏈接進行消息發送處理,不用頻繁進行消息組件的創建和銷毀的工作 17 /// </summary> 18 public sealed class KafkaMessageKeepAliveProducer : DisposableBase, IDisposable, IKafkaMessageProducer 19 { 20 #region 私有字段 21 22 private KafkaNet.Producer _producer; 23 private BrokerRouter _brokerRouter; 24 private string _broker; 25 26 #endregion 27 28 #region 構造函數 29 30 /// <summary> 31 /// 通過構造函數初始化消息隊列的服務器 32 /// </summary> 33 /// <param name="broker">消息隊列服務器地址,該值不能為空</param> 34 public KafkaMessageKeepAliveProducer(string broker) 35 { 36 if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker)) 37 { 38 throw new ArgumentNullException("消息隊列服務器的地址不可以為空!"); 39 } 40 41 #region kafka-net實現 42 43 Uri[] brokerUriList = null; 44 45 if (broker.IndexOf(',') >= 0) 46 { 47 string[] brokers = broker.Split(','); 48 brokerUriList = new Uri[brokers.Length]; 49 50 for (int i = 0; i < brokers.Length; i++) 51 { 52 brokerUriList[i] = new Uri(brokers[i]); 53 } 54 } 55 else 56 { 57 brokerUriList = new Uri[] { new Uri(broker) }; 58 } 59 60 var kafkaOptions = new KafkaOptions(brokerUriList); 61 _brokerRouter = new BrokerRouter(kafkaOptions); 62 _producer = new KafkaNet.Producer(_brokerRouter); 63 64 #endregion 65 66 _broker = broker; 67 } 68 69 #endregion 70 71 #region 實例屬性 72 73 /// <summary> 74 /// 獲取消息服務器的地址 75 /// </summary> 76 public string Broker 77 { 78 get { return _broker; } 79 } 80 81 #endregion 82 83 #region 發送消息的方法 84 85 /// <summary> 86 /// 將指定的消息內容發送到消息服務器並存放在指定的主題名稱里 87 /// </summary> 88 /// <param name="topic">發送消息的主題名稱,這個主題就是對消息的分類,不同的主題存放不同的消息,該參數不能為空,空值會拋出異常</param> 89 /// <param name="message">需要發送的消息內容,該參數不能為空,空值會拋出異常</param> 90 /// <param name="producedAction">當消息生產完成並成功發送到服務器后,可以對成功生產並發送的消息執行代理所封裝方法的操作</param> 91 public void Produce(string topic, string message, Action<MessageResult> producedAction = null) 92 { 93 #region 同步實現 94 95 var currentDatetime = DateTime.Now; 96 var key = currentDatetime.Second.ToString(); 97 var events = new[] { new KafkaNet.Protocol.Message(message, key) }; 98 List<ProduceResponse> result = _producer.SendMessageAsync(topic, events).Result; 99 100 if (producedAction != null && result != null && result.Count > 0) 101 { 102 MessageResult messageResult = new MessageResult { Broker = Broker, GroupID = null, Message = message, Offset = result[0].Offset, Partition = result[0].PartitionId, Topic = result[0].Topic }; 103 producedAction(messageResult); 104 } 105 106 #endregion 107 } 108 109 /// <summary> 110 /// 將指定的消息內容發送到消息服務器並存放在指定的主題名稱里 111 /// </summary> 112 /// <param name="topic">發送消息的主題名稱,這個主題就是對消息的分類,不同的主題存放不同的消息,該參數不能為空,空值會拋出異常</param> 113 /// <param name="message">需要發送的消息內容,該參數不能為空,空值會拋出異常</param> 114 public void Produce(string topic, string message) 115 { 116 Produce(topic, message, null); 117 } 118 119 #endregion 120 121 #region 實現消息隊列資源的釋放 122 123 /// <summary> 124 /// 析構函數釋放資源 125 /// </summary> 126 ~KafkaMessageKeepAliveProducer() 127 { 128 Dispose(false); 129 } 130 131 /// <summary> 132 /// 釋放托管資源 133 /// </summary> 134 protected override void DisposeManagedResources() 135 { 136 if (_producer != null) 137 { 138 _producer.Dispose(); 139 } 140 if (_brokerRouter != null) 141 { 142 _brokerRouter.Dispose(); 143 } 144 } 145 146 /// <summary> 147 /// 釋放非托管資源 148 /// </summary> 149 protected override void DisposeUnmanagedResource(){} 150 151 #endregion 152 } 153 }
好了,今天就寫到這里了,每天進步一點點,努力堅持。不忘初心,繼續努力吧,歡迎大家前來討論。