.net6 操作kafka
使用docker部署kafka(單服務)
本人使用的docker部署kafka 具體的部署流程可以參考如下地址:
https://blog.csdn.net/steve_frank/article/details/109782212
https://www.jianshu.com/p/e642793cd5de
詳解Kafka消息隊列的兩種模式
Kafka是一種高吞吐量的分布式發布訂閱消息系統,有如下特性:
- 通過O的磁盤數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。
- 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒數百萬 [2] 的消息。
- 支持通過Kafka服務器和消費機集群來分區消息。
- 支持Hadoop並行數據加載。
Kafka通過官網發布了最新版本2.3.0
1. 點對點模式(一對一)
1)模式特點:
消息生產者生產消息發送到Queue中,然后消息消費者從Queue中取出並且消費消息。消息被消費以后,queue中不再存儲該條消息,所以消息消費者不可能消費到已經被消費的消息。Queue支持存在多個消費者,但是對一個消息而言,只會有一個消費者消費。
2)數據拉取方式:消費者主動拉取。
3)模式缺點:消息不能被重復消費。
點對點模式下包括三個角色:
消息隊列
發送者 (生產者)
接收者(消費者)
2. 發布/訂閱模式(一對多)
1)模式特點:
消息生產者(發布)將消息發布到topic中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發布到topic的消息會被所有訂閱者消費。
2)數據拉取方式:消費者主動拉取、消費者被動接受(類似微信公眾號)
3)模式缺點:當數據拉取方式為消費者被動接受時,消費者的消費速度可能跟不上生產者的生產速度。
發布/訂閱模式下包括三個角色:
角色主題(Topic)
發布者(Publisher)
訂閱者(Subscriber)
kafka更多詳細資料:https://lotabout.me/2018/kafka-introduction/
代碼實例(點對點模式)
public interface IKafkaService { /// <summary> /// 發生消息至指定主題 /// </summary> /// <typeparam name="TMessage"></typeparam> /// <param name="topicName"></param> /// <param name="message"></param> /// <returns></returns> Task PublishAsync<TMessage>(string host,string topicName, TMessage message) where TMessage : class; /// <summary> /// 從指定主題訂閱消息 /// </summary> /// <typeparam name="TMessage"></typeparam> /// <param name="topics"></param> /// <param name="messageFunc"></param> /// <param name="cancellationToken"></param> /// <returns></returns> Task SubscribeAsync<TMessage>(IEnumerable<string> topics,Action<TMessage> messageFunc,CancellationToken cancellationToken) where TMessage : class; }
//封裝
public class KafkaService : IKafkaService { public readonly string host = "120.79.77.91:9092"; public async Task PublishAsync<TMessage>(string host, string topicName, TMessage message) where TMessage : class { var config = new ProducerConfig { BootstrapServers = host }; using var producer = new ProducerBuilder<string, string>(config).Build(); var data = new Message<string, string> { Key = Guid.NewGuid().ToString(), Value = Newtonsoft.Json.JsonConvert.SerializeObject(message) }; await producer.ProduceAsync(topicName, data); } public async Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class { var config = new ConsumerConfig { BootstrapServers = host, GroupId = "consumer", EnableAutoCommit = false, StatisticsIntervalMs = 5000, SessionTimeoutMs = 6000, AutoOffsetReset = AutoOffsetReset.Earliest, EnablePartitionEof = true }; //const int commitPeriod = 5; using var consumer = new ConsumerBuilder<Ignore, string>(config) .SetErrorHandler((_, e) => { Console.WriteLine($"Error: {e.Reason}"); }) .SetStatisticsHandler((_, json) => { Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息監聽中.."); }) .SetPartitionsAssignedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($" - 分配的 kafka 分區: {partitionsStr}"); }) .SetPartitionsRevokedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($" - 回收了 kafka 的分區: {partitionsStr}"); }) .Build(); consumer.Subscribe(topics); try { while (true) { try { var consumeResult = consumer.Consume(cancellationToken); Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'."); if (consumeResult.IsPartitionEOF) { Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已經到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}."); continue; } TMessage messageResult = null; try { messageResult = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message.Value); } catch (Exception ex) { var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失敗,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}"; Console.WriteLine(errorMessage); messageResult = null; } if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/) { messageFunc(messageResult); try { consumer.Commit(consumeResult); } catch (KafkaException e) { Console.WriteLine(e.Message); } } } catch (ConsumeException e) { Console.WriteLine($"Consume error: {e.Error.Reason}"); } } } catch (OperationCanceledException) { Console.WriteLine("Closing consumer."); consumer.Close(); } await Task.CompletedTask; } }
//生產者
string[] configList = { "xxx.xx.xx.xx:9092", "sun" }; //調用接口定義的方法 KafkaService kafkaService = new KafkaService(); while (true) { var data = Console.ReadLine(); await kafkaService.PublishAsync<string>(configList.First(), configList.Last(), data); }
//消費者
KafkaService kafkaService = new KafkaService(); await kafkaService.SubscribeAsync<string>(new string[] { "sun" }, like, new CancellationToken()); static void like(string like) { Console.WriteLine($"這個是接受到的結果:{like}"); }
代碼實例(發布/訂閱模式)