一 kafka介紹
kafka是基於zookeeper的一個分布式流平台,既然是流,那么大家都能猜到它的存儲結構基本上就是線性的了。硬盤大家都知道讀寫非常的慢,那是因為在隨機情況下,線性下,硬盤的讀寫非常快。kafka官方文檔,一直拿傳統的消息隊列來和kafka對比,這樣大家會觸類旁通更快了解kafka的特性。最熟悉的消息隊列框架有ActiveMQ 和 RabbitMQ.熟悉消息隊列的,最熟悉的特性就是隊列和發布訂閱功能,因為這是大家最常用的,kafka實現了一些特有的機制,去規避傳統的消息隊列的一些瓶頸,比如並發,rabbitMQ在多個處理程序下,並不能保證執行順序,還是必須自己去處理獨占,而kafka使用consumer group的方式,實現了可以多個處理程序處理一個topic下的記錄。如圖:
每個分區的記錄保證能被每個組接受,這樣可以並發去處理一個topic的記錄,而且擴展組,則可以隨意根據應用需求去擴展你的應用程序,但是每個組的消費者不能超過分區的數量。
kafka Distribution 提供了容錯的功能,每一個partition都有一個服務器叫leader,還有零個或者一個以上的服務器叫follower,當這些follower都在同步數據的時候,leader扛起所有的寫和讀,當leader掛掉,follower會隨機選取一個服務器當leader,當然必須有幾個follower同步時 in-sync的。還有kafka雖然的那個記錄具有原子性,但是並不支持事務。
因為這一篇並不是專門講解kafka,所以點到為止。
二 擴展服務 開發
以前講過,netcore的一個很重要的特性就是支持依賴注入,在這里一切皆服務。那么如果需要kafka作為日志服務的終端,就首先需要kafka服務,下面咱們就開發一個kafka服務。
首先,服務就是需要構建,這是netcore開發服務的第一步,我們首先建立一個IKafkaBuilder.cs接口類,如下:
homusing Microsoft.Extensions.DependencyInjection; namespace Walt.Freamwork.Service { public interface IKafkaBuilder { /// <summary> /// Gets the <see cref="IServiceCollection"/> where Logging services are configured. /// </summary> IServiceCollection Services { get; } } }
再實現它,KafkaBuilder.cs
using Microsoft.Extensions.DependencyInjection; namespace Walt.Freamwork.Service { public class KafkaBuilder : IKafkaBuilder { public IServiceCollection Services {get;} public KafkaBuilder(IServiceCollection services) { Services=services; } } }
再利用擴展方法為serviceCollection類加上擴展方法:
using System; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Walt.Framework.Service.Kafka; namespace Walt.Framework.Service { public static class ServiceCollectionExtensions { /// <summary> /// Adds logging services to the specified <see cref="IServiceCollection" />. /// </summary> /// <param name="services">The <see cref="IServiceCollection" /> to add services to.</param> /// <returns>The <see cref="IServiceCollection"/> so that additional calls can be chained.</returns> public static IServiceCollection AddKafka(this IServiceCollection services) { return AddKafka(services, builder => { }); } public static IServiceCollection AddKafka(this IServiceCollection services , Action<IKafkaBuilder> configure) { if (services == null) { throw new ArgumentNullException(nameof(services)); } services.AddOptions(); configure(new KafkaBuilder(services)); services.TryAddSingleton<IKafkaService,KafkaService>(); //kafka的服務類 return services; } } }
KafkaService的實現:
using System; using System.Collections.Generic; using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.Options; namespace Walt.Framework.Service.Kafka { public class KafkaService : IKafkaService { private KafkaOptions _kafkaOptions; private Producer _producer; public KafkaService(IOptionsMonitor<KafkaOptions> kafkaOptions) { _kafkaOptions=kafkaOptions.CurrentValue; kafkaOptions.OnChange((kafkaOpt,s)=>{ _kafkaOptions=kafkaOpt; System.Diagnostics.Debug .WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(kafkaOpt)+"---"+s); }); _producer=new Producer(_kafkaOptions.Properties); } private byte[] ConvertToByte(string str) { return System.Text.Encoding.Default.GetBytes(str); } public async Task<Message> Producer(string topic,string key,string value) { if(string.IsNullOrEmpty(topic) ||string.IsNullOrEmpty(value)) { throw new ArgumentNullException("topic或者value不能為null."); } var task= await _producer.ProduceAsync(topic,ConvertToByte(key),ConvertToByte(value)); return task; } } }
那么咱們是不是忘記什么了,看上面的代碼,是不是那個配置類KafkaOptions 還沒有說明?
在這個位置添加kafka的配置類KafkaConfigurationOptions:
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Options; using Walt.Freamwork.Service; namespace Walt.Freamwork.Configuration { public class KafkaConfigurationOptions : IConfigureOptions<KafkaOptions> { private readonly IConfiguration _configuration; public KafkaConfigurationOptions(IConfiguration configuration) { _configuration=configuration; } public void Configure(KafkaOptions options) { //這里僅僅自定義一些你自己的代碼,使用上面configuration配置中的配置節,處理程序沒法自動綁定的
一些事情。 } } }
然后,將配置類添加進服務:
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Options; using Walt.Framework.Service; namespace Walt.Framework.Configuration { public static class KafkaConfigurationExtensioncs { public static IKafkaBuilder AddConfiguration(this IKafkaBuilder builder ,IConfiguration configuration) { InitService( builder,configuration); return builder; } public static void InitService(IKafkaBuilder builder,IConfiguration configuration) { builder.Services.TryAddSingleton<IConfigureOptions<KafkaOptions>>( new KafkaConfigurationOptions(configuration)); //配置類和配置內容 builder.Services.TryAddSingleton (ServiceDescriptor.Singleton<IOptionsChangeTokenSource<KafkaOptions>>( new ConfigurationChangeTokenSource<KafkaOptions>(configuration)) );//這個是觀察類,如果更改,會激發onchange方法 builder.Services .TryAddEnumerable(ServiceDescriptor.Singleton<IConfigureOptions<KafkaOptions>> (new ConfigureFromConfigurationOptions<KafkaOptions>(configuration))); //這個是option類,沒這個,配置無法將類綁定 builder.Services.AddSingleton(new KafkaConfiguration(configuration)); } } }
ok,推送nuget,業務部分調用。
三 kafka服務調用
在project中引用然后restore:
引入命名空間:
調用:
using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Walt.Framework.Log; using Walt.Framework.Configuration; using Walt.Framework.Service; namespace Walt.TestMcroServoces.Webapi { public class Program { public static void Main(string[] args) { var host = new WebHostBuilder() .ConfigureAppConfiguration((hostingContext, configContext) =>{ var en=hostingContext.HostingEnvironment; if(en.IsDevelopment()) { configContext.AddJsonFile($"appsettings.{en.EnvironmentName}.json"); } else { configContext.AddJsonFile("appsettings.json"); } configContext.AddCommandLine(args) .AddEnvironmentVariables() .SetBasePath(Directory.GetCurrentDirectory()).Build(); }).ConfigureServices((context,configureServices)=>{ configureServices.AddKafka(KafkaBuilder=>{ KafkaBuilder.AddConfiguration(context.Configuration.GetSection("KafkaService")); }); }) //kafka的調用。 .ConfigureLogging((hostingContext, logging) => { logging.AddConfiguration(hostingContext.Configuration.GetSection("Logging")) .AddCustomizationLogger(); }).UseKestrel(KestrelServerOption=>{ KestrelServerOption.ListenAnyIP(801); }) .UseStartup<Startup>().Build(); host.Run(); Console.ReadKey(); } } }
然后提交git,讓jenkins構建docker發布運行:
jenkin是是非常牛的一款構建工具,不僅僅根據插件可以擴展不同環境,還支持分布式構建.
這是我們用jenikins構建的的:
讓它跑起來:
調用看看:
這個方法是輸出Properties數組的,這個配置結構只是演示,后面的結構要變,因為要放kafka的配置,比如連接服務ip等,
改動也很簡單,在配置好configuration和service后,改動這個類KafkaOptions和配置文件中kafka節點中的json結構就行。:
四 集成kafka
kafka的接口不多,看看都有那些:
https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Producer.html
Consumer和Producer是咱們發布消息和消費消息的兩個主類,代碼在上文已經實現的service。
客戶端代碼:
使用my-replicated-topic-morepart這兒topic,還是希望多分區,因為后面consumer使用分布式計算讀取。
consumer先在客戶端監聽:
product端的調用代碼:
執行這個接口后,再看consumer接收到的消息:
最后一步,將咱們kafka日志部分替換為真實的kafka環境,看結果:
那么最后的配置是這樣的:
{ "Logging": { "LogLevel": { "Default": "Debug", "System": "Debug", "Microsoft": "Debug" }, "KafkaLog":{ "Prix":"這是我的自定義日志提供程序" } }, "KafkaService":{ "Properties":{ "bootstrap.servers":"192.168.249.106:9092" } } }
log使用這個kafka服務就很簡單了,在前面文章中實現的log擴展類中,直接構造函數注入這個kafkaService,就可以以使用了。