使用logstash過濾出特定格式的日志


1.一個例子

  • 項目日志生成在某個路徑,如/var/log/project,里面有warn,info,error目錄,分別對應不同級別的日志,需要采集這些日志。
  • 需要采集特定格式的日志,如:
[2018-11-24 08:33:43,253][ERROR][http-nio-8080-exec-4][com.hh.test.logs.LogsApplication][code:200,msg:測試錄入錯誤日志,param:{}]
  • 篩選采集到的日志,生成自定義字段,如 date, level, thread, class, msg
filter {
  if "nova" in [tags]{
    grok {
      # 篩選過濾
      match => {
        "message" => "(?<date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\[(?<level>[A-Z]{4,5})\]\[(?<thread>[A-Za-z0-9/-]{4,40})\]\[(?<class>[A-Za-z0-9/.]{4,40})\]\[(?<msg>.*)"
      }
    mutate {
      remove_field => [
        "message",
      ]
    }
    # 不匹配正則則刪除,匹配正則用=~
    if [level] !~ "(ERROR|WARN|INFO)" {
      # 刪除日志
      drop {}
    }
  }
}

備注:grok里邊有定義好的現場模板可以用,但是更多的是自定義模板,規則是這樣的,小括號里邊包含一個key和value,例子:(? value),比如以上的信息,第一個定義的key是date,表示方法為:? 前邊一個問號,然后用<>把key包含在里邊去。value就是純正則了。這有個在線的調試庫,以供參考: http://grokdebug.herokuapp.com/

2.多項匹配(提高性能)

grok {
  "match" => {
    "message => [
      '%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{WORD:word_1} %{WORD:word_2} %{NUMBER:number_1} %{NUMBER:number_2} %{DATA:data}',
      '%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{WORD:word_1} %{NUMBER:number_1} %{NUMBER:number_2} %{NUMBER:number_3} %{DATA:data};%{NUMBER:number_4}',
      '%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{DATA:data} | %{NUMBER:number}'
    ]
  }
}

logstash 會按照這個定義次序依次嘗試匹配,直到匹配成功為止。雖說效果跟用 | 分割寫個大大的正則是一樣的,但是可閱讀性好很多。

3.grok匹配失敗(性能基准測試)

當grok匹配失敗的時候,插件會為這個事件打個tag,默認是 _grokparsefailure。LogStash允許你把這些處理失敗的事件路由到其他地方做后續的處理,例如:

input { # ... }
filter {
  grok {
    match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{IPV4:ip};%{WORD:environment}\] %{LOGLEVEL:log_level} %{GREEDYDATA:message}" }
  }
}
output {
  if "_grokparsefailure" in [tags] {
    # write events that didn't match to a file
    file { "path" => "/tmp/grok_failures.txt" }
  } else {
     elasticsearch { }
  }
}

4.快速失敗,設置錨點(提高性能)

^%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}$

5.數據修改(Mutate)

filter {
  mutate {
    convert => ["request_time", "float"]
  }
}

6.大小寫轉換:uppercase 和 lowercase

filter {
  mutate {
    uppercase => [ "fieldname" ]
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM