一、logback + elk,tcp方式發送
環境搭建參考上一篇博客:https://www.cnblogs.com/alan6/p/11667758.html
tcp方式存在的問題:tcp方式在日志量比較大,並發量較高的情況下,可能導致日志丟失。可以考慮采用 kafka 保存日志消息,做一個流量削峰。
二、logback + kafka + elk
1、docker安裝 zookeeper + kafka
拉鏡像:
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
運行zookeeper:
docker run -d --name zookeeper --restart always --publish 2181:2181 --volume /etc/localtime:/etc/localtime wurstmeister/zookeeper:latest
運行kafka:
docker run -d --name kafka --restart always --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=kafka所在宿主機的IP \
--env KAFKA_ADVERTISED_PORT=9092 \
--volume /etc/localtime:/etc/localtime \
wurstmeister/kafka:latest
2、配置logback發送到kafka
在服務端的pom文件添加依賴
<dependency> <groupId>com.github.danielwegener</groupId> <artifactId>logback-kafka-appender</artifactId> </dependency>
在logback-spring.xml配置文件中添加appender
<appender name="kafka" class="com.github.danielwegener.logback.kafka.KafkaAppender"> <encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder"> <layout class="net.logstash.logback.layout.LogstashLayout" > <includeContext>true</includeContext> <includeCallerData>true</includeCallerData> <customFields>{"system":"test"}</customFields> <fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/> </layout> <charset>UTF-8</charset> </encoder> <!--kafka topic 需要與配置文件里面的topic一致 --> <topic>kafka_elk</topic> <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostNameKeyingStrategy" /> <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" /> <producerConfig>bootstrap.servers=192.168.33.128:9092</producerConfig> </appender> <!-- 日志輸出級別 --> <root level="INFO"> <appender-ref ref="STDOUT" /> <appender-ref ref="FILE"/> <appender-ref ref="kafka" /> </root>
3、配置logstash
啟動elk,進入容器:
#docker exec -it elk /bin/bash
進入 /etc/logstash/conf.d/ 目錄,創建配置文件 logstash.conf,編輯內容,主要是 input 和 output
input {
kafka {
bootstrap_servers => ["192.168.33.128:9092"]
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => true
group_id => "elk"
topics => ["elk_kafka"]
type => "bhy"
}
}
output {
stdout {}
elasticsearch {
hosts => ["192.168.33.128:9200"]
index => "kafka-elk-%{+YYYY.MM.dd}"
}
}
編輯 /etc/init.d/logstash,修改
LS_USER=root //原來默認為logstash
LS_GROUP=root //原為默認為logstash
修改完成后退出,重啟 elk 容器
#docker restart elk
4、配置kibana
配置方式和上一篇博客差不多,Index Pattern 選擇 logstash 中配置的 “kafka-elk-日期“ 的 https://www.cnblogs.com/alan6/p/11667758.html