初期选用fiume发送至Kafka。经过观察,每一个FlumeAgent都占用了较大的系统资源(至少会占用一颗CPU 50%以上的资源)。而另外一个业务,LogServer压力大,CPU资源尤其紧张,如果要实时收集分析日志,那么就需要一个更轻量级、占用资源更少的日志收集框架。
filebeat、logstash、es下载
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.8.1-linux-x86_64.tar.gz
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.8.1.tar.gz
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.8.1-linux-x86_64.tar.gz
filebeat解压后修改配置文件
启动命令:
./filebeat -e -c filebeat.yml
nohup ./filebeat -e -c filebeat.yml >> filebeat.log 2>&1 &
filebeat.yml配置
filebeat.inputs:(log类型)
- type: log # Change to true to enable t enabled: true # Paths that should be crawl paths: - /home/lw/test/filebeat/*.log fields: log_topic: lw_filebeat_t_2
kafka output:
output.kafka: enable: true #根据kafka指定对应端口和ip hosts: ["xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092"] topic: '%{[fields.log_topic]}' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip #也可以none max_message_bytes: 1000000 version: 0.9.0.1 codec.format: string: '%{[host.name]}-%{[message]}'
-
hosts是kafka集群的broker list;
-
topic: ‘%{[fields.log_topic]}’ : 这项指定了我们要写入kafka集群哪个topic, 可以看到它实现上是引用了上面test.yml配置中我们自定义的filed字段,通过这种方式我们就可以将收集的不同路径的数据写入到不同的topic中,但是这个有个限制就是只能写到一个kafka集群,因为当前版本的filebeat不允许同时配置多个output。
-
codec.format: 指定了写入kafka集群的消息格式,我们在从日志文件中读取的每行内容前面加上了当前机器的hostname。
在kafka上创建对应的topic
#查看topic bin/kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181 #创建 bin/kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partitions 3 --topic test_log_r3p3
kafka其他命令
#生成数据 bin/kafka-console-producer.sh --broker-list ip:9094 --topic test_log_r3p3 #查看数据 bin/kafka-console-consumer.sh --bootstrap-server ip:6667 --topic test_log_r3p3 --from-beginning #主题描述 bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test_log_r3p3 #删除主题 bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test_log_r3p3
解压logstash
可直接命令启动:
bin/logstash -e 'input { stdin { } } output { stdout {codec=>rubydebug} }'(控制台输入内容并输出内容)
加入配置文件,后台启动
nohup ./bin/logstash -f config/fbet_es.conf --config.reload.automatic >> /opt/server/logstash/logs/logstash_es.log 2>&1 &
根据需求修改配置文件:
Kafka输入插件配置详解:https://blog.csdn.net/weixin_34405354/article/details/88730394
kafka到es,期间可以根据需求过滤
input { kafka { ## app-log-服务名称 topics_pattern => "app-log-.*"
#topics => ["test_weblog","..."] bootstrap_servers => "ip:9092" codec => json consumer_threads => 1 ## 因为只设置了一个partition,所以消费者线程数设置为1 #auto_offset_rest => "latest" group_id => "app-log-group" } kafka { ## error-log-服务名称 topics_pattern => "error-log-.*" bootstrap_servers => "ip:9092" codec => json consumer_threads => 1 decorate_events => true #auto_offset_rest => "latest"#earliest group_id => "error-log-group" } } filter { ## 时区转换 ruby { code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))" } if "app-log" in [fields][logtopic]{ grok { ## 表达式 match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"] } } if "error-log" in [fields][logtopic]{ grok { ## 表达式 match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"] } } } ## elasticsearch: output { if "app-log" in [fields][logtopic]{ ## es插件 elasticsearch { # es服务地址 hosts => ["ip:9200"] # 用户名密码 user => "elastic" password => "123456" ## 索引名,+ 号开头的,就会自动认为后面是时间格式: ## javalog-app-service-2019.01.23 index => "app-log-%{[fields][logbiz]}-%{index_time}" # 是否嗅探集群ip:一般设置true;http://ip:9200/_nodes/http?pretty # 通过嗅探机制进行es集群负载均衡发日志消息 sniffing => true # logstash默认自带一个mapping模板,进行模板覆盖 template_overwrite => true } } if "error-log" in [fields][logtopic]{ elasticsearch { hosts => ["ip:9200"] user => "elastic" password => "123456" index => "error-log-%{[fields][logbiz]}-%{index_time}" #"test_log-%{+YYYY.MM.dd}" sniffing => true template_overwrite => true } } }
kafka到hdfs:
需要安装webhdfs插件
#下载插件命令 ./bin/logstash-plugin install logstash-output-webhdfs #方法二 ##下载插件包 https://github.com/logstash-plugins/logstash-output-webhdfs ##在logstash目录下的Gemfile文件中,进行如下修改 gem "logstash-output-webhdfs" 改为 gem "logstash-output-webhdfs", :path => "/root/logstash-output-webhdfs/" ##完成后执行下载命令,等待出现Installation successful则成功
配置项
#===========kafka-hdfs================== input { kafka { bootstrap_servers => "ip:9092, ip2:9092, ip3:9092" topics => ["rsyslog_nginx"] codec => "json" } } filter { date { match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"] target => "time_local" } ruby { code => "event.set('index.date', event.get('time_local').time.localtime.strftime('%Y%m%d'))" } ruby { code => "event.set('index.hour', event.get('time_local').time.localtime.strftime('%H'))" } } output { webhdfs { workers => 2 host => "master01" port => 50070 user => "hdfs" path => "/logs/nginx/%{index.date}/%{index.hour}.log" #"/test_logstash/logstash-%{+YYYY}-%{+MM}-%{+dd}/test.log" codec => "json" } stdout { codec => rubydebug } }
ES集群安装
ES的7版本需要jdk1.8以上
解压修改配置文件
vim ./config/elasticsearch.yml
master节点
#配置详情在另一博客 cluster.name: elasticsearch # ------------------------------------ Node ------------------------------------ node.name: es-master node.master: true node.data: true # ----------------------------------- Paths ------------------------------------ path.data: /opt/server/elasticsearch-7.8.1/data path.logs: /opt/server/elasticsearch-7.8.1/logs # ---------------------------------- Network ----------------------------------- network.host: 0.0.0.0 http.cors.enabled: true http.cors.allow-origin: "*" # --------------------------------- Discovery ---------------------------------- cluster.initial_master_nodes: ["es-master"] discovery.seed_hosts: ["ip1:9300","ip2:9300", "ip3:9300"] #es指定创建索引规则 action.auto_create_index: .monitoring-kibana*,.monitoring-data*,.watches,.kibana,.watcher-history*,.monitoring-es*,.security,.triggered_watches,logstash-*
从节点1
# ---------------------------------- Cluster ----------------------------------- cluster.name: elasticsearch # # ------------------------------------ Node ----------------------- node.name: es-node01 node.master: false node.data: true # ----------------------------------- Paths ------------------------------------ path.data: /opt/server/elasticsearch-7.8.1/data path.logs: /opt/server/elasticsearch-7.8.1/logs # ---------------------------------- Network ----------------------------------- network.host: 0.0.0.0 http.cors.enabled: true http.cors.allow-origin: "*"
# --------------------------------- Discovery ----------------------------------
cluster.initial_master_nodes: ["es-master"]
discovery.seed_hosts: ["ip1:9300","ip2:9300", "ip3:9300"]
#es指定创建索引规则
action.auto_create_index: .monitoring-kibana*,.monitoring-data*,.watches,.kibana,.watcher-history*,.monitoring-es*,.security,.triggered_watches,logstash-*
从节点2修改node.name:即可
root用户修改配置
vim /etc/security/limits.conf
* soft nofile 65536 * hard nofile 131072 * soft nproc 2048 * hard nproc 4096
soft nproc: 可打开的文件描述符的最大数(软限制)
hard nproc: 可打开的文件描述符的最大数(硬限制)
soft nofile:单个用户可用的最大进程数量(软限制)
hard nofile:单个用户可用的最大进程数量(硬限制)
修改sysctl.conf
sysctl -w vm.max_map_count=655360
vim /etc/sysctl.conf
vm.max_map_count=655360
查看
sysctl -a|grep vm.max_map_count
启动
ES不允许使用root操作es,需要添加用户之后切换用户启动
./elasticsearch
后台启动./elasticsearch -d
访问 http://ip:9200/_cat/nodes?v会看到3个节点的信息或者使用命令curl 172.31.5.6:9200/_cat/nodes?v访问
elasticHD
下载:
wget https://github.com/360EntSecGroup-Skylar/ElasticHD/releases/download/1.4/elasticHD_linux_amd64.zip
unzip安装包解压、修改权限
unzip elasticHD_linux_amd64.zip chmod 777 ElasticHD
后台启动
#127.0.0.1改为es集群ip
nohup ./ElasticHD -p 127.0.0.1:9800 > /opt/servers/elasticsearch/elasticHD.log 2>&1 &
页面查看
http://ip:9800/
kibana
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.8.1-linux-x86_64.tar.gz nohup ./bin/kibana >> ./logs/kibana.log &
filebeat知识链接:https://www.yuque.com/stonejin/cbt4vt/blqise
kibana教程:https://www.cnblogs.com/chenqionghe/p/12503181.html?utm_source=tuicool&utm_medium=referral
ELK参考:https://www.kancloud.cn/noahs/linux/1397312
logstash插件使用:https://blog.csdn.net/chenleiking/article/details/73563930
es基础使用:https://www.pianshen.com/article/739610986/
es使用:http://blog.cheyo.net/136.html