kafka日志同步至elasticsearch和kibana展示
一 kafka consumer准備
前面的章節進行了分布式job的自動計算的概念講解以及實踐。上次分布式日志說過日志寫進kafka,是需要進行處理,以便合理的進行展示,分布式日志的量和我們對日志的重視程度,決定了我們必須要有一個大數據檢索,和友好展示的需求。那么自然就是elasticsearch和kibana,elasticsearch是可以檢索TB級別數據的一個分布式NOSQL數據庫,而kibana,不僅僅可以展示詳情,而且有針對不同展示需求的功能,並且定制了很多很多日志格式的模板和采集數據的插件,這里不多介紹了,我自己感覺是比percona的pmm強大很多。
書歸正傳,我們這一節是要做同步前的准備工作。第一,對kafka的consumer進行封裝。第二,讀取kafka數據是需要一個后台程序去處理,但是不需要job,我們上次做的框架是基於zookeeper的分布式job,而kafka的分布式是在服務器端的,當然將job分布式設計方案用在輪詢或者阻塞方式的后台程序,也是可以的,但是這次就不講解了。下面我們就將kafka分布式的原理分析下,kafka的客戶端有一個組的概念,borker端有一個topic的概念,product在發送消息的時候,會有一個key值。因為kafka存數據就是以key-value的方式存儲數據的,所以broker就是用product傳遞過來的這個key進行運算,合理的將數據存儲到某個topic的某個分區。而consumer端訂閱topic,可以訂閱多個topic,它的分派是這樣的,每一個topic下的分區會有多個consuer,但是這些consumer必須屬於不同的組,而每一個consumer可以訂閱多個topic下的分區,但是不能重復。下面看圖吧,以我們這次實際的日志為例,在kafka中mylog topic有5個分區。

那么如果我們有三個程序需要用這個mylog topic怎么辦?而且我們需要很快的處理完這個數據,所以有可能這三個程序每一個程序都要兩台服務器。想着都很頭大,對吧?當然如果有我們前面講解的分布式job也可以處理,但是要把分布式的功能遷移到這個后台程序,避免不了又大動干戈,開發,調試,測試,修改bug,直到程序穩定,那又是一場苦功。但是在kafka這里,不用擔心,三個程序,比如訂單,庫存,顧客,我們為這三個程序的kafka客戶端對應的設置為三個組,每一個組中consumer數量只要不超過5個,假如訂單需要用到名為mylog的topic的消息,只要訂單處理這個topic的實例數量,必須不能超過5個,當然可以少於5個,也可以等於0個。而同時一個consumer又可以去訂閱多個topic,這也是kafka可以媲美rabbit的重要的一個原因,先天支持並發和擴展。我們看圖:

如果一個組的consumer數量沒有topic的分區多,kafka會自動分派給這個組的consumer,如果某一個consumer失敗,kafka也會自動的將這個consumer的offset記錄並且分派給另外一個consumer。
但是要注意一點,kafka的topic中的每個分區是線性的,但是所有的分區看起來就不會是線性的,如果需要topic是線性的,就必須將分區設置為1個。
下面看看我們封裝的kafka客戶端方法:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Options;
namespace Walt.Framework.Service.Kafka
{
public class KafkaService : IKafkaService
{
private KafkaOptions _kafkaOptions;
private Producer _producer;
private Consumer _consumer;
public Action<Message> GetMessageDele{ get; set; }
public Action<Error> ErrorDele{ get; set; }
public Action<LogMessage> LogDele{ get; set; }
public KafkaService(IOptionsMonitor<KafkaOptions> kafkaOptions)
{
_kafkaOptions=kafkaOptions.CurrentValue;
kafkaOptions.OnChange((kafkaOpt,s)=>{
_kafkaOptions=kafkaOpt;
System.Diagnostics.Debug
.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(kafkaOpt)+"---"+s);
});
_producer=new Producer(_kafkaOptions.Properties);
_consumer=new Consumer(_kafkaOptions.Properties);
}
private byte[] ConvertToByte(string str)
{
return System.Text.Encoding.Default.GetBytes(str);
}
public async Task<Message> Producer<T>(string topic,string key,T t)
{
if(string.IsNullOrEmpty(topic)
|| t==null)
{
throw new ArgumentNullException("topic或者value不能為null.");
}
string data = Newtonsoft.Json.JsonConvert.SerializeObject(t);
var task= await _producer.ProduceAsync(topic,ConvertToByte(key),ConvertToByte(data));
return task;
}
public void AddProductEvent()
{
_producer.OnError+=new EventHandler<Error>(Error);
_producer.OnLog+=new EventHandler<LogMessage>(Log);
}
///以事件的方式獲取message
public void AddConsumerEvent(IEnumerable<string> topics)
{
_consumer.Subscribe(topics);
_consumer.OnMessage += new EventHandler<Message>(GetMessage);
_consumer.OnError += new EventHandler<Error>(Error);
_consumer.OnLog += new EventHandler<LogMessage>(Log);
}
private void GetMessage(object sender, Message mess)
{
if(GetMessageDele!=null)
{
GetMessageDele(mess);
}
}
private void Error(object sender, Error mess)
{
if(ErrorDele!=null)
{
ErrorDele(mess);
}
}
private void Log(object sender, LogMessage mess)
{
if(LogDele!=null)
{
LogDele(mess);
}
}
//以輪詢的方式獲取message
public Message Poll(int timeoutMilliseconds)
{
Message message =default(Message);
_consumer.Consume(out message, timeoutMilliseconds);
return message;
}
}
}
以事件激發的方式,因為是線程安全的方式調用,而本實例是后台方式執行,少不了多線程,所以還是以輪詢的方式。以輪詢的方式,這樣的程序需要放那塊尼?就是我們的后台程序框架。
二 后台程序管理框架開發

他的原理和job幾乎差不多,比job要簡單多了。看入口程序:
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using EnvironmentName = Microsoft.Extensions.Hosting.EnvironmentName;
using Walt.Framework.Log;
using Walt.Framework.Service;
using Walt.Framework.Service.Kafka;
using Walt.Framework.Configuration;
using MySql.Data.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
using System.Threading;
using IApplicationLife =Microsoft.Extensions.Hosting;
using IApplicationLifetime = Microsoft.Extensions.Hosting.IApplicationLifetime;
namespace Walt.Framework.Console
{
public class Program
{
public static void Main(string[] args)
{
//這里獲取程序及和工作線程配置信息
Dictionary<string, Assembly> assmblyColl = new Dictionary<string, Assembly>();
var host = new HostBuilder()
.UseEnvironment(EnvironmentName.Development)
.ConfigureAppConfiguration((hostContext, configApp) =>
{
//這里netcore支持多數據源,所以可以擴展到數據庫或者redis,集中進行配置。
//
configApp.SetBasePath(Directory.GetCurrentDirectory());
configApp.AddJsonFile(
$"appsettings.{hostContext.HostingEnvironment.EnvironmentName}.json",
optional: true);
configApp.AddEnvironmentVariables("PREFIX_");
configApp.AddCommandLine(args);
}).ConfigureLogging((hostContext, configBuild) =>
{
configBuild.AddConfiguration(hostContext.Configuration.GetSection("Logging"));
configBuild.AddConsole();
configBuild.AddCustomizationLogger();
})
.ConfigureServices((hostContext, service) =>
{
service.Configure<HostOptions>(option =>
{
option.ShutdownTimeout = System.TimeSpan.FromSeconds(10);
});
service.AddKafka(KafkaBuilder =>
{
KafkaBuilder.AddConfiguration(hostContext.Configuration.GetSection("KafkaService"));
});
service.AddElasticsearchClient(config=>{
config.AddConfiguration(hostContext.Configuration.GetSection("ElasticsearchService"));
});
service.AddDbContext<ConsoleDbContext>(option =>
option.UseMySQL(hostContext.Configuration.GetConnectionString("ConsoleDatabase")), ServiceLifetime.Transient, ServiceLifetime.Transient);
///TODO 待實現從數據庫中pull數據,再將任務添加進DI
service.AddSingleton<IConsole,KafkaToElasticsearch>();
})
.Build();
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken token = source.Token;
var task=Task.Run(async () =>{
IConsole console = host.Services.GetService<IConsole>();
await console.AsyncExcute(source.Token);
},source.Token);
Dictionary<string, Task> dictTask = new Dictionary<string, Task>();
dictTask.Add("kafkatoelasticsearch", task);
int recordRunCount = 0;
var fact = host.Services.GetService<ILoggerFactory>();
var log = fact.CreateLogger<Program>();
var disp = Task.Run(() =>
{
while (true)
{
if (!token.IsCancellationRequested)
{
++recordRunCount;
foreach (KeyValuePair<string, Task> item in dictTask)
{
if (item.Value.IsCanceled
|| item.Value.IsCompleted
|| item.Value.IsCompletedSuccessfully
|| item.Value.IsFaulted)
{
log.LogWarning("console任務:{0},參數:{1},執行異常,task狀態:{2}", item.Key, "", item.Value.Status);
if (item.Value.Exception != null)
{
log.LogError(item.Value.Exception, "task:{0},參數:{1},執行錯誤.", item.Key, "");
//TODO 根據參數更新數據庫狀態,以便被監控到。
}
//更新數據庫狀態。
}
}
}
System.Threading.Thread.Sleep(2000);
log.LogInformation("循環:{0}次,接下來等待2秒。", recordRunCount);
}
},source.Token);
IApplicationLifetime appLiftTime = host.Services.GetService<IApplicationLifetime>();
appLiftTime.ApplicationStopping.Register(()=>{
log.LogInformation("程序停止中。");
source.Cancel();
log.LogInformation("程序停止完成。");
});
host.RunAsync().GetAwaiter().GetResult();
}
}
}
因為分布式job有quartz,是有自己的設計理念,但是這個console后台框架不需要,是自己開發,所以完全和Host通用主機兼容,所有的部件都可以DI。設計原理就是以數據庫的配置,構造Task,然后使用
CancellationTokenSource和TaskCompletionSource去管理Task。運行結果根據狀態去更新數據庫,以便監控。當然咱們這個例子功能沒實現全,后面可以完善
,感興趣的可以去我的github上pull代碼。咱們看任務中的例子代碼:
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Nest;
using Walt.Framework.Log;
using Walt.Framework.Service.Elasticsearch;
using Walt.Framework.Service.Kafka;
namespace Walt.Framework.Console
{
public class KafkaToElasticsearch : IConsole
{
ILoggerFactory _logFact;
IConfiguration _config;
IElasticsearchService _elasticsearch;
IKafkaService _kafkaService;
public KafkaToElasticsearch(ILoggerFactory logFact,IConfiguration config
,IElasticsearchService elasticsearch
,IKafkaService kafkaService)
{
_logFact = logFact;
_config = config;
_elasticsearch = elasticsearch;
_kafkaService = kafkaService;
}
public async Task AsyncExcute(CancellationToken cancel=default(CancellationToken))
{
var log = _logFact.CreateLogger<KafkaToElasticsearch>();
_kafkaService.AddConsumerEvent(new List<string>(){"mylog"});
//以事件方式獲取message不工作,因為跨線程
// _kafkaService.GetMessageDele = (message) => {
// var id = message.Key;
// var offset = string.Format("{0}---{2}",message.Offset.IsSpecial,message.Offset.Value);
// var topic = message.Topic;
// var topicPartition = message.TopicPartition.Partition.ToString();
// var topicPartitionOffsetValue = message.TopicPartitionOffset.Offset.Value;
// // log.LogInformation("id:{0},offset:{1},topic:{2},topicpatiton:{3},topicPartitionOffsetValue:{4}"
// // ,id,offset,topic,topicPartition,topicPartitionOffsetValue);
// };
// _kafkaService.ErrorDele = (message) => {
// log.LogError(message.ToString());
// };
// _kafkaService.LogDele = (message) => {
// log.LogInformation(message.ToString());
// };
// log.LogInformation("事件添加完畢");
// var waitForStop =
// new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
// cancel.Register(()=>{
// log.LogInformation("task執行被取消回掉函數");
// waitForStop.SetResult(null);
// });
// waitForStop.Task.Wait();
// log.LogInformation("任務已經被取消。");
//下面以輪詢方式。
if(!cancel.IsCancellationRequested)
{
while (true)
{
Message message = _kafkaService.Poll(2000);
if (message != null)
{
if(message.Error!=null&&message.Error.Code!=ErrorCode.NoError)
{
//log.LogError("consumer獲取message出錯,詳細信息:{0}",message.Error);
System.Console.WriteLine("consumer獲取message出錯,詳細信息:{0}",message.Error);
System.Threading.Thread.Sleep(200);
continue;
}
var id =message.Key==null?"":System.Text.Encoding.Default.GetString(message.Key);
var offset = string.Format("{0}---{1}", message.Offset.IsSpecial, message.Offset.Value);
var topic = message.Topic;
var topicPartition = message.TopicPartition.Partition.ToString();
var topicPartitionOffsetValue = message.TopicPartitionOffset.Offset.Value;
var val =System.Text.Encoding.Default.GetString( message.Value);
EntityMessages entityMess =
Newtonsoft.Json.JsonConvert.DeserializeObject<EntityMessages>(val);
await _elasticsearch.CreateIndexIfNoExists<LogElasticsearch>("mylog"+entityMess.OtherFlag);
// _elasticsearch.CreateMappingIfNoExists<LogElasticsearch>("mylog"+entityMess.OtherFlag
// ,"mylog"+entityMess.OtherFlag+"type",null);
//為elasticsearch添加document
var addDocumentResponse = await _elasticsearch.CreateDocument<LogElasticsearch>("mylog" + entityMess.OtherFlag
, new LogElasticsearch()
{
Id = entityMess.Id,
Time = entityMess.DateTime,
LogLevel = entityMess.LogLevel,
Exception = entityMess.Message
}
);
if (addDocumentResponse != null)
{
if (!addDocumentResponse.ApiCall.Success)
{
}
}
}
}
}
return ;
}
}
}
三 elasticsearch 服務開發
服務已經開發很多了,主要就是構建和配置的設計,還有就是對組件的封裝,看程序結構:

配置:
{
"Logging": {
"LogLevel": {
"Default": "Information",
"System": "Information",
"Microsoft": "Information"
},
"KafkaLog":{
"Prix":"console", //目前這個屬性,可以放程序類別,比如用戶中心,商品等。
"LogStoreTopic":"mylog"
}
},
"KafkaService":{
"Properties":{
"bootstrap.servers":"192.168.249.106:9092",
"group.id":"group2"
}
},
"ConnectionStrings": {
"ConsoleDatabase":"Server=192.168.249.106;Database=quartz;Uid=quartz;Pwd=quartz"
},
"ElasticsearchService":{
"Host":["http://192.168.249.105:9200","http://localhost:9200"],
"TimeOut":"10000",
"User":"",
"Pass":""
}
}
服務類:這里有必要說下,elasticsearch是基於api的接口,最底層就是http請求,在接口上,實現了一個高級的接口和一個低級別的接口,當然低級別的接口需要熟悉elasticsearch的協議,
而高級別的api,使用強類型去使用,對開發很有幫助。下面是封裝elasticsearch的服務類:
using System;
using System.Net.Http;
using Elasticsearch.Net;
using Microsoft.Extensions.Options;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Nest;
namespace Walt.Framework.Service.Elasticsearch
{
public class ElasticsearchService:IElasticsearchService
{
private ElasticsearchOptions _elasticsearchOptions=null;
private ElasticClient _elasticClient = null;
private ILoggerFactory _loggerFac;
public ElasticsearchService(IOptionsMonitor<ElasticsearchOptions> options
,ILoggerFactory loggerFac)
{
_elasticsearchOptions = options.CurrentValue;
options.OnChange((elasticsearchOpt,s)=>{
_elasticsearchOptions=elasticsearchOpt;
System.Diagnostics.Debug
.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(elasticsearchOpt)+"---"+s);
});
//連接客戶端需,支持多個節點,防止單點故障
var lowlevelClient = new ElasticLowLevelClient();
var urlColl = new Uri[_elasticsearchOptions.Host.Length];
for (int i = 0; i < _elasticsearchOptions.Host.Length;i++)
{
urlColl[i] = new Uri(_elasticsearchOptions.Host[i]);
}
_loggerFac = loggerFac;
var connectionPool = new SniffingConnectionPool(urlColl);
var settings = new ConnectionSettings(connectionPool)
.RequestTimeout(TimeSpan.FromMinutes(_elasticsearchOptions.TimeOut))
.DefaultIndex("mylogjob");//設置默認的index
_elasticClient = new ElasticClient(settings);
}
//如果index存在,則返回,如果不存在,則創建,type的創建方式是為文檔類型打標簽ElasticsearchTypeAttribute
public async Task<bool> CreateIndexIfNoExists<T>(string indexName) where T : class
{
var log = _loggerFac.CreateLogger<ElasticsearchService>();
var exists = await _elasticClient.IndexExistsAsync(Indices.Index(indexName));
if (exists.Exists)
{
log.LogWarning("index:{0}已經存在", indexName.ToString());
return await Task.FromResult(true);
}
var response = await _elasticClient.CreateIndexAsync(indexName
,c=>c.Mappings(mm=>mm.Map<T>(m=>m.AutoMap())));//將類型的屬性自動映射到index的type上,也可以打標簽控制那個可以映射,那些不可以
log.LogInformation(response.DebugInformation);
if (response.Acknowledged)
{
log.LogInformation("index:{0},創建成功", indexName.ToString());
return await Task.FromResult(false);
}
else
{
log.LogError(response.ServerError.ToString());
log.LogError(response.OriginalException.ToString());
return await Task.FromResult(false);
}
}
//創建document
public async Task<ICreateResponse> CreateDocument<T>(string indexName,T t) where T:class
{
var log=_loggerFac.CreateLogger<ElasticsearchService>();
if(t==null)
{
log.LogError("bulk 參數不能為空。");
return null;
}
IndexRequest<T> request = new IndexRequest<T>(indexName, TypeName.From<T>()) { Document = t };
var createResponse = await _elasticClient.CreateDocumentAsync<T>(t);
log.LogInformation(createResponse.DebugInformation);
if (createResponse.ApiCall.Success)
{
log.LogInformation("index:{0},type:{1},創建成功", createResponse.Index, createResponse.Type);
return createResponse;
}
else
{
log.LogError(createResponse.ServerError.ToString());
log.LogError(createResponse.OriginalException.ToString());
return null;
}
}
}
}
poco類型,這個類會和index的typ相關聯的:
using System;
using Nest;
namespace Walt.Framework.Console
{
[ElasticsearchTypeAttribute(Name="LogElasticsearchDefaultType")] //可以使用類型生成和查找type
public class LogElasticsearch
{
public string Id { get; set; }
public DateTime Time { get; set; }
public string LogLevel{ get; set; }
public string Exception{ get; set; }
public string Mess{ get; set; }
}
}
然后就是執行我們console后台程序,就可以在kibana看到日志被同步的情況:

所有程序都提交到github,如果調試代碼,再看這篇文章,或許理解能更快。

