.net core使用Kafka可以像上一篇介紹的封裝那樣使用(Kafka基礎教程(三):C#使用Kafka消息隊列),但是我還是覺得再做一層封裝比較好,同時還能使用它做一個日志收集的功能。
因為代碼比較多,所有就直接放到碼雲(Gitee)上去了,地址:https://gitee.com/shanfeng1000/dotnetcore-demo/tree/master/Kafka(為什么不是github,因為github太慢了-_-!!)
感興趣的可以克隆下來再按照自己的需求修改,這里簡單介紹一下使用的Demo(Demo基於.net core3.1的版本,其他版本可能需要自行測試)
生產者(AspNetCore.WebApi.Producer)
首選需要在ConfigureServices中添加相關依賴項:
public void ConfigureServices(IServiceCollection services) { var hosts = new string[] { "192.168.209.133:9092", "192.168.209.134:9092", "192.168.209.135:9092" }; #region 日志記錄 services.AddLogging(builder => { builder.SetMinimumLevel(LogLevel.Trace); }); services.AddKafkaLogger(options => { options.BootstrapServers = hosts; options.Category = "Home"; options.InitializeCount = 10; options.Key = "log"; options.MinLevel = LogLevel.Trace; options.Topic = "topic.logger"; options.ApplicationName = "AspNetCore.WebApi.Producer"; }); #endregion #region Kafka services.AddKafkaProducer(options => { options.BootstrapServers = hosts; options.InitializeCount = 3; options.Key = "kafka"; options.Topic = "topic.kafka"; }); #endregion ...... }
AddKafkaLogger是添加日志的相關依賴服務配置,之后使用.net core的ILogger對象記錄消息時就可以直接將消息發布到Kafka了。
AddKafkaProducer是添加Kafka發布者的相關配置,可以指定一個名稱,使用時使用IKafkaProducerFactory接口注入即可,比如在Home控制器中使用:
[ApiController] [Route("[controller]")] public class HomeController : ControllerBase { IKafkaProducerFactory kafkaProducerFactory; ILoggerFactory loggerFactory; public HomeController(IKafkaProducerFactory kafkaProducerFactory, ILoggerFactory loggerFactory) { this.kafkaProducerFactory = kafkaProducerFactory; this.loggerFactory = loggerFactory; } /// <summary> /// 發布消息 /// </summary> /// <param name="message">消息</param> /// <returns>success</returns> [HttpGet("Kafka")] public string Kafka(string message) { message = message ?? ""; var producer = kafkaProducerFactory.Create(); producer.Publish(message); return "success"; } /// <summary> /// 日志 /// </summary> /// <param name="message">消息</param> /// <returns>success</returns> [HttpGet("Logger")] public string Logger(string message) { var logger1 = loggerFactory.CreateLogger("logger"); logger1.LogTrace($"logger1(LogTrace):{message}"); logger1.LogDebug($"logger1(LogDebug):{message}"); logger1.LogInformation($"logger1(LogInformation):{message}"); logger1.LogWarning($"logger1(LogWarning):{message}"); logger1.LogError($"logger1(LogError):{message}"); logger1.LogCritical($"logger1(LogCritical):{message}"); var logger2 = loggerFactory.CreateLogger("123456"); logger2.LogTrace($"logger2(LogTrace):{message}"); logger2.LogDebug($"logger2(LogDebug):{message}"); logger2.LogInformation($"logger2(LogInformation):{message}"); logger2.LogWarning($"logger2(LogWarning):{message}"); logger2.LogError($"logger2(LogError):{message}"); logger2.LogCritical($"logger2(LogCritical):{message}"); return "success"; } }
消費者(AspNetCore.WebApi.Consumer)
首選需要在ConfigureServices中添加相關依賴項:
public void ConfigureServices(IServiceCollection services) { var hosts = new string[] { "192.168.209.133:9092", "192.168.209.134:9092", "192.168.209.135:9092" }; #region 日志記錄 services.AddKafkaConsumer(options => { options.BootstrapServers = hosts; options.EnableAutoCommit = true;//自動提交 options.GroupId = "group.1"; options.Subscribers = KafkaSubscriber.From("topic.logger"); }).AddListener(result => { Console.WriteLine("Message From topic.logger:" + result.Message); }); #endregion #region Kafka services.AddKafkaConsumer(options => { options.BootstrapServers = hosts; options.EnableAutoCommit = false; options.GroupId = "group.2"; options.Subscribers = KafkaSubscriber.From("topic.kafka"); }).AddListener(result =>//直接在lambda表達式中完成消費邏輯 { Console.WriteLine("Message From topic.kafka:" + result.Message); result.Commit(); }).AddListener<KafkaConsumerListener>();//實現IKafkaConsumerListener接口完成消費邏輯 #endregion ...... }
無論是日志的消息消費還是自定義的消息消費,都是先使用AddKafkaConsumer方法聲明Kafka消費者的配置,然后使用AddListener方法添加消息消費的處理程序,AddListener有幾個委托,可以接受一個lambda表達式,可以使用一個實現了IKafkaConsumerListener接口的類,就比如上面的KafkaConsumerListener類:
public class KafkaConsumerListener : IKafkaConsumerListener { public Task ConsumeAsync(RecieveResult recieveResult) { Console.WriteLine("KafkaConsumerListener:" + recieveResult.Message); recieveResult.Commit(); return Task.CompletedTask; } }