一、引言
有段時間沒有寫東西了,當然不是沒得寫,還有MongoDB的系列沒有寫完呢,那個系列還要繼續。今天正好是周末,有點時間,來寫新東西吧。最近公司用了Kafka做為消息的中間件,最開始寫的那個版本不是很好,我就要來優化它,所以就抽了一些時間來研究Kafka。很多概念性的東西就不寫了,今天主要是上干貨,主要是代碼,今天就把Kafka的消費者和生產者的代碼貼出來,以供大家參考,當然這個是代碼樣板,最后我也會把地址貼出來。以后有時間我會把我自己實現的Kafka消息的生產者和消費者的代碼貼出來。好了,話不多說,言歸正傳。
說明一點,如果想調試這里的代碼,必須引入Confluent.Kafka這個dll才可以,直接在Visual Studio 項目的 Nuget 里面可以查找,直接安裝就可以了。
二、消息的生產者(Kafka消息的Producer)
大多數的消息中間件都包含三個部分,一個是消息的生產者,一個是存放消息的隊列,另外一個就是消息的消費者,我們就按着這個順序,我就先把消息生產者的代碼寫出來。直接上代碼,其實不是很難,里面有很多備注,只要有基本的概念理解起來還是很容易的。
第一個版本,同步版本!
1 using System; 2 using System.IO; 3 using System.Text; 4 using System.Collections.Generic; 5 using Confluent.Kafka; 6 using Confluent.Kafka.Serialization; 7 8 9 namespace Confluent.Kafka.Examples.Producer 10 { 11 public class Program 12 { 13 public static void Main(string[] args) 14 { 15 if (args.Length != 2) 16 { 17 Console.WriteLine("Usage: .. brokerList topicName"); 18 return; 19 } 20 21 string brokerList = args[0]; 22 string topicName = args[1]; 23 24 var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } }; 25 26 using (var producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8))) 27 { 28 var cancelled = false; 29 Console.CancelKeyPress += (_, e) => { 30 e.Cancel = true; // 阻止進程退出 31 cancelled = true; 32 }; 33 34 while (!cancelled) 35 { 36 Console.Write("> "); 37 38 string text; 39 try 40 { 41 text = Console.ReadLine(); 42 } 43 catch (IOException) 44 { 45 // IO 異常拋出的時候設置此值ConsoleCancelEventArgs.Cancel == true. 46 break; 47 } 48 if (text == null) 49 { 50 break; 51 } 52 53 string key = null; 54 string val = text; 55 56 // 如果指定了鍵和值,則拆分行. 57 int index = text.IndexOf(" "); 58 if (index != -1) 59 { 60 key = text.Substring(0, index); 61 val = text.Substring(index + 1); 62 } 63 64 // 在下面的異步生產請求上調用.Result會導致它阻塞,直到它完成。 通常,您應該避免同步生成,因為這會對吞吐量產生巨大影響。對於這個交互式控制台的例子,這是我們想要的。 65 var deliveryReport = producer.ProduceAsync(topicName, key, val).Result; 66 Console.WriteLine( 67 deliveryReport.Error.Code == ErrorCode.NoError 68 ? "delivered to: "+deliveryReport.TopicPartitionOffset 69 : "failed to deliver message: "+deliveryReport.Error.Reason 70 ); 71 } 72 73 // 由於我們是同步的生產消息,此時不會有消息在傳輸並且也不需要等待消息到達的確認應答, 銷毀生產者之前我們是不需要調用 producer.Flush 方法, 就像正常使用一樣。 74 } 75 } 76 } 77 }
第二個版本,異步版本,推薦使用
1 using System; 2 using System.IO; 3 using System.Text; 4 using System.Collections.Generic; 5 using Confluent.Kafka; 6 using Confluent.Kafka.Serialization; 7 8 9 namespace Confluent.Kafka.Examples.Producer 10 { 11 public class Program 12 { 13 public static void Main(string[] args) 14 { 15 if (args.Length != 2) 16 { 17 Console.WriteLine("Usage: .. brokerList topicName"); 18 return; 19 } 20 21 string brokerList = args[0]; 22 string topicName = args[1]; 23 string message="我就是要傳輸的消息內容"; 24 25 //這是以異步方式生產消息的代碼實例 26 var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } }; 27 using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8))) 28 { 29 var deliveryReport = producer.ProduceAsync(topicName, null, message); 30 deliveryReport.ContinueWith(task => 31 { 32 Console.WriteLine("Producer: "+producer.Name+"\r\nTopic: "+topicName+"\r\nPartition: "+task.Result.Partition+"\r\nOffset: "+task.Result.Offset); 33 }); 34 35 producer.Flush(TimeSpan.FromSeconds(10)); 36 } 37 } 38 } 39 }
好了,上面給出了兩個版本的消息生產者的代碼,一個是同步版本,第二個是異步版本的,推薦使用異步版本的代碼實現。
三、消息的消費者(Kafka消息的Consumer)
在消息的生產者中已經說明了消息中間件的三個部分,第一個是消息的生產者,沒有消息的生產者,就沒有消息的消費者了,巧婦難為無米之炊吧。在上一節我們已經寫了消息生產者的代碼,這一節,我們主要來貼出消息消費者的代碼。代碼同樣很簡單,注釋也很全。
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using Confluent.Kafka.Serialization; 6 7 8 /// <summary> 9 /// 演示如何使用Consumer客戶端. 10 /// </summary> 11 namespace Confluent.Kafka.Examples.Consumer 12 { 13 public class Program 14 { 15 /// <summary> 16 // 在這個例子中: 17 /// - offsets 是自動提交的。 18 /// - consumer.Poll / OnMessage 是用於消息消費的。 19 /// - 沒有為輪詢循環創建(Poll)二外的線程,當然可以創建 20 /// </summary> 21 public static void Run_Poll(string brokerList, List<string> topics) 22 { 23 var config = new Dictionary<string, object> 24 { 25 { "bootstrap.servers", brokerList }, 26 { "group.id", "csharp-consumer" }, 27 { "enable.auto.commit", true }, // 默認值 28 { "auto.commit.interval.ms", 5000 }, 29 { "statistics.interval.ms", 60000 }, 30 { "session.timeout.ms", 6000 }, 31 { "auto.offset.reset", "smallest" } 32 }; 33 34 using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8))) 35 { 36 // 注意: 所有事件處理程序的執行都是在主線程中執行的,就是同步的。 37 38 //當成功消費了消息就會觸發該事件 39 consumer.OnMessage += (_, msg) => Console.WriteLine("Topic: "+msg.Topic+" Partition: "+msg.Partition+" Offset: "+msg.Offset+" "+msg.Value); 40 41 consumer.OnPartitionEOF += (_, end) => Console.WriteLine("Reached end of topic "+end.Topic+" partition "+end.Partition+", next message will be at offset "+end.Offset); 42 43 //當然發生了嚴重錯誤,比如,連接丟失或者Kafka服務器無效就會觸發該事件 44 consumer.OnError += (_, error) => Console.WriteLine("Error: "+error); 45 46 //當反序列化有錯誤,或者消費的過程中發生了錯誤,即error != NoError,就會觸發該事件 47 consumer.OnConsumeError += (_, msg) 48 => Console.WriteLine("Error consuming from topic/partition/offset "+msg.Topic+"/"+msg.Partition+"/"+msg.Offset+": "+msg.Error); 49 50 //成功提交了Offsets會觸發該事件 51 consumer.OnOffsetsCommitted += (_, commit) => Console.WriteLine(commit.Error ? "Failed to commit offsets: "+commit.Error : "Successfully committed offsets: "+commit.Offsets); 52 53 // 當消費者被分配一組新的分區時觸發該事件 54 consumer.OnPartitionsAssigned += (_, partitions) => 55 { 56 Console.WriteLine("Assigned partitions:"+partitions+" "+member id: "+consumer.MemberId); 57 // 如果您未向OnPartitionsAssigned事件添加處理程序,則會自動執行以下.Assign調用。 如果你這樣做,你必須明確地調用.Assign以便消費者開始消費消息。 58 //開始從分區中消息消息 59 consumer.Assign(partitions); 60 }; 61 62 // 當消費者的當前分區集已被撤銷時引發該事件。 63 consumer.OnPartitionsRevoked += (_, partitions) => 64 { 65 Console.WriteLine("Revoked partitions:"+partitions); 66 // 如果您未向OnPartitionsRevoked事件添加處理程序,則下面的.Unassign調用會自動發生。 如果你這樣做了,你必須明確地調用.Usessign以便消費者停止從它先前分配的分區中消費消息。 67 68 //停止從分區中消費消息 69 consumer.Unassign(); 70 }; 71 72 consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: "+json); 73 74 consumer.Subscribe(topics); 75 76 Console.WriteLine("Subscribed to:"+consumer.Subscription); 77 78 var cancelled = false; 79 Console.CancelKeyPress += (_, e) => { 80 e.Cancel = true; // 組織進程退出 81 cancelled = true; 82 }; 83 84 Console.WriteLine("Ctrl-C to exit."); 85 while (!cancelled) 86 { 87 consumer.Poll(TimeSpan.FromMilliseconds(100)); 88 } 89 } 90 } 91 92 /// <summary> 93 /// 在這實例中 94 /// - offsets 是手動提交的。 95 /// - consumer.Consume方法用於消費消息 96 /// (所有其他事件仍由事件處理程序處理) 97 /// -沒有為了 輪詢(消耗)循環 創建額外的線程。 98 /// </summary> 99 public static void Run_Consume(string brokerList, List<string> topics) 100 { 101 var config = new Dictionary<string, object> 102 { 103 { "bootstrap.servers", brokerList }, 104 { "group.id", "csharp-consumer" }, 105 { "enable.auto.commit", false }, 106 { "statistics.interval.ms", 60000 }, 107 { "session.timeout.ms", 6000 }, 108 { "auto.offset.reset", "smallest" } 109 }; 110 111 using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8))) 112 { 113 // 注意:所有事件處理都是在主線程中處理的,也就是說同步的 114 115 consumer.OnPartitionEOF += (_, end) 116 => Console.WriteLine("Reached end of topic "+end.Topic+" partition "+end.Partition+", next message will be at offset "+end.Offset); 117 118 consumer.OnError += (_, error)=> Console.WriteLine("Error: "+error); 119 120 // 當反序列化有錯誤,或者消費的過程中發生了錯誤,即error != NoError,就會觸發該事件 121 consumer.OnConsumeError += (_, error)=> Console.WriteLine("Consume error: "+error); 122 123 // 當消費者被分配一組新的分區時觸發該事件 124 consumer.OnPartitionsAssigned += (_, partitions) => 125 { 126 Console.WriteLine("Assigned partitions:"+partitions+" "+member id: "+consumer.MemberId); 127 // 如果您未向OnPartitionsAssigned事件添加處理程序,則會自動執行以下.Assign調用。 如果你這樣做,你必須明確地調用.Assign以便消費者開始消費消息。 128 //開始從分區中消息消息 129 consumer.Assign(partitions); 130 }; 131 132 // 當消費者的當前分區集已被撤銷時引發該事件。 133 consumer.OnPartitionsRevoked += (_, partitions) => 134 { 135 Console.WriteLine("Revoked partitions:"+partitions); 136 // 如果您未向OnPartitionsRevoked事件添加處理程序,則下面的.Unassign調用會自動發生。 如果你這樣做了,你必須明確地調用.Usessign以便消費者停止從它先前分配的分區中消費消息。 137 138 //停止從分區中消費消息 139 consumer.Unassign(); 140 }; 141 142 consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: "+json); 143 144 consumer.Subscribe(topics); 145 146 Console.WriteLine("Started consumer, Ctrl-C to stop consuming"); 147 148 var cancelled = false; 149 Console.CancelKeyPress += (_, e) => { 150 e.Cancel = true; // 防止進程退出 151 cancelled = true; 152 }; 153 154 while (!cancelled) 155 { 156 if (!consumer.Consume(out Message<Ignore, string> msg, TimeSpan.FromMilliseconds(100))) 157 { 158 continue; 159 } 160 161 Console.WriteLine("Topic: "+msg.Topic+" Partition: "+msg.Partition+" Offset: "+msg.Offset+" "+msg.Value); 162 163 if (msg.Offset % 5 == 0) 164 { 165 var committedOffsets = consumer.CommitAsync(msg).Result; 166 Console.WriteLine("Committed offset: "+committedOffsets); 167 } 168 } 169 } 170 } 171 172 /// <summary> 173 /// 在這個例子中 174 /// - 消費者組功能(即.Subscribe +offset提交)不被使用。 175 /// - 將消費者手動分配給分區,並始終從特定偏移量(0)開始消耗。 176 /// </summary> 177 public static void Run_ManualAssign(string brokerList, List<string> topics) 178 { 179 var config = new Dictionary<string, object> 180 { 181 // 即使您不打算使用任何使用者組功能,也必須在創建使用者時指定group.id屬性。 182 { "group.id", new Guid().ToString() }, 183 { "bootstrap.servers", brokerList }, 184 // 即使消費者沒有訂閱該組,也可以將分區偏移量提交給一個組。 在這個例子中,自動提交被禁用以防止發生這種情況。 185 { "enable.auto.commit", false } 186 }; 187 188 using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8))) 189 { 190 //總是從0開始消費 191 consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 0, Offset.Beginning)).ToList()); 192 193 // 引發嚴重錯誤,例如 連接失敗或所有Kafka服務器失效。 194 consumer.OnError += (_, error) => Console.WriteLine("Error: "+error); 195 196 // 這個事件是由於在反序列化出現錯誤,或者在消息消息的時候出現錯誤,也就是 error != NoError 的時候引發該事件 197 consumer.OnConsumeError += (_, error) => Console.WriteLine("Consume error: "+error); 198 199 while (true) 200 { 201 if (consumer.Consume(out Message<Ignore, string> msg, TimeSpan.FromSeconds(1))) 202 { 203 Console.WriteLine("Topic: "+msg.Topic+" Partition: "+msg.Partition+" Offset: "+msg.Offset+" "+msg.Value); 204 } 205 } 206 } 207 } 208 209 private static void PrintUsage()=> Console.WriteLine("Usage: .. <poll|consume|manual> <broker,broker,..> <topic> [topic..]"); 210 211 public static void Main(string[] args) 212 { 213 if (args.Length < 3) 214 { 215 PrintUsage(); 216 return; 217 } 218 219 var mode = args[0]; 220 var brokerList = args[1]; 221 var topics = args.Skip(2).ToList(); 222 223 switch (mode) 224 { 225 case "poll": 226 Run_Poll(brokerList, topics); 227 break; 228 case "consume": 229 Run_Consume(brokerList, topics); 230 break; 231 case "manual": 232 Run_ManualAssign(brokerList, topics); 233 break; 234 default: 235 PrintUsage(); 236 break; 237 } 238 } 239 } 240 }
以上代碼也有兩個版本,第一個版本是自動提交Offset,第二個版本是人工提交Offset,但是代碼沒有分開寫,只是不同的版本用了不同的方法。
四、結束
好了,今天就寫到這里了,這是一個引子,所有代碼都是真實有效的,我已經全部測試過,所以大家可以放心使用或者改造成自己的消息的生產者和消息消費者的實現。原文的地址如下,https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples ,內容差不多。不忘初心,繼續努力吧。