Kafka.net使用编程入门


最近研究分布式消息队列,分享下!

首先zookeeper  和 kafka 压缩包 解压 并配置好!

我本机zookeeper环境配置如下:

D:\Worksoftware\ApacheZookeeper3\conf\zoo.cfg

以下是kafka的配置

D:\Worksoftware\Apachekafka2.11\config\server.properties

我已经加了path环境变量,没加的话需要到zookeeper对应bin目录下执行zkServer

然后执行cmd命令:

 

结果:

 然后打开第二个dos窗口,我没加环境变量path,执行kafka命令如下:

 

 

重头戏来了,开始kafka C#客户端处理:

首先引用kafka-net.dll,可以用vs2013的nuget下载,

以下是Prorame.cs

 1 class Program  2  {  3         static void Main(string[] args)  4  {  5             const string topicName = "test";  6             var options = new KafkaOptions(new Uri("http://localhost:9092"))  7  {  8                 Log = new ConsoleLog()  9  }; 10             
11             Task.Run(() =>
12  { 13                 var consumer = new Consumer(new ConsumerOptions(topicName, new BrokerRouter(options)) { Log = new ConsoleLog() }); 14                 foreach (var data in consumer.Consume()) 15  { 16                     Console.WriteLine("Response: PartitionId={0},Offset={1} :Value={2}", data.Meta.PartitionId, data.Meta.Offset, data.Value.ToUtf8String()); 17  } 18  }); 19 
20             //创建一个生产者发消息
21             var producer = new Producer(new BrokerRouter(options)) 22  { 23                 BatchSize = 100, 24                 BatchDelayTime = TimeSpan.FromMilliseconds(2000) 25  }; 26 
27             Console.WriteLine("打出一条消息按 enter..."); 28             while (true) 29  { 30                 var message = Console.ReadLine(); 31                 if (message == "quit") break; 32 
33                 if (string.IsNullOrEmpty(message)) 34  { 35                     //
36                     SendRandomBatch(producer, topicName, 200); 37  } 38                 else
39  { 40                     producer.SendMessageAsync(topicName, new[] { new Message(message) }); 41  } 42  } 43 
44             //释放资源
45             using (producer) 46  { 47 
48  } 49  } 50         private static async void SendRandomBatch(Producer producer, string topicName, int count) 51  { 52             //发送多个消息
53             var sendTask = producer.SendMessageAsync(topicName, Enumerable.Range(0, count).Select(x => new Message(x.ToString()))); 54 
55             Console.WriteLine("传送了 #{0} messages. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount); 56 
57             var response = await sendTask; 58 
59             Console.WriteLine("已完成批量发送: {0}. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount); 60             foreach (var result in response.OrderBy(x => x.PartitionId)) 61  { 62                 Console.WriteLine("主题:{0} PartitionId:{1} Offset:{2}", result.Topic, result.PartitionId, result.Offset); 63  } 64 
65  } 66     }

结果:

闲的蛋疼,随便研究一些好东西,.net环境太封闭,每个.net程序员都要扩展视野,技术交流,本人QQ827937686


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM