通過elasticsearch對日志進行搜索熱詞統計


通過logstash搜集日志

這里搜集日志可以使用ELK的一個插件filebeat對日志進行處理,並傳輸到后端的程序
在這里有一個不好的地方, 如果想要直接使用filebeat將日志發送到elasticsearch的話, 它並不能對任何字段進行替換等處理
比較明顯的問題就是, 一般我們需要將@timestamp替換成日志里面的時間而不是程序對日志的處理時間, 這一點它無法做到
還有一點, 使用filebeat對多行日志進行處理時似乎會發生日志收集錯亂的現象, 這個問題有待測試, 因為filebeat程序是自帶處理多行日志的
當然好處也是有點, 可以比較省資源

input {
    file {
        path => "/tmp/test.log"
        add_field => {"area"=>"beijing"}
        codec => multiline {
            pattern => "^\["
            negate => true
            what => previous
        }
    }
}
filter {
    grok {
            match => { "message" => "^\[(%{WORD:loglevel}\s+)?%{TIMESTAMP_ISO8601:timestamp}?(?<file>[^\@]+)\s+@\s+(?<pid>[^\]]+)\]\s+\-\s+?%{GREEDYDATA:result}" }
            remove_field => [ "message" ]
    }

    if ([result] =~ "visitlog|") {
        mutate {
            split => ["result","visitlog|"]
                add_field => {
                    "field2" => "%{[result][1]}"
                }
                remove_field => [ "result" ]
        }

        json {
            source => "field2"
            target => "results"
            remove_field => [ "field2" ]
        }
        date {
            match => [ "[results][reqTime]", "yyyy-MM-dd HH:mm:ss" ]
        }
    }
}
output {
    elasticsearch {
        hosts => [ "127.0.0.1:9200" ]
        index => "logstash-name-%{+YYYY.MM.dd.HH}"
        flush_size => 20
        idle_flush_time => 3
        sniffing => true
        template_overwrite => true
    }
}
output {
    stdout {
        codec => rubydebug
    }
}

上面是一個logstash的配置文件,處理的日志格式大概是這樣的

[ERROR 2017-05-04 10:12:24,281 ./connect_info.py:336 @ 8299] - socket send and recieve Error: Traceback (most recent call last):
  File "./connect_info.py", line 305, in get_request
    retdata['handstr']=unpack('10s',client_socket.recv(10) )
error: unpack requires a string argument of length 10

[INFO 2017-05-04 10:12:24,282 ./connect_info.py:84 @ 8299] - before doing clean up...
[INFO 2017-05-04 10:12:24,282 ./connect_info.py:92 @ 8299] - end clean up.
[INFO 2017-05-04 10:12:24,289 ./connect_info.py:320 @ 8299] - from engine:{"data":{"isFromCache":0,"results":[{"aa":"bb","cc":dd"}],"semantic":[{"aa":"bb","cc":"dd"}],"total":1},"errmsg":"","retcode":0,"tolerance":["abc"]}
[INFO 2017-05-04 10:12:24,290 /xxx/ooo/music_service.py:95 @ 8299] - visitlog|{"reqTime":"2017-05-04 10:12:24","time":{"receive": 0.006849050521850586, "init": 4.0531158447265625e-06, "reqTime": 0.008450031280517578, "send": 1.5974044799804688e-05},"req":{"pageSize": 20, "text": "abc", "appId": "appid", "uuid": "1e4e45365ae43b12cf31004f41013b23", "lengthMin": 0, "isCorrect": "1", "sessionId": "1493863935", "sid": "1493863935", "sort": "1", "pageIndex": 1, "searchFunc": "searchmusic", "lengthMax": 0, "timestamp": "1493863935", "isSemantic": "1", "isFilter": "0", "releaseDateMin": 0, "path": "/aa/bb/cc/searchmusic", "_": "1493863109797", "releaseDateMax": 0, "callback": "jQuery1900565385167_1456109742", "token": "aaaaaaaaaaaaaaaaaa", "queryId": "dfbab18a3bd7cfb28acb33f323ada1cd"},"response":{"data":{"isFromCache":0,"results":[{"aa":"bb","cc":dd"}],"semantic":[{"aa":"bb","cc":"dd"}],"total":1},"errmsg":"","retcode":0,"tolerance":["abc"]}}

這里分為三個段落

input段:

采用文件的形式, path可以采用*來匹配任意字符(匹配單個字符待測試),
add_field 可以增加字段, 可以很好的區分開日志
codec => multiline 采用多行的模式 如果不是以[開頭的將后面的行算作第一行

filter段:

這里采用的是 grok 匹配前面的無規則(非json格式)內容, 其后的json格式內容統一存到 result 字段, 並移除message字段
再通過 if 判斷, 提取需要處理的日志 使用 mutate 對日志進行切分, 標准的json格式日志將保存在 field2 字段 之后通過 json 進行格式化該字段
最好將格式化好的字段中的時間 替換默認的 @timestamp 字段

output字段:

elasticsearch 將日志輸出到elasticsearch 中
stdout 將日志輸出到屏幕終端

通過elasticsearch對日志進行檢索

先通過results.req.searchFunc字段過濾出包含 searchmusic的內容, 再判斷 results.response.data.total 是否大於 1 排除搜索無結果的內容
最后使用 aggregations 對 results.req.text.keyword 字段結果進行聚合 統計出該字段的每個內容的個數, size控制顯示多少個內容

aggregations 上面的size控制不顯示其他搜索內容, 只關注aggregations 統計結果

GET /logstash-name-2017.06*/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "results.req.searchFunc": "searchmusic"
          }
        },
        {
          "range": {
            "results.response.data.total": {
              "gte": "1"
            }
          }
        }
      ]
    }
  },
  "size":0,
  "aggregations": {
    "topSearch": {
      "terms": {
        "field": "results.req.text.keyword",
        "size": 100
      }
    }
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM