最近在項目中使用LogStash做日志的采集和過濾,感覺LogStash還是很強大的。
input { file{ path => "/XXX/syslog.txt" start_position => beginning codec => multiline{ patterns_dir => ["/XX/logstash-1.5.3/patterns"] pattern => "^%{MESSAGE}" negate => true what => "previous" } } } filter{ mutate{ split => ["message","|"] add_field => { "tmp" => "%{[message][0]}" } add_field => { "DeviceProduct" => "%{[message][2]}" } add_field => { "DeviceVersion" => "%{[message][3]}" } add_field => { "Signature ID" => "%{[message][4]}" } add_field => { "Name" => "%{[message][5]}" } } mutate{ split => ["tmp",":"] add_field => { "tmp1" => "%{[tmp][1]}" } add_field => { "Version" => "%{[tmp][2]}" } remove_field => [ "tmp" ] } grok{ patterns_dir => ["/XXX/logstash-1.5.3/patterns"] match => {"tmp1" => "%{TYPE:type}"} remove_field => [ "tmp1"] } kv{ include_keys => ["eventId", "msg", "end", "mrt", "modelConfidence", "severity", "relevance","assetCriticality","priority","art","rt","cs1","cs2","cs3","locality","cs2Label","cs3Label","cs4Label","flexString1Label","ahost","agt","av","atz","aid","at","dvc","deviceZoneID","deviceZoneURI","dtz","eventAnnotationStageUpdateTime","eventAnnotationModificationTime","eventAnnotationAuditTrail","eventAnnotationVersion","eventAnnotationFlags","eventAnnotationEndTime","eventAnnotationManagerReceiptTime","_cefVer","ad.arcSightEventPath"] } mutate{ split => ["ad.arcSightEventPath",","] add_field => { "arcSightEventPath" => "%{[ad.arcSightEventPath][0]}" } remove_field => [ "ad.arcSightEventPath" ] remove_field => [ "message" ] } } output{ kafka{ topic_id => "rawlog" batch_num_messages => 20 broker_list => "10.3.162.193:39192,10.3.162.194:39192,10.3.162.195:39192" codec => "json" } stdout{ codec => rubydebug }
input:接入數據源
filter:對數據源進行過濾
output: 輸出的
其中最重要的是filter的處理,目前我們的需求是需要對字符串進行key-value的提取
1、使用了mutate中的split,能通過分割符對分本處理。
2、通過grok使用正則對字符串進行截取處理。
3、使用kv 提取所有的key-value