前言
通常來說,各種日志的格式都比較靈活復雜比如nginx訪問日志或者並不純粹是一行一事件比如java異常堆棧,而且還不一定對大部分開發或者運維那么友好,所以如果可以在最終展現前對日志進行解析並歸類到各個字段中,可用性會提升很多。
grok過濾器插件就是用來完成這個功能的。默認可用。
- grok的主要選項是match和overwrite,前者用來解析message到相應字段,后者用來重寫message,這樣原始message就可以被覆蓋,對於很多的日志來說,原始的message重復存儲一份沒有意義。
- 雖然Grok過濾器可以用來進行格式化,但是對於多行事件來說,並不適合在filter或者input(multiline codec,如果希望在logstash中處理多行事件,可以參考https://www.elastic.co/guide/en/logstash/current/multiline.html)中處理,因為使用ELK的平台通常日志使用beats input插件,此時在logstash中進行多行事件的處理會導致數據流混亂,所以需要在事件發送到logstash之前就處理好,也就是應該在filebeat中預處理。
grok 截取過濾
grok正則表達式:(?<temMsg>(.*)(?=Report)/?) 獲取Report之前的字符
grok正則表達式:(?<temMsg>(?=Report)(.*)/?) 獲取Report之后的字符 grok{ match => { #截取<Report>之前的字符作為temMsg字段的值 "message" => "(?<temMsg>(.*)(?=Report)/?)" } } 這個是截取特定的字符集日志,要日志中包含了【Report】關鍵字 (注:表達式中(?=Report)中的等於【=】符號如果換成【<=】這表示就不包含本身了,例如(?<temMsg>(.*)(?=Report)/?)可以寫成(?<temMsg>(.*)(?<=Report)/?)這樣輸出的結果就不包含Report了,同理下面的一樣)
grok正則表達式:(?<temMsg>(?<=report).*?(?=msg)) 截取report和msg之間的值 不包含report和msg本身
grok正則表達式:(?<temMsg>(report).*?(?=msg)) 截取 包含report但不包含msg grok正則表達式:(?<temMsg>(?<=report).*?(msg))截取 不包含report但包含msg grok正則表達式:(?<temMsg>(report).*?(msg|request))輸出以report開頭,以msg或者以request結尾的所有包含頭尾信息 grok正則表達式:(?<temMsg>(report).*?(?=(msg|request)))輸出以report開頭,以msg或者以request結尾的不包含頭尾信息 grok{ match => { #截取<Report>之后的和<msg>之前的值作為temMsg字段的值 "message" => "(?<temMsg>(?<=report).*?(?=msg))" } } 這個是截取特定的字符集日志,要日志中包含了【report和msg和request】關鍵字 之間的表達式只要替換一下就可以使用了 (注過個表達式中出現異常,在單個的字符串中可以將小括號【()】去掉,例如:(report).*?(?=msg) 可以寫成report.*?(?=msg))
grok正則表達式:(?<MYELF>([\s\S]{500}))
grok{
match => { #截取日志500個字符 作為MYELF的值 "message" => "(?<MYELF>([\s\S]{500}))" } } 對有所日志截取500個字符,可以加入if()做為判斷條件,根據自身項目來
grok正則表達式:%{LOGLEVEL:level}
grok {
#這個patterns_dir大家都應該正對 單獨寫表達式的地方
#patterns_dir => "/usr/local/nlp/logstash-6.0.1/config/patterns" match => [ "message","%{LOGLEVEL:level}" ] } 這個比較簡單 就不多說了
結合上面的 這個是對level級別的日志做判斷 如果日志中含有DEBUG的,就drop掉
if [level] == "DEBUG" { drop { } } 這個其實和上面差不多,加了一個【~】表示對單條的前后日志做匹配 if[message]=~"ASPECT"{ drop { } }
這個是說對temMsg賦值的所有的日志從新命名打印message
mutate {
#重命名字段temMsg為message
rename => {"temMsg" => "message"} }
#logstash過濾器切割
filter {
if [type] == "simple" { mutate{ split => ["message","|"] #按 | 進行split切割message add_field => { "requestId" => "%{[message][0]}" } add_field => { "timeCost" => "%{[message][1]}" } add_field => { "responseStatus" => "%{[message][2]}" } add_field => { "channelCode" => "%{[message][3]}" } add_field => { "transCode" => "%{[message][4]}" } } mutate { convert => ["timeCost", "integer"] #修改timeCost字段類型為整型 } } else if [type] == "detail" { grok{ match => { #將message里面 TJParam后面的內容,分隔並新增為ES字段和值 "message" => ".*TJParam %{PROG:requestId} %{PROG:channelCode} %{PROG:transCode}" } } grok{ match => { #截取TJParam之前的字符作為temMsg字段的值 "message" => "(?<temMsg>(.*)(?=TJParam)/?)" #刪除字段message remove_field => ["message"] } } mutate { #重命名字段temMsg為message rename => {"temMsg" => "message"} } } }
過濾截取完整例子:
input {
redis {
data_type => "list" host => "localhost1" port => "5100" key => "nlp_log_file" db => 0 threads => 1 #線程數量 codec => "json" } redis { data_type => "list" host => "localhost2" port => "5101" key => "nlp_log_file" db => 0 threads => 1 #線程數量 codec => "json" } } filter { grok { #patterns_dir => "/usr/local/nlp/logstash-6.0.1/config/patterns" match => [ "message","%{LOGLEVEL:level}" ] } grok{ match => { #截取<ReportPdf>之前的字符作為temMsg字段的值 "message" => "(?<temMsg>(.*)(?=<ReportPdf>)/?)" } } mutate { #重命名字段temMsg為message rename => {"temMsg" => "message"} } if [level] == "DEBUG" { drop { } } if[message]=~"ASPECT"{ drop { } } #獲取日志文件帶RAWT關鍵字的 if[message]=~"[RAW]"{ grok{ match => { #截取帶RAW關鍵字的日志500個字符 作為MYELF的值 "message" => "(?<MYELF>([\s\S]{500}))" } } mutate { rename => {"MYELF" => "message"} #重命名字段MYELF為message } } } output { elasticsearch { hosts => ["localhost:9200"] index => "logstash-%{+YYYY.MM.dd}" action => "index" template_overwrite => true #user => "elastic" #password => "admins-1" } stdout{codec => dots} }
根據不同情況進行不同的匹配原則
filter{
if "start" in [message]{ --message就是指原始消息 grok{ match => xxxxxxxxx } }else if "complete" in [message]{ grok{ xxxxxxxxxx } }else{ grok{ xxxxxxx } } }
多項匹配
filter {
grok {
match => [ "message" , "%{DATA:hostname}\|%{DATA:tag}\|%{DATA:types}\|%{DATA:uid}\|%{GREEDYDATA:msg}", "message" , "%{DATA:hostname}\|%{DATA:tag}\|%{GREEDYDATA:msg}" ] remove_field => ['type','_id','input_type','tags','message','beat','offset'] } }
正則匹配
太多使用DATA和GREEDYDAYA會導致性能cpu負載嚴重。建議多使用正則匹配,或者ruby代碼塊。
filter {
grok {
match => [ "message", "(?<hostname>[a-zA-Z0-9._-]+)\|%{DATA:tag}\|%{NUMBER:types}\|(?<uid>[0-9]+)\|%{GREEDYDATA:msg}", "message", "(?<hostname>[a-zA-Z0-9._-]+)\|%{DATA:tag}\|%{GREEDYDATA:msg}", ] remove_field => ['type','_id','input_type','tags','message','beat','offset'] } }
ruby代碼塊匹配
太多使用DATA和GREEDYDAYA會導致性能cpu負載嚴重。建議多使用正則匹配,或者ruby代碼塊。
filter {
ruby {
code =>' arr = event["message"].split("|") if arr.length == 5 event["hostname"] = arr[0] event["tag"] = arr[1] event["types"] = arr[2] event["uid"] = arr[3] event["msg"] = arr[4] elsif arr.length == 3 event["hostname"] = arr[0] event["tag"] = arr[1] event["msg"] = arr[2] end' remove_field => ['type','_id','input_type','tags','message','beat','offset'] } }
本人完整例子
input { kafka { bootstrap_servers => "172.xxx.xxx.91:9092,172.16.10.92:9092,172.xxx.xxx.93:9092" topics => ["logstash-log"] consumer_threads => 1 decorate_events => true codec => json } } filter { if [message]=~"ERROR" { # 截取日志級別為ERROR的日志2000個字符作為ERRORMSG的值(因為包含了堆棧信息內容會很長,導致下面[logBegin][logEnd]的正則匹配很慢,然后超時). grok { match => { "message" => "(?<ERRORMSG>([\s\S]{0,2000}))" } } #重命名字段ERRORMSG為message,給下面的正則使用 mutate { rename => {"ERRORMSG" => "message"} } } grok { match => [ "message", "\s*%{TIMESTAMP_ISO8601:logTimestamp} \[%{DATA:threadName}\s*\] \[%{LOGLEVEL:logLevel}\s*\] \[%{DATA:methodName}\s*\]\s+MessageTree=+(?<traceMsg>(\S+)).*", "message", "\s*%{TIMESTAMP_ISO8601:logTimestamp} \[%{DATA:threadName}\s*\] \[%{LOGLEVEL:logLevel}\s*\] \[%{DATA:methodName}\s*\]\s+warningMessage=+(?<warningId>(\S+)).*&+(?<warningMsg>([\s\S]*})).*", "message", "\s*%{TIMESTAMP_ISO8601:logTimestamp} \[%{DATA:threadName}\s*\] \[%{LOGLEVEL:logLevel}\s*\] \[%{DATA:methodName}\s*\]\s*(?<logInfo>([\s\S]*))", "message", "\s*(?<logInfo>([\s\S]*))" ] remove_tag => ["beats_input_codec_plain_applied"] remove_field => ["message","prospector"] } date { match => ["logTimestamp", "ISO8601"] target => "logTimestamp" } #if [traceId] =~ /\d/ or [warningId] =~ /[0-9a-z_A-Z_]/ { # mutate { # replace => {"logInfo" => "%{message}"} # } #} } output { if [traceMsg] =~ /\S/ { kafka { bootstrap_servers => "172.16.xxx.xxx:9092,172.xxx.xxx.92:9092,172.xxx.xxx.93:9092" topic_id => "logstash-trace" retries => 1 compression_type => "snappy" codec => plain{ format => "%{traceMsg}" charset => "UTF-8" } } } if [warningId] =~ /[0-9a-z_A-Z_]/ { kafka { bootstrap_servers => "172.16.xxx.xxx:9092,172.16.xxx.xxx:9092,172.xxx.xxx.93:9092" topic_id => "warning-topic" retries => 1 compression_type => "snappy" codec => plain{ format => "%{warningMsg}" charset => "UTF-8" } } } if [traceId] =~ /\d/ { elasticsearch{ hosts => ["10.xxx.xxx.100:9200", "10.xxx.xxx.101:9200", "10.xxx.xxx.102:9200"] index => "%{[fields][product_type]}-logs-transaction-%{+YYYY-MM}" manage_template => false template_name => "business_logs_template" } #stdout { # codec => rubydebug #} } #stdout { # codec => rubydebug #} if "sas" in [tags]{ elasticsearch{ hosts => ["10.xxx.xxx.100:9200", "10.xxx.xxx.101:9200", "10.xxx.xxx.102:9200"] index => "%{[fields][product_type]}-logs-%{+YYYY-MM-dd}" manage_template => false template_name => "business_logs_template" } #stdout { # codec => rubydebug #} } }
參考:
https://www.cnblogs.com/JetpropelledSnake/p/9893560.html
https://blog.csdn.net/cai750415222/article/details/86614854
https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html —— Logstash 最佳實踐