linux系統常見日志采集
系統日志
/var/log/messages
安全日志
/var/log/secure
審計日志
/var/log/audit/audit.log
組件介紹
filebeat采集日志,然后發送到消息隊列kafka,然后logstash去獲取,利用filter功能過濾格式,然后存儲到elasticsearch中,最后通過kibana展示。
filebeat
輕量級的日志收集工具,本地文件的日志數據采集器。 作為服務器上的代理安裝,Filebeat監視日志目錄或特定日志文件,並將它們轉發給kafka或Elasticsearch、Logstash等。
kafka
kafka使用Scala語言編寫,Kafka是一個分布式、分區的、多副本的、多訂閱者的消息中間件,在ELK日志系統中用於日志的暫存,。
logstash
用於對日志進行收集、過濾,對數據進行格式化處理,並將所搜集的日志傳輸到相關系統進行存儲。Logstash是用Ruby語言開發的,由數據輸入端、過濾器和輸出端3部分組成。其中數據輸入端可以從數據源采集數據,常見的數據源如Kafka等;過濾器是數據處理層,包括對數據進行格式化處理、數據類型轉換、數據過濾等,支持正則表達式;數據輸出端是將Logstash收集的數據經由過濾器處理之后輸出到Elasticsearch。
elasticsearch
ElasticSearch是一個基於Lucene構建的開源的分布式的搜索引擎。設計用於雲計算中,能夠達到實時搜索,穩定,可靠,快速,安裝使用方便。ELK中主要用於數據的永久性存儲。
kibana
一款針對Elasticsearch開源分析以及可視化平台,使用node.js開發,可以用來搜索,展示存儲在Elasticsearch中的數據。同時提供了豐富的圖標模板,只需要通過簡單的配置就可以方便的進行高級數據分析和繪制各種圖表。在Kibana界面可以通過拖拽各個圖表進行排版,同時Kibana也支持條件查詢、過濾檢索等,還支持導入相應插件的儀表盤。
架構圖
日志的處理流程為:filebeat --> kafka --> logstash --> elasticsearch。
架構圖如下所示:

配置示例
filebeat配置
filebeat從日志文件讀寫日志,輸出到kafka。
filebeat.inputs:
#------- 系統相關日志 ----------
- type: log
paths:
- /var/log/messages*
fields:
logtopic: messages
fields_under_root: true
- type: log
paths:
- /var/log/secure*
fields:
logtopic: secure
fields_under_root: true
- type: log
paths:
- /var/log/audit/audit.log*
fields:
logtopic: audit
fields_under_root: true
- type: log
paths:
- /var/log/history.log*
fields:
logtopic: history
fields_under_root: true
#-----------tomcat日志 ----------
- type: log
paths:
- /opt/tomcat/logs/catalina.out*
fields:
logtopic: tomcat-catalina-log
fields_under_root: true
multiline.pattern: ^\[
multiline.negate: true
multiline.match: after
- type: log
json.keys_under_root: true
json.overwrite_keys: true
paths:
- /opt/tomcat/logs/tomcat_access_log.log
fields:
logtopic: tomcat-access-log
fields_under_root: true
#-----------apache日志 ----------
- type: log
json.keys_under_root: true
json.overwrite_keys: true
paths:
- /var/log/httpd/access_log
fields:
logtopic: apache-access-log
fields_under_root: true
#-----------nginx日志 ----------
- type: log
json.keys_under_root: true
json.overwrite_keys: true
paths:
- /usr/local/nginx/logs/access.log
fields:
logtopic: nginx-access-log
fields_under_root: true
- type: log
paths:
- /usr/local/nginx/logs/error.log
fields:
logtopic: nginx-error-log
fields_under_root: true
#-----------mysql日志 ----------
- type: log
paths:
- /alidata/mysql/logs/mysql-slow.log
fields:
logtopic: mysql-slow-log
fields_under_root: true
multiline.pattern: '^(# Time)'
multiline.negate: true
multiline.match: after
- type: log
paths:
- /var/log/mysqld-error.log
fields:
logtopic: mysqld-error-log
fields_under_root: true
#msyql的binlog需先使用maxwell工具輸出為 json格式的file,也可以直接輸出到kafka
- type: log
json.keys_under_root: true
json.overwrite_keys: true
paths:
- /alidata/mysql/logs/mysql_binlog_data.log
fields:
logtopic: mysqld-binlog
fields_under_root: true
#------- 全局設置 ------------
fields:
host: 主機IP
fields_under_root: true
#-------輸出到kafka ----------
output.kafka:
enabled: true
hosts: ["kafka_ip:port"]
topic: "elk-%{[logtopic]}"
partition.round_robin:
reachable_only: true
required_acks: 1
compression: gzip
max_message_bytes: 10000000
logstash配置
logstash作為消費者從kafka消費日志,經過格式化處理后輸出到elasticsearch。
input {
kafka {
topics_pattern => "elk-.*"
bootstrap_servers => "x.x.x.x:9092"
auto_offset_rest => "latest"
codec => json {
charset => "UTF-8"
}
group_id => "xxxxx"
client_id => "xxxx"
consumer_threads => 5
decorate_events => true //此屬性會將當前topic、offset、group、partition等信息也帶到message中
}
}
filter{
if[logtopic] == "tomcatlog"{
grok{
........
}
}
if[logtopic] == "history"{
grok{
........
}
}
}
output {
elasticsearch {
hosts => ["x.x.x.x:9200"]
#index => "elk-%{logtopic}-%{+YYYY.MM.dd}"
index => "%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"
}
#stdout { codec => rubydebug }
}
