我們通過filebeat收集的日志通常沒有進行敏感信息的脫敏工作,以及一些base64超長字符如果不做處理,存儲到es及在kibana展示也會耗費性能。
針對這兩個問題我們可以進行如下配置:
1.設置超長字符截取
進入logstash安裝目錄,config文件夾下新建文件logstash_to_es相關配置如下:
filter {
if [type] == "filebeat" {
grok {
match => [
"message","%{LOGLEVEL:level}",
"message","%{DATA:message}"
]
}
if [level] == "DEBUG" {
drop { }
}
if[message]=~"ASPECT"{
drop { }
}
if [message]=~"OVERLENGTH"{
grok{
match => {
"message" => "(?<MYELF>([\s\S]{500}))" #截取帶RAW-CONTENT關鍵字的日志500個字符 作為MYELF的值
}
}
mutate {
rename => {"MYELF" => "message"} #重命名字段MYELF為message
}
}
if[message]=~"traceId"{
grok {
match => ["message","%{UUID:traceId}"] #自定義kibana篩選關鍵字
}
}
}
}
2.敏感字段脫敏
進入logstash安裝目錄,config文件夾下新建文件logstash_to_es相關配置如下:
filter {
if [type] == "filebeat" {
ruby {
path => "/usr/local/logstash-6.0.1/config/text.rb"
script_params => { "message" => "message" }
}
}
}
text.rb也就是我們配置的敏感字段脫敏ruby的過濾條件。
def register(params)
@message = params["message"] # 通過params獲取script_params傳參
end
def filter(event)
msg = event.get(@message)
if(msg =~ /certNo:(.*)/) # 獲取日志中的關鍵字certNo
cerNoExp = /(?<=certNo:)(\w{6})(\w{8})(\w{4})/
cerNoExp =~ msg
cerNoInFor = $2
msg.gsub!(cerNoInFor, "******")
end
if(msg =~ /mobileNo:(.*)/) # 獲取日志中的關鍵字mobileNo
mobileNoExp = /(?<=mobileNo:)(\w{3})(\w{4})(\w{4})/
mobileNoExp =~ msg
mobileNoInFor = $2
msg.gsub!(mobileNoInFor, "****")
end
#event.set("msg", msg)
return [event]
end