最近在項目中使用LogStash做日志的采集和過濾,感覺LogStash還是很強大的。
input {
file{
path => "/XXX/syslog.txt"
start_position => beginning
codec => multiline{
patterns_dir => ["/XX/logstash-1.5.3/patterns"]
pattern => "^%{MESSAGE}"
negate => true
what => "previous"
}
}
}
filter{
mutate{
split => ["message","|"]
add_field => {
"tmp" => "%{[message][0]}"
}
add_field => {
"DeviceProduct" => "%{[message][2]}"
}
add_field => {
"DeviceVersion" => "%{[message][3]}"
}
add_field => {
"Signature ID" => "%{[message][4]}"
}
add_field => {
"Name" => "%{[message][5]}"
}
}
mutate{
split => ["tmp",":"]
add_field => {
"tmp1" => "%{[tmp][1]}"
}
add_field => {
"Version" => "%{[tmp][2]}"
}
remove_field => [ "tmp" ]
}
grok{
patterns_dir => ["/XXX/logstash-1.5.3/patterns"]
match => {"tmp1" => "%{TYPE:type}"}
remove_field => [ "tmp1"]
}
kv{
include_keys => ["eventId", "msg", "end", "mrt", "modelConfidence", "severity", "relevance","assetCriticality","priority","art","rt","cs1","cs2","cs3","locality","cs2Label","cs3Label","cs4Label","flexString1Label","ahost","agt","av","atz","aid","at","dvc","deviceZoneID","deviceZoneURI","dtz","eventAnnotationStageUpdateTime","eventAnnotationModificationTime","eventAnnotationAuditTrail","eventAnnotationVersion","eventAnnotationFlags","eventAnnotationEndTime","eventAnnotationManagerReceiptTime","_cefVer","ad.arcSightEventPath"]
}
mutate{
split => ["ad.arcSightEventPath",","]
add_field => {
"arcSightEventPath" => "%{[ad.arcSightEventPath][0]}"
}
remove_field => [ "ad.arcSightEventPath" ]
remove_field => [ "message" ]
}
}
output{
kafka{
topic_id => "rawlog"
batch_num_messages => 20
broker_list => "10.3.162.193:39192,10.3.162.194:39192,10.3.162.195:39192"
codec => "json"
}
stdout{
codec => rubydebug
}
input:接入數據源
filter:對數據源進行過濾
output: 輸出的
其中最重要的是filter的處理,目前我們的需求是需要對字符串進行key-value的提取
1、使用了mutate中的split,能通過分割符對分本處理。
2、通過grok使用正則對字符串進行截取處理。
3、使用kv 提取所有的key-value
