logstash的filter之grok
Logstash中的filter可以支持對數據進行解析過濾。
grok:支持120多種內置的表達式,有一些簡單常用的內容就可以使用內置的表達式進行解析
https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
自定義的grok表達式 格式(?<usetime>PATTERN) usertime:
表示定義的字段名稱 PATTERN:此處需要寫正則表達式
filebeat采集輸出到logstash
[root@bigdata-sg-a-01 log]# echo qq5201314 >>data.log
input {
beats {port => 5044}
}
filter{
grok{
match=>{"message"=>"%{NUMBER:usertime}"}
}
}
output {
stdout { codec => json}
}
#################output 多了個usertime 字段
{"message":"qq5201314","@version":"1","@timestamp":"2018-07-27T09:21:32.080Z","offset":46,"input_type":"log","fields":null,"count":1,"beat":{"hostname":"bigdata-sg-a-01","name":"bigdata-sg-a-01"},"source":"/mnt/log/data.log","type":"log","host":"bigdata-sg-a-01","tags":["beats_input_codec_plain_applied"],"usertime":"5201314"}
logstash中的if else
如果我們在獲取日志的話是需要同時讀取多個文件,那這多個文件的解析規則肯定不一樣,那就需要根據不同的文件執行
不同的解析邏輯了。 假設我們要解析ngnix日志和tomcat的日志,這樣就需要判斷數據是屬於那個文件的,使用對應的解析規則
根據自己的業務規則修改
filebeat、tags:會向log中添加一個標簽,該標簽可以提供給logstash用於區分不同客戶端不同業務的log
filebeat、document_type:標記,跟tags差不多,區別不同的日志來源
output {
if [type] == "tomcat_ctmpweb" { ##按照type類型創建多個索引
elasticsearch {
hosts => ["192.168.0.148:9200"]
index => "tomcat_ctmpweb_%{+YYYY.MM.dd}"
}
}
if [type] == "nginx_access" { ##按照type類型創建多個索引
elasticsearch {
hosts => ["192.168.0.148:9200"]
index => "nginx_access_%{+YYYY.MM.dd}"
}
}
}
multiline異常信息整合(old)
由於目前是使用filebeat收集日志,所以需要在filebeat端對異常堆棧信息進行整合
修改filebeat.yml文件
grok插件 grok插件有非常強大的功能,他能匹配一切數據,但是他的性能和對資源的損耗同樣讓人詬病。
以下借鑒 :https://yq.aliyun.com/articles/154341?utm_content=m_27283
filter{
grok{
#只說一個match屬性,他的作用是從message 字段中吧時間給摳出來,並且賦值給另個一個字段logdate。
#首先要說明的是,所有文本數據都是在Logstash的message字段中中的,我們要在過濾器里操作的數據就是message。
#第二點需要明白的是grok插件是一個十分耗費資源的插件,這也是為什么我只打算講解一個TIMESTAMP_ISO8601正則表達式的原因。
#第三點需要明白的是,grok有超級多的預裝正則表達式,這里是沒辦法完全搞定的,也許你可以從這個大神的文章中找到你需要的表達式
#http://blog.csdn.net/liukuan73/article/details/52318243
#但是,我還是不建議使用它,因為他完全可以用別的插件代替,當然,對於時間這個屬性來說,grok是非常便利的。
match => ['message','%{TIMESTAMP_ISO8601:logdate}']
}
}
2、mutate插件 mutate插件是用來處理數據的格式的,你可以選擇處理你的時間格式,或者你想把一個字符串變為數字類型(當然需要合法),同樣的你也可以返回去做。可以設置的轉換類型 包括: "integer", "float" 和 "string"。
filter {
mutate {
#接收一個數組,其形式為value,type
#需要注意的是,你的數據在轉型的時候要合法,你總是不能把一個‘abc’的字符串轉換為123的。
convert => [
#把request_time的值裝換為浮點型
"request_time", "float",
#costTime的值轉換為整型
"costTime", "integer"
]
}
}
3、ruby插件 官方對ruby插件的介紹是——無所不能。ruby插件可以使用任何的ruby語法,無論是邏輯判斷,條件語句,循環語句,還是對字符串的操作,對EVENT對象的操作,都是極其得心應手的。
filter {
ruby {
#ruby插件有兩個屬性,一個init 還有一個code
#init屬性是用來初始化字段的,你可以在這里初始化一個字段,無論是什么類型的都可以,這個字段只是在ruby{}作用域里面生效。
#這里我初始化了一個名為field的hash字段。可以在下面的coed屬性里面使用。
init => [field={}]
#code屬性使用兩個冒號進行標識,你的所有ruby語法都可以在里面進行。
#下面我對一段數據進行處理。
#首先,我需要在把message字段里面的值拿到,並且對值進行分割按照“|”。這樣分割出來的是一個數組(ruby的字符創處理)。
#第二步,我需要循環數組判斷其值是否是我需要的數據(ruby條件語法、循環結構)
#第三步,我需要吧我需要的字段添加進入EVEVT對象。
#第四步,選取一個值,進行MD5加密
#什么是event對象?event就是Logstash對象,你可以在ruby插件的code屬性里面操作他,可以添加屬性字段,可以刪除,可以修改,同樣可以進行樹脂運算。
#進行MD5加密的時候,需要引入對應的包。
#最后把冗余的message字段去除。
code => "
array=event。get('message').split('|')
array.each do |value|
if value.include? 'MD5_VALUE'
then
require 'digest/md5'
md5=Digest::MD5.hexdigest(value)
event.set('md5',md5)
end
if value.include? 'DEFAULT_VALUE'
then
event.set('value',value)
end
end
remove_field=>"message"
"
}
}
4、date插件 這里需要合前面的grok插件剝離出來的值logdate配合使用(當然也許你不是用grok去做)。
filter{
date{
#還記得grok插件剝離出來的字段logdate嗎?就是在這里使用的。你可以格式化為你需要的樣子,至於是什么樣子。就得你自己取看啦。
#為什什么要格式化?
#對於老數據來說這非常重要,應為你需要修改@timestamp字段的值,如果你不修改,你保存進ES的時間就是系統但前時間(+0時區)
#單你格式化以后,就可以通過target屬性來指定到@timestamp,這樣你的數據的時間就會是准確的,這對以你以后圖表的建設來說萬分重要。
#最后,logdate這個字段,已經沒有任何價值了,所以我們順手可以吧這個字段從event對象中移除。
match=>["logdate","dd/MMM/yyyy:HH:mm:ss Z"]
target=>"@timestamp"
remove_field => 'logdate'
#還需要強調的是,@timestamp字段的值,你是不可以隨便修改的,最好就按照你數據的某一個時間點來使用,
#如果是日志,就使用grok把時間摳出來,如果是數據庫,就指定一個字段的值來格式化,比如說:"timeat", "%{TIMESTAMP_ISO8601:logdate}"
#timeat就是我的數據庫的一個關於時間的字段。
#如果沒有這個字段的話,千萬不要試着去修改它。
}
}
5、json插件,這個插件也是極其好用的一個插件,現在我們的日志信息,基本都是由固定的樣式組成的,我們可以使用json插件對其進行解析,並且得到每個字段對應的值。
filter{
#source指定你的哪個值是json數據。
json {
source => "value"
}
#注意:如果你的json數據是多層的,那么解析出來的數據在多層結里是一個數組,你可以使用ruby語法對他進行操作,最終把所有數據都裝換為平級的。
}
json插件還是需要注意一下使用的方法的,下圖就是多層結構的弊端:
對應的解決方案為:
ruby{
code=>"
kv=event.get('content')[0]
kv.each do |k,v|
event.set(k,v)
end"
remove_field => ['content','value','receiptNo','channelId','status']
}
Logstash filter組件的插件基本介紹到這里了,這里需要明白的是:
add_field、remove_field、add_tag、remove_tag 是所有 Logstash 插件都有。相關使用反法看字段名就可以知道。不如你也試試吧。。。。
————————————————
版權聲明:本文為CSDN博主「醬g」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/qq_33283716/article/details/81241225