配置語法: Logstash必須有一個 input 和一個 output
1, 處理輸入的input
1), 從文件錄入
logstash使用一個名為 filewatch的 ruby gem庫來監聽文件變化, 這個庫記錄一個 .sincedb的數據文件跟蹤監聽日志文件的當前位置
input { file { path => ["/var/log/*.log", "/var/log/message"] type => "system" start_position => "beginning" } }
output {
stdout{
codec=>rubydebug } }
其他配置
discover_interval: 每隔多久檢查path下是否有新文件, 默認15s
exclude: 不行唄監聽的文件排除
close_older: 被監聽的文件多久沒更新就關閉監聽, 默認3600s
ignore_older: 檢查文件列表時, 如果最后修改時間超過這個值, 就虎烈
2) 標准輸入: (Stdin)
logstash最簡單最基本的輸入方式
在 {LH}/下, 新建 stdin.conf, 並輸入以下內容:
input{ stdin{ add_field=>{"key"=>"value"} codec=>"plain" tags=>["add"] type=>"std" } } output { stdout{ codec=>rubydebug } }
使用命令運行
./bin/logstash -f ./stdin.conf
啟動后輸入 helloworld, 可以看到如下輸出
這兒的 type 和tags 是logstash的兩個特俗字段, 通常會在輸入區域通過type標記事件類型, tags則在數據處理階段, 由具體的插件來添加或刪除的
3) syslog
從設備上收集日志的時候可用
input { syslog { port => "514" } }
output {
stdout{
codec=>rubydebug } }
此時, 系統的日志都會到logstash中來, 建議使用使用LogStash::Inputs::TCP和 LogStash::Filters::Grok 配合實現同樣的 syslog 功能! 具體可見: https://kibana.logstash.es/content/logstash/plugins/input/syslog.html
input { tcp { port => "8514" } } filter { grok { match => ["message", "%{SYSLOGLINE}" ] } syslog_pri { } }
4) 網絡數據讀取, tcp
可被redis,等替代作為 logstash broker 的角色, 但logstash又自己的tcp插件
input { tcp { port => 8888 mode => "server" ssl_enable => false } }
最佳使用是: 配合 nc 命令導入就數據
# nc 127.0.0.1 8888 < olddata
導入完畢后, nc命令會結束, 如果使用file會一直監聽新數據
2 編碼插件 codec
使用codec可以處理不同類型的數據, 使得logstash 形成 input | decode | filter | encode | output 的數據流, codec就是用來 encode 和 decode的
1), json格式
將nginx的日志導入為json格式: nginx需要配置 conf, 在 http{} 下進行配置, 所有server共享
logformat json '{"@timestamp":"$time_iso8601",' '"@version":"1",' '"host":"$server_addr",' '"client":"$remote_addr",' '"size":$body_bytes_sent,' '"responsetime":$request_time,' '"domain":"$host",' '"url":"$uri",' '"status":"$status"}'; access_log /var/log/nginx/access.log_json json;
修改stdin.conf
input { file { path => "/var/log/nginx/access.log_json" codec => "json" } }
然后訪問本地nginx, 可以看到logstash輸出:

2), multiline 合並多行數據
一個事件打印多行內容, 很難通過命令行解析分析, 因此需要:
input { stdin { codec => multiline { pattern => "^\[" negate => true what => "previous" } } }
將當前的數據添加到下一行后面, 知道新匹配 ^[ 位置
3, filter插件:
擴展了進入過濾器的原始數據,進行復雜的邏輯處理,甚至可以無中生有的添加新的 logstash 事件到后續的流程中去!
1) 時間處理
logstash內部使用了java的 joda 時間庫來處理時間
filter { grok { match => ["message", "%{HTTPDATE:logdate}"] } date { match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"] } }
2) grok, 正則捕獲
可以將輸入的文本匹配到字段中去:
input {stdin{}} filter { grok { match => { "message" => "\s+(?<request_time>\d+(?:\.\d+)?)\s+" } } } output {stdout{codec => rubydebug}}
然后輸入 begin 123.456 end

grok支持預定義的grok表達式: (自己的變量)
%{PATTERN_NAME:capture_name:data_type}
所以上例可改成:
filter { grok { match => { "message" => "%{WORD} %{NUMBER:request_time:float} %{WORD}" } } }
重新運行后, request_time的值變為float類型的,
實際使用中: 建議把所有的fork表達式統一寫在一個地方, 然后patterns_dir指明. 如果將message中的所有信息都grok到不通字段了, 數據就存儲重復了, 因此可以用remove_filed或者 overwrite來重寫message
filter { grok { patterns_dir => ["/path/to/your/own/patterns"] match => { "message" => "%{SYSLOGBASE} %{DATA:message}" }
data => {
"match" => ["date1", "YYYY-MM-dd HH:mm:ss.SSS" ]
} overwrite => ["message"] } }
冒號(:) 可以重新命名
附: grok 正則變量類型: https://github.com/wenbronk/elasticsearch-elasticsearch-learn/blob/master/grok%E5%86%85%E9%83%A8%E5%8F%98%E9%87%8F.txt
3) dissect
跟grok類似, 但資源消耗較小. 當日志格式有比較簡明的分隔標志位,而且重復性較大的時候,我們可以使用 dissect 插件更快的完成解析工作
filter { dissect { mapping => { "message" => "%{ts} %{+ts} %{+ts} %{src} %{} %{prog}[%{pid}]: %{msg}" } convert_datatype => { pid => "int" } } }
比如配置: http://rizhiyi.com/index.do?id=123
http://%{domain}/%{?url}?%{?arg1}=%{&arg1} 匹配后 { domain => "rizhiyi.com", id => "123" }
解釋
%{+key} 這個 + 表示,前面已經捕獲到一個 key 字段了,而這次捕獲的內容,自動添補到之前 key 字段內容的后面。
%{+key/2} 這個 /2 表示,在有多次捕獲內容都填到 key 字段里的時候,拼接字符串的順序誰前誰后。/2 表示排第 2 位。
%{?string} 這個 ? 表示,這塊只是一個占位,並不會實際生成捕獲字段存到 Event 里面。
%{?string} %{&string} 當同樣捕獲名稱都是 string,但是一個 ? 一個 & 的時候,表示這是一個鍵值對。
4) geoip: 免費的ip地址歸類查詢庫, 可根據ip提供對應的低於信息, 包括省,市,經緯度,, 可視化地圖統計等
input {stdin{}} filter { geoip { source => "message" } } output {stdout{codec => rubydebug}}
運行結果

如果只想要其中某些字段, 可以通過fileds來指定
geoip { fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"] }
5, metrics, filters/metrics 插件是使用 Ruby 的 Metriks 模塊來實現在內存里實時的計數和采樣分析
最近一分鍾 504 請求的個數超過 100 個就報警:
filter {
metrics {
timer => {"rt" => "%{request_time}"}
percentiles => [25, 75]
add_tag => "percentile"
}
if "percentile" in [tags] {
ruby {
code => "l=event.get('[rt][p75]')-event.get('[rt][p25]');event.set('[rt][low]', event.get('[rt][p25]')-l);event.set('[rt][high]',event.get('[rt][p75]')+l)"
}
}
}
output {
if "percentile" in [tags] and ([rt][last] > [rt][high] or [rt][last] < [rt][low]) {
exec {
command => "echo \"Anomaly: %{[rt][last]}\""
}
}
}
6, mutate, 類型轉換
可轉換的類型包括 integer, float, string
filter { mutate { convert => ["request_time", "float"] } }
字符串處理: , sub
gsub => ["urlparams", "[\\?#]", "_"]
split:
filter { mutate { split => ["message", "|"] } }
join, 將split切分的在join回去
filter { mutate { split => ["message", "|"] } mutate { join => ["message", ","] } }
rename: 字段重命名:
filter { mutate { rename => ["syslog_host", "host"] } }
7, split切分
是multiline插件的反向, 將一行數據切分到多個事件中去
filter { split { field => "message" terminator => "#" } }
然后輸入 "test1#test2", 可以看到被輸出到2個事件中

4, output
1, 標准輸出 (Stdout)
output { stdout { codec => rubydebug workers => 2 } }
2, 輸出到es
output { elasticsearch { hosts => ["192.168.0.2:9200"] # 有多個用逗號隔開 index => "logstash-%{type}-%{+YYYY.MM.dd}" document_type => "%{type}" flush_size => 20000 idle_flush_time => 10 sniffing => true template_overwrite => true } }
注意索引名中不能有大寫字母,否則 ES 在日志中會報 InvalidIndexNameException,但是 Logstash 不會報錯,這個錯誤比較隱晦,也容易掉進這個坑中。
3), email
126郵箱發送到 qq郵箱的示例
output { email { port => "25" address => "smtp.126.com" username => "test@126.com" password => "" authentication => "plain" use_tls => true from => "test@126.com" subject => "Warning: %{title}" to => "test@qq.com" via => "smtp" body => "%{message}" } }
