基於Confluent.Kafka實現的Kafka客戶端操作類使用詳解


一、引言

      有段時間沒有寫東西了,當然不是沒得寫,還有MongoDB的系列沒有寫完呢,那個系列還要繼續。今天正好是周末,有點時間,來寫新東西吧。最近公司用了Kafka做為消息的中間件,最開始寫的那個版本不是很好,我就要來優化它,所以就抽了一些時間來研究Kafka。很多概念性的東西就不寫了,今天主要是上干貨,主要是代碼,今天就把Kafka的消費者和生產者的代碼貼出來,以供大家參考,當然這個是代碼樣板,最后我也會把地址貼出來。以后有時間我會把我自己實現的Kafka消息的生產者和消費者的代碼貼出來。好了,話不多說,言歸正傳。

      說明一點,如果想調試這里的代碼,必須引入Confluent.Kafka這個dll才可以,直接在Visual Studio 項目的 Nuget 里面可以查找,直接安裝就可以了。

二、消息的生產者(Kafka消息的Producer)

      大多數的消息中間件都包含三個部分,一個是消息的生產者,一個是存放消息的隊列,另外一個就是消息的消費者,我們就按着這個順序,我就先把消息生產者的代碼寫出來。直接上代碼,其實不是很難,里面有很多備注,只要有基本的概念理解起來還是很容易的。

     第一個版本,同步版本!

 1 using System;
 2 using System.IO;
 3 using System.Text;
 4 using System.Collections.Generic;
 5 using Confluent.Kafka;
 6 using Confluent.Kafka.Serialization;
 7 
 8 
 9 namespace Confluent.Kafka.Examples.Producer
10 {
11     public class Program
12     {
13         public static void Main(string[] args)
14         {
15             if (args.Length != 2)
16             {
17                 Console.WriteLine("Usage: .. brokerList topicName");
18                 return;
19             }
20 
21             string brokerList = args[0];
22             string topicName = args[1];
23 
24             var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } };
25 
26             using (var producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
27             {
28                 var cancelled = false;
29                 Console.CancelKeyPress += (_, e) => {
30                     e.Cancel = true; // 阻止進程退出
31                     cancelled = true;
32                 };
33 
34                 while (!cancelled)
35                 {
36                     Console.Write("> ");
37 
38                     string text;
39                     try
40                     {
41                         text = Console.ReadLine();
42                     }
43                     catch (IOException)
44                     {
45                         // IO 異常拋出的時候設置此值ConsoleCancelEventArgs.Cancel == true.
46                         break;
47                     }
48                     if (text == null)
49                     {
50                         break;
51                     }
52 
53                     string key = null;
54                     string val = text;
55 
56                     // 如果指定了鍵和值,則拆分行.
57                     int index = text.IndexOf(" ");
58                     if (index != -1)
59                     {
60                         key = text.Substring(0, index);
61                         val = text.Substring(index + 1);
62                     }
63 
64                     // 在下面的異步生產請求上調用.Result會導致它阻塞,直到它完成。 通常,您應該避免同步生成,因為這會對吞吐量產生巨大影響。對於這個交互式控制台的例子,這是我們想要的。
65                     var deliveryReport = producer.ProduceAsync(topicName, key, val).Result;
66                     Console.WriteLine(
67                         deliveryReport.Error.Code == ErrorCode.NoError
68                             ? "delivered to: "+deliveryReport.TopicPartitionOffset
69                             : "failed to deliver message: "+deliveryReport.Error.Reason
70                     );
71                 }
72 
73                 // 由於我們是同步的生產消息,此時不會有消息在傳輸並且也不需要等待消息到達的確認應答, 銷毀生產者之前我們是不需要調用 producer.Flush 方法, 就像正常使用一樣。
74             }
75         }
76     }
77 }

 

    第二個版本,異步版本,推薦使用

 1 using System;
 2 using System.IO;
 3 using System.Text;
 4 using System.Collections.Generic;
 5 using Confluent.Kafka;
 6 using Confluent.Kafka.Serialization;
 7 
 8 
 9 namespace Confluent.Kafka.Examples.Producer
10 {
11     public class Program
12     {
13         public static void Main(string[] args)
14         {
15             if (args.Length != 2)
16             {
17                 Console.WriteLine("Usage: .. brokerList topicName");
18                 return;
19             }
20 
21             string brokerList = args[0];
22             string topicName = args[1];
23             string message="我就是要傳輸的消息內容";
24 
25             //這是以異步方式生產消息的代碼實例
26             var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } };
27             using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
28             {
29                 var deliveryReport = producer.ProduceAsync(topicName, null, message);
30                 deliveryReport.ContinueWith(task =>
31                 {
32                     Console.WriteLine("Producer: "+producer.Name+"\r\nTopic: "+topicName+"\r\nPartition: "+task.Result.Partition+"\r\nOffset: "+task.Result.Offset);
33                 });
34  
35                 producer.Flush(TimeSpan.FromSeconds(10));
36            }
37         }
38     }
39 }


      好了,上面給出了兩個版本的消息生產者的代碼,一個是同步版本,第二個是異步版本的,推薦使用異步版本的代碼實現。

三、消息的消費者(Kafka消息的Consumer)

      在消息的生產者中已經說明了消息中間件的三個部分,第一個是消息的生產者,沒有消息的生產者,就沒有消息的消費者了,巧婦難為無米之炊吧。在上一節我們已經寫了消息生產者的代碼,這一節,我們主要來貼出消息消費者的代碼。代碼同樣很簡單,注釋也很全。

  1 using System;
  2 using System.Collections.Generic;
  3 using System.Linq;
  4 using System.Text;
  5 using Confluent.Kafka.Serialization;
  6 
  7 
  8 /// <summary>
  9 ///     演示如何使用Consumer客戶端.
 10 /// </summary>
 11 namespace Confluent.Kafka.Examples.Consumer
 12 {
 13     public class Program
 14     {
 15         /// <summary>
 16         //      在這個例子中:
 17         ///         - offsets 是自動提交的。
 18         ///         - consumer.Poll / OnMessage 是用於消息消費的。
 19         ///         - 沒有為輪詢循環創建(Poll)二外的線程,當然可以創建
 20         /// </summary>
 21         public static void Run_Poll(string brokerList, List<string> topics)
 22         {
 23             var config = new Dictionary<string, object>
 24             {
 25                 { "bootstrap.servers", brokerList },
 26                 { "group.id", "csharp-consumer" },
 27                 { "enable.auto.commit", true },  // 默認值
 28                 { "auto.commit.interval.ms", 5000 },
 29                 { "statistics.interval.ms", 60000 },
 30                 { "session.timeout.ms", 6000 },
 31                 { "auto.offset.reset", "smallest" }
 32             };
 33 
 34             using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
 35             {
 36                 // 注意: 所有事件處理程序的執行都是在主線程中執行的,就是同步的。
 37 
 38                 //當成功消費了消息就會觸發該事件
 39                 consumer.OnMessage += (_, msg) => Console.WriteLine("Topic: "+msg.Topic+" Partition: "+msg.Partition+" Offset: "+msg.Offset+" "+msg.Value);
 40 
 41                 consumer.OnPartitionEOF += (_, end) => Console.WriteLine("Reached end of topic "+end.Topic+" partition "+end.Partition+", next message will be at offset "+end.Offset);
 42 
 43                 //當然發生了嚴重錯誤,比如,連接丟失或者Kafka服務器無效就會觸發該事件
 44                 consumer.OnError += (_, error) => Console.WriteLine("Error: "+error);
 45 
 46                 //當反序列化有錯誤,或者消費的過程中發生了錯誤,即error != NoError,就會觸發該事件
 47                 consumer.OnConsumeError += (_, msg)
 48                     => Console.WriteLine("Error consuming from topic/partition/offset "+msg.Topic+"/"+msg.Partition+"/"+msg.Offset+": "+msg.Error);
 49 
 50                 //成功提交了Offsets會觸發該事件
 51                 consumer.OnOffsetsCommitted += (_, commit) => Console.WriteLine(commit.Error ? "Failed to commit offsets: "+commit.Error : "Successfully committed offsets: "+commit.Offsets);
 52 
 53                 // 當消費者被分配一組新的分區時觸發該事件
 54                 consumer.OnPartitionsAssigned += (_, partitions) =>
 55                 {
 56                     Console.WriteLine("Assigned partitions:"+partitions+"  "+member id: "+consumer.MemberId);
 57                     // 如果您未向OnPartitionsAssigned事件添加處理程序,則會自動執行以下.Assign調用。 如果你這樣做,你必須明確地調用.Assign以便消費者開始消費消息。
 58                     //開始從分區中消息消息
 59                     consumer.Assign(partitions);
 60                 };
 61 
 62                 // 當消費者的當前分區集已被撤銷時引發該事件。
 63                 consumer.OnPartitionsRevoked += (_, partitions) =>
 64                 {
 65                     Console.WriteLine("Revoked partitions:"+partitions);
 66                     // 如果您未向OnPartitionsRevoked事件添加處理程序,則下面的.Unassign調用會自動發生。 如果你這樣做了,你必須明確地調用.Usessign以便消費者停止從它先前分配的分區中消費消息。
 67 
 68                     //停止從分區中消費消息
 69                     consumer.Unassign();
 70                 };
 71 
 72                 consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: "+json);
 73 
 74                 consumer.Subscribe(topics);
 75 
 76                 Console.WriteLine("Subscribed to:"+consumer.Subscription);
 77 
 78                 var cancelled = false;
 79                 Console.CancelKeyPress += (_, e) => {
 80                     e.Cancel = true;  // 組織進程退出
 81                     cancelled = true;
 82                 };
 83 
 84                 Console.WriteLine("Ctrl-C to exit.");
 85                 while (!cancelled)
 86                 {
 87                     consumer.Poll(TimeSpan.FromMilliseconds(100));
 88                 }
 89             }
 90         }
 91 
 92         /// <summary>
 93         ///     在這實例中
 94         ///         - offsets 是手動提交的。
 95         ///         - consumer.Consume方法用於消費消息
 96         ///             (所有其他事件仍由事件處理程序處理)
 97         ///         -沒有為了 輪詢(消耗)循環 創建額外的線程。
 98         /// </summary>
 99         public static void Run_Consume(string brokerList, List<string> topics)
100         {
101             var config = new Dictionary<string, object>
102             {
103                 { "bootstrap.servers", brokerList },
104                 { "group.id", "csharp-consumer" },
105                 { "enable.auto.commit", false },
106                 { "statistics.interval.ms", 60000 },
107                 { "session.timeout.ms", 6000 },
108                 { "auto.offset.reset", "smallest" }
109             };
110 
111             using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
112             {
113                 // 注意:所有事件處理都是在主線程中處理的,也就是說同步的
114 
115                 consumer.OnPartitionEOF += (_, end)
116                     => Console.WriteLine("Reached end of topic "+end.Topic+" partition "+end.Partition+", next message will be at offset "+end.Offset);
117 
118                 consumer.OnError += (_, error)=> Console.WriteLine("Error: "+error);
119 
120                 // 當反序列化有錯誤,或者消費的過程中發生了錯誤,即error != NoError,就會觸發該事件
121                 consumer.OnConsumeError += (_, error)=> Console.WriteLine("Consume error: "+error);
122 
123                 // 當消費者被分配一組新的分區時觸發該事件
124                 consumer.OnPartitionsAssigned += (_, partitions) =>
125                 {
126                     Console.WriteLine("Assigned partitions:"+partitions+"  "+member id: "+consumer.MemberId);
127                     // 如果您未向OnPartitionsAssigned事件添加處理程序,則會自動執行以下.Assign調用。 如果你這樣做,你必須明確地調用.Assign以便消費者開始消費消息。
128                     //開始從分區中消息消息
129                     consumer.Assign(partitions);
130                 };
131 
132                 // 當消費者的當前分區集已被撤銷時引發該事件。
133                 consumer.OnPartitionsRevoked += (_, partitions) =>
134                 {
135                     Console.WriteLine("Revoked partitions:"+partitions);
136                     // 如果您未向OnPartitionsRevoked事件添加處理程序,則下面的.Unassign調用會自動發生。 如果你這樣做了,你必須明確地調用.Usessign以便消費者停止從它先前分配的分區中消費消息。
137 
138                     //停止從分區中消費消息
139                     consumer.Unassign();
140                 };
141 
142                 consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: "+json);
143 
144                 consumer.Subscribe(topics);
145 
146                 Console.WriteLine("Started consumer, Ctrl-C to stop consuming");
147 
148                 var cancelled = false;
149                 Console.CancelKeyPress += (_, e) => {
150                     e.Cancel = true; // 防止進程退出
151                     cancelled = true;
152                 };
153 
154                 while (!cancelled)
155                 {
156                     if (!consumer.Consume(out Message<Ignore, string> msg, TimeSpan.FromMilliseconds(100)))
157                     {
158                         continue;
159                     }
160 
161                     Console.WriteLine("Topic: "+msg.Topic+" Partition: "+msg.Partition+" Offset: "+msg.Offset+" "+msg.Value);
162 
163                     if (msg.Offset % 5 == 0)
164                     {
165                         var committedOffsets = consumer.CommitAsync(msg).Result;
166                         Console.WriteLine("Committed offset: "+committedOffsets);
167                     }
168                 }
169             }
170         }
171 
172         /// <summary>
173         ///     在這個例子中
174         ///         - 消費者組功能(即.Subscribe +offset提交)不被使用。
175         ///         - 將消費者手動分配給分區,並始終從特定偏移量(0)開始消耗。
176         /// </summary>
177         public static void Run_ManualAssign(string brokerList, List<string> topics)
178         {
179             var config = new Dictionary<string, object>
180             {
181                 // 即使您不打算使用任何使用者組功能,也必須在創建使用者時指定group.id屬性。
182                 { "group.id", new Guid().ToString() },
183                 { "bootstrap.servers", brokerList },
184                 // 即使消費者沒有訂閱該組,也可以將分區偏移量提交給一個組。 在這個例子中,自動提交被禁用以防止發生這種情況。
185                 { "enable.auto.commit", false }
186             };
187 
188             using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
189             {
190                 //總是從0開始消費
191                 consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 0, Offset.Beginning)).ToList());
192 
193                 // 引發嚴重錯誤,例如 連接失敗或所有Kafka服務器失效。
194                 consumer.OnError += (_, error) => Console.WriteLine("Error: "+error);
195 
196                 // 這個事件是由於在反序列化出現錯誤,或者在消息消息的時候出現錯誤,也就是 error != NoError 的時候引發該事件
197                 consumer.OnConsumeError += (_, error) => Console.WriteLine("Consume error: "+error);
198 
199                 while (true)
200                 {
201                     if (consumer.Consume(out Message<Ignore, string> msg, TimeSpan.FromSeconds(1)))
202                     {
203                         Console.WriteLine("Topic: "+msg.Topic+" Partition: "+msg.Partition+" Offset: "+msg.Offset+" "+msg.Value);
204                     }
205                 }
206             }
207         }
208 
209         private static void PrintUsage()=> Console.WriteLine("Usage: .. <poll|consume|manual> <broker,broker,..> <topic> [topic..]");
210 
211         public static void Main(string[] args)
212         {
213             if (args.Length < 3)
214             {
215                 PrintUsage();
216                 return;
217             }
218 
219             var mode = args[0];
220             var brokerList = args[1];
221             var topics = args.Skip(2).ToList();
222 
223             switch (mode)
224             {
225                 case "poll":
226                     Run_Poll(brokerList, topics);
227                     break;
228                 case "consume":
229                     Run_Consume(brokerList, topics);
230                     break;
231                 case "manual":
232                     Run_ManualAssign(brokerList, topics);
233                     break;
234                 default:
235                     PrintUsage();
236                     break;
237             }
238         }
239     }
240 }


      以上代碼也有兩個版本,第一個版本是自動提交Offset,第二個版本是人工提交Offset,但是代碼沒有分開寫,只是不同的版本用了不同的方法。    

四、結束

      好了,今天就寫到這里了,這是一個引子,所有代碼都是真實有效的,我已經全部測試過,所以大家可以放心使用或者改造成自己的消息的生產者和消息消費者的實現。原文的地址如下,https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples    ,內容差不多。不忘初心,繼續努力吧。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM