Kafka .net 開發入門


  Kafka安裝

  首先我們需要在windows服務器上安裝kafka以及zookeeper,有關zookeeper的介紹將會在后續進行講解。

       在網上可以找到相應的安裝方式,我采用的是騰訊雲服務器,借鑒的是https://www.cnblogs.com/lnice/p/9668750.html

       根據上面博客安裝完成后,我們在kafka中新建了一個名叫test的Topic,並新建了一個生產者和一個消費者。

  注:控制台生產者和控制台消費者的數據不同是因為我用.net開發了一個生產者所致,后面會講

  

  .net生產者  

  當前市面上比較好的.NET的kafka開源包有兩個:kafka-net和rdkafka,我采用的是RdKafka;

  新建項目后首先添加Nuget包,我的生產者源碼如下所示

 1             string brokerList = "118.24.184.36:9092";
 2             string topicName = "test";
 3 
 4 
 5             using (Producer producer = new Producer("118.24.184.36:9092"))
 6             using (Topic topic = producer.Topic(topicName))
 7             {
 8                 Console.WriteLine("{" + producer.Name + "} producing on {" + topic.Name + "}. q to exit.");
 9 
10                 string text;
11                 while ((text = Console.ReadLine()) != "q")
12                 {
13                     byte[] data = Encoding.UTF8.GetBytes(text);
14                     Task<DeliveryReport> deliveryReport = topic.Produce(data);
15                     var unused = deliveryReport.ContinueWith(task =>
16                     {
17                         Console.WriteLine("Partition: {" + task.Result.Partition + "}, Offset: {" + task.Result.Offset + "}");
18                     });
19                  }
20             }
21             

  如你所見,當我們在軟件的輸入框輸入hellow world並回車后,控制台的消費者就收到了。

  

  如上圖,控制台消費者收到了.net生產者發布的消息。

  

  .net 消費者

  我采用的還是RfKafka這個插件,采用的代碼如下

 

 1 static void Main(string[] args)
 2         {
 3             string brokerList = "118.24.184.36:9092";
 4             List<string> topic = new List<string>();
 5             topic.Add("test");
 6             Run(brokerList, topic);
 7         }
 8         public static void Run(string brokerList, List<string> topics)
 9         {
10             bool enableAutoCommit = false;
11 
12             var config = new Config()
13             {
14                 GroupId = "advanced-csharp-consumer",
15                 EnableAutoCommit = enableAutoCommit,
16                 StatisticsInterval = TimeSpan.FromSeconds(60)
17             };
18 
19             using (var consumer = new EventConsumer(config, brokerList))
20             {
21                 consumer.OnMessage += (obj, msg) =>
22                 {
23                     string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
24                     Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
25                     Console.WriteLine("1 Response: Partition {0},Offset {1} : {2}",
26                     msg.Partition, msg.Offset, text);
27                     if (!enableAutoCommit && msg.Offset % 10 == 0)
28                     {
29                         Console.WriteLine("Committing offset");
30                         consumer.Commit(msg).Wait();
31                         Console.WriteLine("Committed offset");
32                     }
33                 };
34 
35                 consumer.OnConsumerError += (obj, errorCode) =>
36                 {
37                     Console.WriteLine($"Consumer Error: {errorCode}");
38                 };
39 
40                 consumer.OnEndReached += (obj, end) =>
41                 {
42                     Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
43                 };
44 
45                 consumer.OnError += (obj, error) =>
46                 {
47                     Console.WriteLine($"Error: {error.ErrorCode} {error.Reason}");
48                 };
49 
50                 if (enableAutoCommit)
51                 {
52                     consumer.OnOffsetCommit += (obj, commit) =>
53                     {
54                         if (commit.Error != ErrorCode.NO_ERROR)
55                         {
56                             Console.WriteLine($"Failed to commit offsets: {commit.Error}");
57                         }
58                         Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
59                     };
60                 }
61 
62                 consumer.OnPartitionsAssigned += (obj, partitions) =>
63                 {
64                     Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
65                     consumer.Assign(partitions);
66                 };
67 
68                 consumer.OnPartitionsRevoked += (obj, partitions) =>
69                 {
70                     Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
71                     consumer.Unassign();
72                 };
73 
74                 consumer.OnStatistics += (obj, json) =>
75                 {
76                     Console.WriteLine($"Statistics: {json}");
77                 };
78 
79                 consumer.Subscribe(topics);
80                 consumer.Start();
81 
82                 Console.WriteLine($"Assigned to: [{string.Join(", ", consumer.Assignment)}]");
83                 Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");
84 
85                 Console.WriteLine($"Started consumer, press enter to stop consuming");
86                 Console.ReadLine();
87             }
88         }

  運行后得到如下效果,以下四個框分別是.net生產者,控制台生產者,.net消費者,控制台消費者。

  

  至此.net關於kafka的使用就講完了。

 

       這里有一篇文章分享給大家,把很多kafka的一些深入理解進行了很通俗的講解:

http://melanx.com/2019/01/07/深入淺出理解基於-kafka-和-zookeeper-的分布式消息隊列/#12kafkatopic

 

  如果你在使用上訴代碼時遇到了如下問題:

解決方案如下:將工程切到.net4.0,再將工程切到.net4.5或你需要的版本

  不要問我為什么,我是這樣解決的,否則就需要根據缺失的庫進行相關插件安裝。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM