原理
使用filebeat來上傳日志數據,logstash進行日志收集與處理,elasticsearch作為日志存儲與搜索引擎,最后使用kibana展現日志的可視化輸出。所以不難發現,日志解析主要還是logstash做的事情。

從上圖中可以看到,logstash主要包含三大模塊:
- INPUTS: 收集所有數據源的日志數據([源有file、redis、beats等,filebeat就是使用了beats源*);
- FILTERS: 解析、整理日志數據(本文重點);
- OUTPUTS: 將解析的日志數據輸出至存儲器([elasticseach、file、syslog等);
FILTERS是重點,來看看它常用到的幾個插件:
- grok:采用正則的方式,解析原始日志格式,使其結構化;
- geoip:根據IP字段,解析出對應的地理位置、經緯度等;
- date:解析選定時間字段,將其時間作為logstash每條記錄產生的時間(若沒有指定該字段,默認使用read line的時間作為該條記錄時間);
*注意:codec也是經常會使用到的,它主要作用在INPUTS和OUTPUTS中,[提供有json的格式轉換、multiline的多行日志合並等
配置文件
一個簡單的配置文件:
input { log4j { port => "5400" } beats { port => "5044" } } filter { # 多個過濾器會按聲明的先后順序執行 grok { match => { "message" => "%{COMBINEDAPACHELOG}"} } geoip { source => "clientip" } } output { elasticsearch { action => "index" hosts => "127.0.0.1:9200" # 或者 ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"] ,支持均衡的寫入ES的多個節點,一般為非master節點 index => "logstash-%{+YYYY-MM}" } stdout { codec=> rubydebug } file { path => "/path/to/target/file" } }
場景
1. NodeJS 日志
- 日志格式
$time - $remote_addr $log_level $path - $msg
- 日志內容
2017-03-15 18:34:14.535 - 112.65.171.98 INFO /root/ws/socketIo.js - xxxxxx與ws server斷開連接
- filebeat配置(建議filebeat使用rpm安裝,以systemctl start filebeat方式啟動)
filebeat: prospectors: - document_type: nodejs #申明type字段為nodejs,默認為log paths: - /var/log/nodejs/log #日志文件地址 input_type: log #從文件中讀取 tail_files: true #以文件末尾開始讀取數據 output: logstash: hosts: ["${LOGSTASH_IP}:5044"] #General Setting name: "server1" #設置beat的名稱,默認為主機hostname
- logstash中FILTERS配置
filter { if [type] == "nodejs" { #根據filebeat中設置的type字段,來過濾不同的解析規則 grok{ match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} - %{IPORHOST:clientip} %{LOGLEVEL:level} %{PATH:path} - %{GREEDYDATA:msg}" } } geoip { source => "clientip" #填寫IP字段 } } }
- 結果(為方便演示,數據有刪減)

- Filter配置講解
- grok中的match內容:
- key:表示所需解析的內容;
- value:表示解析的匹配規則,提取出對應的字段;
- 解析語法:%{正則模板:自定義字段},其中TIMESTAMP_ISO8601、IPORHOST等都是grok提供的正則模板;
- geoip:通過分析IP值,產生IP對應的地理位置信息;
這里是否發現@timestamp與timestamp不一致,@timestamp表示該日志的讀取時間,在elasticsearch中作為時間檢索索引。下面講解Nginx日志時,會去修正這一問題。
2. Nginx 訪問日志
- 日志格式
$remote_addr - $remote_user [$time_local]
"$request" $status $body_bytes_sent "$http_referer"
"$http_user_agent" "$http_x_forwarded_for"
- 日志內容
112.65.171.98 - - [15/Mar/2017:18:18:06 +0800] "GET /index.html HTTP/1.1" 200 1150 "http://www.yourdomain.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36" "-"
- filebeat中prospectors的配置
- document_type: nginx paths: - /var/log/nginx/access.log #日志文件地址 input_type: log #從文件中讀取 tail_files: true #以文件末尾開始讀取數據
- logstash中FILTERS配置
filter { if [type] == "nginx" { grok{ match => { "message" => "%{COMBINEDAPACHELOG}" } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z", "ISO8601" ] target => "@timestamp" #可省略 } } }
- 結果
- Filter配置講解
- grok:
- 是不是很不可思議,上一示例中我們匹配規則寫了一長串,這個僅僅一個COMBINEDAPACHELOG就搞定了!
- grok除了提供上面那種基礎的正則規則,還對常用的日志(java,http,syslog等)提供的相應解析模板,本質還是那么一長串正則,[詳情見grok的120中正則模板;
- date:
- match:數組中第一個值為要匹配的時間字段,后面的n個是匹配規則,它們的關系是or的關系,滿足一個即可;
- target:將match中匹配的時間替換該字段,默認替換@timestamp;
目前為止我們解析的都是單行的日志,向JAVA這樣的,若果是多行的日志我們又該怎么做呢?
3. JAVA Log4j 日志
- 日志內容
'2017-03-16 15:52:39,580 ERROR TestController:26 - test: java.lang.NullPointerException at com.test.TestController.tests(TestController.java:22) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:137)'
- filebeat中prospectors的配置
- document_type: tomcat paths: - /var/log/java/log #日志文件地址 input_type: log #從文件中讀取 tail_files: true #以文件末尾開始讀取數據 multiline: pattern: ^\d{4} match: after negate: true
- logstash中FILTERS配置
filter { if [type] == "tomcat" { grok{ match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{JAVALOGMESSAGE:msg}" } } date { match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss,S", "ISO8601" ] } } }
- 結果
-
Filebeat配置講解
-
multiline 合並多行日志:
- pattern:匹配規則,這里指匹配每條日志開始的年份;
- match:有before與after,這里指從該行開始向后匹配;
- negate:是否開始一個新記錄,這里指當pattern匹配后,結束之前的記錄,創建一條新日志記錄;
當然在logstash input中使用codec multiline設置是一樣的
-
小技巧:關於grok的正則匹配,官方有給出Grok Constructor方法,在這上面提供了debugger、自動匹配等工具,方便大家編寫匹配規則
ES Output插件
主要的選項包括:
# action,默認是index,索引文檔(logstash的事件)(ES架構與核心概念參考)。 # host,聲明ES服務器地址端口 # index,事件寫入的ES index,默認是logstash-%{+YYYY.MM.dd},按天分片index,一般來說我們會按照時間分片,時間格式參考http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html。
詳情請參考我的另外一篇文章:https://www.cnblogs.com/caoweixiong/p/11791396.html
參考: