通過Confluent.Kafka.dll在.net core下使用Kafka
1.在項目中安裝Confluent.Kafka的Nuget包
2.添加生產者
1 /// <summary> 2 /// Kafka消息生產者 3 /// </summary> 4 public class KafkaProducer 5 { 6 public static string brokerUrl = "localhost:9092"; 7 public static string topic = "TestTopic"; 9 private static readonly object Locker = new object(); 10 private static IProducer<string, string> _producer; 11 /// <summary> 12 /// 單例生產 13 /// </summary> 14 public KafkaProducer() 15 { 16 if (_producer == null) 17 { 18 lock (Locker) 19 { 20 if (_producer == null) 21 { 22 var config = new ProducerConfig 23 { 24 BootstrapServers = brokerUrl 25 }; 26 _producer = new ProducerBuilder<string, string>(config).Build(); 27 } 28 } 29 } 30 } 31 32 /// <summary> 33 /// 生產消息並發送消息 34 /// </summary> 35 /// <param name="key">key</param> 36 /// <param name="message">需要傳送的消息</param> 37 public static void Produce(string key, string message) 38 { 39 bool result = false; 40 new KafkaProducer(); 41 if (string.IsNullOrEmpty(message) || string.IsNullOrWhiteSpace(message) || message.Length <= 0) 42 { 43 throw new ArgumentNullException("消息內容不能為空!"); 44 } 45 _producer.Produce(topic, new Message<string, string> { Key = key, Value = message }); 46 _producer.Flush(TimeSpan.FromSeconds(10)); 47 } 48 }
外部調用Produce方法即可產生消息
參考文檔:
https://blog.csdn.net/qq_23009105/article/details/87993827(C#實現kafka消息隊列-Confluent.Kafka)
https://zhuanlan.zhihu.com/p/139101754(c#操作kafka(下)使用confluent.kafka)