asp.net core mcroservices 架構之 分布式日志(三):集成kafka


 

  一 kafka介紹                               

           kafka是基於zookeeper的一個分布式流平台,既然是流,那么大家都能猜到它的存儲結構基本上就是線性的了。硬盤大家都知道讀寫非常的慢,那是因為在隨機情況下,線性下,硬盤的讀寫非常快。kafka官方文檔,一直拿傳統的消息隊列來和kafka對比,這樣大家會觸類旁通更快了解kafka的特性。最熟悉的消息隊列框架有ActiveMQRabbitMQ.熟悉消息隊列的,最熟悉的特性就是隊列和發布訂閱功能,因為這是大家最常用的,kafka實現了一些特有的機制,去規避傳統的消息隊列的一些瓶頸,比如並發,rabbitMQ在多個處理程序下,並不能保證執行順序,還是必須自己去處理獨占,而kafka使用consumer group的方式,實現了可以多個處理程序處理一個topic下的記錄。如圖:

image

每個分區的記錄保證能被每個組接受,這樣可以並發去處理一個topic的記錄,而且擴展組,則可以隨意根據應用需求去擴展你的應用程序,但是每個組的消費者不能超過分區的數量。

kafka Distribution 提供了容錯的功能,每一個partition都有一個服務器叫leader,還有零個或者一個以上的服務器叫follower,當這些follower都在同步數據的時候,leader扛起所有的寫和讀,當leader掛掉,follower會隨機選取一個服務器當leader,當然必須有幾個follower同步時 in-sync的。還有kafka雖然的那個記錄具有原子性,但是並不支持事務。

因為這一篇並不是專門講解kafka,所以點到為止。

      擴展服務 開發                          

     以前講過,netcore的一個很重要的特性就是支持依賴注入,在這里一切皆服務。那么如果需要kafka作為日志服務的終端,就首先需要kafka服務,下面咱們就開發一個kafka服務。

首先,服務就是需要構建,這是netcore開發服務的第一步,我們首先建立一個IKafkaBuilder.cs接口類,如下:

 

homusing Microsoft.Extensions.DependencyInjection;

namespace Walt.Freamwork.Service
{
    public interface IKafkaBuilder
    {
         /// <summary>
        /// Gets the <see cref="IServiceCollection"/> where Logging services are configured.
        /// </summary>
        IServiceCollection Services { get; }
    }
}

再實現它,KafkaBuilder.cs

using Microsoft.Extensions.DependencyInjection;

namespace Walt.Freamwork.Service
{
    public class KafkaBuilder : IKafkaBuilder
    {
        public IServiceCollection Services {get;}

        public KafkaBuilder(IServiceCollection services)
        {
            Services=services;
        }
    }
}

再利用擴展方法為serviceCollection類加上擴展方法:

 using System;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Walt.Framework.Service.Kafka;

namespace Walt.Framework.Service
{
  
  
    public static class ServiceCollectionExtensions
    {
        /// <summary>
        /// Adds logging services to the specified <see cref="IServiceCollection" />.
        /// </summary>
        /// <param name="services">The <see cref="IServiceCollection" /> to add services to.</param>
        /// <returns>The <see cref="IServiceCollection"/> so that additional calls can be chained.</returns>
        public static IServiceCollection AddKafka(this IServiceCollection services)
        {
            return AddKafka(services, builder => { });
        }
 
        public static IServiceCollection AddKafka(this IServiceCollection services
        , Action<IKafkaBuilder> configure)
        {
            if (services == null)
            {
                throw new ArgumentNullException(nameof(services));
            }

            services.AddOptions(); 
            configure(new KafkaBuilder(services));
            services.TryAddSingleton<IKafkaService,KafkaService>();  //kafka的服務類             return services;
        }
    }
}
KafkaService的實現:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Options;

namespace  Walt.Framework.Service.Kafka
{
    public class KafkaService : IKafkaService
    {

        private KafkaOptions _kafkaOptions;
        private Producer _producer;
        public KafkaService(IOptionsMonitor<KafkaOptions>  kafkaOptions)
        {
            _kafkaOptions=kafkaOptions.CurrentValue; 
            kafkaOptions.OnChange((kafkaOpt,s)=>{
                _kafkaOptions=kafkaOpt; 
                    System.Diagnostics.Debug
                    .WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(kafkaOpt)+"---"+s);
                    
            });
             _producer=new Producer(_kafkaOptions.Properties);
        }

        private byte[] ConvertToByte(string str)
        {
            return System.Text.Encoding.Default.GetBytes(str);
        }
 
        public  async Task<Message> Producer(string topic,string key,string value)
        {  
            if(string.IsNullOrEmpty(topic)
            ||string.IsNullOrEmpty(value))
            {
                throw new ArgumentNullException("topic或者value不能為null.");
            }
      
           var task=  await _producer.ProduceAsync(topic,ConvertToByte(key),ConvertToByte(value)); 
           return task;
        }
 
    }
}

 

那么咱們是不是忘記什么了,看上面的代碼,是不是那個配置類KafkaOptions 還沒有說明

image這個位置添加kafka的配置類KafkaConfigurationOptions

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
using Walt.Freamwork.Service;

namespace Walt.Freamwork.Configuration
{
    public class KafkaConfigurationOptions : IConfigureOptions<KafkaOptions>
    {

        private readonly IConfiguration _configuration;


        public KafkaConfigurationOptions(IConfiguration configuration)
        {
           _configuration=configuration;
        }


        public void Configure(KafkaOptions options)
        {
                //這里僅僅自定義一些你自己的代碼,使用上面configuration配置中的配置節,處理程序沒法自動綁定的
                  一些事情。
        }
    }
}

然后,將配置類添加進服務:

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using Walt.Framework.Service;

namespace Walt.Framework.Configuration
{
    public static class KafkaConfigurationExtensioncs
    {
          public static IKafkaBuilder AddConfiguration(this IKafkaBuilder builder
          ,IConfiguration configuration)
          {
               
                InitService( builder,configuration); 
                return builder;
          }


          public static void InitService(IKafkaBuilder builder,IConfiguration configuration)
          {
            builder.Services.TryAddSingleton<IConfigureOptions<KafkaOptions>>(
                  new KafkaConfigurationOptions(configuration));  //配置類和配置內容 
            builder.Services.TryAddSingleton
            (ServiceDescriptor.Singleton<IOptionsChangeTokenSource<KafkaOptions>>(
                  new ConfigurationChangeTokenSource<KafkaOptions>(configuration)) );//這個是觀察類,如果更改,會激發onchange方法

            builder.Services
            .TryAddEnumerable(ServiceDescriptor.Singleton<IConfigureOptions<KafkaOptions>>
            (new ConfigureFromConfigurationOptions<KafkaOptions>(configuration))); //這個是option類,沒這個,配置無法將類綁定
            
             builder.Services.AddSingleton(new KafkaConfiguration(configuration));
          }
    }
} 

 

ok,推送nuget,業務部分調用。

      kafka服務調用                          

在project中引用然后restore:

image

引入命名空間:

image

調用:

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging; 
using Newtonsoft.Json;
using Walt.Framework.Log;
using Walt.Framework.Configuration;
using Walt.Framework.Service;

namespace Walt.TestMcroServoces.Webapi
{
    public class Program
    { 
        public static void Main(string[] args)
        { 
             
            var host = new WebHostBuilder()
            .ConfigureAppConfiguration((hostingContext, configContext) =>{
                 var en=hostingContext.HostingEnvironment;
                 if(en.IsDevelopment())
                 {
                     configContext.AddJsonFile($"appsettings.{en.EnvironmentName}.json");
                 }
                 else
                 {
                     configContext.AddJsonFile("appsettings.json");
                 }
                   configContext.AddCommandLine(args)
             .AddEnvironmentVariables()
             .SetBasePath(Directory.GetCurrentDirectory()).Build(); 
              
            }).ConfigureServices((context,configureServices)=>{
                   configureServices.AddKafka(KafkaBuilder=>{
                    KafkaBuilder.AddConfiguration(context.Configuration.GetSection("KafkaService")); }); }) //kafka的調用。
            .ConfigureLogging((hostingContext, logging) => {
 
                logging.AddConfiguration(hostingContext.Configuration.GetSection("Logging"))
                .AddCustomizationLogger();

            }).UseKestrel(KestrelServerOption=>{
                KestrelServerOption.ListenAnyIP(801);
            })
            .UseStartup<Startup>().Build(); 
            host.Run(); 
            Console.ReadKey();
        }
    }

}

 

然后提交git,讓jenkins構建docker發布運行:

jenkin是是非常牛的一款構建工具,不僅僅根據插件可以擴展不同環境,還支持分布式構建.

 

image

這是我們用jenikins構建的的:

image

讓它跑起來:

image

調用看看:

image

這個方法是輸出Properties數組的,這個配置結構只是演示,后面的結構要變,因為要放kafka的配置,比如連接服務ip等,

改動也很簡單,在配置好configuration和service后,改動這個類KafkaOptions和配置文件中kafka節點中的json結構就行。

image

 

 

  四 集成kafka                         

kafka的接口不多,看看都有那些:

https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Producer.html

image

ConsumerProducer是咱們發布消息和消費消息的兩個主類,代碼在上文已經實現的service

客戶端代碼:

使用my-replicated-topic-morepart這兒topic,還是希望多分區,因為后面consumer使用分布式計算讀取。

image

consumer先在客戶端監聽:

image

product端的調用代碼:

image

執行這個接口后,再看consumer接收到的消息:

image

最后一步,將咱們kafka日志部分替換為真實的kafka環境,看結果:

image

那么最后的配置是這樣的:

 

{
  "Logging": {
    "LogLevel": {
      "Default": "Debug",
      "System": "Debug",
      "Microsoft": "Debug"
    },
    "KafkaLog":{
      "Prix":"這是我的自定義日志提供程序"
    }
  },
  "KafkaService":{
    "Properties":{
      "bootstrap.servers":"192.168.249.106:9092" } }
}

 

log使用這個kafka服務就很簡單了,在前面文章中實現的log擴展類中,直接構造函數注入這個kafkaService,就可以以使用了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM