通過logstash搜集日志
這里搜集日志可以使用ELK的一個插件filebeat對日志進行處理,並傳輸到后端的程序
在這里有一個不好的地方, 如果想要直接使用filebeat將日志發送到elasticsearch的話, 它並不能對任何字段進行替換等處理
比較明顯的問題就是, 一般我們需要將@timestamp替換成日志里面的時間而不是程序對日志的處理時間, 這一點它無法做到
還有一點, 使用filebeat對多行日志進行處理時似乎會發生日志收集錯亂的現象, 這個問題有待測試, 因為filebeat程序是自帶處理多行日志的
當然好處也是有點, 可以比較省資源
input {
file {
path => "/tmp/test.log"
add_field => {"area"=>"beijing"}
codec => multiline {
pattern => "^\["
negate => true
what => previous
}
}
}
filter {
grok {
match => { "message" => "^\[(%{WORD:loglevel}\s+)?%{TIMESTAMP_ISO8601:timestamp}?(?<file>[^\@]+)\s+@\s+(?<pid>[^\]]+)\]\s+\-\s+?%{GREEDYDATA:result}" }
remove_field => [ "message" ]
}
if ([result] =~ "visitlog|") {
mutate {
split => ["result","visitlog|"]
add_field => {
"field2" => "%{[result][1]}"
}
remove_field => [ "result" ]
}
json {
source => "field2"
target => "results"
remove_field => [ "field2" ]
}
date {
match => [ "[results][reqTime]", "yyyy-MM-dd HH:mm:ss" ]
}
}
}
output {
elasticsearch {
hosts => [ "127.0.0.1:9200" ]
index => "logstash-name-%{+YYYY.MM.dd.HH}"
flush_size => 20
idle_flush_time => 3
sniffing => true
template_overwrite => true
}
}
output {
stdout {
codec => rubydebug
}
}
上面是一個logstash的配置文件,處理的日志格式大概是這樣的
[ERROR 2017-05-04 10:12:24,281 ./connect_info.py:336 @ 8299] - socket send and recieve Error: Traceback (most recent call last):
File "./connect_info.py", line 305, in get_request
retdata['handstr']=unpack('10s',client_socket.recv(10) )
error: unpack requires a string argument of length 10
[INFO 2017-05-04 10:12:24,282 ./connect_info.py:84 @ 8299] - before doing clean up...
[INFO 2017-05-04 10:12:24,282 ./connect_info.py:92 @ 8299] - end clean up.
[INFO 2017-05-04 10:12:24,289 ./connect_info.py:320 @ 8299] - from engine:{"data":{"isFromCache":0,"results":[{"aa":"bb","cc":dd"}],"semantic":[{"aa":"bb","cc":"dd"}],"total":1},"errmsg":"","retcode":0,"tolerance":["abc"]}
[INFO 2017-05-04 10:12:24,290 /xxx/ooo/music_service.py:95 @ 8299] - visitlog|{"reqTime":"2017-05-04 10:12:24","time":{"receive": 0.006849050521850586, "init": 4.0531158447265625e-06, "reqTime": 0.008450031280517578, "send": 1.5974044799804688e-05},"req":{"pageSize": 20, "text": "abc", "appId": "appid", "uuid": "1e4e45365ae43b12cf31004f41013b23", "lengthMin": 0, "isCorrect": "1", "sessionId": "1493863935", "sid": "1493863935", "sort": "1", "pageIndex": 1, "searchFunc": "searchmusic", "lengthMax": 0, "timestamp": "1493863935", "isSemantic": "1", "isFilter": "0", "releaseDateMin": 0, "path": "/aa/bb/cc/searchmusic", "_": "1493863109797", "releaseDateMax": 0, "callback": "jQuery1900565385167_1456109742", "token": "aaaaaaaaaaaaaaaaaa", "queryId": "dfbab18a3bd7cfb28acb33f323ada1cd"},"response":{"data":{"isFromCache":0,"results":[{"aa":"bb","cc":dd"}],"semantic":[{"aa":"bb","cc":"dd"}],"total":1},"errmsg":"","retcode":0,"tolerance":["abc"]}}
這里分為三個段落
input段:
采用文件的形式, path可以采用*來匹配任意字符(匹配單個字符待測試),
add_field 可以增加字段, 可以很好的區分開日志
codec => multiline 采用多行的模式 如果不是以[
開頭的將后面的行算作第一行
filter段:
這里采用的是 grok 匹配前面的無規則(非json格式)內容, 其后的json格式內容統一存到 result 字段, 並移除message字段
再通過 if 判斷, 提取需要處理的日志 使用 mutate 對日志進行切分, 標准的json格式日志將保存在 field2 字段 之后通過 json 進行格式化該字段
最好將格式化好的字段中的時間 替換默認的 @timestamp 字段
output字段:
elasticsearch 將日志輸出到elasticsearch 中
stdout 將日志輸出到屏幕終端
通過elasticsearch對日志進行檢索
先通過results.req.searchFunc字段過濾出包含 searchmusic的內容, 再判斷 results.response.data.total 是否大於 1 排除搜索無結果的內容
最后使用 aggregations 對 results.req.text.keyword 字段結果進行聚合 統計出該字段的每個內容的個數, size控制顯示多少個內容
aggregations 上面的size控制不顯示其他搜索內容, 只關注aggregations 統計結果
GET /logstash-name-2017.06*/_search
{
"query": {
"bool": {
"must": [
{
"term": {
"results.req.searchFunc": "searchmusic"
}
},
{
"range": {
"results.response.data.total": {
"gte": "1"
}
}
}
]
}
},
"size":0,
"aggregations": {
"topSearch": {
"terms": {
"field": "results.req.text.keyword",
"size": 100
}
}
}
}