Linux日志采集方案


linux系統常見日志采集

系統日志

/var/log/messages

安全日志

/var/log/secure

審計日志

/var/log/audit/audit.log

組件介紹

filebeat采集日志,然后發送到消息隊列kafka,然后logstash去獲取,利用filter功能過濾格式,然后存儲到elasticsearch中,最后通過kibana展示。

filebeat

輕量級的日志收集工具,本地文件的日志數據采集器。 作為服務器上的代理安裝,Filebeat監視日志目錄或特定日志文件,並將它們轉發給kafka或Elasticsearch、Logstash等。

kafka

kafka使用Scala語言編寫,Kafka是一個分布式、分區的、多副本的、多訂閱者的消息中間件,在ELK日志系統中用於日志的暫存,。

logstash

用於對日志進行收集、過濾,對數據進行格式化處理,並將所搜集的日志傳輸到相關系統進行存儲。Logstash是用Ruby語言開發的,由數據輸入端、過濾器和輸出端3部分組成。其中數據輸入端可以從數據源采集數據,常見的數據源如Kafka等;過濾器是數據處理層,包括對數據進行格式化處理、數據類型轉換、數據過濾等,支持正則表達式;數據輸出端是將Logstash收集的數據經由過濾器處理之后輸出到Elasticsearch。

elasticsearch

ElasticSearch是一個基於Lucene構建的開源的分布式的搜索引擎。設計用於雲計算中,能夠達到實時搜索,穩定,可靠,快速,安裝使用方便。ELK中主要用於數據的永久性存儲。

kibana

一款針對Elasticsearch開源分析以及可視化平台,使用node.js開發,可以用來搜索,展示存儲在Elasticsearch中的數據。同時提供了豐富的圖標模板,只需要通過簡單的配置就可以方便的進行高級數據分析和繪制各種圖表。在Kibana界面可以通過拖拽各個圖表進行排版,同時Kibana也支持條件查詢、過濾檢索等,還支持導入相應插件的儀表盤。

架構圖

日志的處理流程為:filebeat --> kafka --> logstash --> elasticsearch。

架構圖如下所示: 

配置示例

filebeat配置

filebeat從日志文件讀寫日志,輸出到kafka。

filebeat.inputs:
#------- 系統相關日志 ----------
- type: log
  paths:
    - /var/log/messages*
  fields:
    logtopic: messages 
  fields_under_root: true

- type: log
  paths:
    - /var/log/secure*
  fields:
    logtopic: secure
  fields_under_root: true

- type: log
  paths:
    - /var/log/audit/audit.log*
  fields:
    logtopic: audit
  fields_under_root: true

- type: log
  paths:
    - /var/log/history.log*
  fields:
    logtopic: history
  fields_under_root: true

#-----------tomcat日志 ----------
- type: log
  paths:
    - /opt/tomcat/logs/catalina.out*
  fields:
    logtopic: tomcat-catalina-log
  fields_under_root: true
  multiline.pattern: ^\[
  multiline.negate: true
  multiline.match: after

- type: log
  json.keys_under_root: true
  json.overwrite_keys: true
  paths:
    - /opt/tomcat/logs/tomcat_access_log.log
  fields:
    logtopic: tomcat-access-log
  fields_under_root: true

#-----------apache日志 ----------  
- type: log
  json.keys_under_root: true
  json.overwrite_keys: true
  paths:
    - /var/log/httpd/access_log
  fields:
    logtopic: apache-access-log
  fields_under_root: true

#-----------nginx日志 ----------  
- type: log
  json.keys_under_root: true
  json.overwrite_keys: true
  paths:
    - /usr/local/nginx/logs/access.log
  fields:
    logtopic: nginx-access-log
  fields_under_root: true

- type: log
  paths:
    - /usr/local/nginx/logs/error.log
  fields:
    logtopic: nginx-error-log
  fields_under_root: true

#-----------mysql日志 ----------
- type: log
  paths:
    - /alidata/mysql/logs/mysql-slow.log
  fields:
    logtopic: mysql-slow-log
  fields_under_root: true
  multiline.pattern: '^(# Time)'
  multiline.negate: true
  multiline.match: after

- type: log
  paths:
    - /var/log/mysqld-error.log
  fields:
    logtopic: mysqld-error-log
  fields_under_root: true

#msyql的binlog需先使用maxwell工具輸出為 json格式的file,也可以直接輸出到kafka
- type: log
  json.keys_under_root: true
  json.overwrite_keys: true
  paths:
    - /alidata/mysql/logs/mysql_binlog_data.log
  fields:
    logtopic: mysqld-binlog
  fields_under_root: true

#------- 全局設置 ------------
fields:
  host: 主機IP
fields_under_root: true

#-------輸出到kafka ----------
output.kafka:
  enabled: true
  hosts: ["kafka_ip:port"]
  topic: "elk-%{[logtopic]}"
  partition.round_robin:
    reachable_only: true
  required_acks: 1
  compression: gzip
  max_message_bytes: 10000000

logstash配置

logstash作為消費者從kafka消費日志,經過格式化處理后輸出到elasticsearch。

input {
  kafka {
    topics_pattern => "elk-.*"
    bootstrap_servers => "x.x.x.x:9092"
    auto_offset_rest => "latest"
    codec => json {
        charset => "UTF-8"
    }
    group_id => "xxxxx"
    client_id => "xxxx"
    consumer_threads => 5
    decorate_events => true //此屬性會將當前topic、offset、group、partition等信息也帶到message中
   }
}

filter{
    if[logtopic] == "tomcatlog"{
        grok{
           ........
        }
    }
    if[logtopic] == "history"{
        grok{
           ........
        }
    }
}

output {
  elasticsearch {
    hosts => ["x.x.x.x:9200"]
    #index => "elk-%{logtopic}-%{+YYYY.MM.dd}"
    index => "%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"
  }
  #stdout { codec => rubydebug }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM