最近研究分布式消息隊列,分享下!
首先zookeeper 和 kafka 壓縮包 解壓 並配置好!
我本機zookeeper環境配置如下:
D:\Worksoftware\ApacheZookeeper3\conf\zoo.cfg
以下是kafka的配置
D:\Worksoftware\Apachekafka2.11\config\server.properties
我已經加了path環境變量,沒加的話需要到zookeeper對應bin目錄下執行zkServer
然后執行cmd命令:
結果:
然后打開第二個dos窗口,我沒加環境變量path,執行kafka命令如下:
重頭戲來了,開始kafka C#客戶端處理:
首先引用kafka-net.dll,可以用vs2013的nuget下載,
以下是Prorame.cs
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 const string topicName = "test"; 6 var options = new KafkaOptions(new Uri("http://localhost:9092")) 7 { 8 Log = new ConsoleLog() 9 }; 10
11 Task.Run(() =>
12 { 13 var consumer = new Consumer(new ConsumerOptions(topicName, new BrokerRouter(options)) { Log = new ConsoleLog() }); 14 foreach (var data in consumer.Consume()) 15 { 16 Console.WriteLine("Response: PartitionId={0},Offset={1} :Value={2}", data.Meta.PartitionId, data.Meta.Offset, data.Value.ToUtf8String()); 17 } 18 }); 19
20 //創建一個生產者發消息
21 var producer = new Producer(new BrokerRouter(options)) 22 { 23 BatchSize = 100, 24 BatchDelayTime = TimeSpan.FromMilliseconds(2000) 25 }; 26
27 Console.WriteLine("打出一條消息按 enter..."); 28 while (true) 29 { 30 var message = Console.ReadLine(); 31 if (message == "quit") break; 32
33 if (string.IsNullOrEmpty(message)) 34 { 35 //
36 SendRandomBatch(producer, topicName, 200); 37 } 38 else
39 { 40 producer.SendMessageAsync(topicName, new[] { new Message(message) }); 41 } 42 } 43
44 //釋放資源
45 using (producer) 46 { 47
48 } 49 } 50 private static async void SendRandomBatch(Producer producer, string topicName, int count) 51 { 52 //發送多個消息
53 var sendTask = producer.SendMessageAsync(topicName, Enumerable.Range(0, count).Select(x => new Message(x.ToString()))); 54
55 Console.WriteLine("傳送了 #{0} messages. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount); 56
57 var response = await sendTask; 58
59 Console.WriteLine("已完成批量發送: {0}. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount); 60 foreach (var result in response.OrderBy(x => x.PartitionId)) 61 { 62 Console.WriteLine("主題:{0} PartitionId:{1} Offset:{2}", result.Topic, result.PartitionId, result.Offset); 63 } 64
65 } 66 }
結果:
閑的蛋疼,隨便研究一些好東西,.net環境太封閉,每個.net程序員都要擴展視野,技術交流,本人QQ827937686