基於Confluent.Kafka實現的KafkaConsumer消費者類和KafkaProducer消息生產者類型


一、引言

  研究Kafka有一段時間了,略有心得,基於此自己就寫了一個Kafka的消費者的類和Kafka消息生產者的類,進行了單元測試和生產環境的測試,還是挺可靠的。

二、源碼

  話不多說,直接上代碼,代碼不是很難,注釋很全,希望大家多多發表意見,繼續提升。

 1     /// <summary>
 2     /// Kafka消息消費者接口
 3     /// </summary>
 4     public interface IKafkaConsumer
 5     {
 6         /// <summary>
 7         /// 指定的組別的消費者開始消費指定主題的消息
 8         /// </summary>
 9         /// <param name="broker">Kafka消息服務器的地址</param>
10         /// <param name="topic">Kafka消息所屬的主題</param>
11         /// <param name="groupID">Kafka消費者所屬的組別</param>
12         /// <param name="action">可以對已經消費的消息進行相關處理</param>
13         void Consume(string broker, string topic, string groupID, Action<ConsumerResult> action = null);
14     }

  以上類型是接口定義,這個類定義的抽象類,可以重復使用相關的代碼定義其中,但是目前這兩個方法沒有使用。

 1     /// <summary>
 2     /// Kafka抽象基類,提供公共接口實現
 3     /// </summary>
 4     public abstract class KafkaBase
 5     {
 6         /// <summary>
 7         /// 獲取Kafka服務器地址
 8         /// </summary>
 9         /// <param name="brokerNameKey">配置文件中Broker服務器地址的key的名稱</param>
10         /// <returns>返回獲取到的Kafka服務器的地址明細</returns>
11         public string GetKafkaBroker(string brokerNameKey = "Broker")
12         {
13             string kafkaBroker = string.Empty;            
14 
15             if (!ConfigurationManager.AppSettings.AllKeys.Contains(brokerNameKey))
16             {
17                 kafkaBroker = "http://localhost:9092";
18             }
19             else
20             {
21                 kafkaBroker = ConfigurationManager.AppSettings[brokerNameKey];
22             }
23             return kafkaBroker;
24         }
25 
26         /// <summary>
27         /// 在配置文件中獲取系統中已經生成的主題名稱
28         /// </summary>
29         /// <param name="topicNameKey">配置文件中主題的key名稱</param>
30         /// <returns>返回獲取到的主題的具體值</returns>
31         public string GetTopicName(string topicNameKey = "Topic")
32         {
33             string topicName = string.Empty;
34 
35             if (!ConfigurationManager.AppSettings.AllKeys.Contains(topicNameKey))
36             {
37                 throw new Exception("Key \"" + topicNameKey + "\" not found in Config file -> configuration/AppSettings");
38             }
39             else
40             {
41                 topicName = ConfigurationManager.AppSettings[topicNameKey];
42             }
43             return topicName;
44         }
45     }

  還有一個用於數據傳遞的工具類,代碼如下:

 1     /// <summary>
 2     /// Kafka消息消費者設置對象,提供Kafka消費消息的參數對象(Consumer.Consum)
 3     /// </summary>
 4     public sealed class ConsumerSetting
 5     {
 6         /// <summary>
 7         /// Kafka消息服務器的地址
 8         /// </summary>
 9         public string Broker { get; set; }
10 
11         /// <summary>
12         /// Kafka消息所屬的主題
13         /// </summary>
14         public string Topic { get; set; }
15 
16         /// <summary>
17         /// Kafka消息消費者分組主鍵
18         /// </summary>
19         public string GroupID { get; set; }
20 
21         /// <summary>
22         /// 消費消息后可以執行的方法
23         /// </summary>
24         public Action<ConsumerResult> Action { get; set; }
25     }

  我們可以對消息進行消費,該類型用於對消息進行整理,代碼如下:

 1     /// <summary>
 2     /// 已經消費的消息的詳情信息
 3     /// </summary>
 4     public sealed class ConsumerResult
 5     {
 6         /// <summary>
 7         /// Kafka消息服務器的地址
 8         /// </summary>
 9         public string Broker { get; set; }
10 
11         /// <summary>
12         /// Kafka消息所屬的主題
13         /// </summary>
14         public string Topic { get; set; }
15 
16         /// <summary>
17         /// Kafka消息消費者分組主鍵
18         /// </summary>
19         public string GroupID { get; set; }
20 
21         /// <summary>
22         /// 我們需要處理的消息具體的內容
23         /// </summary>
24         public string Message { get; set; }
25 
26         /// <summary>
27         /// Kafka數據讀取的當前位置
28         /// </summary>
29         public long Offset { get; set; }
30 
31         /// <summary>
32         /// 消息所在的物理分區
33         /// </summary>
34         public int Partition { get; set; }
35     }

  最后階段了,該是消費者的代碼了,代碼如下:

  1     /// <summary>
  2     /// Kafka消息消費者
  3     /// </summary>
  4     public sealed class KafkaConsumer : KafkaBase, IKafkaConsumer
  5     {
  6         #region 私有字段
  7 
  8         private bool isCancelled;
  9 
 10         #endregion
 11 
 12         #region 構造函數
 13 
 14         /// <summary>
 15         /// 構造函數,初始化IsCancelled屬性
 16         /// </summary>
 17         public KafkaConsumer()
 18         {
 19             isCancelled = false;
 20         }
 21 
 22         #endregion
 23 
 24         #region 屬性
 25 
 26         /// <summary>
 27         /// 是否應該取消繼續消費Kafka的消息,默認值是false,繼續消費消息
 28         /// </summary>
 29         public bool IsCancelled
 30         {
 31             get { return isCancelled; }
 32             set { isCancelled = value; }
 33         }
 34 
 35         #endregion
 36 
 37         #region 同步版本
 38 
 39         /// <summary>
 40         /// 指定的組別的消費者開始消費指定主題的消息
 41         /// </summary>
 42         /// <param name="broker">Kafka消息服務器的地址</param>
 43         /// <param name="topic">Kafka消息所屬的主題</param>
 44         /// <param name="groupID">Kafka消費者所屬的組別</param>
 45         /// <param name="action">可以對已經消費的消息進行相關處理</param>
 46         public void Consume(string broker, string topic, string groupID, Action<ConsumerResult> action = null)
 47         {
 48             if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0)
 49             {
 50                 throw new ArgumentNullException("Kafka消息服務器的地址不能為空!");
 51             }
 52 
 53             if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0)
 54             {
 55                 throw new ArgumentNullException("消息所屬的主題不能為空!");
 56             }
 57 
 58             if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0)
 59             {
 60                 throw new ArgumentNullException("用戶分組ID不能為空!");
 61             }
 62 
 63             var config = new Dictionary<string, object>
 64                 {
 65                     { "bootstrap.servers", broker },
 66                     { "group.id", groupID },
 67                     { "enable.auto.commit", true },  // this is the default
 68                     { "auto.commit.interval.ms", 5000 },
 69                     { "statistics.interval.ms", 60000 },
 70                     { "session.timeout.ms", 6000 },
 71                     { "auto.offset.reset", "smallest" }
 72                 };
 73             
 74 
 75             using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
 76             {
 77                 // Note: All event handlers are called on the main thread.
 78                 //consumer.OnMessage += (_, message) => Console.WriteLine("Topic:" + message.Topic + " Partition:" + message.Partition + " Offset:" + message.Offset + " " + message.Value);
 79                 //consumer.OnMessage += (_, message) => Console.WriteLine("Offset:【" + message.Offset + "】Message:【" + message.Value + "】");
 80                 if (action != null)
 81                 {
 82                     consumer.OnMessage += (_, message) => {
 83                         ConsumerResult messageResult = new ConsumerResult();
 84                         messageResult.Broker = broker;
 85                         messageResult.Topic = message.Topic;
 86                         messageResult.Partition = message.Partition;
 87                         messageResult.Offset = message.Offset.Value;
 88                         messageResult.Message = message.Value;
 89 
 90                         //執行外界自定義的方法
 91                         action(messageResult);
 92                     }; 
 93                 }
 94 
 95                 consumer.OnPartitionEOF += (_, end) => Console.WriteLine("Reached end of topic " + end.Topic + " partition " + end.Partition + ", next message will be at offset " + end.Offset);
 96 
 97                 consumer.OnError += (_, error) => Console.WriteLine("Error:" + error);
 98 
 99                 //引發反序列化錯誤或消費消息出現錯誤!= NoError。
100                 consumer.OnConsumeError += (_, message) => Console.WriteLine("Error consuming from topic/partition/offset " + message.Topic + "/" + message.Partition + "/" + message.Offset + ": " + message.Error);
101 
102                 consumer.OnOffsetsCommitted += (_, commit) => Console.WriteLine(commit.Error ? "Failed to commit offsets:" + commit.Error : "Successfully committed offsets:" + commit.Offsets);
103 
104                 // 當消費者被分配一組新的分區時引發。
105                 consumer.OnPartitionsAssigned += (_, partitions) =>
106                 {
107                     Console.WriteLine("Assigned Partitions:" + partitions + ", Member ID:" + consumer.MemberId);
108                     //如果您未向OnPartitionsAssigned事件添加處理程序,則會自動執行以下.Assign調用。 如果你為它添加了事件處理程序,你必須明確地調用.Assign以便消費者開始消費消息。
109                     consumer.Assign(partitions);
110                 };
111 
112                 // Raised when the consumer's current assignment set has been revoked.
113                 //當消費者的當前任務集已被撤銷時引發。
114                 consumer.OnPartitionsRevoked += (_, partitions) =>
115                 {
116                     Console.WriteLine("Revoked Partitions:" + partitions);
117                     // If you don't add a handler to the OnPartitionsRevoked event,the below .Unassign call happens automatically. If you do, you must call .Unassign explicitly in order for the consumer to stop consuming messages from it's previously assigned partitions.
118                     //如果您未向OnPartitionsRevoked事件添加處理程序,則下面的.Unassign調用會自動發生。 如果你為它增加了事件處理程序,你必須明確地調用.Usessign以便消費者停止從它先前分配的分區中消費消息。
119                     consumer.Unassign();
120                 };
121 
122                 //consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: " + json);
123 
124                 consumer.Subscribe(topic);
125 
126                 //Console.WriteLine("Subscribed to:" + consumer.Subscription);
127 
128                 while (!IsCancelled)
129                 {
130                     consumer.Poll(TimeSpan.FromMilliseconds(100));
131                 }
132             }
133         }
134 
135         #endregion
136 
137         #region 異步版本
138 
139         /// <summary>
140         /// 指定的組別的消費者開始消費指定主題的消息
141         /// </summary>
142         /// <param name="broker">Kafka消息服務器的地址</param>
143         /// <param name="topic">Kafka消息所屬的主題</param>
144         /// <param name="groupID">Kafka消費者所屬的組別</param>
145         /// <param name="action">可以對已經消費的消息進行相關處理</param>
146         public void ConsumeAsync(string broker, string topic, string groupID, Action<ConsumerResult> action = null)
147         {
148             if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0)
149             {
150                 throw new ArgumentNullException("Kafka消息服務器的地址不能為空!");
151             }
152 
153             if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0)
154             {
155                 throw new ArgumentNullException("消息所屬的主題不能為空!");
156             }
157 
158             if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0)
159             {
160                 throw new ArgumentNullException("用戶分組ID不能為空!");
161             }
162 
163             ThreadPool.QueueUserWorkItem(KafkaAutoCommittedOffsets, new ConsumerSetting() { Broker = broker, Topic = topic, GroupID = groupID, Action=action });
164         }
165 
166         #endregion
167 
168         #region 兩種提交Offsets的版本
169 
170         /// <summary>
171         /// Kafka消息隊列服務器自動提交offset
172         /// </summary>
173         /// <param name="state">消息消費者信息</param>
174         private void KafkaAutoCommittedOffsets(object state)
175         {
176             ConsumerSetting setting = state as ConsumerSetting;
177 
178             var config = new Dictionary<string, object>
179                 {
180                     { "bootstrap.servers", setting.Broker },
181                     { "group.id", setting.GroupID },
182                     { "enable.auto.commit", true },  // this is the default
183                     { "auto.commit.interval.ms", 5000 },
184                     { "statistics.interval.ms", 60000 },
185                     { "session.timeout.ms", 6000 },
186                     { "auto.offset.reset", "smallest" }
187                 };
188 
189             using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
190             {
191                 if (setting.Action != null)
192                 {
193                     consumer.OnMessage += (_, message) =>
194                     {
195                         ConsumerResult messageResult = new ConsumerResult();
196                         messageResult.Broker = setting.Broker;
197                         messageResult.Topic = message.Topic;
198                         messageResult.Partition = message.Partition;
199                         messageResult.Offset = message.Offset.Value;
200                         messageResult.Message = message.Value;
201 
202                         //執行外界自定義的方法
203                         setting.Action(messageResult);                        
204                     };
205                 }
206 
207                 //consumer.OnStatistics += (_, json)=> Console.WriteLine("Statistics: {json}");
208 
209                 //可以寫日志
210                 //consumer.OnError += (_, error)=> Console.WriteLine("Error:"+error);
211 
212                 //可以寫日志
213                 //consumer.OnConsumeError += (_, msg) => Console.WriteLine("Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}");
214 
215                 consumer.Subscribe(setting.Topic);
216                
217                 while (!IsCancelled)
218                 {
219                     consumer.Poll(TimeSpan.FromMilliseconds(100));
220                 }
221             }
222         }
223 
224         /// <summary>
225         /// Kafka消息隊列服務器手動提交offset
226         /// </summary>
227         /// <param name="state">消息消費者信息</param>
228         private void  KafkaManuallyCommittedOffsets(object state)
229         {
230             ConsumerSetting setting = state as ConsumerSetting;
231 
232             var config = new Dictionary<string, object>
233                 {
234                     { "bootstrap.servers", setting.Broker },
235                     { "group.id", setting.GroupID },
236                     { "enable.auto.commit", false },//不是自動提交的
237                     { "auto.commit.interval.ms", 5000 },
238                     { "statistics.interval.ms", 60000 },
239                     { "session.timeout.ms", 6000 },
240                     { "auto.offset.reset", "smallest" }
241                 };
242 
243             using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
244             {                                
245                 //可以寫日志
246                 //consumer.OnError += (_, error) => Console.WriteLine("Error:"+error);
247 
248                 //可以寫日志
249                 // Raised on deserialization errors or when a consumed message has an error != NoError.
250                 //consumer.OnConsumeError += (_, error)=> Console.WriteLine("Consume error:"+error);
251 
252                 consumer.Subscribe(setting.Topic);
253 
254                 Message<Ignore, string> message = null;
255 
256                 while (!isCancelled)
257                 {
258                     if (!consumer.Consume(out message, TimeSpan.FromMilliseconds(100)))
259                     {
260                         continue;
261                     }
262 
263                     if (setting.Action != null)
264                     {
265                         ConsumerResult messageResult = new ConsumerResult();
266                         messageResult.Broker = setting.Broker;
267                         messageResult.Topic = message.Topic;
268                         messageResult.Partition = message.Partition;
269                         messageResult.Offset = message.Offset.Value;
270                         messageResult.Message = message.Value;
271 
272                         //執行外界自定義的方法
273                         setting.Action(messageResult);
274                     }
275 
276                     if (message.Offset % 5 == 0)
277                     {
278                         var committedOffsets = consumer.CommitAsync(message).Result;
279                         //Console.WriteLine("Committed offset:"+committedOffsets);
280                     }
281                 }
282             }
283         }
284 
285         #endregion
286     }

  有了消息的消費者代碼,那消息的生產者代碼肯定是少不了的。代碼如下:

 1     /// <summary>
 2     /// Kafka消息生產者
 3     /// </summary>
 4     public sealed class KafkaProducer : KafkaBase, IKafkaProducer
 5     {
 6         /// <summary>
 7         /// 生產消息並發送消息
 8         /// </summary>
 9         /// <param name="broker">kafka的服務器地址</param>
10         /// <param name="topic">kafka的消息主題名稱</param>
11         /// <param name="message">需要傳送的消息</param>
12         public bool Produce(string broker, string topic, string message)
13         {
14             bool result = false;
15             if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0)
16             {
17                 throw new ArgumentNullException("Kafka消息服務器地址不能為空!");
18             }
19 
20             if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0)
21             {
22                 throw new ArgumentNullException("消息所屬的主題不能為空!");
23             }
24 
25             if (string.IsNullOrEmpty(message) || string.IsNullOrWhiteSpace(message) || message.Length <= 0)
26             {
27                 throw new ArgumentNullException("消息內容不能為空!");
28             }
29 
30             var config = new Dictionary<string, object> { { "bootstrap.servers", broker } };
31             using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
32             {
33                 var deliveryReport = producer.ProduceAsync(topic, null, message);
34                 deliveryReport.ContinueWith(task =>
35                 {
36                     if (task.Result.Error.Code == ErrorCode.NoError)
37                     {
38                         result = true;
39                     }
40                     //可以在控制台使用以下語句
41                     //Console.WriteLine("Producer:" + producer.Name + "\r\nTopic:" + topic + "\r\nPartition:" + task.Result.Partition + "\r\nOffset:" + task.Result.Offset + "\r\nMessage:" + task.Result.Value);
42                 });
43 
44                 producer.Flush(TimeSpan.FromSeconds(10));
45             }
46             return result;
47         }
48     }

  好了,以上就是全部代碼,大家可以整理使用。

三、總結

  繼續學習,先使用,慢慢了解內部的細節,我已經邁出了第一步,不忘初心,繼續努力。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM