logstash解析嵌套json格式數據


logstash解析嵌套json格式數據

1、源文件

  1.原日志文件為

2019-10-28 09:49:44:947 [http-nio-8080-exec-23] INFO  [siftLog][qewrw123ffwer2323fdsafd] - logTime:2019-10-28 09:49:25.833-receiveTime:2019-10-28 09:49:44.044-{"area":"","frontInitTime":0,"initiatePaymentMode":"plugin_manual","network":"電信","os":"Microsoft Windows 7","payStatus":"1","reqs":[{"curlCode":"0","end":"2019-10-28 09:49:25.233","errorCode":"","errorDesc":"","totalTime":2153}],"settleAccountsTime":0}

  在這里我們需要先把json前面一段的正則寫出來,由於這些數據在實際生產沒什么實際意義,所以沒重點分字段

  DATETIME %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?
       ACCESSLOG %{DATETIME:logTime} \[%{DATA:threadName}\] %{DATA:loglevel}  \[%{DATA:logType}\]\[%{DATA:appId}\] - logTime:%{DATETIME:logTime2}-receiveTime:%{DATETIME:receiveTime}-%{GREEDYDATA:jsonMsg}

  這個文件json中間還嵌套了一個json,所以需要把里面嵌套的json在拿出來解析,故logstash配置文件應該寫成  

input {
  kafka {
    #bootstrap_servers => "kafka-service.ops:9092"
    bootstrap_servers => "172.27.27.220:9092,172.27.27.221:9092,172.27.27.222:9092"
    topics => ["test-grok"]
    codec => "json"
    type => "test-grok"
  }
}

filter {
  if [type] == "test-grok" {
    grok {
        patterns_dir => [ "/opt/appl/logstash/patterns" ]
        match => { "message" => "%{ACCESSLOG}" }
    }
    mutate {
      gsub => [ "jsonMsg","\[","" ]
      gsub => [ "jsonMsg","\]","" ]
    }
    json {
      source => "jsonMsg"
    }
    mutate {
      add_field => { "reqs_json" => "%{reqs}" }
    }
    json {
      source => "reqs_json"
      remove_field => ["reqs","reqs_json","message","jsonMsg"]
    }
  }

  ruby {
    code => "event.timestamp.time.localtime"
  }

}

output {
  elasticsearch {
    hosts => ["172.27.27.220:9200","172.27.27.221:9200","172.27.27.222:9200"]
    index => "logstash-test-grok-%{+YYYY.MM.dd}"
    template_overwrite => true
  }
}

  

  2.原日志文件為  

[2019-10-28 10:01:01.169] [Thread-13086] INFO  [192.168.2.1, 192.168.1.1, 192.168.1.2_1572_smallTrade] [INTERFACE] - [HTTP] [request] - {"latitude":"","cardCode":"","memberCouponNo":"","transAmount":"900","hbFqNum":"","confirmCode":"9357","couponAmount":"","lastCost":"2360","memberMobile":"","timestamp":"1572228060000","longitude":""}

  日志只需要取到有lastCost這個關鍵字的,所以filebeat配置應該為  

- type: log
  enabled: true
  paths:
    - /opt/appl/tomcat/logs/test/test.log
  include_lines: ['.*lastCost.*']
  tail_files: true
  fields:
    type: interface
    log_module: test-interface
output.kafka:
  enabled: true
  hosts: ["172.27.27.220:9092,172.27.27.221:9092,172.27.27.222:9092"]
  topic: '%{[fields][type]}'

  

 

  由於研發同事把客戶端的IP加到了第一個第四個字段的第一個IP,所以要把這個IP單獨拿出來分析

  DATETIME %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?

  

input {
       kafka {
         bootstrap_servers => "172.27.27.220:9092,172.27.27.221:9092,172.27.27.222:9092"
         topics => ["interface"]
         codec => "json"
         type => "test-interface"
       }
}

filter {
        if [type] == "test-interface" {
                grok {
                        patterns_dir => [ "/opt/logstash/patters" ]
                        match => { "message" => "\[%{DATETIME:log_timestamp}\] \[%{DATA:ThreadName}\] %{LOGLEVEL:logLevel}  \[%{DATA:IP}\] \[%{DATA:InterfaceTag}\] - \[%{DATA:Protocol}\] \[%{DATA:LogType}\] - %{GREEDYDATA:jsonMsg2}" }
                }
                json {
                        source => "jsonMsg2"
                        remove_field => [ "jsonMsg2","message" ]
                }
                mutate {
                        convert => [ "lastCost","float" ]
                        split => ["IP",", "]
                        add_field => { "clientIp" => "%{[IP][0]}" }
                        add_field => { "proxyIp" => "%{[IP][1]}" }
                        add_field => { "time" => "%{[IP][2]}" }
                }
                geoip {
                        source => "clientIp"
                        #database => "/opt/logstash-interface/Geoip/GeoLite2-City_20191022/GeoLite2-City.mmdb"
                }
                }
                ruby {
                code => "event.timestamp.time.localtime"
                }
}

output {
	elasticsearch {
		hosts => ["172.27.27.220:9200","172.27.27.221:9200","172.27.27.222:9200"]
		index => "logstash-test-interface-%{+YYYY.MM.dd}"
		template_overwrite => true
	}
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM