Kafka.net使用編程入門


最近研究分布式消息隊列,分享下!

首先zookeeper  和 kafka 壓縮包 解壓 並配置好!

我本機zookeeper環境配置如下:

D:\Worksoftware\ApacheZookeeper3\conf\zoo.cfg

以下是kafka的配置

D:\Worksoftware\Apachekafka2.11\config\server.properties

我已經加了path環境變量,沒加的話需要到zookeeper對應bin目錄下執行zkServer

然后執行cmd命令:

 

結果:

 然后打開第二個dos窗口,我沒加環境變量path,執行kafka命令如下:

 

 

重頭戲來了,開始kafka C#客戶端處理:

首先引用kafka-net.dll,可以用vs2013的nuget下載,

以下是Prorame.cs

 1 class Program  2  {  3         static void Main(string[] args)  4  {  5             const string topicName = "test";  6             var options = new KafkaOptions(new Uri("http://localhost:9092"))  7  {  8                 Log = new ConsoleLog()  9  }; 10             
11             Task.Run(() =>
12  { 13                 var consumer = new Consumer(new ConsumerOptions(topicName, new BrokerRouter(options)) { Log = new ConsoleLog() }); 14                 foreach (var data in consumer.Consume()) 15  { 16                     Console.WriteLine("Response: PartitionId={0},Offset={1} :Value={2}", data.Meta.PartitionId, data.Meta.Offset, data.Value.ToUtf8String()); 17  } 18  }); 19 
20             //創建一個生產者發消息
21             var producer = new Producer(new BrokerRouter(options)) 22  { 23                 BatchSize = 100, 24                 BatchDelayTime = TimeSpan.FromMilliseconds(2000) 25  }; 26 
27             Console.WriteLine("打出一條消息按 enter..."); 28             while (true) 29  { 30                 var message = Console.ReadLine(); 31                 if (message == "quit") break; 32 
33                 if (string.IsNullOrEmpty(message)) 34  { 35                     //
36                     SendRandomBatch(producer, topicName, 200); 37  } 38                 else
39  { 40                     producer.SendMessageAsync(topicName, new[] { new Message(message) }); 41  } 42  } 43 
44             //釋放資源
45             using (producer) 46  { 47 
48  } 49  } 50         private static async void SendRandomBatch(Producer producer, string topicName, int count) 51  { 52             //發送多個消息
53             var sendTask = producer.SendMessageAsync(topicName, Enumerable.Range(0, count).Select(x => new Message(x.ToString()))); 54 
55             Console.WriteLine("傳送了 #{0} messages. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount); 56 
57             var response = await sendTask; 58 
59             Console.WriteLine("已完成批量發送: {0}. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount); 60             foreach (var result in response.OrderBy(x => x.PartitionId)) 61  { 62                 Console.WriteLine("主題:{0} PartitionId:{1} Offset:{2}", result.Topic, result.PartitionId, result.Offset); 63  } 64 
65  } 66     }

結果:

閑的蛋疼,隨便研究一些好東西,.net環境太封閉,每個.net程序員都要擴展視野,技術交流,本人QQ827937686


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM