ELK學習筆記之Logstash詳解


 

0x00 Logstash概述

官方介紹:Logstash is an open source data collection engine with real-time pipelining capabilities。簡單來說logstash就是一根具備實時數據傳輸能力的管道,負責將數據信息從管道的輸入端傳輸到管道的輸出端;與此同時這根管道還可以讓你根據自己的需求在中間加上濾網,Logstash提供里很多功能強大的濾網以滿足你的各種應用場景。

 

Logstash常用於日志關系系統中做日志采集設備,最常用於ELK(elasticsearch + logstash + kibane)中作為日志收集器使用;
官網介紹

 

從 Logstash 的名字就能看出,它主要負責跟日志相關的各類操作,在此之前,我們先來看看日志管理的三個境界吧

境界一 
『昨夜西風凋碧樹。獨上高樓,望盡天涯路』,在各台服務器上用傳統的 linux 工具(如 cat, tail, sed, awk, grep 等)對日志進行簡單的分析和處理,基本上可以認為是命令級別的操作,成本很低,速度很快,但難以復用,也只能完成基本的操作。

境界二 
『衣帶漸寬終不悔,為伊消得人憔悴』,服務器多了之后,分散管理的成本變得越來越多,所以會利用 rsyslog 這樣的工具,把各台機器上的日志匯總到某一台指定的服務器上,進行集中化管理。這樣帶來的問題是日志量劇增,小作坊式的管理基本難以滿足需求。

境界三 
『眾里尋他千百度,驀然回首,那人卻在燈火闌珊處』,隨着日志量的增大,我們從日志中獲取去所需信息,並找到各類關聯事件的難度會逐漸加大,這個時候,就是 Logstash 登場的時候了

Logstash 的主要優勢,一個是在支持各類插件的前提下提供統一的管道進行日志處理(就是 input-filter-output 這一套),二個是靈活且性能不錯

logstash里面最基本的概念(先不管codec)

logstash收集日志基本流程:

 input–>filter–>output
input:從哪里收集日志

filter:對日志進行過濾

output:輸出哪里

 

0x01 Logstash架構

Logstash 是由 JRuby 編寫的,使用基於消息的簡單架構,在 JVM 上運行。理念非常簡單,如果說 MapReduce 框架分為 Mapper 和 Reducer 兩大模塊,那么 Logstash 有

Collect: 數據輸入。對應 input
Enrich: 數據處理。對應 filter
Transport: 數據輸出。對應 output

Logstash的事件(logstash將數據流中等每一條數據稱之為一個event)處理流水線有三個主要角色完成:inputs –> filters –> outputs:

inpust:必須,負責產生事件(Inputs generate events),常用:File、syslog、redis、beats(如:Filebeats)
filters:可選,負責數據處理與轉換(filters modify them),常用:grok、mutate、drop、clone、geoip
outpus:必須,負責數據輸出(outputs ship them elsewhere),常用:elasticsearch、file、graphite、statsd

雖然模塊僅僅比 MapReduce 框架多了一個,但是無三不成幾,通過不同的拓撲結構,可以完成各類數據處理應用。不過這里我們主要還是以日志匯總處理系統的思路來進行介紹,一個典型的架構為:

 

0x02 Logstash安裝

1. 環境清單

  • 操作系統:CentOS Linux release 7.3.1611
  • Logstash版本:logstash-5.4.1
  • Jdk版本:1.8.0_131

2. 軟件下載

  • 下載Jdk:
[root@root ~]$ wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.tar.gz
  • 下載Logstash:
[root@root ~]$ wget https://artifacts.elastic.co/downloads/logstash/logstash-5.4.1.tar.gz

3. 安裝Jdk

  • 創建安裝目錄
[root@root ~]$ sudo mkdir /usr/local/Java
  • 解壓縮安裝文件
## 移動安裝包到安裝目錄 ##
[root@root ~]$ sudo mv jdk-8u131-linux-x64.tar.gz /usr/local/Java/
## 進入安裝目錄 ##
[root@root ~]$ cd /usr/local/Java/
## 解壓縮安裝包 ##
[root@root Java]$ sudo tar -zxvf jdk-8u131-linux-x64.tar.gz
## 刪除安裝包 ##
[root@root Java]$ sudo rm jdk-8u131-linux-x64.tar.gz
  • 測試安裝是否成功
## 進入JAVA_HOME ##
[root@root Java]$ cd jdk1.8.0_131/
## 測試java命令是否可以正常執行 ##
[root@root jdk1.8.0_131]$ ./bin/java -version
  • 配置JAVA_HOME環境變量
[root@root ~]$ cd ~
[root@root ~]$ vi .bash_profile
# .bash_profile

# Get the aliases and functions
if [ -f ~/.bashrc ]; then
. ~/.bashrc
fi

# User specific environment and startup programs

## 配置JAVA_HOME環境變量 ##
JAVA_HOME=/usr/local/Java/jdk1.8.0_131

## 將java執行目錄加入到PATH下面 ##
PATH=$PATH:$HOME/.local/bin:$HOME/bin:$JAVA_HOME/bin

export PATH
~  
## 使環境變量生效 ##
[root@root ~]$ source .bash_profile
## 測試JAVA_HOME是否正確配置 ##
[root@root ~]$ java -version

4. 安裝Logstash

  • 創建安裝目錄
[root@root ~]$ sudo mkdir /usr/local/logstash
  • 解壓縮安裝文件
[root@root ~]$ sudo mv logstash-5.4.1.tar.gz /usr/local/logstash/
[root@root ~]$ cd /usr/local/logstash/
[root@root logstash]$ sudo tar -zxvf logstash-5.4.1.tar.gz

測試安裝是否成功

  • 測試一、快速啟動,標准輸入輸出作為input和output,沒有filter
[root@root logstash]$ cd logstash-5.4.1/
[root@root logstash-5.4.1]$ ./bin/logstash -e 'input { stdin {} } output { stdout {} }'
Sending Logstash's logs to /usr/local/logstash/logstash-5.4.1/logs which is now configured via log4j2.properties
[2018-10-31T13:37:13,449][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/usr/local/logstash/logstash-5.4.1/data/queue"}
[2018-10-31T13:37:13,467][INFO ][logstash.agent           ] No persistent UUID file found. Generating new UUID {:uuid=>"dcfdb85f-9728-46b2-91ca-78a0d6245fba", :path=>"/usr/local/logstash/logstash-5.4.1/data/uuid"}
[2018-10-31T13:37:13,579][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>250}
[2018-10-31T13:37:13,612][INFO ][logstash.pipeline        ] Pipeline main started
The stdin plugin is now waiting for input:
[2018-10-31T13:37:13,650][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

## 此時命令窗口停留在等待輸入狀態,鍵盤鍵入任意字符 ##

hello world

## 下方是Logstash輸出到效果 ##

2018-10-31T12:21:14.401Z logstash.master hello world
  • 測試二、在測試一堆基礎上加上codec進行格式化輸出
[root@root logstash-5.4.1]$ ./bin/logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'
Sending Logstash's logs to /usr/local/logstash/logstash-5.4.1/logs which is now configured via log4j2.properties
[2018-10-31T14:01:50,325][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>250}
[2018-10-31T14:01:50,356][INFO ][logstash.pipeline        ] Pipeline main started
The stdin plugin is now waiting for input:
[2018-10-131T14:01:50,406][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

## 此時命令窗口停留在等待輸入狀態,鍵盤鍵入任意字符 ##

hello world

## 下方是Logstash輸出到效果 ##

{
"@timestamp" => 2018-10-31T06:02:19.189Z,
"@version" => "1",
"host" => "chenlei.master",
"message" => "hello world"
}

 

0x03 Logstash參數與配置

Logstash宏觀的配置文件內容格式如下:

# 輸入
input {
  ...
}

# 過濾器
filter {
  ...
}

# 輸出
output {
  ...
}

配置文件參考

input {
    # 從文件讀取日志信息
    file {
        path => "/var/log/error.log"
        type => "error"//type是給結果增加一個type屬性,值為"error"的條目
        start_position => "beginning"//從開始位置開始讀取
        # 使用 multiline 插件,傳說中的多行合並
        codec => multiline {
            # 通過正則表達式匹配,具體配置根據自身實際情況而定
            pattern => "^\d"
            negate => true
            what => "previous"
        }
    }
}

#可配置多種處理規則,他是有順序,所以通用的配置寫下面
# filter {
#    grok {
#       match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
     }
# }

output {
    # 輸出到 elasticsearch
    elasticsearch {
        hosts => ["192.168.22.41:9200"]
        index => "error-%{+YYYY.MM.dd}"//索引名稱
    }
}

上面的file可以配置多個:

file {  
        type => "tms_inbound.log"  
        path => "/JavaWeb/tms2.wltest.com/logs/tms_inbound.es.*.log"  
        codec => json {  
                charset => "UTF-8"  
            }  
    }  

  file {  
        type => "tms_outbound.log"  
        path => "/JavaWeb/tms2.wltest.com/logs/tms_outbound.es.*.log"  
        codec => json {  
                charset => "UTF-8"  
            }  
    } 

 

1. 常用啟動參數

2. 配置文件結構及語法

  • 區段

      Logstash通過{}來定義區域,區域內可以定義插件,一個區域內可以定義多個插件,如下:

input {
    stdin {
    }
    beats {
        port => 5044
    }
}
  • 數據類型

    Logstash僅支持少量的數據類型:

​       Boolean:ssl_enable => true

​       Number:port => 33

​       String:name => “Hello world”

​       Commonts:# this is a comment

  • 字段引用

    Logstash數據流中的數據被稱之為Event對象,Event以JSON結構構成,Event的屬性被稱之為字段,如果你像在配置文件中引用這些字段,只需要把字段的名字寫在中括號[]里就行了,如[type],對於嵌套字段每層字段名稱都寫在[]里就可以了,比如:[tags][type];除此之外,對於Logstash的arrag類型支持下標與倒序下表,如:[tags][type][0],[tags][type][-1]。

  • 條件判斷

    Logstash支持下面的操作符:

​     equality:==, !=, <, >, <=, >=

​     regexp:=~, !~

​     inclusion:in, not in

​     boolean:and, or, nand, xor

​     unary:!

  例如:

if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}
  • 環境變量引用

    Logstash支持引用系統環境變量,環境變量不存在時可以設置默認值,例如:

export TCP_PORT=12345
input {
  tcp {
    port => "${TCP_PORT:54321}"
  }
}

 

0x04 常用輸入插件(Input Plug)

1. File讀取插件

 文件讀取插件主要用來抓取文件的變化信息,將變化信息封裝成Event進程處理或者傳遞。

  • 配置事例
input
  file {
    path => ["/var/log/*.log", "/var/log/message"]
    type => "system"
    start_position => "beginning"
  }
}
  • 常用參數

 

2. Beats監聽插件

Beats插件用於建立監聽服務,接收Filebeat或者其他beat發送的Events;

  • 配置事例
input {
    beats {
        port => 5044
    }
}
  • 常用參數(空 => 同上)

 

3. TCP監聽插件

TCP插件有兩種工作模式,“Client”和“Server”,分別用於發送網絡數據和監聽網絡數據。

  • 配置事例
tcp {
    port => 41414
}
  • 常用參數(空 => 同上)

 

4. Redis讀取插件

用於讀取Redis中緩存的數據信息。

  • 最小化配置
input {
redis {
data_type => "list" #logstash redis插件工作方式
key => "logstash-test-list" #監聽的鍵值
host => "127.0.0.1" #redis地址
port => 6379 #redis端口號
}
}
output {
stdout{}
}
  • 詳細配置
input {

redis {

batch_count => 1 #EVAL命令返回的事件數目

data_type => "list" #logstash redis插件工作方式

key => "logstash-test-list" #監聽的鍵值

host => "127.0.0.1" #redis地址

port => 6379 #redis端口號

password => "123qwe" #如果有安全認證,此項為密碼

db => 0 #redis數據庫的編號

threads => 1 #啟用線程數量

}

}

output {

stdout{}

}

 

5. Syslog監聽插件

監聽操作系統syslog信息

  • 配置事例
syslog {
}
  • 輸出至屏幕
[root@node1 conf.d]# cat syslog.conf
input{
    syslog{
        type => "system-syslog"
    port => 514
    }
}

filter{

}

output{
    stdout{
    codec => rubydebug
    }
}
[root@node1 conf.d]# /opt/logstash/bin/logstash -f syslog.conf
  • 修改rsyslog配置文件
[root@node1 ~]# vim /etc/rsyslog.conf
*.* @@192.168.79.103:514
[root@node1 ~]# systemctl restart rsyslog
  • 輸出至es
[root@node1 conf.d]# cat syslog.conf
input{
    syslog{
        type => "system-syslog"
    port => 514
    }
}

filter{

}

output{
    elasticsearch{
    hosts => ["192.168.79.103:9200"]
    index => "system-syslog-%{+YYYY.MM}"
    }
}
[root@node1 conf.d]# /opt/logstash/bin/logstash -f syslog.conf

 

0x05 常用過濾插件(Filter plugin)

豐富的過濾器插件的是 logstash威力如此強大的重要因素,過濾器插件主要處理流經當前Logstash的事件信息,可以添加字段、移除字段、轉換字段類型,通過正則表達式切分數據等,也可以根據條件判斷來進行不同的數據處理方式。

 

1. grok正則捕獲

  grok 是Logstash中將非結構化數據解析成結構化數據以便於查詢的最好工具,非常適合解析syslog logs,apache log, mysql log,以及一些其他的web log

預定義表達式調用

  Logstash提供120個常用正則表達式可供安裝使用,安裝之后你可以通過名稱調用它們,語法如下:%{SYNTAX:SEMANTIC}

  SYNTAX:表示已經安裝的正則表達式的名稱

  SEMANTIC:表示從Event中匹配到的內容的名稱

  例如:Event的內容為“[debug] 127.0.0.1 - test log content”,匹配%{IP:client}將獲得“client: 127.0.0.1”的結果,前提安裝了IP表達式;如果你在捕獲數據時想進行數據類型轉換可以使用%{NUMBER:num:int}這種語法,默認情況下,所有的返回結果都是string類型,當前Logstash所支持的轉換類型僅有“int”和“float”;

  一個稍微完整一點的事例:

​ 日志文件http.log內容:55.3.244.1 GET /index.html 15824 0.043

​ 表達式:%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

​ 配置文件內容:

input {
  file {
    path => "/var/log/http.log"
  }
}
filter {
  grok {
    match => {"message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}
  }
}

輸出結果:

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043
  • 自定義表達式調用

    語法:(?<field_name>the pattern here)
    舉例:捕獲10或11和長度的十六進制queue_id可以使用表達式(?<queue_id>[0-9A-F]{10,11})

  • 安裝自定義表達式

    與預定義表達式相同,你也可以將自定義的表達式配置到Logstash中,然后就可以像於定義的表達式一樣使用;以下是操作步驟說明:

    1、在Logstash根目錄下創建文件夾“patterns”,在“patterns”文件夾中創建文件“extra”(文件名稱無所謂,可自己選擇有意義的文件名稱);

    2、在文件“extra”中添加表達式,格式:patternName regexp,名稱與表達式之間用空格隔開即可,如下:

# contents of ./patterns/postfix:
POSTFIX_QUEUEID [0-9A-F]{10,11}

    3、使用自定義的表達式時需要指定“patterns_dir”變量,變量內容指向表達式文件所在的目錄,舉例如下:

## 日志內容 ##
Jan  1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>
## Logstash配置 ##
filter {
  grok {
    patterns_dir => ["./patterns"]
    match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
  }
}
## 運行結果 ##
timestamp: Jan 1 06:25:43
logsource: mailserver14
program: postfix/cleanup
pid: 21403
queue_id: BEF25A72965
  • grok常用配置參數(空 => 同上)

- 其他
- 一般的正則表達式只能匹配單行文本,如果一個Event的內容為多行,可以在pattern前加“(?m)”
- 對於Hash和Array類型,Hash表示鍵值對,Array表示數組
- Grok表達式在線debug地址:http://grokdebug.herokuapp.com
- 預定義正則表達式參考地址:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

2. date時間處理插件

  該插件用於時間字段的格式轉換,比如將“Apr 17 09:32:01”(MMM dd HH:mm:ss)轉換為“MM-dd HH:mm:ss”。而且通常情況下,Logstash會為自動給Event打上時間戳,但是這個時間戳是Event的處理時間(主要是input接收數據的時間),和日志記錄時間會存在偏差(主要原因是buffer),我們可以使用此插件用日志發生時間替換掉默認是時間戳的值。

  • 常用配置參數(空 => 同上)

 

3. mutate數據修改插件

mutate 插件是 Logstash另一個重要插件。它提供了豐富的基礎類型數據處理能力。可以重命名,刪除,替換和修改事件中的字段。

# logstash-filter-mutate 插件是Logstash 另一個重要插件,它提供了豐富的基礎類型數據處理能力,包括類型轉換,字符串處理和字段處理等
 
#1.類型轉換
 
#類型轉換是logstash-filter-mutate 插件最初誕生時的唯一功能,

#可以設置的轉換類型包括:"integer","float" 和 "string"。示例如下:
 
input {
   stdin {
     }
 }
 
filter {
  grok {
   match =>{
   "message" =>"(?<request_time>\d+(?:\.\d+)?)"
     }
}
}
output {
   stdout {
   codec =>rubydebug
   }
}
 
[elk@Vsftp logstash]$ logstash -f t2.conf 
Settings: Default pipeline workers: 4
Pipeline main started
23.45
{
         "message" => "23.45",
        "@version" => "1",
      "@timestamp" => "2017-01-11T02:07:33.581Z",
            "host" => "Vsftp",
    "request_time" => "23.45"
}
 

#字符串 轉換為float型 [elk@Vsftp logstash]$ cat t2.conf input { stdin { } } filter { grok { match =>{ "message" =>"(?<request_time>\d+(?:\.\d+)?)" } } mutate { convert => ["request_time", "float"] } } output { stdout { codec =>rubydebug } } [elk@Vsftp logstash]$ logstash -f t2.conf Settings: Default pipeline workers: 4 Pipeline main started 23.45 { "message" => "23.45", "@version" => "1", "@timestamp" => "2017-01-11T02:10:07.045Z", "host" => "Vsftp", #字符串轉換成數值型: [elk@Vsftp logstash]$ cat t2.conf input { stdin { } } filter { grok { match =>{ "message" =>"(?<request_time>\d+(?:\.\d+)?)" } } mutate { convert => ["request_time", "integer"] } } output { stdout { codec =>rubydebug } } [elk@Vsftp logstash]$ logstash -f t2.conf Settings: Default pipeline workers: 4 Pipeline main started 23.45 { "message" => "23.45", "@version" => "1", "@timestamp" => "2017-01-11T02:11:21.071Z", "host" => "Vsftp", "request_time" => 23

 

4. JSON插件

JSON插件用於解碼JSON格式的字符串,一般是一堆日志信息中,部分是JSON格式,部分不是的情況下

  • 配置事例
json {
    source => ...
}
## 事例配置,message是JSON格式的字符串:"{\"uid\":3081609001,\"type\":\"signal\"}" ##
filter {
    json {
        source => "message"
        target => "jsoncontent"
    }
}
## 輸出結果 ##
{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "{\"uid\":3081609001,\"type\":\"signal\"}",
    "jsoncontent": {
        "uid": 3081609001,
        "type": "signal"
    }
}
## 如果從事例配置中刪除`target`,輸出結果如下 ##
{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "{\"uid\":3081609001,\"type\":\"signal\"}",
    "uid": 3081609001,
    "type": "signal"
}
  • 常用配置參數(空 => 同上)

 

5. elasticsearch查詢過濾插件

用於查詢Elasticsearch中的事件,可將查詢結果應用於當前事件中

  • 常用配置參數(空 => 同上)

 

 

6. 其他

  還有很多其他有用插件,如:Split、GeoIP、Ruby,這里就不一一寫了,等以后用到再補充

 

0x06 常用輸出插件(Output plugin)

1. ElasticSearch輸出插件

用於將事件信息寫入到Elasticsearch中,官方推薦插件,ELK必備插件

  • 配置事例
output {
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "filebeat-%{type}-%{+yyyy.MM.dd}"
        template_overwrite => true
    }
}
  • 常用配置參數(空 => 同上)

 

2、Redis輸出插件

  用於將Event寫入Redis中進行緩存,通常情況下Logstash的Filter處理比較吃系統資源,復雜的Filter處理會非常耗時,如果Event產生速度比較快,可以使用Redis作為buffer使用

  • 配置事例
output {
    redis {
        host => "127.0.0.1"
        port => 6379
        data_type => "list"
        key => "logstash-list"
    }
}
  • 常用配置參數(空 => 同上)

 

3. File輸出插件

用於將Event輸出到文件內

  • 配置事例
output {
    file {
        path => ...
        codec => line { format => "custom format: %{message}"}
    }
}
  • 常用配置參數(空 => 同上)

 

4. TCP插件

  Write events over a TCP socket.Each event json is separated by a newline.Can either accept connections from clients or connect to a server, depending on mode.

  • 配置事例
tcp {
    host => ...
    port => ...
}
  • 常用配置參數(空 => 同上)

 

0x07 常用編碼插件(Codec plugin)

1. JSON編碼插件

直接輸入預定義好的 JSON 數據,這樣就可以省略掉 filter/grok 配置

  • 配置事例
json {
}
  • 常用配置參數

 

 

0x08 Logstash實例

1. 接收Filebeat事件,輸出到Redis

input {
    beats {
        port => 5044
    }
}

output {
    redis {
        host => "127.0.0.1"
        port => 6379
        data_type => "list"
        key => "logstash-list"
    }
}

 

2. 讀取Redis數據,根據“type”判斷,分別處理,輸出到ES

input {
    redis {
        host => "127.0.0.1"
        port => 6379
        data_type => "list"
        key => "logstash-list"
    }
}

filter {
    if [type] == "application" {
        grok {
            match => ["message", "(?m)-(?<systemName>.+?):(?<logTime>(?>\d\d){1,2}-(?:0?[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5][0-9]|60)(?:[:.,][0-9]+)?)) \[(?<level>(\b\w+\b)) *\] (?<thread>(\b\w+\b)) \((?<point>.*?)\) - (?<content>.*)"]
        }
        date {
            match => ["logTime", "yyyy-MM-dd HH:mm:ss,SSS"]
        }
        json {
            source => "message"
        }
        date {
            match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
        }
    }
    if [type] == "application_bizz" {
        json {
            source => "message"
        }
        date {
            match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
        }
    }
    mutate {
        remove_field => ["@version", "beat", "logTime"]
    }
}

output {
    stdout{
    }
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "filebeat-%{type}-%{+yyyy.MM.dd}"
        document_type => "%{documentType}"
        template_overwrite => true
    }
}

 

0x09 Logstash注意事項

  • 問題記錄

    啟動logstash慢,輸入./bin/logstash沒有反應,多出現在新安裝的操作系統上

  • 原因

    jruby啟動的時候jdk回去從/dev/random中初始化隨機數熵,新版本的jruby會用RPNG算法產生隨后的隨機數,但是舊版本的jruby會持續從/dev/random中獲取數字。但是不幸的是,random發生器會跟不上生成速度,所以獲取隨機數的過程會被阻塞,直到隨機數池擁有足夠的熵然后恢復。這在某些系統上,尤其是虛擬化系統,熵數池可能會比較小從而會減慢jruby的啟動速度。

    檢查一下系統的熵數池 cat /proc/sys/kernel/random/entropy_avail,正常情況這個數字推薦大於1000,對比了一下獨立主機的這個數值,大約在700-900之間晃悠。

  • 解決

    使用偽隨機,編輯/usr/local/logstash/logstash-5.4.1/config/jvm.options,在最后增加一行:-Djava.security.egd=file:/dev/urandom

  • 參考

    https://github.com/elastic/logstash/issues/5507
    http://www.tuicool.com/articles/jEBBZbb

 

參考

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM