首先說明一點,像Confluent.Kafka這種開源的組件,三天兩頭的更新。在搜索引擎搜索到的結果往往用不了,浪費時間。建議以后遇到類似的情況直接看官網給的Demo。
因為搜索引擎搜到的文章,作者基本上都沒有說明用的是哪個版本的dll。所以你nuget安裝了后,不一定能使用。
截止目前,我用的Confluent.Kafka是最新版本:1.2.1。
GitHub上源碼地址:https://github.com/confluentinc/confluent-kafka-dotnet,上面附有生產和消費的示例。直接去看吧。往下就不要看了,是我自己用到的,只是方便我自己查看。
生產:
static async void Produce() { var config = new ProducerConfig { BootstrapServers = "192.168.3.250:9092" }; using (var p = new ProducerBuilder<Null, string>(config).Build()) { try { var dr = await p.ProduceAsync("mytopic", new Message<Null, string> { Value = "test" }); Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'"); } catch (ProduceException<Null, string> e) { Console.WriteLine($"Delivery failed: {e.Error.Reason}"); } } }
消費:
static async void Consume() { var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.3.250:9092", AutoOffsetReset = AutoOffsetReset.Earliest }; using (var c = new ConsumerBuilder<Ignore, string>(conf).Build()) { c.Subscribe("mytopic"); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; try { while (true) { try { var cr = c.Consume(cts.Token); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { c.Close(); } } }