logstash 配置文件實例


這個配置文件記不起來是從那個地方下載的來的了,感謝那位無私的朋友

input {
  beats {            #shipper 端用的是 filebeat,所以用這個插件
    port => 510        #開510端口,shipper 端發到 這里
  }
  beats {
    port => 511
    codec => json    #原始日志是json格式,這里指定json格式,就可以解析好日志,下面filter 就不用寫grok表達式了
  }
  tcp {                #shipper用rsyslog,不像filebeat會自帶元數據,所以用tcp開個端口就好
    port => 512
    type => commamd    #增加個字段,以此來查找此類型日志
  }
  tcp {
    port => 513
    type => win_event
    codec => multiline {    #win的event日志,是多行的,用多行插件來做全合並
      pattern => "^%{NGINXERR_DATE}"    #這個正則變量,是一種時間日期格式,就是說日志以這種日期開頭,后面的都算一條日志
      negate => true
      what => "previous"
    }
  }
}

filter {
  if [input_type] == "log" {    #判斷值,filebeat 的元數據(即是每一條日志都會附帶上的數據)就有這個字段。
  ruby {    #ruby插件,下面init 是聲明有多少個字段。code 是聲明 使用什么分隔符,分隔出每一個字段的值。要搞這個,前提是你的日志格式,指定生成以XXX符號為間隔啦
    init => "@kname = ['http_x_forwarded_for','timestamp','request','status ','body_bytes_sent','http_referer','http_user_agent','remote_addr','http_head','upstream_addr','request_time','upstream_response_time']"
    code => "new_event = LogStash::Event.new(Hash[@kname.zip(event['message'].split('|'))]); new_event.remove('@timestamp');event.append(new_event)"
  }
  if [request] {    #還是判斷,這是上面分好的字段,這里對request字段再分割一下
    ruby {            #同上啦
      init => "@kname = ['method','URL','verb']"
      code => "new_event = LogStash::Event.new(Hash[@kname.zip(event['request'].split(' '))]); new_event.remove('@timestamp');event.append(new_event)"
    }
  }
  if [URL] {        #跟上面的request判斷 一個意思,就是要把一個字段,分的更細的方便日志分析
    ruby {
      init => "@kname = ['uri','url_args']"
      code => "new_event = LogStash::Event.new(Hash[@kname.zip(event['URL'].split('?'))]); new_event.remove('@timestamp');event.append(new_event)"
      remove_field => ["request","message","URL"]    #這是每一個插件都有的命令,把多余的字段移除掉吧。
        }
  }


  mutate {        #匹配插件,常用的
    convert => [    #轉換,上面切割好字段,就像數據庫表一樣,都是str類型,下面就是把字段轉換成對應的類型。舉例:值是 - ,也會被轉成0 。多個值的會變成數組。
      "body_bytes_sent" , "integer",
      "request_time", "float",
      "upstream_response_time", "float"
    ]
  }
  date {    #時間插件
    match => ["timestamp" , "ISO8601"]    #timestamp這個字段是日志里帶的那個時間,而@timestamp 這個是默認的時間(此描述不准),通常我們多用這個@timestamp畫圖作為時間軸。ISO8601就是一種時間格式。詳見搜下logstash N級目錄下grok-patterns文件
  }
}

  if [input_type] == "test" {
    mutate {split => ["ups_resp_time", ","]}    #就是一個字段可能有多個值。用逗號隔開。
    mutate {convert => ["ups_resp_time", "float"]} #然后再轉成浮點型
    mutate {convert => ["request_time", "float"]}
    date {match => ["timestamp","ISO8601"]}
  }

  if [type] == "commad" {    #這是一條別的日志,下面就是grok 解析日志的寫法。日志長這樣:2016-06-17 08:42:59 ## root@/dev/pts/1 ---> 121.33.26.18 51101 120.26.13.18 22 ##  snmpnetstat -V
    grok {  match => {"message" => "%{NGINXERR_DATE:log_timestamp} %{NOTSPACE:xx} %{USERNAME:user}@%{NOTSPACE:tty} %{NOTSPACE:xxx} %{IPV4:chient_ip} %{NUMBER:client_port} %{IPV4:server_ip} %{NUMBER:server_port} %{NOTSPACE:xxxx} %{GREEDYDATA:command}"}
      remove_field => ['xx']
      remove_field => ['xxx']
      remove_field => ['xxxx']
      remove_field => ['message']  }
    date { match => ["log_timestamp" , "yyyy-MM-dd HH:mm:ss"] }
  }

  if [type] == "win_event" {    #在input模塊那里,已用多行插件處理了。這里當成一行寫grok就好。
    grok {
      match => {"message" => "%{NGINXERR_DATE:winlog_timestamp} %{NOTSPACE:win_hostname} %{NOTSPACE:Level} %{NUMBER:event_id} %{GREEDYDATA:event}" }
      remove_field => ['message']
    }
    date {
      match => ["winlog_timestamp" , "yyyy-MM-dd HH:mm:ss"]
    }
  }

# 下面這些呢,因為kibana顯示問題,想看的直觀些,於是在這里做個替換。存進es里時,就直接這中文啦。kibana展示,自然出是中文
if [host] == "10.168.24.70"  { mutate { replace => { "host" => "精武門" } } }
if [host] == "10.117.16.241" { mutate { replace => { "host" => "nginx_1" } } }
if [host] == "10.117.9.162"  { mutate { replace => { "host" => "nginx_2" } } }
if [host] == "10.51.8.234" { mutate { replace => { "host" => "監控" } } }
if [host] == "10.47.69.198" { mutate { replace => { "host" => "APP" } } }
if [win_hostname] == "iZ23syf95oaZ" { mutate { replace => { "win_hostname" => "數據庫" } } }
if [win_hostname] == "iZ234bmxy7wZ" { mutate { replace => { "win_hostname" => "網站" } } }
if [win_hostname] == "iZ233n40vi4Z" { mutate { replace => { "win_hostname" => "資訊" } } }
if [win_hostname] == "iZ23z5w0bj3Z" { mutate { replace => { "win_hostname" => "DataCenter" } } }

}

# 這個沒啥好說的吧。我就一個索引名。默認完了。
output {
  elasticsearch { hosts => "127.0.0.1:9200" }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM