問題
有時候我們想要在Logstash里對收集到的日志等信息進行分割,並且將分割后的字符作為新的字符來index到Elasticsearch里。假定需求如下:
Logstash收集到的日志字段message
的值是由多個字段拼接而成的,分隔符是;,;
,如下:
{
"message": "key_1=value_1;,;key_2=value_2"
}
現在想要將message
的值拆分成2個新的字段:key_1、key_2,並且將它們index到ES里,可以借助Logstash的filter的插件來完成;這里提供兩種解決方案。
方案一:使用mutate插件
filter {
mutate {
split => ["message",";,;"]
}
if [message][0] {
mutate {
add_field => {
"temp1" => "%{[message][0]}"
}
}
}
if [message][1] {
mutate {
add_field => {
"temp2" => "%{[message][1]}"
}
}
}
if [temp1][1] {
mutate {
split => ["temp1","="]
add_field => {
"%{[temp1][0]}" => "%{[temp1][1]}"
}
}
}
if [temp2][1] {
mutate {
split => ["temp2","="]
add_field => {
"%{[temp2][0]}" => "%{[temp2][1]}"
}
remove_field => [ "temp1", "temp2", "message" ]
}
}
}
看得出來,這種做法很麻煩,也不利於日后的維護。每當message
里被拼接的字段的數量增加時,就必須同步改動這里的filter邏輯,而且添加的代碼量也是呈線性遞增的。
此外,這里使用的諸如temp1
等臨時變量,可以用[@metadata][temp1]
的寫法來作為臨時變量,這樣就不需要去手動remove掉了。
方案二:使用ruby插件
filter {
ruby {
code => "
array1 = event.get('message').split(';,;')
array1.each do |temp1|
if temp1.nil? then
next
end
array2 = temp1.split('=')
key = array2[0]
value = array2[1]
if key.nil? then
next
end
event.set(key, value)
end
"
remove_field => [ "message" ]
}
}
ruby插件可以允許你使用ruby的語法來完成各種復雜的邏輯,使用這種方案可以完美解決方案一中的不足之處,便於日后的維護。