一、Beats 輕量型數據采集器
Beats 平台集合了多種單一用途數據采集器。這些采集器安裝后可用作輕量型代理,從成百上千或成千上萬台機器向 Logstash 或 Elasticsearch 發送數據。

二、實現logstash結合filebeat采集kafka系統日志
架構圖:

環境准備:
A主機:elasticsearch主機,IP地址:192.168.7.100
B主機:logstash主機:IP地址:192.168.7.102
C主機:filebeat主機, IP地址:192.168.7.103
D主機:kafka/zookeeper IP地址:192.168.7.104
E主機:kafka/zookeeper IP地址:192.168.7.105
1、官網下載filebeat並安裝filebeat包
官方下載地址:https://www.elastic.co/cn/downloads/past-releases#filebeat
官方文檔:https://www.elastic.co/guide/en/beats/filebeat/6.8/filebeat-configuration.html
1、官網下載安裝filebeat包
[root@filebate ~]# yum install filebeat-6.8.1-x86_64.rpm -y
2、修改filebeat配置文件,將filebeat收集的日志存放在kafka中
[root@filebate tmp]# vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
- /var/log/messages
fields:
host: "192.168.7.103"
type: "filebeat-syslog-7-103"
app: "syslog"
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch: #注釋掉此行
# Array of hosts to connect to.
#hosts: ["192.168.7.100:9200"] # 注釋掉此行
output.kafka: # 寫入到kafka主機內
hosts: ["192.168.7.104:9092","192.168.7.105:9092"] # 寫入kafka的IP地址
topic: "filebeat-systemlog-7-103"
partition.round_robin:
reachable_only: true
required_acks: 1 # 本地寫入完成
compression: gzip # 開啟壓縮
max_message_bytes: 1000000 # 消息最大值
3、啟動filebeat服務
[root@filebate tmp]# systemctl start filebeat [root@filebate tmp]# systemctl status filebeat [root@filebate tmp]# ps -ef |grep filebeat root 5464 1 0 23:11 ? 00:00:00 /usr/share/filebeat/bin/filebeat -c /etc/filebeat/filebeat.yml -path.home /usr/share/filebeat -path.config /etc/filebeat -path.data /var/lib/filebeat -path.logs /var/log/filebeat root 5488 1299 0 23:33 pts/0 00:00:00 grep --color=auto filebeat
2、在kafka集群主機上進行測試
1、測試kafka集群主機是否已經寫入數據
[root@web1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.7.104:2181 192.168.7.105:2181 __consumer_offsets connect-test filebeat-systemlog-7-103 filebeat-systemlog-7-108 hello kafka-nginx-access-log-7-101 kafka-syslog-log-7-101 logstash [root@tomcat-web2 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.7.104:2181 192.168.7.105:2181 __consumer_offsets connect-test filebeat-systemlog-7-103 filebeat-systemlog-7-108 hello kafka-nginx-access-log-7-101 kafka-syslog-log-7-101 logstash
3、修改logstash主機的配置文件
1、在/etc/logstash/conf.d目錄下創建一個配置文件,采集kafka的日志
[root@logstash conf.d]# vim kafa-to-es.conf
input {
kafka {
topics => "filebeat-systemlog-7-103" # 與filebeat的配置文件對應
bootstrap_servers => "192.168.7.105:9092" # 與filebeat集群主機的leader主機IP地址對應
codec => "json"
}}
output {
if [fields][app] == "syslog" { # app類型要與filebeat主機的app對用。
elasticsearch {
hosts => ["192.168.7.100:9200"] # 轉發到elasticsearch服務器上
index => "filebeat-syslog-7-103-%{+YYYY.MM.dd}"
}}
}
2、重新啟動logstash服務
# systemctl restart logstash
3、在head插件上查看是否有日志收集到,此時已經收集到日志

4、在kibana主機上創建索引
1、創建索引

2、此時就在discover選項中可以看到日志信息

三、logstash結合filebeat收集kafka系統日志及nginx日志
1、filebeat收集系統日志和nginx日志寫入到kafka
1、源碼編譯安裝nginx,這里就不再細講了
2、修改filebeat配置文件,收集系統日志和nginx日志,寫入到kafka中
[root@filebate tmp]# vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/messages
fields:
host: "192.168.7.103"
type: "filebeat-syslog-7-103"
app: "syslog"
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
fields:
host: "192.168.7.103"
type: "filebeat-nginx-accesslog-7-103"
app: "nginx"
output.kafka: # 在最后一行開始添加
hosts: ["192.168.7.104:9092","192.168.7.105:9092"] #kafka集群的IP地址
topic: "filebeat-systemlog-7-103"
partition.round_robin:
reachable_only: true
required_acks: 1
compression: gzip
max_message_bytes: 1000000
重新啟動fielbeat服務
#systemctl restart filebeat
3、查看關鍵的配置信息
[root@filebate tmp]# grep -v "#" /etc/filebeat/filebeat.yml |grep -v "^$"
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/messages
fields:
host: "192.168.7.103"
type: "filebeat-syslog-7-103"
app: "syslog"
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
fields:
host: "192.168.7.103"
type: "filebeat-nginx-accesslog-7-103"
app: "nginx"
filebeat.config.modules: #默認存在
path: ${path.config}/modules.d/*.yml #默認存在
reload.enabled: false # 默認存在
setup.template.settings: #默認存在
index.number_of_shards: 3 #默認存在
setup.kibana: #默認存在
processors: #默認存在
- add_host_metadata: ~ #默認存在
- add_cloud_metadata: ~ #默認存在
output.kafka: #在最后一行開始添加
hosts: ["192.168.7.104:9092","192.168.7.105:9092"]
topic: "filebeat-systemlog-7-103"
partition.round_robin:
reachable_only: true
required_acks: 1
compression: gzip
max_message_bytes: 1000000
2、測試kafka集群主機
1、查看此時已經有nginx和系統日志數據已經寫入
[root@web1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.7.104:2181 192.168.7.105:2181 __consumer_offsets connect-test filebeat-systemlog-7-103 filebeat-systemlog-7-108 hello kafka-nginx-access-log-7-101 kafka-syslog-log-7-101 logstash [root@tomcat-web2 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.7.104:2181 192.168.7.105:2181 __consumer_offsets connect-test filebeat-systemlog-7-103 filebeat-systemlog-7-108 hello kafka-nginx-access-log-7-101 kafka-syslog-log-7-101 logstash
3、在logstash提取kafka日志到elasticsearch主機
1、在logstash主機的/etc/logstash/conf.d目錄下創建一個采集kafka日志的文件
input {
kafka {
topics => "filebeat-systemlog-7-103"
bootstrap_servers => "192.168.7.104:9092" # 采集kafka一個主機的信息
codec => "json"
}
}
output {
if [fields][app] == "syslog" { # app類型與filebeat主機的要一致
elasticsearch {
hosts => ["192.168.7.100:9200"] #系統日志轉到elasticsearch主機上
index => "filebeat-syslog-7-103-%{+YYYY.MM.dd}"
}}
if [fields][app] == "nginx" { # app類型要與filebeat主機的要一致
elasticsearch {
hosts => ["192.168.7.100:9200"] # 轉到elasticsearch主機上
index => "filebeat-nginx-accesslog-7-103-%{+YYYY.MM.dd}"
}}
}
2、重新啟動logstash服務
# systemctl restart logstash
3、在head插件里查詢此時已經收集到的兩個日志的名稱

4、 在kibana主機上創建索引
1、在kibana網頁上創建索引

2、在discover查詢創建的索引信息

