.Net Core 商城微服務項目系列(十三):搭建Log4net+ELK+Kafka日志框架


之前是使用NLog直接將日志發送到了ELK,本篇將會使用Docker搭建ELK和kafka,同時替換NLog為Log4net。

一.搭建kafka

1.拉取鏡像

//下載zookeeper
docker pull wurstmeister/zookeeper

//下載kafka
docker pull wurstmeister/kafka:2.11-0.11.0.3

2.啟動

//啟動zookeeper
docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime wurstmeister/zookeeper

//啟動kafka
docker run -d --name kafka --publish 9092:9092 \
--link zookeeper \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.3.131:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.3.131 \
--env KAFKA_ADVERTISED_PORT=9092  \
--volume /etc/localtime:/etc/localtime \
wurstmeister/kafka:2.11-0.11.0.3

3.測試Kafka

//查看Kafka容器ID
docker ps

進入容器
docker exec -it [容器ID] bin/bash

//創建topic
bin/kafka-topics.sh --create --zookeeper 192.168.3.131:2181 --replication-factor 1 --partitions 1 --topic mykafka

//查看topic
bin/kafka-topics.sh --list --zookeeper 192.168.3.131:2181

//創建生產者
bin/kafka-console-producer.sh --broker-list 192.168.3.131:9092 --topic mykafka 

//再開一個客戶端進入容器
//創建消費者
bin/kafka-console-consumer.sh --zookeeper 192.168.3.131:2181 --topic mykafka --from-beginning

再生產端發送消息,消費端可以成功接收到就說明沒問題。

 

二.Docker安裝ELK

1.拉取鏡像
docker pull sebp/elk

2.啟動ELK
docker run -p 5601:5601 -p 9200:9200 -p 9300:9300 -p 5044:5044 -e ES_MIN_MEM=128m  -e ES_MAX_MEM=2048m -d --name elk sebp/elk 

//若啟動過程出錯一般是因為elasticsearch用戶擁有的內存權限太小,至少需要262144
切換到root用戶

執行命令:

sysctl -w vm.max_map_count=262144

查看結果:

sysctl -a|grep vm.max_map_count

顯示:

vm.max_map_count = 262144

上述方法修改之后,如果重啟虛擬機將失效,所以:

解決辦法:

在   /etc/sysctl.conf文件最后添加一行

vm.max_map_count=262144

即可永久修改

等幾十秒,然后訪問9200和5601端口就可以看到ELK相關的面板。

然后我們還需要配置下logstash:

1.查看elk容器ID
docker ps

2.進入elk容器
docker exec -it 容器ID bin/bash

3.執行命令
/opt/logstash/bin/logstash -e 'input { stdin { } } output { elasticsearch { hosts => ["localhost"] } }'

當命令成功被執行后,看到:Successfully started Logstash API endpoint {:port=>9600} 信息后,輸入:this is a dummy entry 然后回車,模擬一條日志進行測試。

打開瀏覽器,輸入:http://:9200/_search?pretty 如圖,就會看到我們剛剛輸入的日志內容。

注意:如果看到這樣的報錯信息 Logstash could not be started because there is already another instance using the configured data directory. If you wish to run multiple instances, you must change the "path.data" setting. 請執行命令:service logstash stop 然后在執行就可以了。

 

OK,測試沒問題的話,說明Logstash和ES之間是可以正常聯通,然后我們需要配置Logstash從Kafka消費消息:

1.找到config文件
cd /opt/logstash/config

2.編輯配置文件
vi logstash.config
input {
        kafka{
                bootstrap_servers =>["192.168.3.131:9092"]
                client_id => "test" group_id => "test"
                consumer_threads => 5
                decorate_events => true
                topics => "mi"
        }
}
filter{
        json{
                source => "message"
        }
}

output {
        elasticsearch {
                hosts => ["localhost"]
                index => "mi-%{app_id}"
                codec => "json"
        }
}

bootstrap_servers:Kafka地址

這里介紹下這個app_id的作用,生產場景下我們根據不同的項目生成不同的ES索引,比如服務是一個單獨的索引,Web是一個,MQ是一個,這些就可以通過傳入的app_id來區分創建。

配置完成后加載該配置:

/opt/logstash/bin/logstash -f  /opt/logstash/config/logstash.conf

沒問題的話,此時Logstash就會從Kafka消費數據了,然后我們新建一個.net core API項目測試一下:

1.通過NuGet引用 Microsoft.Extensions.Logging.Log4Net.AspNetCore;

2.啟動文件中注入 Log4Net:

public static IWebHost BuildWebHost(string[] args) =>
            WebHost.CreateDefaultBuilder(args)
            .ConfigureLogging((logging) =>
            {
                // 過濾掉 System 和 Microsoft 開頭的命名空間下的組件產生的警告級別以下的日志
                logging.AddFilter("System", LogLevel.Warning);
                logging.AddFilter("Microsoft", LogLevel.Warning);
                logging.AddLog4Net();
            })
                .UseStartup<Startup>()
                .Build();

3.根目錄下添加 log4net.config,設置為 “如果較新則復制”

appender 支持自定義,只要符合 appender 定義規則即可,這里我使用了公司同事大佬寫的一個將日志寫入 Kafka 的 Nuget 包, log4net.Kafka.Core,安裝后在 log4net.config 添加 KafkaAppender 配置即可。 log4net.Kafka.Core 源碼 在 github 上 ,可以 Fork 自行修改。
<?xml version="1.0" encoding="utf-8" ?>
<log4net>
    <appender name="KafkaAppender" type="log4net.Kafka.Core.KafkaAppender, log4net.Kafka.Core">
        <KafkaSettings>
            <broker value="192.168.3.131:9092" />
            <topic value="mi" />
        </KafkaSettings>
        <layout type="log4net.Kafka.Core.KafkaLogLayout,log4net.Kafka.Core" >
            <appid value="api-test" />
        </layout>
    </appender>
    <root>
        <level value="ALL"/>
        <appender-ref ref="KafkaAppender" />
    </root>
</log4net>
broker: Kafka 服務地址,集群可使用,分割;
topic:日志對應的 Topic 名稱;
appid:服務唯一標識,輔助識別日志來源;
 
在 ValuesController 修改代碼進行測試:
    [Route("api/[controller]")]
    public class ValuesController : Controller
    {
        private readonly ILogger _logger;

        public ValuesController(ILogger<ValuesController> logger)
        {
            _logger = logger;
        }

        // GET api/values
        [HttpGet]
        public IEnumerable<string> Get()
        {
            _logger.LogInformation("根據appId最后一次測試Kafka!");
            return new string[] { "value1", "value2" };
        }
    }

OK,運行然后訪問5601端口查看:

 

 OK,搭建成功!
 
 
參考文章:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM