【原創】大數據基礎之Logstash(3)應用之file解析(grok/ruby/kv)


從nginx日志中進行url解析

/v1/test?param2=v2&param3=v3&time=2019-03-18%2017%3A34%3A14
->
{'param1':'v1','param2':'v2','param3':'v3','time':'2019-03-18 17:34:14'}

nginx日志示例:

1.119.132.168 - - [18/Mar/2019:09:13:50 +0000] "POST /param1/test?param2=1&param3=2&time=2019-03-18%2017%3A34%3A14 HTTP/1.1" 200 929 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "-"

1 使用grok

input {

      file {

 path => [ "/var/log/nginx/access.log" ] start_position => "beginning" } } filter { if [message] =~ /test/ { grok { match => { "message" => "%{IPORHOST:client_ip} (%{USER:ident}|-) (%{USER:auth}|-) \[%{HTTPDATE:access_time_raw}\] \"(?:%{WORD:verb} (/%{PARAMVALUE:param1}/test\?param2=%{PARAMVALUE:param2}&param3=%{PARAMVALUE:param3}&time=%{PARAMVALUE:send_time_raw})(?: HTTP/%{NUMBER:http_version})?|-)\" (%{NUMBER:response}|-) (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent} %{QS:x_forward_for}" } pattern_definitions => { "PARAMVALUE" => "[^& ]*" } } urldecode { all_fields => true } date { match => [ "access_time_raw","dd/MMM/yyyy:HH:mm:ss Z"] target => "access_time_tmp"
} ruby { code => "event.set('access_time', (event.get('access_time_tmp').to_i * 1000000).to_s) event.set('send_time', event.get('access_time'))" } if [send_time_raw] { date { match => [ "send_time_raw","yyyy-MM-dd HH:mm:ss"] target => "send_time_tmp"
timezone => "UTC"
} ruby { code => "event.set('send_time', (event.get('send_time_tmp').to_i * 1000000).to_s)" } } mutate { remove_field => ["message", "ident", "auth", "verb", "bytes", "reponse", "x_forward_for", "http_version", "access_time_raw", "access_time_tmp", "path", "response", "send_time_raw", "send_time_tmp"] } } else { drop {} } } output { if [param1] and [param2] and [param3] and "_grokparsefailure" not in [tags] { stdout {codec => json} } }

注意:
1)對url的參數名和位置硬編碼,不靈活
2)使用自定義pattern:PARAMVALUE
3)一定要使用urldecode,否則time得到的value為2019-03-18%2017%3A34%3A14,logstash中date插件使用joda解析pattern會報錯,因為含有字母A;
4)如果time為空,則使用access_time;
5)不匹配的記錄drop掉;
6)只有滿足條件的記錄才會被output;
7)在filter和output中使用if-else定義分支;
8)date插件要注意timezone,否則會按照時區偏移;

2 使用grok+ruby

 
         

  input {
    file {
      path => [ "/var/log/nginx/access.log" ]
      start_position => "beginning"
    }
  }

 filter { if [message] =~ /test/ { grok { match => { "message" => "%{IPORHOST:client_ip} (%{USER:ident}|-) (%{USER:auth}|-) \[%{HTTPDATE:access_time_raw}\] \"(?:%{WORD:verb} (%{URIPATHPARAM:request}|-)(?: HTTP/%{NUMBER:http_version})?|-)\" (%{NUMBER:response}|-) (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}" } } urldecode { all_fields => true } date { match => [ "access_time_raw","dd/MMM/yyyy:HH:mm:ss Z"] target => "access_time_tmp"
} ruby { code => "event.set('access_time', (event.get('access_time_tmp').to_i * 1000000).to_s) event.set('send_time', event.get('access_time'))" } if [request] { ruby {
          init => "
          def convertName(name)
              result = ''
              name.each_char{|ch| result += (if ch < 'a' then '_' + ch.downcase else ch end)}
              result
          end
          "
code
=> " event.set('param1', event.get('request').split('?')[0].split('/')[1]) pairs = event.get('request').split('?')[1].split('&') pairs.each{ |item| arr=item.split('='); event.set(arr[0], arr[1])} " } if [time] { date { match => [ "time","yyyy-MM-dd HH:mm:ss"] target => "send_time_tmp"
timezone => "UTC"
} ruby { code => "event.set('send_time', (event.get('send_time_tmp').to_i * 1000000).to_s)" } } } mutate { remove_field => ["message", "ident", "auth", "verb", "bytes", "reponse", "x_forward_for", "http_version", "access_time_raw", "access_time_tmp", "path", "response", "time", "send_time_tmp"] } } else { drop {} } } output { if [param1] and [param2] and [param3] and "_grokparsefailure" not in [tags] { stdout {codec => json} } }

注意:
1)直接使用默認的nginx日志的grok pattern;
2)在ruby中直接按照key=value進行解析,更靈活;
3)自定義函數;

 

logstash的ruby代碼中getter和setter必須使用代碼,比如event.get('field'),不能使用event['field'],因為

[2019-03-19T17:15:32,729][ERROR][logstash.filters.ruby ] Ruby exception occurred: Direct event field references (i.e. event['field'] = 'value') have been disabled in favor of using event get and set methods (e.g. event.set('field', 'value')). Please consult the Logstash 5.0 breaking changes documentation for more details.

3 使用grek+kv

input {
    file {
        path => [ "/data/tmp/access.log" ]
        start_position => "beginning"
    }
}

filter {
  if [message] =~ /dataone\/u1/ {
    grok {
        match => { "message" => "%{IPORHOST:client_ip} (%{USER:ident}|-) (%{USER:auth}|-) \[%{HTTPDATE:access_time_raw}\] \"(?:%{WORD:verb} (%{URIPATHPARAM:request}|-)(?: HTTP/%{NUMBER:http_version})?|-)\" (%{NUMBER:response}|-) (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}" }
    }
    kv {
      source => "request"
      field_split => "&?"
      value_split => "="
    }
    urldecode {
        all_fields => true 
    }
    date {
        match => [ "access_time_raw","dd/MMM/yyyy:HH:mm:ss Z"]
        target => "access_time_tmp"
    }
    ruby {
        code => "event.set('access_time', (event.get('access_time_tmp').to_i * 1000000).to_s)
                event.set('send_time', event.get('access_time'))"
    }
    if [send_time_raw] {
      date {
          match => [ "send_time_raw","yyyy-MM-dd HH:mm:ss"]
          target => "send_time_tmp"
      }
      ruby {
          code => "event.set('send_time', (event.get('send_time_tmp').to_i * 1000000).to_s)"
      }
    }
    mutate {
        remove_field => ["message", "ident", "auth", "verb", "bytes", "reponse", "x_forward_for", "http_version", "access_time_raw", "access_time_tmp", "path", "response", "send_time_raw", "send_time_tmp"]
    }
  } else {
    drop {}
  }
}

 

參考:https://www.elastic.co/guide/en/logstash/current/plugins-filters-kv.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM