Grok正則提取日志
環境延續我上一篇ELK單機版的filebeat-->redis-->logstash-->elasticsearch-->kibana環境,詳情請參考:
Elasticsearch + Logstash + Kibana +Redis +Filebeat 單機版日志收集環境搭建
正則表達式
普通正則表達式
. 任意一個字符
* 前面一個字符出現0次或者多次
[abc] 中括號內任意一個字符
[^abc] 非中括號內的字符
[0-9] 表示一個數字
[a-z] 小寫字母
[A-Z] 大寫字母
[a-zA-Z] 所有字母
[a-zA-Z0-9] 所有字母+數字
[^0-9] 非數字
^xx 以xx開頭
xx$ 以xx結尾
\d 任何一個數字
\s 任何一個空白字符
擴展正則表達式,在普通正則符號再進行了擴展
? 前面字符出現0或者1次
+ 前面字符出現1或者多次
{n} 前面字符匹配n次
{a,b} 前面字符匹配a到b次
{,b} 前面字符匹配0次到b次
{a,} 前面字符匹配a或a+次
(string1|string2) string1或string2
在Kibana的grokdebugger上進行測試
在編寫grok提取正則配置前可以在Kibana的grokdebugger上進行測試:
比如我想提取一個如下的Nginx日志:
192.168.237.1 - - [24/Feb/2019:17:48:47 +0800] "GET /shijiange HTTP/1.1" 404 571 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"
那么可以按照正則表達式和grok語法在grokdebugger進行如下測試:
可以看到我的Grok成功提取了我想要的內容,我的Grok匹配規則如下:
(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<requesttime>[^ ]+ \+[0-9]+)\] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"
(?<字段名>正則)表示將匹配的內容提取為字段,其他不用提取為字段的地方原樣寫上或者用正則匹配即可。
在配置文件中引入Grok提取規則
vim ./logstash_grok.conf # logstash_grok.conf內容 input {
redis {
host => '192.168.1.4'
port => 6379
key => "queue"
data_type => "list"
}
}
filter {
grok {
match => {
"message" => '(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<requesttime>[^ ]+ \+[0-9]+)\] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"'
}
}
}
output {
elasticsearch {
hosts => ["http://192.168.1.4:9200"]
}
}
啟動logstah:
/opt/es/logstash-7.2.0/bin/logstash -f ./logstash_grok.conf
測試是否成功:
echo '192.168.237.1 - - [24/Feb/2019:17:48:47 +0800] "GET /shijiange HTTP/1.1" 404 571 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"' >> example.log
在Kinaba中配置index,方便查看:
點擊Management-->Kibana-->Index patterns會出現如下提示:
點擊Create Index Pattern按照如下步驟創建新的Kibana索引:
輸入索引匹配規則,如logstash*會匹配所有logstash開頭的ES索引數據,然后點擊下一步選擇時間字段用於按時間戳篩選數據:
選擇@timestamp將會根據@timestamp字段的時間過濾數據,點擊Create index pattern即可在Discover頁面選擇你創建的Kibana索引來查看數據:
可以看到我配置的Kibana起作用了。這里要注意的是Kibana采用的是UTC時間,比東八區時間要快8個小時,所以在篩選時間范圍的時候我將范圍往后擴大了一點,否則可能看不到數據。
展開最上方的記錄可以看到我剛才插入的數據被成功提取了:
去除字段
通過上面的圖可以看到Grok配置的確起作用了,但是Logstash給我們添加了很多字段,有時候這些字段是不必要的,這時候就要可以使用remove_field配置來去除一些字段。
修改配置文件如下:
# logstash_grok.conf內容 input { redis { host => '192.168.1.4' port => 6379 key => "queue" data_type => "list" } } filter { grok { match => { "message" => '(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<requesttime>[^ ]+ \+[0-9]+)\] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"' } remove_field => ["message","@version","path"] } } output { elasticsearch { hosts => ["http://192.168.1.4:9200"] } }
重啟重復上述測試,數秒后查看Kibana:
Logstash確實為我們去除了我們不需要的字段。
使用日志時間而非插入時間
由於日志輸出和消息隊列同步以及ELK中的數據傳輸都是需要時間的,使用ES自動生成的時間戳可能和日志產生的時間並不一致,而且由於時區問題會產生更大的影響,所以需要自定義時間字段,而非使用自動生成的時間字段。
更改配置文件如下:
# logstash_grok.conf內容 input { redis { host => '192.168.1.4' port => 6379 key => "queue" data_type => "list" } } filter { grok { match => { "message" => '(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<requesttime>[^ ]+ \+[0-9]+)\] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"' } remove_field => ["message","@version","path"] }
date {
match => ["requesttime", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
} } output { elasticsearch { hosts => ["http://192.168.1.4:9200"] } }
重復上述測試,擴大時間范圍只2018年某月可以看到有一條2019年2月的數據:
提取json格式的數據
Grok對於提取非結構化的數據是很方便的,但是對於json格式的數據如果還用Grok來提取未免也太麻煩了點,畢竟采用json這種半結構化數據來輸出日志本來就是為了方便處理。還好Logstash早就考慮到了這點,並提供了json格式數據的提取規則。
修改配置文件以提取json數據:
# logstash_json.conf input { redis { host => '192.168.1.4' port => 6379 key => "queue" data_type => "list" } } filter { json { source => "message" remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"] } date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } } output { elasticsearch { hosts => ["http://192.168.1.4:9200"] } }
測試是否配置成功:
以logstash_json.conf啟動Logstash:
/opt/es/logstash-7.2.0/bin/logstash -f ./logstash_json.conf
向example.log追加一條json數據:
echo '{"@timestamp":"24/Feb/2019:21:08:34 +0800","clientip":"192.168.1.5","status":0,"bodysize":"1m","referer":"http_referer","ua":"http_user_agent","handletime":"request_time","url":"uri"}' >> example.log
數秒后在kibana查看數據:
可以看到我們的配置是生效的。
filebeat監視多個日志
修改filebeat配置文件:
vim ./filebeat-7.2.0-linux-x86_64/filebeat.yml # filebeat.yml內容 filebeat.inputs: - type: log tail_files: true backoff: "1s" paths: - /opt/es/example.log fields: type: example fields_under_root: true - type: log tail_files: true backoff: "1s" paths: - /opt/es/example2.log fields: type: example2 fields_under_root: true output: redis: hosts: ["192.168.1.4:6379"] key: 'queue'
修改logstash配置文件:
vim ./logstash-7.2.0/config/logstash_muti.conf # logstash_muti.conf內容 input { redis { host => '192.168.1.4' port => 6379 key => "queue" data_type => "list" } } filter { if [type] == "example" { json { source => "message" remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"] } date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } }else if [type] == "example2"{ grok { match => { "message" => '(?<clientip>[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}) - - \[(?<timestamp>[^ ]+ \+[0-9]+)\] "(?<requesttype>[A-Z]+) (?<requesturl>[^ ]+) HTTP/\d.\d" (?<status>[0-9]+) (?<bodysize>[0-9]+) "[^"]+" "(?<ua>[^"]+)"' } remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"] } date { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } } } output { elasticsearch { hosts => ["http://192.168.1.4:9200"] } }
以新的配置啟動filebeat和logstash:
# 以logstash_muti.conf啟動logstash /opt/es/logstash-7.2.0/bin/logstash -f ./logstash_muti.conf # 以filebeat.yml啟動filebeat ./filebeat-7.2.0-linux-x86_64/filebeat -e -c ./filebeat-7.2.0-linux-x86_64/filebeat.yml
新建一個example2.log文件,路徑要和filebeat配置的路徑一致,然后在終端執行以下命令進行測試:
# 測試example.log收集情況 echo '{"timestamp":"11/Apr/2019:21:08:34 +0800","clientip":"192.168.1.5","status":0,"bodysize":"1m","referer":"http_referer","ua":"http_user_agent","handletime":"request_time","url":"uri"}' >> example.log # 測試example2.log收集情況 echo '192.168.237.1 - - [24/Apr/2019:17:48:47 +0800] "GET /shijiange HTTP/1.1" 404 571 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"' >>example2.log
數秒后查看kibana數據:
example.log輸入產生的數據:
example2.log測試產生的數據:
可以看到我們的配置是成功的。
這里我們把兩個日志都輸出到了同一個索引下,生產中一般都會加以區分的,一般是每個日志對應一個索引,要實現這種效果可以在logstash的配置文件的output中進行如下配置:
output{ if [type] == "example" { elasticsearch { hosts => ["http://192.168.1.4:9200"] index => "example-%{+YYYY.MM.dd}" } } else if [type] == "example2" { elasticsearch { hosts => ["http://192.168.1.4:9200"] index => "example2-%{+YYYY.MM.dd}" } }