之前是使用NLog直接將日志發送到了ELK,本篇將會使用Docker搭建ELK和kafka,同時替換NLog為Log4net。
一.搭建kafka
1.拉取鏡像
//下載zookeeper docker pull wurstmeister/zookeeper //下載kafka docker pull wurstmeister/kafka:2.11-0.11.0.3
2.啟動
//啟動zookeeper docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime wurstmeister/zookeeper //啟動kafka docker run -d --name kafka --publish 9092:9092 \ --link zookeeper \ --env KAFKA_ZOOKEEPER_CONNECT=192.168.3.131:2181 \ --env KAFKA_ADVERTISED_HOST_NAME=192.168.3.131 \ --env KAFKA_ADVERTISED_PORT=9092 \ --volume /etc/localtime:/etc/localtime \ wurstmeister/kafka:2.11-0.11.0.3
3.測試Kafka
//查看Kafka容器ID docker ps 進入容器 docker exec -it [容器ID] bin/bash //創建topic bin/kafka-topics.sh --create --zookeeper 192.168.3.131:2181 --replication-factor 1 --partitions 1 --topic mykafka //查看topic bin/kafka-topics.sh --list --zookeeper 192.168.3.131:2181 //創建生產者 bin/kafka-console-producer.sh --broker-list 192.168.3.131:9092 --topic mykafka //再開一個客戶端進入容器 //創建消費者 bin/kafka-console-consumer.sh --zookeeper 192.168.3.131:2181 --topic mykafka --from-beginning
再生產端發送消息,消費端可以成功接收到就說明沒問題。
二.Docker安裝ELK
1.拉取鏡像 docker pull sebp/elk 2.啟動ELK docker run -p 5601:5601 -p 9200:9200 -p 9300:9300 -p 5044:5044 -e ES_MIN_MEM=128m -e ES_MAX_MEM=2048m -d --name elk sebp/elk //若啟動過程出錯一般是因為elasticsearch用戶擁有的內存權限太小,至少需要262144 切換到root用戶 執行命令: sysctl -w vm.max_map_count=262144 查看結果: sysctl -a|grep vm.max_map_count 顯示: vm.max_map_count = 262144 上述方法修改之后,如果重啟虛擬機將失效,所以: 解決辦法: 在 /etc/sysctl.conf文件最后添加一行 vm.max_map_count=262144 即可永久修改
等幾十秒,然后訪問9200和5601端口就可以看到ELK相關的面板。
然后我們還需要配置下logstash:
1.查看elk容器ID docker ps 2.進入elk容器 docker exec -it 容器ID bin/bash 3.執行命令 /opt/logstash/bin/logstash -e 'input { stdin { } } output { elasticsearch { hosts => ["localhost"] } }'
當命令成功被執行后,看到:Successfully started Logstash API endpoint {:port=>9600} 信息后,輸入:this is a dummy entry 然后回車,模擬一條日志進行測試。
打開瀏覽器,輸入:http://:9200/_search?pretty 如圖,就會看到我們剛剛輸入的日志內容。
注意:如果看到這樣的報錯信息 Logstash could not be started because there is already another instance using the configured data directory. If you wish to run multiple instances, you must change the "path.data" setting. 請執行命令:service logstash stop 然后在執行就可以了。
OK,測試沒問題的話,說明Logstash和ES之間是可以正常聯通,然后我們需要配置Logstash從Kafka消費消息:
1.找到config文件 cd /opt/logstash/config 2.編輯配置文件 vi logstash.config
input { kafka{ bootstrap_servers =>["192.168.3.131:9092"] client_id => "test" group_id => "test" consumer_threads => 5 decorate_events => true topics => "mi" } } filter{ json{ source => "message" } } output { elasticsearch { hosts => ["localhost"] index => "mi-%{app_id}" codec => "json" } }
bootstrap_servers:Kafka地址
這里介紹下這個app_id的作用,生產場景下我們根據不同的項目生成不同的ES索引,比如服務是一個單獨的索引,Web是一個,MQ是一個,這些就可以通過傳入的app_id來區分創建。
配置完成后加載該配置:
/opt/logstash/bin/logstash -f /opt/logstash/config/logstash.conf
沒問題的話,此時Logstash就會從Kafka消費數據了,然后我們新建一個.net core API項目測試一下:
1.通過NuGet引用 Microsoft.Extensions.Logging.Log4Net.AspNetCore;
2.啟動文件中注入 Log4Net:
public static IWebHost BuildWebHost(string[] args) => WebHost.CreateDefaultBuilder(args) .ConfigureLogging((logging) => { // 過濾掉 System 和 Microsoft 開頭的命名空間下的組件產生的警告級別以下的日志 logging.AddFilter("System", LogLevel.Warning); logging.AddFilter("Microsoft", LogLevel.Warning); logging.AddLog4Net(); }) .UseStartup<Startup>() .Build();
3.根目錄下添加 log4net.config,設置為 “如果較新則復制”
<?xml version="1.0" encoding="utf-8" ?> <log4net> <appender name="KafkaAppender" type="log4net.Kafka.Core.KafkaAppender, log4net.Kafka.Core"> <KafkaSettings> <broker value="192.168.3.131:9092" /> <topic value="mi" /> </KafkaSettings> <layout type="log4net.Kafka.Core.KafkaLogLayout,log4net.Kafka.Core" > <appid value="api-test" /> </layout> </appender> <root> <level value="ALL"/> <appender-ref ref="KafkaAppender" /> </root> </log4net>
topic:日志對應的 Topic 名稱;
appid:服務唯一標識,輔助識別日志來源;
[Route("api/[controller]")] public class ValuesController : Controller { private readonly ILogger _logger; public ValuesController(ILogger<ValuesController> logger) { _logger = logger; } // GET api/values [HttpGet] public IEnumerable<string> Get() { _logger.LogInformation("根據appId最后一次測試Kafka!"); return new string[] { "value1", "value2" }; } }
OK,運行然后訪問5601端口查看: