ELK之九----logstash結合filebeat經過kafka收集系統及nginx日志


一、Beats 輕量型數據采集器

Beats 平台集合了多種單一用途數據采集器。這些采集器安裝后可用作輕量型代理,從成百上千或成千上萬台機器向 Logstash 或 Elasticsearch 發送數據。

二、實現logstash結合filebeat采集kafka系統日志

架構圖:

環境准備:

A主機:elasticsearch主機,IP地址:192.168.7.100

B主機:logstash主機:IP地址:192.168.7.102

C主機:filebeat主機, IP地址:192.168.7.103

D主機:kafka/zookeeper IP地址:192.168.7.104

E主機:kafka/zookeeper IP地址:192.168.7.105

1、官網下載filebeat並安裝filebeat包 

官方下載地址:https://www.elastic.co/cn/downloads/past-releases#filebeat

官方文檔:https://www.elastic.co/guide/en/beats/filebeat/6.8/filebeat-configuration.html

1、官網下載安裝filebeat包

[root@filebate ~]# yum install filebeat-6.8.1-x86_64.rpm -y

2、修改filebeat配置文件,將filebeat收集的日志存放在kafka中

[root@filebate tmp]# vim /etc/filebeat/filebeat.yml 
filebeat.inputs:
- type: log
  enabled: true
    - /var/log/messages
  fields:
    host: "192.168.7.103"
    type: "filebeat-syslog-7-103"
    app: "syslog"

#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:  #注釋掉此行
  # Array of hosts to connect to.
  #hosts: ["192.168.7.100:9200"]  # 注釋掉此行

output.kafka:  # 寫入到kafka主機內
  hosts: ["192.168.7.104:9092","192.168.7.105:9092"] # 寫入kafka的IP地址
  topic: "filebeat-systemlog-7-103" 
  partition.round_robin:
    reachable_only: true
  required_acks: 1  # 本地寫入完成
  compression: gzip  # 開啟壓縮
  max_message_bytes: 1000000  # 消息最大值

3、啟動filebeat服務

[root@filebate tmp]# systemctl start filebeat
[root@filebate tmp]# systemctl status  filebeat
[root@filebate tmp]# ps -ef |grep filebeat
root       5464      1  0 23:11 ?        00:00:00 /usr/share/filebeat/bin/filebeat -c /etc/filebeat/filebeat.yml -path.home /usr/share/filebeat -path.config /etc/filebeat -path.data /var/lib/filebeat -path.logs /var/log/filebeat
root       5488   1299  0 23:33 pts/0    00:00:00 grep --color=auto filebeat

2、在kafka集群主機上進行測試

1、測試kafka集群主機是否已經寫入數據

[root@web1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper  192.168.7.104:2181 192.168.7.105:2181
__consumer_offsets
connect-test
filebeat-systemlog-7-103
filebeat-systemlog-7-108
hello
kafka-nginx-access-log-7-101
kafka-syslog-log-7-101
logstash
[root@tomcat-web2 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper  192.168.7.104:2181 192.168.7.105:2181
__consumer_offsets
connect-test
filebeat-systemlog-7-103
filebeat-systemlog-7-108
hello
kafka-nginx-access-log-7-101
kafka-syslog-log-7-101
logstash

3、修改logstash主機的配置文件

1、在/etc/logstash/conf.d目錄下創建一個配置文件,采集kafka的日志

[root@logstash conf.d]# vim kafa-to-es.conf 
input {
   kafka {
      topics => "filebeat-systemlog-7-103" # 與filebeat的配置文件對應
      bootstrap_servers => "192.168.7.105:9092" # 與filebeat集群主機的leader主機IP地址對應
      codec => "json"
   }}

output {
  if [fields][app] == "syslog" {  # app類型要與filebeat主機的app對用。
    elasticsearch {
      hosts => ["192.168.7.100:9200"] # 轉發到elasticsearch服務器上
      index => "filebeat-syslog-7-103-%{+YYYY.MM.dd}"
    }}
}

2、重新啟動logstash服務

# systemctl restart logstash

3、在head插件上查看是否有日志收集到,此時已經收集到日志

 4、在kibana主機上創建索引

1、創建索引

2、此時就在discover選項中可以看到日志信息

三、logstash結合filebeat收集kafka系統日志及nginx日志 

1、filebeat收集系統日志和nginx日志寫入到kafka 

1、源碼編譯安裝nginx,這里就不再細講了

2、修改filebeat配置文件,收集系統日志和nginx日志,寫入到kafka中

[root@filebate tmp]# vim /etc/filebeat/filebeat.yml 
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/messages
  fields:
    host: "192.168.7.103"
    type: "filebeat-syslog-7-103"
    app: "syslog"
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log 
  fields:
    host: "192.168.7.103"
    type: "filebeat-nginx-accesslog-7-103"
    app: "nginx"

output.kafka:  # 在最后一行開始添加
  hosts: ["192.168.7.104:9092","192.168.7.105:9092"] #kafka集群的IP地址
  topic: "filebeat-systemlog-7-103"
  partition.round_robin:
    reachable_only: true
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000 

重新啟動fielbeat服務

#systemctl restart filebeat

3、查看關鍵的配置信息

[root@filebate tmp]# grep -v "#" /etc/filebeat/filebeat.yml |grep -v "^$"
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/messages
  fields:
    host: "192.168.7.103"
    type: "filebeat-syslog-7-103"
    app: "syslog"
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log 
  fields:
    host: "192.168.7.103"
    type: "filebeat-nginx-accesslog-7-103"
    app: "nginx"
filebeat.config.modules: #默認存在
  path: ${path.config}/modules.d/*.yml #默認存在
  reload.enabled: false # 默認存在
setup.template.settings: #默認存在
  index.number_of_shards: 3 #默認存在
setup.kibana: #默認存在
processors: #默認存在
  - add_host_metadata: ~ #默認存在
  - add_cloud_metadata: ~ #默認存在
output.kafka:  #在最后一行開始添加
  hosts: ["192.168.7.104:9092","192.168.7.105:9092"]
  topic: "filebeat-systemlog-7-103"
  partition.round_robin:
    reachable_only: true
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000 

2、測試kafka集群主機  

1、查看此時已經有nginx和系統日志數據已經寫入

[root@web1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper  192.168.7.104:2181 192.168.7.105:2181
__consumer_offsets
connect-test
filebeat-systemlog-7-103
filebeat-systemlog-7-108
hello
kafka-nginx-access-log-7-101
kafka-syslog-log-7-101
logstash
[root@tomcat-web2 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper  192.168.7.104:2181 192.168.7.105:2181
__consumer_offsets
connect-test
filebeat-systemlog-7-103
filebeat-systemlog-7-108
hello
kafka-nginx-access-log-7-101
kafka-syslog-log-7-101
logstash

3、在logstash提取kafka日志到elasticsearch主機  

1、在logstash主機的/etc/logstash/conf.d目錄下創建一個采集kafka日志的文件

input {
   kafka {
      topics => "filebeat-systemlog-7-103"
      bootstrap_servers => "192.168.7.104:9092" # 采集kafka一個主機的信息
      codec => "json"
   }
}


output {
  if [fields][app] == "syslog" { # app類型與filebeat主機的要一致
    elasticsearch {
      hosts => ["192.168.7.100:9200"] #系統日志轉到elasticsearch主機上
      index => "filebeat-syslog-7-103-%{+YYYY.MM.dd}"
    }}

  if [fields][app] == "nginx" { # app類型要與filebeat主機的要一致
    elasticsearch {
      hosts => ["192.168.7.100:9200"] # 轉到elasticsearch主機上
      index => "filebeat-nginx-accesslog-7-103-%{+YYYY.MM.dd}"
    }}
}

2、重新啟動logstash服務

# systemctl  restart logstash

3、在head插件里查詢此時已經收集到的兩個日志的名稱

4、 在kibana主機上創建索引

1、在kibana網頁上創建索引

2、在discover查詢創建的索引信息

 

 

  

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM