ELK(13):ELK+kafka+filebeat(海量日志)
https://blog.51cto.com/cloumn/blog/240
https://www.cnblogs.com/whych/p/9958188.html
使用filebeat收集日志直接寫入kafka,然后再由logstash從kafka讀取寫到elasticsearch。
如果還是遇到性能瓶頸
使用filebeat收集日志,先轉發到beat端的logstash1,然后logstash1轉發到kafka,然后再由logstash2從kafka讀取寫到elasticsearch。
https://mp.weixin.qq.com/s/F8TVva8tDgN0tNsUcLoySg
整個系統一共含有10台主機(filebeat部署在客戶端,不計算在內),其中Logstash有四台,Elasticsearch有二台,Kafka集群三台,kibana一台並配置Nginx代理。
架構解釋:
(1)首先用戶通過nginx代理訪問ELK日志統計平台,這里的Nginx可以設置界面密碼。
(2)Nginx將請求轉發到kibana
(3)kibana到Elasticsearch中去獲取數據,這里的Elasticsearch是兩台做的集群,日志數據會隨機保存在任意一台Elasticsearch服務器。
(4)Logstash1從Kafka中取出數據並發送到Elasticsearch中。
(5)Kafka服務器做日志數據的持久化保存,避免web服務器日志量過大的時候造成的數據收集與保存不一致而導致日志丟失,其中Kafka可以做集群,然后再由Logstash服務器從Kafka持續的取出數據。
(6)logstash2從Filebeat取出的日志信息,並放入Kafka中進行保存。
(7)Filebeat在客戶端進行日志的收集。
注1:【Kafka的加入原因與作用】
整個架構加入Kafka,是為了讓整個系統更好的分層,Kafka作為一個消息流處理與持久化存儲軟件,能夠幫助我們在主節點上屏蔽掉多個從節點之間不同日志文件的差異,負責管理日志端(從節點)的人可以專注於向 Kafka里生產數據,而負責數據分析聚合端的人則可以專注於從 Kafka內消費數據。所以部署時要把Kafka加進去。
而且使用Kafka進行日志傳輸的原因還在於其有數據緩存的能力,並且它的數據可重復消費,Kafka本身具有高可用性,能夠很好的防止數據丟失,它的吞吐量相對來說比較好並且使用廣泛。可以有效防止日志丟失和防止logsthash掛掉。綜合來說:它均衡了網絡傳輸,從而降低了網絡閉塞,尤其是丟失數據的可能性,
注2:【雙層的Logstash作用】
這里為什么要在Kafka前面增加二台logstash呢?是因為在大量的日志數據寫入時,容易導致數據的丟失和混亂,為了解決這一問題,增加二台logstash可以通過類型進行匯總分類,降低數據傳輸的臃腫。
如果只有一層的Logstash,它將處理來自不同客戶端Filebeat收集的日志信息匯總,並且進行處理分析,在一定程度上會造成在大規模日志數據下信息的處理混亂,並嚴重加深負載,所以有二層的結構進行負載均衡處理,並且職責分工,一層匯聚簡單分流,一層分析過濾處理信息,並且內層都有二台Logstash來保障服務的高可用性,以此提升整個架構的穩定性。
接下來分別說明原理與各個組件之間的交互(配置文件)。
1 安裝filebeat
Filebeat配置參考手冊 sudo rpm -ivh filebeat-7.2.0-x86_64.rpm #配置文件在/etc/filebeat #[admin@ris-1 filebeat]$ ls #fields.yml filebeat.reference.yml filebeat.yml filebeat.yml_bak modules.d
2 filebeat收集系統日志輸出到文件
2.1 配置
enabled: true 改一下,否則打印不出來,這個坑的啊!網上還找不到文章。
https://www.elastic.co/guide/en/beats/filebeat/current/file-output.html#_literal_enabled_literal_14
#[admin@ris-1 filebeat]$ pwd #/etc/filebeat #[admin@ris-1 filebeat]$ sudo cat filebeat.yml filebeat.inputs: - type: log enabled: true paths: - /var/log/*.log - /var/log/messages filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false setup.template.settings: index.number_of_shards: 1 setup.kibana: output.file: path: "/tmp/" filename: filebeat processors: - add_host_metadata: ~ - add_cloud_metadata: ~
2.2 服務啟動
sudo systemctl restart filebeat sudo systemctl status filebeat
2.3 查看日志文件
tail -f /tmp/filebeat 日志是json格式
{"@timestamp":"2019-07-19T08:34:20.637Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.2.0"},"host":{"architecture":"x86_64","os":{"family":"redhat","name":"CentOS Linux","kernel":"3.10.0-123.el7.x86_64","codename":"Core","platform":"centos","version":"7 (Core)"},"id":"6a0204048ec74c879526b4a6bc131c07","containerized":false,"name":"ris-1","hostname":"ris-1"},"agent":{"hostname":"ris-1","id":"178be62d-8b55-42f8-ae83-7d217b6b9806","version":"7.2.0","type":"filebeat","ephemeral_id":"9b9bdff3-b5db-44f5-9353-46aa71aacc6f"},"log":{"offset":1652416,"file":{"path":"/var/log/messages"}},"message":"11","input":{"type":"log"},"ecs":{"version":"1.0.0"}}
2.4 添加fields標簽
6.2版本 之后取消了filebeat的prospectors里面配置document_type類型,新增一個fields字段
配置
#[admin@ris-1 filebeat]$ sudo cat filebeat.yml filebeat.inputs: - type: log enabled: true paths: - /var/log/*.log - /var/log/messages fields: service : "ris-1-systemlog-filebeat" filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false setup.template.settings: index.number_of_shards: 1 setup.kibana: output.file: path: "/tmp/" filename: filebeat processors: - add_host_metadata: ~ - add_cloud_metadata: ~
輸出日志
另一個窗口
sudo echo '11' | sudo tee -a /var/log/messages
{"@timestamp":"2019-07-19T08:55:09.753Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.2.0"},"message":"11","input":{"type":"log"},"fields":{"service":"ris-1-systemlog-filebeat"},"ecs":{"version":"1.0.0"},"host":{"name":"ris-1","hostname":"ris-1","architecture":"x86_64","os":{"platform":"centos","version":"7 (Core)","family":"redhat","name":"CentOS Linux","kernel":"3.10.0-123.el7.x86_64","codename":"Core"},"id":"6a0204048ec74c879526b4a6bc131c07","containerized":false},"agent":{"version":"7.2.0","type":"filebeat","ephemeral_id":"aaa7e375-9ef6-4bb9-a62a-d75f5aea477f","hostname":"ris-1","id":"178be62d-8b55-42f8-ae83-7d217b6b9806"},"log":{"offset":1704952,"file":{"path":"/var/log/messages"}}}
3 filebeat寫入logstash
3.1 logstash開啟接收,輸出到控制台
#admin@ris-1 ~]$ cat /etc/logstash/conf.d/beats.conf input { beats { port => "5044" codec => "json" } } output { stdout { codec => "rubydebug" } }
3.2 前台啟動logstash
sudo /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/beats.conf -t sudo /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/beats.conf
3.3 filebeat輸出系統日志到logstsh
配置
#只能寫一個輸出源,寫入logstash就不能輸出到文件,很奇怪 [admin@ris-1 filebeat]$ sudo cat filebeat.yml filebeat.inputs: - type: log enabled: true paths: - /var/log/*.log - /var/log/messages fields: service : "ris-1-systemlog-filebeat" filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false setup.template.settings: index.number_of_shards: 1 setup.kibana: output.logstash: hosts: ["10.6.75.171:5044"] #logstash地址可以是多個,我這里是本機,其實沒必要 #hosts:["192.168.108.191:5044", "192.168.108.87:5044"] # 發往二台Logstash-collect loadbalance: true #loadbalance: false # 消息只是往一個logstash里發,如果這個logstash掛了,就會自動將數據發到另一個logstash中。(主備模式) #loadbalance: true # 如果為true,則將數據均分到各個logstash中,掛了就不發了,往存活的logstash里面發送。 worker: 2 #線程數 #compression_level: 3 #壓縮級別 #output.file: # path: "/tmp/" # filename: filebeat
輸出
sudo echo '---------------' | sudo tee -a /var/log/messages
logstash前台查看
{ "message" => "---------------", "host" => { "name" => "ris-1" }, "@version" => "1", "tags" => [ [0] "_jsonparsefailure", [1] "beats_input_codec_json_applied" ], "log" => { "file" => { "path" => "/var/log/messages" }, "offset" => 3864943 }, "fields" => { "service" => "ris-1-systemlog-filebeat" }, "input" => { "type" => "log" }, "@timestamp" => 2019-07-20T01:01:04.156Z, "ecs" => { "version" => "1.0.0" }, "agent" => { "hostname" => "ris-1", "version" => "7.2.0", "id" => "178be62d-8b55-42f8-ae83-7d217b6b9806", "type" => "filebeat", "ephemeral_id" => "4c5284f4-6ba0-4227-a3da-3cd72603c1a2" } }
4 logstash輸出到kafka
4.1 查看kafka的topic
查看kafka的topic,如果之前創建了一個ris-1-systemlog-filebeat,為了方便識別,建議先刪除,等logstash再次自動創建。之前沒有創建肯定更好啊。
/home/admin/elk/kafka/bin/kafka-topics.sh --list --zookeeper kafka1:2181, kafka2:2181, kafka3:2181
[admin@pe-jira ~]$ /home/admin/elk/kafka/bin/kafka-topics.sh --list --zookeeper kafka1:2181, kafka2:2181, kafka3:2181 __consumer_offsets messagetest ris-1-systemlog ris-api-nginx-1 [admin@pe-jira ~]$
4.2 filebeat配置
filebeat.inputs: - type: log enabled: true paths: - /var/log/*.log - /var/log/messages fields: service : "ris-1-systemlog-filebeat" #service : filebeat都是自己定義的,定義完成后使用Logstash的if 判斷 #條件為if [fields][service] == "filebeat".就可以了,具體可以看下面的轉發策略. multiline: # 多行日志合並為一行,適用於日志中每一條日志占據多行的情況,比如各種語言的報錯信息調用棧。 pattern: '^[' negate: true match: after # Excludelines. A list of regular expressions to match. It drops the lines that are # # 刪除以DBG開頭的行: # exclude_lines: ['^DBG'] output.logstash: hosts: ["10.6.75.171:5044"] #logstash地址可以是多個,我這里是本機,其實沒必要 worker: 1 #開啟線程數 compression_levle: 3 #壓縮級別 #hosts: ["192.168.108.191:5044","192.168.108.87:5044"] # 發往二台Logstash-collect #loadbalance: true #loadbalance: false # 消息只是往一個logstash里發,如果這個logstash掛了,就會自動將數據發到另一個logstash中。(主備模式) #loadbalance: true # 如果為true,則將數據均分到各個logstash中,掛了就不發了,往存活的logstash里面發送。
4.3 接收Logstash配置
#[admin@ris-1 conf.d]$ cat /etc/logstash/conf.d/beats.conf input { beats { port => "5044" codec => "json" } } output { if [fields][service] == "ris-1-systemlog-filebeat" { kafka { bootstrap_servers => "10.6.76.27:9092,10.6.76.28:9092,10.6.76.18:9092" topic_id => "ris-1-systemlog-filebeat" batch_size => "5" codec => "json" } } }
4.4 重啟filebeat和logstash
sudo systemctl restart logstash sudo systemctl restart filebeat
4.5 再次查看kafka的topic
#查看到topic說明filebeat-logstash-kafka是通的
/home/admin/elk/kafka/bin/kafka-topics.sh --list --zookeeper kafka1:2181, kafka2:2181, kafka3:2181
[admin@pe-jira ~]$ /home/admin/elk/kafka/bin/kafka-topics.sh --list --zookeeper kafka1:2181, kafka2:2181, kafka3:2181 __consumer_offsets messagetest ris-1-systemlog ris-1-systemlog-filebeat ris-api-nginx-1
4.6 logstash從kafka讀數據寫入elasticsearch
前台查看logstash接收
配置
[admin@pe-jira conf.d]$ cat /etc/logstash/conf.d/sys-kafka-es.conf input{ kafka { bootstrap_servers => "10.6.76.27:9092" #kafka服務器地址 topics => "ris-1-systemlog-filebeat" group_id => "systemlog-filebeat" decorate_events => true #kafka標記 consumer_threads => 1 codec => "json" #寫入的時候使用json編碼,因為logstash收集后會轉換成json格式 } } output{ stdout { codec => "rubydebug" } }
日志
{ "fields" => { "multiline" => { "pattern" => "^[", "match" => "after", "negate" => true }, "service" => "ris-1-systemlog-filebeat" }, "input" => { "type" => "log" }, "@version" => "1", "message" => "Jul 22 10:29:01 ris-1 systemd: Started Session 581591 of user admin.", "agent" => { "hostname" => "ris-1", "type" => "filebeat", "id" => "178be62d-8b55-42f8-ae83-7d217b6b9806", "ephemeral_id" => "3bd63afa-c26d-4088-9288-a19a6fc5399d", "version" => "7.2.0" }, "ecs" => { "version" => "1.0.0" }, "host" => { "name" => "ris-1" }, "tags" => [ [0] "_jsonparsefailure", [1] "beats_input_codec_json_applied" ], "@timestamp" => 2019-07-22T02:29:01.409Z, "log" => { "file" => { "path" => "/var/log/messages" }, "offset" => 14606656 } }
服務方式啟動
配置
#[admin@pe-jira conf.d]$ cat /etc/logstash/conf.d/sys-kafka-es.conf input{ kafka { bootstrap_servers => "10.6.76.27:9092" #kafka服務器地址 topics => "ris-1-systemlog-filebeat" group_id => "systemlog-filebeat" decorate_events => true #kafka標記 consumer_threads => 1 codec => "json" #寫入的時候使用json編碼,因為logstash收集后會轉換成json格式 } } output{ # stdout { # codec => "rubydebug" # } #if [type] == "ris-1-systemlog-filebeat"{ if [fields][service] == "ris-1-systemlog-filebeat" { elasticsearch { hosts => ["10.6.76.27:9200"] index => "logstash-ris-1-systemlog-beat-%{+YYYY.MM.dd}" } } }
#啟動
sudo systemctl restart logstash
5 添加到kibana
6 收集Nginx日志
6.1 filebeat配置
#追加到系統日志配置后面的 #[admin@ris-1 filebeat]$ sudo cat /etc/filebeat/filebeat.yml filebeat.inputs: - type: log enabled: true paths: - /var/log/*.log - /var/log/messages fields: service : "ris-1-systemlog-filebeat" #service : filebeat都是自己定義的,定義完成后使用Logstash的if 判斷 #條件為if [fields][service] == "filebeat".就可以了,具體可以看下面的轉發策略. multiline: # 多行日志合並為一行,適用於日志中每一條日志占據多行的情況,比如各種語言的報錯信息調用棧。 pattern: '^[' negate: true match: after # Excludelines. A list of regular expressions to match. It drops the lines that are # # 刪除以DBG開頭的行: exclude_lines: ['^DBG'] exclude_lines: ['.gz$'] #收集Nginx日志 - type: log enabled: true paths: - /home/admin/webserver/logs/api/api.log fields: service : "ris-1-api-nginx" #service : filebeat都是自己定義的,定義完成后使用Logstash的if 判斷 #條件為if [fields][service] == "filebeat".就可以了,具體可以看下面的轉發策略. multiline: # 多行日志合並為一行,適用於日志中每一條日志占據多行的情況,比如各種語言的報錯信息調用棧。 pattern: '^[' negate: true match: after # Excludelines. A list of regular expressions to match. It drops the lines that are # # 刪除以DBG開頭的行: exclude_lines: ['^DBG'] exclude_lines: ['.gz$'] output.logstash: hosts: ["10.6.75.171:5044"] #logstash地址可以是多個,我這里是本機,其實沒必要 worker: 1 #開啟線程數 compression_levle: 3 #壓縮級別 #hosts: ["192.168.108.191:5044","192.168.108.87:5044"] # 發往二台Logstash-collect #loadbalance: true #loadbalance: false # 消息只是往一個logstash里發,如果這個logstash掛了,就會自動將數據發到另一個logstash中。(主備模式) #loadbalance: true # 如果為true,則將數據均分到各個logstash中,掛了就不發了,往存的logstash里面發送。 [admin@ris-1 filebeat]$
6.2 logstash收集配置
#追加到系統日志配置后面的 #[admin@ris-1 filebeat]$ cat /etc/logstash/conf.d/beats.conf input { beats { port => "5044" codec => "json" } } output { if [fields][service] == "ris-1-systemlog-filebeat" { kafka { bootstrap_servers => "10.6.76.27:9092,10.6.76.28:9092,10.6.76.18:9092" topic_id => "ris-1-systemlog-filebeat" batch_size => "5" codec => "json" } } if [fields][service] == "ris-1-api-nginx" { kafka { bootstrap_servers => "10.6.76.27:9092,10.6.76.28:9092,10.6.76.18:9092" topic_id => "ris-1-api-nginx" batch_size => "5" codec => "json" } } } [admin@ris-1 filebeat]$
6.3 kafka查看topic
/home/admin/elk/kafka/bin/kafka-topics.sh --list --zookeeper kafka1:2181, kafka2:2181, kafka3:2181
[admin@pe-jira ~]$ /home/admin/elk/kafka/bin/kafka-topics.sh --list --zookeeper kafka1:2181, kafka2:2181, kafka3:2181|grep ris-1-api-nginx ris-1-api-nginx [admin@pe-jira ~]$
6.4 logstash寫入elasticsearch
[admin@pe-jira conf.d]$ cat /etc/logstash/conf.d/ris-1-nginx-kafka-es.conf input{ kafka { bootstrap_servers => "10.6.76.27:9092" #kafka服務器地址 topics => "ris-1-api-nginx" group_id => "ris-api-nginx" decorate_events => true #kafka標記 consumer_threads => 1 codec => "json" #寫入的時候使用json編碼,因為logstash收集后會轉換成json格式 } } output{ if [fields][service] == "ris-1-api-nginx" { elasticsearch { hosts => ["10.6.76.27:9200"] index => "logstash-ris-1-api-nginx-%{+YYYY.MM.dd}" } } } [admin@pe-jira conf.d]$
6.5 添加kibana
7 收集Java日志
java日志需要正則匹配
例如 [22 12:00:00,003 DEBUG] [mqScheduler-9] spring.SqlSessionUtils - Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@5c90e795] 正則 ^\[[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}
7.1 filebeat配置
#Nginx混寫到一起了 #[admin@ris-1 filebeat]$ sudo cat /etc/filebeat/filebeat.yml filebeat.inputs: #- type: log # enabled: true # paths: # - /var/log/*.log # - /var/log/messages # fields: # service : "ris-1-systemlog-filebeat" # #service : filebeat都是自己定義的,定義完成后使用Logstash的if 判斷 # #條件為if [fields][service] == "filebeat".就可以了,具體可以看下面的轉發策略. # multiline: # 多行日志合並為一行,適用於日志中每一條日志占據多行的情況,比如各種語言的報錯信息調用棧。 # pattern: '^[' # negate: true # match: after ## Excludelines. A list of regular expressions to match. It drops the lines that are ## # 刪除以DBG開頭的行: # exclude_lines: ['^DBG'] # exclude_lines: ['.gz$'] # #收集Nginx日志 - type: log enabled: true paths: - /home/admin/webserver/logs/api/api.log fields: service : "ris-1-api-nginx" #service : filebeat都是自己定義的,定義完成后使用Logstash的if 判斷 #條件為if [fields][service] == "filebeat".就可以了,具體可以看下面的轉發策略. multiline: # 多行日志合並為一行,適用於日志中每一條日志占據多行的情況,比如各種語言的報錯信息調用棧。 pattern: '^[' negate: true match: after # Excludelines. A list of regular expressions to match. It drops the lines that are # # 刪除以DBG開頭的行: exclude_lines: ['^DBG'] exclude_lines: ['.gz$'] #收集java日志 - type: log enabled: true paths: - /home/admin/ris-api-8080/logs/catalina.out fields: service : "ris-1-api-catalina" #service : filebeat都是自己定義的,定義完成后使用Logstash的if 判斷 #條件為if [fields][service] == "filebeat".就可以了,具體可以看下面的轉發策略. multiline: # 多行日志合並為一行,適用於日志中每一條日志占據多行的情況,比如各種語言的報錯信息調用棧。 pattern: '^\[[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}' negate: true match: after # Excludelines. A list of regular expressions to match. It drops the lines that are # # 刪除以DBG開頭的行: #exclude_lines: ['^DBG'] #我想打印debug exclude_lines: ['.gz$'] output.logstash: hosts: ["10.6.75.171:5044"] #logstash地址可以是多個,我這里是本機,其實沒必要 worker: 1 #開啟線程數 compression_levle: 3 #壓縮級別 #hosts: ["192.168.108.191:5044","192.168.108.87:5044"] # 發往二台Logstash-collect #loadbalance: true #loadbalance: false # 消息只是往一個logstash里發,如果這個logstash掛了,就會自動將數據發到另一個logstash中。(主備模式) #loadbalance: true # 如果為true,則將數據均分到各個logstash中,掛了就不發了,往存活的logstash里面發送。
7.2 logstash收集配置
##Nginx混寫到一起了 #[admin@ris-1 filebeat]$ cat /etc/logstash/conf.d/beats.conf input { beats { port => "5044" codec => "json" } } output { if [fields][service] == "ris-1-api-nginx" { kafka { bootstrap_servers => "10.6.76.27:9092,10.6.76.28:9092,10.6.76.18:9092" topic_id => "ris-1-api-nginx" batch_size => "5" codec => "json" } } if [fields][service] == "ris-1-api-catalina" { kafka { bootstrap_servers => "10.6.76.27:9092,10.6.76.28:9092,10.6.76.18:9092" topic_id => "ris-1-api-catalina" batch_size => "5" codec => "json" } } }
7.3 kafka查看topic
/home/admin/elk/kafka/bin/kafka-topics.sh --list --zookeeper kafka1:2181, kafka2:2181, kafka3:2181
[admin@pe-jira ~]$ /home/admin/elk/kafka/bin/kafka-topics.sh --list --zookeeper kafka1:2181, kafka2:2181, kafka3:2181 __consumer_offsets messagetest ris-1-api-catalina ris-1-api-nginx ris-1-systemlog ris-1-systemlog-filebeat ris-api-nginx-1 [admin@pe-jira ~]$ [admin@pe-jira ~]$
7.4 logstash寫入elasticsearch
#[admin@pe-jira ~]$ cat /etc/logstash/conf.d/ris-1-catalina-kafka-es.conf input{ kafka { bootstrap_servers => "10.6.76.27:9092" #kafka服務器地址 topics => "ris-1-api-catalina" group_id => "ris-api-catalina" decorate_events => true #kafka標記 consumer_threads => 1 codec => "json" #寫入的時候使用json編碼,因為logstash收集后會轉換成json格式 } } output{ if [fields][service] == "ris-1-api-catalina" { elasticsearch { hosts => ["10.6.76.27:9200"] index => "logstash-ris-1-api-catalina-%{+YYYY.MM.dd}" } } }
7.5 添加kibana