ELK日志分析系統(3)-logstash數據處理


 

1. 概述

  logspout收集數據以后,就會把數據發送給logstash進行處理,本文主要講解logstash的input, filter, output處理

2. input

  數據的輸入處理

  支持tcp,udp等協議

 

  晚上找資料建議在使用 LogStash::Inputs::Syslog 的時候走 TCP 協議來傳輸數據。

  因為具體實現中,UDP 監聽器只用了一個線程,而 TCP 監聽器會在接收每個連接的時候都啟動新的線程來處理后續步驟。

  如果你已經在使用 UDP 監聽器收集日志,用下行命令檢查你的 UDP 接收隊列大小:# netstat -plnu | awk 'NR==1 || $4~/:514$/{print $2}'

  Recv-Q

  228096

  228096 是 UDP 接收隊列的默認最大大小,這時候 linux 內核開始丟棄數據包了!

  2.1. 語法

  基本語法如下:

input{
        tcp {
                mode => "server"
                port => 5000
                codec => json_lines
                tags => ["data-http"]
        }
}

 

  2.2. multiline

  有時候日志是這樣多行顯示的:

[2019-10-12 15:24:50 ACCOUNT 97364 4658800064 INFO] http_ip=127.0.0.1        http_uri=/account/v1/binding        http_method=POST        http_time=182ms        http_status=401        
http_headers=Content-Type:application/x-www-form-urlencoded
Content-Length:27
Accept-Encoding:identity
Host:localhost:8800
User-Agent:Python-urllib/3.6
Key:424518e4d27b11e8ada274e5f95979ae
Version:1.1.0
Time:1570865090.412524
Token:y66AHLNmRoscIIsoWnKzxosojSg=
User-Id:0
Connection:close        
http_kwargs={'sns_type': 'wechat', 'code': 'CG9DEj', 'user_id': 0, 'language': 1}        
http_response={"code":"usr_sns_code_error","message":"\u7b2c\u4e09\u65b9sns\u5e10\u53f7code\u65e0\u6548"}

 

    默認情況下logstash會把一行日志轉換成elasticsearch的一個doc,上面這個日志就會存儲成15條日志。這樣就不能滿足我們的需求,我們只是想要一條日志

  我們可以這么配置input:

input{ tcp { port => 5001 type => syslog tags => ["syslog"] codec=>multiline{ pattern => "\[%{TIMESTAMP_ISO8601:timestamp}" negate => true what => "previous" } } }

  紅色代碼的作用是:匹配到以[2019-10-08 16:57:42開頭的一行日志作為previous,不是以這個格式開頭的將作為子行出現,然后把多行記錄合並成一行記錄

 

3. filter

  數據的過濾轉化處理

  3.1. 語法

  基本語法如下:

filter {
  grok {
    match => { "message" => "%{SYSLOGBASE} %{DATA:message}" }
    overwrite => [ "message" ]
  }
}

 

  3.2. grok范式匹配

  

  grok適合用來解析syslog,apache,mysql等日志

  假如你的日志格式是這樣的

[2019-10-12 15:44:52 ACCOUNT 1 140058162291304 WARNING] HashCache::_rds_get, cache not existed!!! id_ls:[]

   日志的格式是這樣的:

"[%(asctime)s %(service)s %(process)d %(thread)d %(levelname)s] %(message)s"

  那么針對這樣有特定格式的日志,我們要怎樣提取這里面的字段呢?

    可以這么配置你的filter:

filter{
        if [type] == "syslog" {
                grok {
                        match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp} %{DATA:service} %{DATA:pid} %{DATA:tid} %{LOGLEVEL:log-level}\] %{GREEDYDATA:msg_body}" }
                }
        }
}

    使用grok的match正則表達式匹配可以方便的從message中提取字段

  從elasticsearch可以發現增加了timestamp、server、pid、tid和log-level等字段。

  

  

  附上官網文檔:

    # grok調試器

    https://grokdebug.herokuapp.com =>debugger

    # 官方文檔

    https://www.elastic.co/guide/en/kibana/7.3/xpack-grokdebugger.html

 

  3.3. gsub字符串替換

  經過logspout處理以后,會增加一些metadata(container name, container id, etc)

  紅色部分是logspout添加的:

  <14>1 2019-10-08T18:00:15Z zfswalk0 mage-device-11283 16901 - - [2019-10-09 09:49:08 WARNING SACCOUNT C P1 T140004171454120 P1 P2 P3] start listen on HTTP:0.0.0.0:17698, start listen on HTTP:0.0.0.0:17698

    如何去除這部分多余的數據呢?

  logstash需要使用gsub進行字符串替換:

  

filter{
        if [type] == "syslog" {
                mutate {
                        gsub => [ "message", "<\d+>.*?- -", "" ]
                }
        }
}

 

  這個正則表達式的意義是選擇從“<14>”開始到“- -”結束的子字符串,然后替換成空字符串,實現metadata的刪除

 

  3.4. remove_filed刪除字段

  ELK是采用json字典的方式來存儲數據的

  如果你有哪些字段是不需要的,可以通過remove_field來刪除

  假如你不想要grop解析出來的msg_body字段和test字段,可以這么操作,那么最后存儲到elasticsearch那邊將不會出現這2字段

filter{
        if [type] == "syslog" {
                mutate {
                        remove_field => [ "msg_body", "test" ]
                }
        }
}

 

 

3.5. kv過濾器解析kv數據

  官方文檔kv filter:https://www.elastic.co/guide/en/logstash/current/plugins-filters-kv.html

  動態的解析kv可以很方便的支持日志擴展,不需要后期去修改

 

  它會把這個字符串:ip=1.2.3.4 error=REFUSED解析成kv字典形式:{"ip":"1.2.3.4", "error": "REFUSED"}

  

filter{
        if [type] == "syslog" {
                kv {
                        source => "msg_body"
                        field_split => "\t\t"
                }
        }
}

    這邊的配置意思是:從msg_body這個字段去解析kv字段,字段的分隔符是"\t\t"

  當然這也要求日志寫入的時候需要采用"\t\t"來區分多個字段,類似這樣:

[2019-10-12 15:24:50 ACCOUNT 97364 4658800064 INFO] http_ip=192.168.1.136		http_uri=/account/v1/binding		http_method=POST

    http_ip=127.0.0.1、http_uri=/account/v1/binding與http_method=POST這三個字段是采用'\t\t'分割的

  這樣kv filter就會解析成功,並往doc里面設置http_ip, http_uri,http_method這三個值:

 

 

4. output

  過濾轉化后的數據的輸出處理

  這里是把數據存儲到elasticsearch的9200端口,index是"syslog-%{+YYYY.MM.dd}"

output{
    if "syslog" in [tags]{
        elasticsearch{
                hosts=>["elasticsearch:9200"]
                index => "syslog-%{+YYYY.MM.dd}"
        }
        stdout{codec => rubydebug}
    }
}

   然后elasticsearch就能得到數據了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM