一、引言
研究Kafka有一段時間了,略有心得,基於此自己就寫了一個Kafka的消費者的類和Kafka消息生產者的類,進行了單元測試和生產環境的測試,還是挺可靠的。
二、源碼
話不多說,直接上代碼,代碼不是很難,注釋很全,希望大家多多發表意見,繼續提升。
1 /// <summary> 2 /// Kafka消息消費者接口 3 /// </summary> 4 public interface IKafkaConsumer 5 { 6 /// <summary> 7 /// 指定的組別的消費者開始消費指定主題的消息 8 /// </summary> 9 /// <param name="broker">Kafka消息服務器的地址</param> 10 /// <param name="topic">Kafka消息所屬的主題</param> 11 /// <param name="groupID">Kafka消費者所屬的組別</param> 12 /// <param name="action">可以對已經消費的消息進行相關處理</param> 13 void Consume(string broker, string topic, string groupID, Action<ConsumerResult> action = null); 14 }
以上類型是接口定義,這個類定義的抽象類,可以重復使用相關的代碼定義其中,但是目前這兩個方法沒有使用。
1 /// <summary> 2 /// Kafka抽象基類,提供公共接口實現 3 /// </summary> 4 public abstract class KafkaBase 5 { 6 /// <summary> 7 /// 獲取Kafka服務器地址 8 /// </summary> 9 /// <param name="brokerNameKey">配置文件中Broker服務器地址的key的名稱</param> 10 /// <returns>返回獲取到的Kafka服務器的地址明細</returns> 11 public string GetKafkaBroker(string brokerNameKey = "Broker") 12 { 13 string kafkaBroker = string.Empty; 14 15 if (!ConfigurationManager.AppSettings.AllKeys.Contains(brokerNameKey)) 16 { 17 kafkaBroker = "http://localhost:9092"; 18 } 19 else 20 { 21 kafkaBroker = ConfigurationManager.AppSettings[brokerNameKey]; 22 } 23 return kafkaBroker; 24 } 25 26 /// <summary> 27 /// 在配置文件中獲取系統中已經生成的主題名稱 28 /// </summary> 29 /// <param name="topicNameKey">配置文件中主題的key名稱</param> 30 /// <returns>返回獲取到的主題的具體值</returns> 31 public string GetTopicName(string topicNameKey = "Topic") 32 { 33 string topicName = string.Empty; 34 35 if (!ConfigurationManager.AppSettings.AllKeys.Contains(topicNameKey)) 36 { 37 throw new Exception("Key \"" + topicNameKey + "\" not found in Config file -> configuration/AppSettings"); 38 } 39 else 40 { 41 topicName = ConfigurationManager.AppSettings[topicNameKey]; 42 } 43 return topicName; 44 } 45 }
還有一個用於數據傳遞的工具類,代碼如下:
1 /// <summary> 2 /// Kafka消息消費者設置對象,提供Kafka消費消息的參數對象(Consumer.Consum) 3 /// </summary> 4 public sealed class ConsumerSetting 5 { 6 /// <summary> 7 /// Kafka消息服務器的地址 8 /// </summary> 9 public string Broker { get; set; } 10 11 /// <summary> 12 /// Kafka消息所屬的主題 13 /// </summary> 14 public string Topic { get; set; } 15 16 /// <summary> 17 /// Kafka消息消費者分組主鍵 18 /// </summary> 19 public string GroupID { get; set; } 20 21 /// <summary> 22 /// 消費消息后可以執行的方法 23 /// </summary> 24 public Action<ConsumerResult> Action { get; set; } 25 }
我們可以對消息進行消費,該類型用於對消息進行整理,代碼如下:
1 /// <summary> 2 /// 已經消費的消息的詳情信息 3 /// </summary> 4 public sealed class ConsumerResult 5 { 6 /// <summary> 7 /// Kafka消息服務器的地址 8 /// </summary> 9 public string Broker { get; set; } 10 11 /// <summary> 12 /// Kafka消息所屬的主題 13 /// </summary> 14 public string Topic { get; set; } 15 16 /// <summary> 17 /// Kafka消息消費者分組主鍵 18 /// </summary> 19 public string GroupID { get; set; } 20 21 /// <summary> 22 /// 我們需要處理的消息具體的內容 23 /// </summary> 24 public string Message { get; set; } 25 26 /// <summary> 27 /// Kafka數據讀取的當前位置 28 /// </summary> 29 public long Offset { get; set; } 30 31 /// <summary> 32 /// 消息所在的物理分區 33 /// </summary> 34 public int Partition { get; set; } 35 }
最后階段了,該是消費者的代碼了,代碼如下:
1 /// <summary> 2 /// Kafka消息消費者 3 /// </summary> 4 public sealed class KafkaConsumer : KafkaBase, IKafkaConsumer 5 { 6 #region 私有字段 7 8 private bool isCancelled; 9 10 #endregion 11 12 #region 構造函數 13 14 /// <summary> 15 /// 構造函數,初始化IsCancelled屬性 16 /// </summary> 17 public KafkaConsumer() 18 { 19 isCancelled = false; 20 } 21 22 #endregion 23 24 #region 屬性 25 26 /// <summary> 27 /// 是否應該取消繼續消費Kafka的消息,默認值是false,繼續消費消息 28 /// </summary> 29 public bool IsCancelled 30 { 31 get { return isCancelled; } 32 set { isCancelled = value; } 33 } 34 35 #endregion 36 37 #region 同步版本 38 39 /// <summary> 40 /// 指定的組別的消費者開始消費指定主題的消息 41 /// </summary> 42 /// <param name="broker">Kafka消息服務器的地址</param> 43 /// <param name="topic">Kafka消息所屬的主題</param> 44 /// <param name="groupID">Kafka消費者所屬的組別</param> 45 /// <param name="action">可以對已經消費的消息進行相關處理</param> 46 public void Consume(string broker, string topic, string groupID, Action<ConsumerResult> action = null) 47 { 48 if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0) 49 { 50 throw new ArgumentNullException("Kafka消息服務器的地址不能為空!"); 51 } 52 53 if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0) 54 { 55 throw new ArgumentNullException("消息所屬的主題不能為空!"); 56 } 57 58 if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0) 59 { 60 throw new ArgumentNullException("用戶分組ID不能為空!"); 61 } 62 63 var config = new Dictionary<string, object> 64 { 65 { "bootstrap.servers", broker }, 66 { "group.id", groupID }, 67 { "enable.auto.commit", true }, // this is the default 68 { "auto.commit.interval.ms", 5000 }, 69 { "statistics.interval.ms", 60000 }, 70 { "session.timeout.ms", 6000 }, 71 { "auto.offset.reset", "smallest" } 72 }; 73 74 75 using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8))) 76 { 77 // Note: All event handlers are called on the main thread. 78 //consumer.OnMessage += (_, message) => Console.WriteLine("Topic:" + message.Topic + " Partition:" + message.Partition + " Offset:" + message.Offset + " " + message.Value); 79 //consumer.OnMessage += (_, message) => Console.WriteLine("Offset:【" + message.Offset + "】Message:【" + message.Value + "】"); 80 if (action != null) 81 { 82 consumer.OnMessage += (_, message) => { 83 ConsumerResult messageResult = new ConsumerResult(); 84 messageResult.Broker = broker; 85 messageResult.Topic = message.Topic; 86 messageResult.Partition = message.Partition; 87 messageResult.Offset = message.Offset.Value; 88 messageResult.Message = message.Value; 89 90 //執行外界自定義的方法 91 action(messageResult); 92 }; 93 } 94 95 consumer.OnPartitionEOF += (_, end) => Console.WriteLine("Reached end of topic " + end.Topic + " partition " + end.Partition + ", next message will be at offset " + end.Offset); 96 97 consumer.OnError += (_, error) => Console.WriteLine("Error:" + error); 98 99 //引發反序列化錯誤或消費消息出現錯誤!= NoError。 100 consumer.OnConsumeError += (_, message) => Console.WriteLine("Error consuming from topic/partition/offset " + message.Topic + "/" + message.Partition + "/" + message.Offset + ": " + message.Error); 101 102 consumer.OnOffsetsCommitted += (_, commit) => Console.WriteLine(commit.Error ? "Failed to commit offsets:" + commit.Error : "Successfully committed offsets:" + commit.Offsets); 103 104 // 當消費者被分配一組新的分區時引發。 105 consumer.OnPartitionsAssigned += (_, partitions) => 106 { 107 Console.WriteLine("Assigned Partitions:" + partitions + ", Member ID:" + consumer.MemberId); 108 //如果您未向OnPartitionsAssigned事件添加處理程序,則會自動執行以下.Assign調用。 如果你為它添加了事件處理程序,你必須明確地調用.Assign以便消費者開始消費消息。 109 consumer.Assign(partitions); 110 }; 111 112 // Raised when the consumer's current assignment set has been revoked. 113 //當消費者的當前任務集已被撤銷時引發。 114 consumer.OnPartitionsRevoked += (_, partitions) => 115 { 116 Console.WriteLine("Revoked Partitions:" + partitions); 117 // If you don't add a handler to the OnPartitionsRevoked event,the below .Unassign call happens automatically. If you do, you must call .Unassign explicitly in order for the consumer to stop consuming messages from it's previously assigned partitions. 118 //如果您未向OnPartitionsRevoked事件添加處理程序,則下面的.Unassign調用會自動發生。 如果你為它增加了事件處理程序,你必須明確地調用.Usessign以便消費者停止從它先前分配的分區中消費消息。 119 consumer.Unassign(); 120 }; 121 122 //consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: " + json); 123 124 consumer.Subscribe(topic); 125 126 //Console.WriteLine("Subscribed to:" + consumer.Subscription); 127 128 while (!IsCancelled) 129 { 130 consumer.Poll(TimeSpan.FromMilliseconds(100)); 131 } 132 } 133 } 134 135 #endregion 136 137 #region 異步版本 138 139 /// <summary> 140 /// 指定的組別的消費者開始消費指定主題的消息 141 /// </summary> 142 /// <param name="broker">Kafka消息服務器的地址</param> 143 /// <param name="topic">Kafka消息所屬的主題</param> 144 /// <param name="groupID">Kafka消費者所屬的組別</param> 145 /// <param name="action">可以對已經消費的消息進行相關處理</param> 146 public void ConsumeAsync(string broker, string topic, string groupID, Action<ConsumerResult> action = null) 147 { 148 if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0) 149 { 150 throw new ArgumentNullException("Kafka消息服務器的地址不能為空!"); 151 } 152 153 if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0) 154 { 155 throw new ArgumentNullException("消息所屬的主題不能為空!"); 156 } 157 158 if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0) 159 { 160 throw new ArgumentNullException("用戶分組ID不能為空!"); 161 } 162 163 ThreadPool.QueueUserWorkItem(KafkaAutoCommittedOffsets, new ConsumerSetting() { Broker = broker, Topic = topic, GroupID = groupID, Action=action }); 164 } 165 166 #endregion 167 168 #region 兩種提交Offsets的版本 169 170 /// <summary> 171 /// Kafka消息隊列服務器自動提交offset 172 /// </summary> 173 /// <param name="state">消息消費者信息</param> 174 private void KafkaAutoCommittedOffsets(object state) 175 { 176 ConsumerSetting setting = state as ConsumerSetting; 177 178 var config = new Dictionary<string, object> 179 { 180 { "bootstrap.servers", setting.Broker }, 181 { "group.id", setting.GroupID }, 182 { "enable.auto.commit", true }, // this is the default 183 { "auto.commit.interval.ms", 5000 }, 184 { "statistics.interval.ms", 60000 }, 185 { "session.timeout.ms", 6000 }, 186 { "auto.offset.reset", "smallest" } 187 }; 188 189 using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8))) 190 { 191 if (setting.Action != null) 192 { 193 consumer.OnMessage += (_, message) => 194 { 195 ConsumerResult messageResult = new ConsumerResult(); 196 messageResult.Broker = setting.Broker; 197 messageResult.Topic = message.Topic; 198 messageResult.Partition = message.Partition; 199 messageResult.Offset = message.Offset.Value; 200 messageResult.Message = message.Value; 201 202 //執行外界自定義的方法 203 setting.Action(messageResult); 204 }; 205 } 206 207 //consumer.OnStatistics += (_, json)=> Console.WriteLine("Statistics: {json}"); 208 209 //可以寫日志 210 //consumer.OnError += (_, error)=> Console.WriteLine("Error:"+error); 211 212 //可以寫日志 213 //consumer.OnConsumeError += (_, msg) => Console.WriteLine("Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}"); 214 215 consumer.Subscribe(setting.Topic); 216 217 while (!IsCancelled) 218 { 219 consumer.Poll(TimeSpan.FromMilliseconds(100)); 220 } 221 } 222 } 223 224 /// <summary> 225 /// Kafka消息隊列服務器手動提交offset 226 /// </summary> 227 /// <param name="state">消息消費者信息</param> 228 private void KafkaManuallyCommittedOffsets(object state) 229 { 230 ConsumerSetting setting = state as ConsumerSetting; 231 232 var config = new Dictionary<string, object> 233 { 234 { "bootstrap.servers", setting.Broker }, 235 { "group.id", setting.GroupID }, 236 { "enable.auto.commit", false },//不是自動提交的 237 { "auto.commit.interval.ms", 5000 }, 238 { "statistics.interval.ms", 60000 }, 239 { "session.timeout.ms", 6000 }, 240 { "auto.offset.reset", "smallest" } 241 }; 242 243 using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8))) 244 { 245 //可以寫日志 246 //consumer.OnError += (_, error) => Console.WriteLine("Error:"+error); 247 248 //可以寫日志 249 // Raised on deserialization errors or when a consumed message has an error != NoError. 250 //consumer.OnConsumeError += (_, error)=> Console.WriteLine("Consume error:"+error); 251 252 consumer.Subscribe(setting.Topic); 253 254 Message<Ignore, string> message = null; 255 256 while (!isCancelled) 257 { 258 if (!consumer.Consume(out message, TimeSpan.FromMilliseconds(100))) 259 { 260 continue; 261 } 262 263 if (setting.Action != null) 264 { 265 ConsumerResult messageResult = new ConsumerResult(); 266 messageResult.Broker = setting.Broker; 267 messageResult.Topic = message.Topic; 268 messageResult.Partition = message.Partition; 269 messageResult.Offset = message.Offset.Value; 270 messageResult.Message = message.Value; 271 272 //執行外界自定義的方法 273 setting.Action(messageResult); 274 } 275 276 if (message.Offset % 5 == 0) 277 { 278 var committedOffsets = consumer.CommitAsync(message).Result; 279 //Console.WriteLine("Committed offset:"+committedOffsets); 280 } 281 } 282 } 283 } 284 285 #endregion 286 }
有了消息的消費者代碼,那消息的生產者代碼肯定是少不了的。代碼如下:
1 /// <summary> 2 /// Kafka消息生產者 3 /// </summary> 4 public sealed class KafkaProducer : KafkaBase, IKafkaProducer 5 { 6 /// <summary> 7 /// 生產消息並發送消息 8 /// </summary> 9 /// <param name="broker">kafka的服務器地址</param> 10 /// <param name="topic">kafka的消息主題名稱</param> 11 /// <param name="message">需要傳送的消息</param> 12 public bool Produce(string broker, string topic, string message) 13 { 14 bool result = false; 15 if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0) 16 { 17 throw new ArgumentNullException("Kafka消息服務器地址不能為空!"); 18 } 19 20 if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0) 21 { 22 throw new ArgumentNullException("消息所屬的主題不能為空!"); 23 } 24 25 if (string.IsNullOrEmpty(message) || string.IsNullOrWhiteSpace(message) || message.Length <= 0) 26 { 27 throw new ArgumentNullException("消息內容不能為空!"); 28 } 29 30 var config = new Dictionary<string, object> { { "bootstrap.servers", broker } }; 31 using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8))) 32 { 33 var deliveryReport = producer.ProduceAsync(topic, null, message); 34 deliveryReport.ContinueWith(task => 35 { 36 if (task.Result.Error.Code == ErrorCode.NoError) 37 { 38 result = true; 39 } 40 //可以在控制台使用以下語句 41 //Console.WriteLine("Producer:" + producer.Name + "\r\nTopic:" + topic + "\r\nPartition:" + task.Result.Partition + "\r\nOffset:" + task.Result.Offset + "\r\nMessage:" + task.Result.Value); 42 }); 43 44 producer.Flush(TimeSpan.FromSeconds(10)); 45 } 46 return result; 47 } 48 }
好了,以上就是全部代碼,大家可以整理使用。
三、總結
繼續學習,先使用,慢慢了解內部的細節,我已經邁出了第一步,不忘初心,繼續努力。