原文地址:https://www.2cto.com/kf/201610/560348.html
Logstash的使用
logstash支持把配置寫入文件 xxx.conf,然后通過讀取配置文件來采集數據
./bin/logstash –f xxx.conf
logstash最終會把數據封裝成json類型,默認會添加@timestamp時間字段、host主機字段、type字段。原消息數據會整個封裝進message字段。如果數據處理過程中,用戶解析添加了多個字段,則最終結果又會多出多個字段。也可以在數據處理過程中移除多個字段,總之,logstash最終輸出的數據格式是json格式。
Logstash的結構
Logstash由 input,filter,output三個組件去完成采集數據
如下是一個logstash的配置實例:
input { file { type => "log" path => "/log/*/*.log" discover_interval => 10 start_position => "beginning" } } filter { } output { elasticsearch { index => "log-%{+YYYY.MM.dd}" hosts => ["172.16.0.14:9200", "172.16.0.15:9200", "172.16.0.16:9200"] } stdout {codec => rubydebug} }
input
input組件負責讀取數據,可以采用file插件讀取本地文本文件,stdin插件讀取標准輸入數據,tcp插件讀取網絡數據,log4j插件讀取log4j發送過來的數據等等。
filter
filter插件負責過濾解析input讀取的數據,可以用grok插件正則解析數據,date插件解析日期,json插件解析json等等。
output
output插件負責將filter處理過的數據輸出。可以用elasticsearch插件輸出到es,rediss插件輸出到redis,stdout插件標准輸出,kafka插件輸出到kafka等等
trade.log日志采集。
trade.log日志采集
配置內容如下:
input { file { type => "tradelog" path => "/home/elk/his/trade.log*" discover_interval => 5 start_position => "beginning" sincedb_path => "/home/elk/myconf/sincedb_trade.txt" sincedb_write_interval => 15 codec => plain { charset => "GB2312" } } } filter { grok { match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|%{GREEDYDATA:exception}\|" } match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|" } match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" } match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|" } match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" } remove_field => "message" } date { match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"] } } output { if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] { stdout {codec => rubydebug} elasticsearch { index => "log4j-tradelog" hosts => ["168.7.1.67:9200"] manage_template => true template_overwrite => true template_name => "log4j-tradelog" template => "/home/elk/myconf/tradelog_template.json" } } }
input
1. start_position:設置beginning保證從文件開頭讀取數據。
2. path:填入文件路徑。
3. type:自定義類型為tradelog,由用戶任意填寫。
4. codec:設置讀取文件的編碼為GB2312,用戶也可以設置為UTF-8等等
5. discover_interval:每隔多久去檢查一次被監聽的 path 下是否有新文件,默認值是15秒
6. sincedb_path:設置記錄源文件讀取位置的文件,默認為文件所在位置的隱藏文件。
7. sincedb_write_interval:每隔15秒記錄一下文件讀取位置
filter
日志格式如下:
2016-05-09 09:49:13,817 [] [ACTIVE] ExecuteThread: '1' for queue: 'weblogic.kernel.Default (self-tuning)' [INFO ] com.c.command.StartLogCommand.execute(StartLogCommand.java:46) - FrontPageFinProdListQry|IP: 192.168.1.105|MAC: A1345C05-26C1-4263-8845-01CFCA6EC4FD| 2016-05-09 09:49:13,928 [] [ACTIVE] ExecuteThread: '2' for queue: 'weblogic.kernel.Default (self-tuning)' [INFO ] com.c.command.EndLogCommand.execute(EndLogCommand.java:44) - FrontPageAdvertListQry|IP: 192.168.1.105|MAC: A1245C05-26C1-4263-8845-01CFCA6EC4FD|Success|
grok插件
因為該日志中有5種格式如下,以最后幾個我需要的字段為例說明:
交易名|登錄名|編號|ip地址|mac地址|返回結果|異常信息 交易名|登錄名|編號|ip地址|mac地址|返回結果| 交易名|登錄名|編號|ip地址|mac地址| 交易名|ip地址|mac地址|返回結果| 交易名|ip地址|mac地址|
所以采用5種正則規則去匹配,logstash默認會從上到下按規則去匹配,直到匹配上為止。(日志中的多行錯誤信息,匹配不上,logstash會在tags字段添加”_ grokparsefailure”,所以后面輸出的時候會用if條件判斷過濾掉解析失敗的行消息)
注意:5種正則規則的上下順序,下面的規則放在上面會導致可能內容解析不全,比如源數據是:請求交易名|操作員登錄名|操作員編號|ip地址|mac地址|返回結果|異常信息,如果按照“請求交易名|ip地址|mac地址|”規則去匹配,只能識別出3個字段,而且匹配成功,不繼續往下執行,這樣識別的內容就不全。
logstash內置了很多正則匹配規則,用戶可以直接調用這些規則來解析,例如%{WORD:result} 表示調用WORD規則(即識別字符串規則)來解析並最后賦值給result字段(result字段會自動創建)。
下面以第一條match規則為例來說明:
match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:opeType}\|%{WORD:name}\|Oid: %{WORD:oid}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|%{WORD:result}\|%{GREEDYDATA:exception}\|" }
首先行首使用DATESTAMP_CN規則來識別時間,並賦值給logdate字段名;然后.*識別任意字符串(.代表任意一個字符,包括特殊字符,*代表個數是任意個);然后使用WORD規則(即匹配字符串規則,不包含特殊字符)識別到字符串並賦值給opeType字段;后面同理。這些WORD、IP、GREEDYDATA規則都是logstash內部grok-patterns文件已經定義好了的規則。用戶可以直接拿來使用。
注意:[@metadata]表示logdate這個字段在數據處理過程中只是一個臨時字段,最后不會真的輸出。避免了使用remove_field手動移除字段。
注意:logstash默認不支持”YYYY-MM-dd HH:mm:ss,SSS”格式的時間匹配,需要自己定義正則表達式到logstash-2.3.1/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns/grok-patterns文件中。grok-patterns文件中追加2行內容:
DATE_CN %{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}
DATESTAMP_CN %{DATE_CN} %{TIME}
注意:logstash的正則表達式采用ruby語言正則表達式,具體語法可以參考網上。
remove_field => "message"表示解析完成之后刪除原來的 message字段,避免重復。
date插件
match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]
logstash默認的時間字段是@timestamp,如果不設置的話,默認是數據采集時候的時間,這里我們將日志打印的時間(即解析出的logdate字段的內容)設置為@timestamp內容,方便之后kibana根據時間檢索。
注意:解析出來的@timestamp會比實際時間早8個小時,這是內置utc時間格式問題,kibana頁面展示的時候會根據瀏覽器當前時區自動轉換回來,這里不作處理。
output
if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] { stdout {codec => rubydebug} elasticsearch { index => "log4j-tradelog" hosts => ["134.7.1.67:9200"] manage_template => true template_overwrite => true template => "/home/elk/myconf/tradelog_template.json" } }
前面提到過,如果grok解析失敗,會在tags字段自動添加_grokparsefailure值,如果date解析失敗,會在tags字段自動添加_dateparsefailure值。所以最后的輸出,我們采用條件過濾掉解析失敗的行內容。最終的每一行內容解析成json,一路存入elasticsearch,另一路進行標准輸出。
elasticsearch插件
index:要導入的es索引
host:es地址,有多個節點配置多個節點
template:指定elasticsearch的mapping模板文件,如果該索引不存在,logstash會根據這個mapping模板去自動創建索引。
stdout插件
rubydebug標准輸出,便於調試,可以不使用該插件。
最終解析出結果示例如下:
{ "@version" => "1", "@timestamp" => "2016-05-09T01:44:48.366Z", "path" => "/home/elk/e.log", "host" => "ccc7", "type" => "tradelog", "opeType" => "WZQry", "name" => "lhcsssz2", "oid" => "abzzak", "ip" => "192.168.44.105", "mac" => "A1345C05-26C1-4253-8845-01CFCA8EC4FD", "result" => "Success" }
error.log采集
日志實例:
2016-09-29 17:13:24,184|ncid=1100343164|oid=acaatv|loginName=zhenglw1|transId=Withdraw|traceId=N/A-_A-88C4D-043|exceptType=com.intenft.exception.AppRTException|exceptCode=CORESYST_TXN_NATIVE_89042|exceptMsg=對不起!記錄沒有找到
配置文件如下:
input { file { path => "/home/elk/his/error.log*" type => "errorlog" start_position => "beginning" discover_interval => 5 codec => multiline { charset => "GB2312" pattern => "^%{DATESTAMP_CN}" negate => true what => "next" } sincedb_path => "/home/elk/myconf/sincedb_error.txt" sincedb_write_interval => 15 } } filter { grok { match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]}%{GREEDYDATA:[@metadata][keyvalue]}" } remove_field => "message" } date { match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"] } kv { source => "[@metadata][keyvalue]" field_split => "\|" value_split => "=" } } output { if "multiline" in [tags] { stdout {codec => rubydebug} elasticsearch { index => "log4j-errorlog-3" hosts => ["168.7.1.67:9200"] manage_template => true template_overwrite => true template => "/home/elk/myconf/errorlog_template.json" } } }
input
8. start_position:設置beginning保證從文件開頭讀取數據。
9. path:填入文件路徑。
10. type:自定義類型為tradelog,由用戶任意填寫。
11. codec:multiline插件
12. discover_interval:每隔多久去檢查一次被監聽的 path 下是否有新文件,默認值是15秒
13. sincedb_path:設置記錄源文件讀取位置的文件,默認為文件所在位置的隱藏文件。
14. sincedb_write_interval:每隔15秒記錄一下文件讀取位置
multiline插件
logstash默認讀取一行內容為一個消息,因為錯誤日志包含堆棧信息,多行對應一個消息,所以使用該插件合並多行為一條消息。
pattern:以”YYYY-MM-dd HH:mm:ss,SSS”格式開頭的匹配為一條消息。
negate:true 表示正向使用該patttern
what:匹配到的日期屬於下一條消息
charset:設置文件編碼
filter
grok插件
匹配日期到logdata字段,匹配剩下的所有字符串到keyvalue臨時字段,”GREEDYDATA”正則表達式為”.*”
date插件
match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]
logstash默認的時間字段是@timestamp,如果不設置的話,默認是數據采集時候的時間,這里我們將日志打印的時間(即解析出的logdate字段的內容)設置為@timestamp內容,方便之后kibana根據時間檢索。
注意:解析出來的@timestamp會比實際時間早8個小時,這是內置utc時間格式問題,kibana頁面展示的時候會根據瀏覽器當前時區自動轉換回來,這里不作處理。
kv插件
source:解析前面grok獲取的keyvalue字段
(比如:|ncid=1100783164|oid=acaatv|loginName=zhew1|transId=Withdraw|traceId=N/A-_A-88C4D-043|exceptType=com.inteft.exception.AppRTException|exceptCode=CORESYST_TXN_NATIVE_89042|exceptMsg=對不起!記錄沒有找到)
field_split:按”|”切分key-value對
value_split:按”=”切分key 和 value,最終切分出來key作為字段名,value作為字段值
output
output { if "multiline" in [tags] { stdout {codec => rubydebug} elasticsearch { index => "log4j-errorlog-3" hosts => ["168.7.1.67:9200"] manage_template => true template_overwrite => true template => "/home/elk/myconf/errorlog_template.json" } } }
該日志有2種格式的日志,一種是單行的錯誤信息日志,一種是多行的包含堆棧信息的日志,這2種日志內容重復,那么只需要解析單行格式的日志。kv插件解析多行格式的日志時, tags字段里沒有”multipline”值(原因是因為grok解析的時候不能解析換行符),所以可以通過if條件判斷tags字段是否有”multipline”值,來過濾掉多行格式的日志。
elasticsearch插件
index:要導入的es索引
host:es地址,有多個節點配置多個節點
template:指定elasticsearch的mapping模板文件,如果該索引不存在,logstash會根據這個mapping模板去自動創建索引。
最終解析的結果示例如下:
{ "@timestamp" => "2016-09-29T09:14:22.194Z", "@version" => "1", "tags" => [ [0] "multiline" ], "path" => "/home/elk/stst.log", "host" => "ci7", "type" => "sttlog", "ncid" => "1143164", "oid" => "acav", "loginName" => "zhew1", "transId" => "MyQuery", "traceId" => "N/A8C4E-047", "exceptType" => "com.exception.AppRTException", "exceptCode" => "CORESYNATIVE_82243", "exceptMsg" => "對不起!根據賬號獲取客戶信息錯誤" }
總結:
注意:
logstash filter中的每一個插件都有add_field,remove_field,add_tag,remove_tag 4個功能。
附錄:
mapping模板文件
tradelog:
{ "template": "log4j-tradelog*", "settings": { "index.number_of_shards": 3, "number_of_replicas": 0 }, "mappings": { "tradelog": { "_all": { "enabled": false }, "properties": { "@timestamp": { "type": "date", "format": "strict_date_optional_time||epoch_millis", "doc_values": true }, "@version": { "type": "string", "index": "not_analyzed" }, "exception": { "type": "string", "index": "analyzed" }, "path": { "type": "string", "index": "not_analyzed" }, "host": { "type": "string", "index": "not_analyzed" }, "ip": { "type": "ip", "index": "not_analyzed" }, "logger_name": { "type": "string", "index": "not_analyzed" }, "mac": { "type": "string", "index": "not_analyzed" }, "name": { "type": "string", "index": "not_analyzed" }, "oid": { "type": "string", "index": "not_analyzed" }, "opeType": { "type": "string", "index": "not_analyzed" }, "priority": { "type": "string", "index": "not_analyzed" }, "result": { "type": "string", "index": "not_analyzed" }, "type": { "type": "string", "index": "not_analyzed" } } } } }
errorlog:
{ "template": "log4j-errorlog*", "settings": { "index.number_of_shards": 3, "number_of_replicas": 0 }, "mappings": { "errorlog": { "_all": { "enabled": false }, "properties": { "host": { "type": "string", "index": "not_analyzed" }, "ncid": { "type": "string", "index": "not_analyzed" }, "type": { "type": "string", "index": "not_analyzed" }, "@version": { "type": "string", "index": "not_analyzed" }, "exceptType": { "type": "string", "index": "not_analyzed" }, "@timestamp": { "format": "strict_date_optional_time||epoch_millis", "type": "date" }, "exceptCode": { "type": "string", "index": "not_analyzed" }, "transId": { "type": "string", "index": "not_analyzed" }, "priority": { "type": "string", "index": "not_analyzed" }, "oid": { "type": "string", "index": "not_analyzed" }, "traceId": { "type": "string", "index": "not_analyzed" }, "exceptMsg": { "type": "string", "index": "analyzed" }, "path": { "type": "string", "index": "not_analyzed" }, "logger_name": { "type": "string", "index": "not_analyzed" }, "loginName": { "type": "string", "index": "not_analyzed" } } } } }