filebeat&logstash配置文件詳解


filebeat配置文件詳解

filebeat.prospectors:
#日志類型
- type: log enabled: True
# 日志路徑可以寫多個,支持通配符 paths:
- /tmp/test.log
#設置字符集編碼 encoding: utf
-8
#文檔類型(6.x已經開始棄用) document_type: my-nginx-log
#每十秒掃描一次,
如果設置為0s,則Filebeat會盡可能快地感知更新(占用的CPU會變高)。默認是10s
scan_frequency: 10s
# 實際讀取文件時,每次讀取 16384 字節
  harverster_buffer_size: 16384
# 一次發生的log大小值; max_bytes: 10485760
#給日志加上tags方便在logstash過濾日志的時候做判斷 tags: ["nginx-access"]
#剔除以.gz 結尾的文件 exclude_files: [
".gz$"]
# 包含輸入中符合正則表達式列表的那些行(默認包含所有行),include_lines執行完畢之后會執行exclude_lines
  include_lines: ["^ERR", "^WARN"]
# 在輸入中排除符合正則表達式列表的那些行。
 exclude_lines: ["^DBG"]
# 這個得意思就是會在es中多添加一個字段,格式為 "filelds":{"level":"debug"}
  fields:
    level: debug
    review: 1
 # 如果該選項設置為true,則新增fields成為頂級目錄,而不是將其放在fields目錄下。自定義的field會覆蓋filebeat默認的field 
#如果設置為true,則在es中新增的字段格式為:"level":"debug" fields_under_root: false
# 可以指定Filebeat忽略指定時間段以外修改的日志內容,比如2h(兩個小時)或者5m(5分鍾)。
 ignore_older: 0
# 如果一個文件在某個時間段內沒有發生過更新,則關閉監控的文件handle。默認1h
 close_older: 1h
# for Java Stack Traces or C-Line Continuation
# 適用於日志中每一條日志占據多行的情況,比如各種語言的報錯信息調用棧
multiline:
# 多行日志開始的那一行匹配的pattern
 pattern: ^\[
# 是否需要對pattern條件轉置使用,不翻轉設為true,反轉設置為false。  【建議設置為true】
 negate: false
# 匹配pattern后,與前面(before)還是后面(after)的內容合並為一條日志
 match: after
# 合並的最多行數(包含匹配pattern的那一行)
max_lines: 500
# 到了timeout之后,即使沒有匹配一個新的pattern(發生一個新的事件),也把已經匹配的日志事件發送出去
timeout: 5s
# 如果設置為true,Filebeat從文件尾開始監控文件新增內容,把新增的每一行文件作為一個事件依次發送,而不是從文件開始處重新發送所有內容
 tail_files: false
# Filebeat檢測到某個文件到了EOF(文件結尾)之后,每次等待多久再去檢測文件是否有更新,默認為1s
 backoff: 1s
# 如果要在本配置文件中引入其他位置的配置文件,可以寫在這里(需要寫完整路徑),但是只處理prospector的部分
 config_dir:

#output 部分的配置文檔

#配置輸出為kafka,無論你是tar包還是rpm安裝,目錄里邊會看到官方提供的filebeat.full.yml OR filebeat.reference.yml 這里邊有filebeat 所有的input 方法和output 方法供你參考; output.kafka: enabled:
true
#kafka 的server,可以配置集群,例子如下: hosts:["ip:9092","ip2:9092","ip3:9092"]
#這個非常重要filebeat作為provider,把數據輸入到kafka里邊,logstash 作為消費者去消費這些信息,logstash的input 中需要這個topic,不然logstash沒有辦法取到數據。 topic: elk
-%{[type]}
#
kafka 的並發運行進程 worker: 2
#當傳輸給kafka 有問題的時候,重試的次數; max_retries: 3
#單個kafka請求里面的最大事件數,默認2048 bulk_max_size: 2048
#等待kafka broker響應的時間,默認30s timeout: 30s
#kafka broker等待請求的最大時長,默認10s broker_timeout: 10s
#每個kafka broker在輸出管道中的消息緩存數,默認256 channel_buffer_size:
256
#網絡連接的保活時間,默認為0,不開啟保活機制 keep_alive: 60
#輸出壓縮碼,可選項有none, snappy, lz4 and gzip,默認為gzip (kafka支持的壓縮,數據會先被壓縮,然后被生產者發送,並且在服務端也是保持壓縮狀態,只有在最終的消費者端才會被解壓縮 compression: gzip
#允許的最大json消息大小,默認為1000000,超出的會被丟棄,應該小於broker的  message.max.bytes(broker能接收消息的最大字節數) max_message_bytes:
1000000
#kafka的響應返回值,0位無等待響應返回,繼續發送下一條消息;1表示等待本地提交(leader broker已經成功寫入,但follower未寫入),-1表示等待所有副本的提交,默認為1 required_acks: 0
#客戶端ID 用於日志排錯,審計等,默認是beats。 client_id: beats
#測試配置文件
/opt/elk/filebeat/filebeat -c /opt/elk/filebeat/filebeat.yml test config
#如果配置文件沒有問題的話,會出現config ok ,如果有問題會提示具體問題在哪里。
#啟動filebeat 可以先通過
/filebeat -c -e filebeat.yml 查看一下輸入filebeat是否工作正常,會有很多信息打印到屏幕上; nohup filebeat -c filebeat.yml >>/dev/null 2>&1&

 

logstash

kafka配置詳解

input {
#數據來源
  kafka {
   #這個對應filebeat的output 的index
    topics_pattern => "elk-.*"
#kafka 的配置 bootstrap_servers => "IP1:9092,IP2:9092,IP3:9092"
#kafka 中的group ID group_id => "logstash-g1" } } #過濾器,http://grokdebug.herokuapp.com/ 是在線調試logstash filter的工具,你可以在線調試你的過濾規則。 filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}
   #因為我們取出了字段,所以不需要原來這個message字段,這個字段里邊包含之前beat 輸入的所有字段。
remove_field
=> ["message"] } } output { elasticsearch { hosts => ["IP:9200"]
#這個是在kibana上的index Patterns 的索引,建議什么服務就用干什么名字,因為beat 提供了些kibana的模板可以導入,導入的模板匹配的時候用的索引是對應服務的名稱開頭 。 index
=> "nginx-%{+YYYY.MM.dd}" document_type => "nginx"
#每次20000 發送一次數據到elasticsearch flush_size => 20000
#如果不夠20000,沒10秒會發送一次數據;、 idle_flush_time =>10 } }

 

file配置文件詳解

input {
#file可以多次使用,也可以只寫一個file而設置它的path屬性配置多個文件實現多文件監控 file {
#type是給結果增加了一個屬性叫type值為
"<xxx>"的條目這里的type,對應了ES中index中的type即如果輸入ES時,沒有指定type,那么這里的type將作為ES中index的type。 type => "apache-access" path => "/apphome/ptc/Windchill_10.0/Apache/logs/access_log*"
#start_position可以設置為beginning或者endbeginning表示從頭開始讀取文件,end表示讀取最新的,這個也要和ignore_older一起使用。 start_position => beginning
#sincedb_path表示文件讀取進度的記錄每行表示一個文件,每行有兩個數字第一個表示文件的inode第二個表示文件讀取到的位置(byteoffset)默認為$HOME
/.sincedb* sincedb_path => "/opt/logstash-2.3.1/sincedb_path/access_progress"
#ignore_older表示了針對多久的文件進行監控,默認一天,單位為秒,可以自己定制,比如默認只讀取一天內被修改的文件。 ignore_older => 604800
#add_field增加屬性。這里使用了${HOSTNAME},即本機的環境變量如果要使用本機的環境變量,那么需要在啟動命令上加--alow-env。 add_field => {"log_hostname"=>"${HOSTNAME}"}
#這個值默認是\n 換行符,如果設置為空
"",那么后果是每個字符代表一個event delimiter => ""
#這個表示關閉超過(默認)3600秒后追蹤文件。這個對於multiline來說特別有用。... 這個參數和logstash對文件的讀取方式有關,兩種方式read tail,如果是read close_older => 3600 coodec => multiline { pattern => "^\s"
#這個negate是否定的意思,意思跟pattern相反,也就是不滿足patter的意思。 # negate => ""
#what有兩個值可選 previous和next,舉例說明,java的異常從第二行以空格開始,這里就可以pattern匹配空格開始,what設置為previous意思是空格開頭這行跟上一行屬於同一event。
    #另一個例子,有時候一條命令太長,當以\結尾時表示這行屬於跟下一行屬於同一event,這時需要使用negate=>true,what=>'next'
what => "previous"
auto_flush_interval => 60 } } file { type => "methodserver-log" path => "/apphome/ptc/Windchill_10.0/Windchill/logs/MethodServer-1604221021-32380.log" start_position => beginning sincedb_path => "/opt/logstash-2.3.1/sincedb_path/methodserver_process" # ignore_older => 604800 } } filter{ #執行ruby程序,下面例子是將日期轉化為字符串賦予daytag ruby { code => "event['daytag'] = event.timestamp.time.localtime.strftime('%Y-%m-%d')" }
#
if [path] =~ "access" {} else if [path] =~ "methodserver" {} else if [path] =~ "servermanager" {} else {} 注意語句結構 if [path] =~ "MethodServer" {             #這里的=~是匹配正則表達式 grok { patterns_dir => ["/opt/logstash-2.3.1/patterns"]       #自定義正則匹配 # Tue 4/12/16 14:24:17: TP-Processor2: hirecode---->77LS match => { "message" => "%{DAY:log_weekday} %{DATE_US:log_date} %{TIME:log_time}: %{GREEDYDATA:log_data}"} } #mutage是做轉換用的 mutate { replace => { "type" => "apache" }       #替換屬性值 convert => {                 #類型轉換 "bytes" => "integer"         #例如還有float "duration" => "integer" "state" => "integer" } #date主要是用來處理文件內容中的日期的。內容中讀取的是字符串,通過date將它轉換為@timestamp。
   #參考https:
//www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match # date { # match => [ "logTime" , "dd/MMM/yyyy:HH:mm:ss Z" ] # } }else if [type] in ['tbg_qas','mbg_pre'] { # if ... else if ... else if ... else結構 }else { drop{} # 將event丟棄 } } output { stdout{ codec=>rubydebug}                   # 直接輸出,調試用起來方便

# 輸出到redis redis { host
=> '10.120.20.208' data_type => 'list' key => '10.99.201.34:access_log_2016-04' }
# 輸出到ES elasticsearch { hosts
=>"192.168.0.15:9200" index => "%{sysid}_%{type}" document_type => "%{daytag}" } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM