filter初級
Logstash安裝
### 設置YUM源
# rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
# tee /etc/yum.repos.d/elastic.repo << EOF
[logstash-5.x]
name=Elastic repository for 5.x packages
baseurl=https://artifacts.elastic.co/packages/5.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
# yum install -y logstash
基本使用
# tee filter.conf << EOF
input {
stdin {
}
}
filter {
mutate {
split => ["message", "|"]
}
}
output {
stdout {
}
}
EOF
# /usr/share/logstash/bin/logstash -f filter.conf --path.settings /etc/logstash
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
The stdin plugin is now waiting for input:
12|fwd|343|dd
2017-09-18T01:35:03.342Z dnode [12, fwd, 343, dd]
ruby語法基本使用
# tee filter.conf << EOF
input {
stdin {
}
}
filter {
mutate {
split => ["message", "|"]
}
ruby {
code => '
msgs = event.get("message")
puts msgs.length
'
}
}
output {
stdout {
codec => "rubydebug"
}
}
EOF
# /usr/share/logstash/bin/logstash -f ruby.conf --path.settings /etc/logstash
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
The stdin plugin is now waiting for input:
r|g
2
{
"@version" => "1",
"host" => "dnode",
"@timestamp" => 2017-09-18T09:06:12.546Z,
"message" => [
[0] "r",
[1] "g"
]
}
filter高級用法
grok插件
- 自定義正則: 將需要提取的正則表達式用
()
括起來,然后使用?<tag_name>
的固定語法格式給匹配項打上標簽 - 內置正則: 使用
%{WORD:tag_name}
內置正則地址
如果想要給一串很長的字符的很多字段都打上標簽,即多個自定義組合的情況,那么正則必須能完全匹配整個字符串(可以使用.*的方式跳過不關心的字段)
ruby插件
### 1. 先實現rb腳本,輸入從變量讀取,輸出也保存到變量
### 2. 腳本的輸入由變量改成event.get("name")
### 3. 腳本的輸出由變量改成event.set("name", $value)
舉例
樣例字符串一
[NEW] tcp
- 使用grok內置正則
- 自定義正則
樣例字符串二
[MAN] name=fwd age=12#[WONMEN]name=xb age=10
將字符串轉換成JSON
### 編寫rb腳本實現所需功能
# vim ruby.rb
$result = Hash.new
$people = []
begin
msgs = "[MAN] name=fwd age=12#[WONMEN]name=xb age=10"
msgs.split("#").each { |msg|
ret = Hash.new
item = msg[/(?<=\[)MAN(?=\])|(?<=\[)WONMEN(?=\])/]
if item.empty?
raise "Invalid format"
end
ret["sex"] = item
beg = msg.index("name")
if beg == nil
raise "Invalid format"
end
msg[beg..-1].split().each { |item|
key, value = item.split("=")
ret[key] = value
}
$people.push(ret)
}
$result["peoples"] = $people
puts $result
end
# ruby ruby.rb
{"peoples"=>[{"sex"=>"MAN", "name"=>"fwd", "age"=>"12"}, {"sex"=>"WONMEN", "name"=>"xb", "age"=>"10"}]}
將ruby腳本放入Logstash的filter插件中
# vim ruby.conf
input {
stdin {
}
}
filter {
ruby {
code => '
$result = Hash.new
$people = []
begin
msgs = event.get("message")
msgs.split("#").each { |msg|
# 分割后的字符串樣例 => [MAN] name=fwd age=12
ret = Hash.new
# 匹配頭部的[MAN]或[WONMEN]
item = msg[/(?<=\[)MAN(?=\])|(?<=\[)WONMEN(?=\])/]
if item.empty?
raise "Invalid format"
end
ret["sex"] = item
# 獲取從name到結束的字符串 => name=fwd age=12
beg = msg.index("name")
if beg == nil
raise "Invalid format"
end
msg[beg..-1].split().each { |item|
# 分割后的字符串樣例 => name=fwd
key, value = item.split("=")
ret[key] = value
}
$people.push(ret)
}
$result["peoples"] = $people
event.set("message", $result)
event.set("[@metadata][drop]", false)
rescue
puts $!
event.set("[@metadata][drop]", true)
end
'
}
}
output {
if ![@metadata][drop] {
stdout {
codec => rubydebug
}
}
}
# /usr/share/logstash/bin/logstash -f ruby.conf --path.settings /etc/logstash
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
The stdin plugin is now waiting for input:
[MAN] name=fwd age=12#[WONMEN]name=xb age=10
{
"@version" => "1",
"host" => "dnode",
"@timestamp" => 2017-09-20T08:40:26.293Z,
"message" => {
"peoples" => [
[0] {
"name" => "fwd",
"age" => "12",
"sex" => "MAN"
},
[1] {
"name" => "xb",
"age" => "10",
"sex" => "WONMEN"
}
]
}
}