Kafka基礎教程(四):.net core集成使用Kafka消息隊列


  .net core使用Kafka可以像上一篇介紹的封裝那樣使用(Kafka基礎教程(三):C#使用Kafka消息隊列),但是我還是覺得再做一層封裝比較好,同時還能使用它做一個日志收集的功能。

  因為代碼比較多,所有就直接放到碼雲(Gitee)上去了,地址:https://gitee.com/shanfeng1000/dotnetcore-demo/tree/master/Kafka(為什么不是github,因為github太慢了-_-!!)

  感興趣的可以克隆下來再按照自己的需求修改,這里簡單介紹一下使用的Demo(Demo基於.net core3.1的版本,其他版本可能需要自行測試)

  生產者(AspNetCore.WebApi.Producer)

  首選需要在ConfigureServices中添加相關依賴項:  

    public void ConfigureServices(IServiceCollection services)
    {
        var hosts = new string[] { "192.168.209.133:9092", "192.168.209.134:9092", "192.168.209.135:9092" };

        #region 日志記錄

        services.AddLogging(builder =>
        {
            builder.SetMinimumLevel(LogLevel.Trace);
        });
        services.AddKafkaLogger(options =>
        {
            options.BootstrapServers = hosts;
            options.Category = "Home";
            options.InitializeCount = 10;
            options.Key = "log";
            options.MinLevel = LogLevel.Trace;
            options.Topic = "topic.logger";
            options.ApplicationName = "AspNetCore.WebApi.Producer";
        });

        #endregion

        #region Kafka

        services.AddKafkaProducer(options =>
        {
            options.BootstrapServers = hosts;
            options.InitializeCount = 3;
            options.Key = "kafka";
            options.Topic = "topic.kafka";
        });

        #endregion

        ......
    }

  AddKafkaLogger是添加日志的相關依賴服務配置,之后使用.net core的ILogger對象記錄消息時就可以直接將消息發布到Kafka了。

  AddKafkaProducer是添加Kafka發布者的相關配置,可以指定一個名稱,使用時使用IKafkaProducerFactory接口注入即可,比如在Home控制器中使用:  

    [ApiController]
    [Route("[controller]")]
    public class HomeController : ControllerBase
    {
        IKafkaProducerFactory kafkaProducerFactory;
        ILoggerFactory loggerFactory;

        public HomeController(IKafkaProducerFactory kafkaProducerFactory, ILoggerFactory loggerFactory)
        {
            this.kafkaProducerFactory = kafkaProducerFactory;
            this.loggerFactory = loggerFactory;
        }

        /// <summary>
        /// 發布消息
        /// </summary>
        /// <param name="message">消息</param>
        /// <returns>success</returns>
        [HttpGet("Kafka")]
        public string Kafka(string message)
        {
            message = message ?? "";
            var producer = kafkaProducerFactory.Create();
            producer.Publish(message);

            return "success";
        }
        /// <summary>
        /// 日志
        /// </summary>
        /// <param name="message">消息</param>
        /// <returns>success</returns>
        [HttpGet("Logger")]
        public string Logger(string message)
        {
            var logger1 = loggerFactory.CreateLogger("logger");
            logger1.LogTrace($"logger1(LogTrace):{message}");
            logger1.LogDebug($"logger1(LogDebug):{message}");
            logger1.LogInformation($"logger1(LogInformation):{message}");
            logger1.LogWarning($"logger1(LogWarning):{message}");
            logger1.LogError($"logger1(LogError):{message}");
            logger1.LogCritical($"logger1(LogCritical):{message}");

            var logger2 = loggerFactory.CreateLogger("123456");
            logger2.LogTrace($"logger2(LogTrace):{message}");
            logger2.LogDebug($"logger2(LogDebug):{message}");
            logger2.LogInformation($"logger2(LogInformation):{message}");
            logger2.LogWarning($"logger2(LogWarning):{message}");
            logger2.LogError($"logger2(LogError):{message}");
            logger2.LogCritical($"logger2(LogCritical):{message}");

            return "success";
        }
    }

 

  消費者(AspNetCore.WebApi.Consumer)

   首選需要在ConfigureServices中添加相關依賴項: 

    public void ConfigureServices(IServiceCollection services)
    {
        var hosts = new string[] { "192.168.209.133:9092", "192.168.209.134:9092", "192.168.209.135:9092" };

        #region 日志記錄

        services.AddKafkaConsumer(options =>
        {
            options.BootstrapServers = hosts;
            options.EnableAutoCommit = true;//自動提交
            options.GroupId = "group.1";
            options.Subscribers = KafkaSubscriber.From("topic.logger");

        }).AddListener(result =>
        {
            Console.WriteLine("Message From topic.logger:" + result.Message);
        });

        #endregion

        #region Kafka

        services.AddKafkaConsumer(options =>
        {
            options.BootstrapServers = hosts;
            options.EnableAutoCommit = false;
            options.GroupId = "group.2";
            options.Subscribers = KafkaSubscriber.From("topic.kafka");

        }).AddListener(result =>//直接在lambda表達式中完成消費邏輯
        {
            Console.WriteLine("Message From topic.kafka:" + result.Message);
            result.Commit();
        }).AddListener<KafkaConsumerListener>();//實現IKafkaConsumerListener接口完成消費邏輯

        #endregion

        ......
    }

  無論是日志的消息消費還是自定義的消息消費,都是先使用AddKafkaConsumer方法聲明Kafka消費者的配置,然后使用AddListener方法添加消息消費的處理程序,AddListener有幾個委托,可以接受一個lambda表達式,可以使用一個實現了IKafkaConsumerListener接口的類,就比如上面的KafkaConsumerListener類:  

    public class KafkaConsumerListener : IKafkaConsumerListener
    {
        public Task ConsumeAsync(RecieveResult recieveResult)
        {
            Console.WriteLine("KafkaConsumerListener:" + recieveResult.Message);
            recieveResult.Commit();
            return Task.CompletedTask;
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM