使用logstash收集java、nginx、系統等常見日志


 

目錄

1、使用codec的multiline插件收集java日志... 1

2、收集nginx日志... 2

3、收集系統syslog日志... 3

4、使用fliter的grok模塊收集mysql日志... 4

 

 

1、使用codec的multiline插件收集java日志

對於采用ELK作為應用日志來說,多行消息的友好展示是必不可少的,否則ELK的價值就大大打折了。要正確的處理多行消息,需使用multiline插件

 

比如,對於java日志而言,可以使用:

multiline.pattern: '^\['

multiline.negate: true

multiline.match: after

 

 

這樣,下面的日志就算一個事件了。

 

input {

    file {

        path => "/var/log/elasticsearch/chuck-clueser.log"

        type => "es-error"

        start_position => "beginning"

        codec => multiline {

            pattern => "^\["    #使用正則表式, 以中括號開頭的就是一行日志

            negate => true

            what => "previous"

        }

    }

}

output {

    if [type] == "es-error" {

        elasticsearch {

            hosts => ["192.168.100.163:9200"]

            index => "es-error-%{+YYYY.MM.dd}"

        }

    }

}

 

2、收集nginx日志

使用codec的json插件將日志的域進行分段,使用key-value的方式,使日志格式更清晰,易於搜索,還可以降低cpu的負載 

2.1 更改nginx的配置文件的日志格式,使用json

[root@linux-node1 ~]# vim /etc/nginx/nginx.conf   #添加日志格式,把自帶的格式注釋掉

17 http {

 18     #log_format  main  '$remote_addr - $remote_user [$time_local] "$request"     '

 19     #                  '$status $body_bytes_sent "$http_referer" '

 20     #                  '"$http_user_agent" "$http_x_forwarded_for"';

 21     #access_log  /var/log/nginx/access.log  main;

 22     log_format json '{ "@timestamp": "$time_local", '

 23                          '"@fields": { '

 24                          '"remote_addr": "$remote_addr", '

 25                          '"remote_user": "$remote_user", '

 26                          '"body_bytes_sent": "$body_bytes_sent", '

 27                          '"request_time": "$request_time", '

 28                          '"status": "$status", '

 29                          '"request": "$request", '

 30                          '"request_method": "$request_method", '

 31                          '"http_referrer": "$http_referer", '

 32                          '"body_bytes_sent":"$body_bytes_sent", '

 33                          '"http_x_forwarded_for": "$http_x_forwarded_for", '

 34                          '"http_user_agent": "$http_user_agent" } }';

 35     access_log /var/log/nginx/access_json.log json;

[root@linux-node1 ~]# nginx -t  #檢查配置文件

[root@linux-node1 ~]# systemctl start nginx

日志格式如下

 

 

2.2 使用logstash將nginx訪問日志收集起來

[root@linux-node1 ~]# cat log_nginx.conf 4、

input {

    file {

        path => "/var/log/nginx/access_json.log"

        codec => "json"

        start_position => "beginning"

        type => "nginx-log"

    }

}

output {

    elasticsearch {

        hosts => ["http://192.168.100.163:9200"]

        index => "nginx-%{+YYY.MM.dd}"

        }

}

[root@linux-node1 ~]# /usr/local/logstash/bin/logstash -f log_nginx.conf

 

 

3、收集系統syslog日志

[root@linux-node1 ~]# vim syslog.conf

input {

    syslog {

        type => "system-syslog"

        #綁定個ip,監聽個514端口,啟動后,別的機器可以通過網絡把日志發過來

        host => "192.168.100.161"

        port => "514"

    }

}

output {

    elasticsearch {

        hosts => ["192.168.100.161:9200"]

        index => "system-syslog-%{+YYYY.MM.dd}"

        }

    }

 

[root@linux-node1 ~]# /usr/local/logstash/bin/logstash -f syslog.conf

 

修改服務器的syslog配置文件,把日志信息發送到514端口上

[root@linux-node2 ~]# vim /etc/rsyslog.conf

90 *.* @@192.168.100.161:514

[root@linux-node2 ~]# systemctl restart rsyslog

 

 

 

4、使用fliter的grok模塊收集mysql日志

filter插件有很多,在這里就學習grok插件,使用正則匹配日志里的域來拆分。在實際生產中,apache日志不支持jason,就只能使用grok插件匹配;mysql慢查詢日志也是無法拆分,只能使用grok正則表達式匹配拆分。

 

在如下鏈接,github上有很多寫好的grok模板,可以直接引用 

https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns

 

 

在裝好的logstash中也會有grok匹配規則,直接可以引用,路徑如下

[root@linux-node1 patterns]# pwd

/usr/local/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.0/patterns

 

 

 

4.1日志文件

[root@linux-node1 ~]# cat slow.log

# Time: 160108 15:46:14

# User@Host: dev_select_user[dev_select_user] @  [192.168.97.86]  Id: 714519

# Query_time: 1.638396  Lock_time: 0.000163 Rows_sent: 40  Rows_examined: 939155

SET timestamp=1452239174;

SELECT DATE(create_time) as day,HOUR(create_time) as h,round(avg(low_price),2) as low_price

    FROM t_actual_ad_num_log WHERE create_time>='2016-01-07' and ad_num<=10

    GROUP BY DATE(create_time),HOUR(create_time);

 

4.2編寫slow.conf

[root@linux-node1 ~]# cat mysql-slow.conf

input{

   file {

     path => "/root/slow.log"

     type => "mysql-slow-log"

     start_position => "beginning"

     codec => multiline {

        pattern => "^# User@Host:"

        negate => true

        what => "previous"

    }

  }

}

filter {

      # drop sleep events

    grok {

        match => { "message" =>"SELECT SLEEP" }

        add_tag => [ "sleep_drop" ]

        tag_on_failure => [] # prevent default _grokparsefailure tag on real records

      }

     if "sleep_drop" in [tags] {

        drop {}

     }

     grok {

        match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id: %{NUMBER:row_id:int}\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)\n#\s*" ]

      }

      date {

        match => [ "timestamp", "UNIX" ]

        remove_field => [ "timestamp" ]

      }

}

output {

     stdout{

     codec => "rubydebug"

   }

}

 

 

執行該配置文件,查看grok正則匹配結果 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM