場景:
1) datasource->logstash->elasticsearch->kibana
2) datasource->filebeat->logstash-> elasticsearch->kibana
3) datasource->filebeat->logstash->redis/kafka->logstash-> elasticsearch->kibana
4) kafka->logstash-> elasticsearch->kibana
5) datasource->filebeat->kafka->logstash->elasticsearch->kibana(最常用)
6) filebeatSSL加密傳輸
7) datasource->logstash->redis/kafka->logstash->elasticsearch->kibana
8) mysql->logstash->elasticsearch->kibana
上述主要是對下面傳輸處理場景的一個概括,從數據源開始,如何采集,用什么工具采集,采集到哪里,經過怎樣的處理過濾,傳輸到哪里,怎樣進行展示
輸入、輸出、過濾主要通過插件實現(包含多類型插件),插件教程參考官網
https://www.elastic.co/guide/en/logstash/current/index.html
【安裝部署這種官網或者社區已經很完善,此處不做贅述,可自行去官網查看】
ps【redis集群安裝文檔前面已經說明過,可自行查看】
前提條件
1) java環境:jdk8;
2) elk已搭建完畢;
3) elasticsearch、kibana、logstash版本最好保持一致,目前環境是5.6.10版本
4) logstash建議使用root用戶(擁有足夠權限去搜集所需日志文件);
5) elasticsearch使用普通用戶安裝,新版本已限制不允許root安裝;
6) filebeat安裝完畢
啟動命令:
7) logstash啟動命令:
nohup ./bin/logstash -f ***.conf –config.reload.automatic >/dev/null 2>/dev/null &
8) filebeat啟動命令: nohup ./filebeat -e -c filebeat.yml>/dev/null 2>/dev/null &
9)elasticsearch啟動命令:./elasticsearch -d
10)kibana啟動命令:nohup ./bin/kibana &
Logstash啟動命令:--config.reload.automatic自動重新加載配置文件,無需重啟logstash
filebeat啟動命令:-e參數指定輸出日志到stderr,-c參數指定配置文件路徑
場景介紹
一、 簡單模式:以logstash作為日志搜索器
架構:logstash采集、處理、轉發到elasticsearch存儲,在kibana進行展示
特點:這種結構因為需要在各個服務器上部署 Logstash,而它比較消耗 CPU 和內存資源,所以比較適合計算資源豐富的服務器,否則容易造成服務器性能下降,甚至可能導致無法正常工作。
Demo1:
test1.conf:
控制台輸入,不經過任何處理轉換(僅傳輸),輸出到控制台(或者elasticsearch、文件----自行選擇):
#控制台輸入 input { stdin { } } output { #codec輸出到控制台 stdout { codec=> rubydebug } #輸出到elasticsearch elasticsearch { hosts => "node18:9200" codec => json } #輸出到文件 file { path => "/usr/local/logstash-5.6.10/data/log/logstash/all.log" #指定寫入文件路徑 flush_interval => 0 # 指定刷新間隔,0代表實時寫入 codec => json } } |
二、 安全模式:beats(Filebeat、Metricbeat、Packetbeat、Winlogbeat等)作為日志搜集器
Packetbeat(搜集網絡流量數據);
Topbeat(搜集系統、進程和文件系統級別的 CPU 和內存使用情況等數據);
Filebeat(搜集文件數據)-------最常用
Winlogbeat(搜集 Windows 事件日志數據)。
架構:
工作模式:Beats 將搜集到的數據發送到 Logstash,經 Logstash 解析、過濾后,將其發送到 Elasticsearch 存儲,並由 Kibana 呈現給用戶;
模式特點:這種架構解決了 Logstash 在各服務器節點上占用系統資源高的問題。相比 Logstash,Beats 所占系統的 CPU 和內存幾乎可以忽略不計。另外,Beats 和 Logstash 之間支持 SSL/TLS 加密傳輸,客戶端和服務器雙向認證,保證了通信安全。
因此這種架構適合對數據安全性要求較高,同時各服務器性能比較敏感的場景
Demo2:
filebeat.yml:
################# Filebeat Configuration Example ########################
# This file is an example configuration file highlighting only the most common # options. The filebeat.full.yml file from the same directory contains all the # supported options with more comments. You can use it as a reference. # # You can find the full configuration reference here: # https://www.elastic.co/guide/en/beats/filebeat/index.html
#===================== Filebeat prospectors =====================
filebeat.prospectors:
# Each - is a prospector. Most options can be set at the prospector level, so # you can use different prospectors for various configurations. # Below are the prospector specific configurations.
- input_type: log
# Paths that should be crawled and fetched. Glob based paths. paths: - /home/admin/helloworld/logs/*.log #- c:\programdata\elasticsearch\logs\*
# Exclude lines. A list of regular expressions to match. It drops the lines that are # matching any regular expression from the list. #exclude_lines: ["^DBG"]
# Include lines. A list of regular expressions to match. It exports the lines that are # matching any regular expression from the list. #include_lines: ["^ERR", "^WARN"]
# Exclude files. A list of regular expressions to match. Filebeat drops the files that # are matching any regular expression from the list. By default, no files are dropped. #exclude_files: [".gz$"]
# Optional additional fields. These field can be freely picked # to add additional information to the crawled log files for filtering #fields: # level: debug # review: 1
### Multiline options
# Mutiline can be used for log messages spanning multiple lines. This is common # for Java Stack Traces or C-Line Continuation
# The regexp Pattern that has to be matched. The example pattern matches all lines starting with [ #multiline.pattern: ^\[
# Defines if the pattern set under pattern should be negated or not. Default is false. #multiline.negate: false
# Match can be set to "after" or "before". It is used to define if lines should be append to a pattern # that was (not) matched before or after or as long as a pattern is not matched based on negate. # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash #multiline.match: after
#====================== General =============================
# The name of the shipper that publishes the network data. It can be used to group # all the transactions sent by a single shipper in the web interface. #name:
# The tags of the shipper are included in their own field with each # transaction published. #tags: ["service-X", "web-tier"]
# Optional fields that you can specify to add additional information to the # output. #fields: # env: staging
#======================= Outputs ===========================
# Configure what outputs to use when sending the data collected by the beat. # Multiple outputs may be used.
#-------------------------- Elasticsearch output ------------------------------ #output.elasticsearch: # Array of hosts to connect to. # hosts: ["localhost:9200"]
# Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" #password: "changeme"
#--------------------------- Logstash output -------------------------------- output.logstash: # The Logstash hosts hosts: ["192.168.80.34:5044"]
# Optional SSL. By default is off. # List of root certificates for HTTPS server verifications #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication #ssl.certificate: "/etc/pki/client/cert.pem"
# Client Certificate Key #ssl.key: "/etc/pki/client/cert.key"
#=========================== Logging =======================
# Sets log level. The default log level is info. # Available log levels are: critical, error, warning, info, debug #logging.level: debug
# At debug level, you can selectively enable logging only for some components. # To enable all selectors use ["*"]. Examples of other selectors are "beat", # "publish", "service". #logging.selectors: ["*"]
|
34服務器----test2.conf:
input { beats { port => 5044 codec => "json" } } #filters{ #…………(后續進行說明) #}
output { # 輸出到控制台 # stdout { }
# 輸出到redis redis { host => "192.168.80.32" # redis主機地址 port => 6379 # redis端口號 password => "123456" # redis 密碼 #db => 8 # redis數據庫編號 data_type => "channel" # 使用發布/訂閱模式 key => "logstash_list_0" # 發布通道名稱 } #輸出到kafka kafka { bootstrap_servers => "192.168.80.42:9092" topic_id => "test" } #輸出到es elasticsearch { hosts => "node18:9200" codec => json } } |
三、 消息模式:Beats 還不支持輸出到消息隊列(新版本除外:5.0版本及以上),所以在消息隊列前后兩端只能是 Logstash 實例。logstash從各個數據源搜集數據,不經過任何處理轉換僅轉發出到消息隊列(kafka、redis、rabbitMQ等),后logstash從消息隊列取數據進行轉換分析過濾,輸出到elasticsearch,並在kibana進行圖形化展示
架構(Logstash進行日志解析所在服務器性能各方面必須要足夠好):
模式特點:這種架構適合於日志規模比較龐大的情況。但由於 Logstash 日志解析節點和 Elasticsearch 的負荷比較重,可將他們配置為集群模式,以分擔負荷。引入消息隊列,均衡了網絡傳輸,從而降低了網絡閉塞,尤其是丟失數據的可能性,但依然存在 Logstash 占用系統資源過多的問題
工作流程:Filebeat采集—> logstash轉發到kafka—> logstash處理從kafka緩存的數據進行分析—> 輸出到es—> 顯示在kibana
Msg1.conf:
input { beats { port => 5044 codec => "json" } syslog{ } }
#filter{ # #}
output { # 輸出到控制台 # stdout { }
# 輸出到redis redis { host => "192.168.80.32" # redis主機地址 port => 6379 # redis端口號 password => "123456" # redis 密碼 #db => 8 # redis數據庫編號 data_type => "channel" # 使用發布/訂閱模式 key => "logstash_list_0" # 發布通道名稱 } #輸出到kafka kafka { bootstrap_servers => "192.168.80.42:9092" topic_id => "test" } } |
Msg2.conf:
input{ kafka { bootstrap_servers => "192.168.80.42:9092" topics => ["test"] #decroate_events => true group_id => "consumer-test"(消費組) #decroate_events => true auto_offset_reset => "earliest"(初始消費,相當於from beginning,不設置,相當於是監控啟動后的kafka的消息生產) } } #filter{ #} output { elasticsearch { hosts => "192.168.80.18:9200" codec => json } } |
四、logstash從kafka消息隊列直接讀取數據並處理、輸出到es(因為從kafka內部直接讀取,相當於是已經在緩存內部,直接logstash處理后就可以進行輸出,輸出到文件、es等)
工作模式:【數據已存在kafka對應主題內】單獨的logstash,kafka讀取,經過處理輸出到es並在kibana進行展示
input{ kafka { bootstrap_servers => "192.168.80.42:9092" topics => ["test"] group_id => "consumer-test" #decroate_events => true auto_offset_reset => "earliest" }
} #flter{ # #}
elasticsearch { hosts => "192.168.80.18:9200" codec => json }
} |
五、filebeat新版本(5.0以上)支持直接支持輸出到kafka,而無需經過logstash接收轉發到kafka
Filebeat采集完畢直接入到kafka消息隊列,進而logstash取出數據,進行處理分析輸出到es,並在kibana進行展示。
filebeat.yml:
################# Filebeat Configuration Example #########################
# This file is an example configuration file highlighting only the most common # options. The filebeat.full.yml file from the same directory contains all the # supported options with more comments. You can use it as a reference. # # You can find the full configuration reference here: # https://www.elastic.co/guide/en/beats/filebeat/index.html
#================== Filebeat prospectors===========================
filebeat.prospectors:
# Each - is a prospector. Most options can be set at the prospector level, so # you can use different prospectors for various configurations. # Below are the prospector specific configurations.
- input_type: log
# Paths that should be crawled and fetched. Glob based paths. paths: - /home/admin/helloworld/logs/*.log #- c:\programdata\elasticsearch\logs\*
# Exclude lines. A list of regular expressions to match. It drops the lines that are # matching any regular expression from the list. #exclude_lines: ["^DBG"]
# Include lines. A list of regular expressions to match. It exports the lines that are # matching any regular expression from the list. #include_lines: ["^ERR", "^WARN"]
# Exclude files. A list of regular expressions to match. Filebeat drops the files that # are matching any regular expression from the list. By default, no files are dropped. #exclude_files: [".gz$"]
# Optional additional fields. These field can be freely picked # to add additional information to the crawled log files for filtering #fields: # level: debug # review: 1
### Multiline options
# Mutiline can be used for log messages spanning multiple lines. This is common # for Java Stack Traces or C-Line Continuation
# The regexp Pattern that has to be matched. The example pattern matches all lines starting with [ #multiline.pattern: ^\[
# Defines if the pattern set under pattern should be negated or not. Default is false. #multiline.negate: false
# Match can be set to "after" or "before". It is used to define if lines should be append to a pattern # that was (not) matched before or after or as long as a pattern is not matched based on negate. # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash #multiline.match: after
#============================ General=========================
# The name of the shipper that publishes the network data. It can be used to group # all the transactions sent by a single shipper in the web interface. #name:
# The tags of the shipper are included in their own field with each # transaction published. #tags: ["service-X", "web-tier"]
# Optional fields that you can specify to add additional information to the # output. #fields: # env: staging
#======================== Outputs ============================
# Configure what outputs to use when sending the data collected by the beat. # Multiple outputs may be used.
#-------------------------- Elasticsearch output ------------------------------ #output.elasticsearch: # Array of hosts to connect to. # hosts: ["localhost:9200"]
# Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" #password: "changeme"
#----------------------------- Logstash output -------------------------------- #output.logstash: # The Logstash hosts # hosts: ["192.168.80.34:5044"]
#-----------------------------kafka output----------------------------------- #output.kafka: # enabled: true # hosts: ["192.168.80.42:9092,192.168.80.43:9092,192.168.80.44:9092"] # topics: 'test' output.kafka: hosts: ["192.168.80.42:9092"] topic: test required_acks: 1
# Optional SSL. By default is off. # List of root certificates for HTTPS server verifications #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication #ssl.certificate: "/etc/pki/client/cert.pem"
# Client Certificate Key #ssl.key: "/etc/pki/client/cert.key"
#======================== Logging ============================
# Sets log level. The default log level is info. # Available log levels are: critical, error, warning, info, debug #logging.level: debug
# At debug level, you can selectively enable logging only for some components. # To enable all selectors use ["*"]. Examples of other selectors are "beat", # "publish", "service". #logging.selectors: ["*"] |
logstash.conf:
input{ kafka { bootstrap_servers => "192.168.80.42:9092" topics => ["test"] group_id => "consumer-test" #decroate_events => true auto_offset_reset => "earliest" }
} #flter{ # #}
elasticsearch { hosts => "192.168.80.18:9200" codec => json }
} |
|
六、SSL加密傳輸(增強安全性,僅配置了秘鑰和證書的filebeat服務器和logstash服務器才能進行日志文件數據的傳輸):
參考文檔: https://blog.csdn.net/zsq12138/article/details/78753369
參考文檔:https://blog.csdn.net/Gamer_gyt/article/details/69280693?locationNum=5&fps=1
Logstash的配置文件:
注釋:
ssl_certificate_authorities :filebeat端傳來的證書所在位置
ssl_certificate => 本端生成的證書所在的位置
ssl_key => /本端生成的密鑰所在的位置
ssl_verify_mode => "force_peer"
beat.conf:
input { beats { port => 5044 codec => "json" ssl => true ssl_certificate_authorities => ["/usr/local/logstash-5.6.10/pki/tls/certs/filebeat.crt"] ssl_certificate => "/usr/local/logstash-5.6.10/pki/tls/certs/logstash.crt" ssl_key => "/usr/local/logstash-5.6.10/pki/tls/private/logstash.key" ssl_verify_mode => "force_peer"#(需與 } syslog{ } }
output { # 輸出到控制台 # stdout { }
# 輸出到redis redis { host => "192.168.80.32" # redis主機地址 port => 6379 # redis端口號 password => "123456" # redis 密碼 #db => 8 # redis數據庫編號 data_type => "channel" # 使用發布/訂閱模式 key => "logstash_list_0" # 發布通道名稱 } #輸出到kafka kafka { bootstrap_servers => "192.168.80.42:9092" topic_id => "test" } #輸出到es elasticsearch { hosts => "node18:9200" codec => json }
} |
filebeat的配置文件:
filebeat.yml:
################ #Filebeat Configuration Example #####################
# This file is an example configuration file highlighting only the most common # options. The filebeat.full.yml file from the same directory contains all the # supported options with more comments. You can use it as a reference. # # You can find the full configuration reference here: # https://www.elastic.co/guide/en/beats/filebeat/index.html
#=================== Filebeat prospectors ========================
filebeat.prospectors:
# Each - is a prospector. Most options can be set at the prospector level, so # you can use different prospectors for various configurations. # Below are the prospector specific configurations.
- input_type: log
# Paths that should be crawled and fetched. Glob based paths. paths: - /home/admin/helloworld/logs/*.log #- c:\programdata\elasticsearch\logs\*
# Exclude lines. A list of regular expressions to match. It drops the lines that are # matching any regular expression from the list. #exclude_lines: ["^DBG"]
# Include lines. A list of regular expressions to match. It exports the lines that are # matching any regular expression from the list. #include_lines: ["^ERR", "^WARN"]
# Exclude files. A list of regular expressions to match. Filebeat drops the files that # are matching any regular expression from the list. By default, no files are dropped. #exclude_files: [".gz$"]
# Optional additional fields. These field can be freely picked # to add additional information to the crawled log files for filtering #fields: # level: debug # review: 1
### Multiline options
# Mutiline can be used for log messages spanning multiple lines. This is common # for Java Stack Traces or C-Line Continuation
# The regexp Pattern that has to be matched. The example pattern matches all lines starting with [ #multiline.pattern: ^\[
# Defines if the pattern set under pattern should be negated or not. Default is false. #multiline.negate: false
# Match can be set to "after" or "before". It is used to define if lines should be append to a pattern # that was (not) matched before or after or as long as a pattern is not matched based on negate. # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash #multiline.match: after
#======================== General ============================
# The name of the shipper that publishes the network data. It can be used to group # all the transactions sent by a single shipper in the web interface. #name:
# The tags of the shipper are included in their own field with each # transaction published. #tags: ["service-X", "web-tier"]
# Optional fields that you can specify to add additional information to the # output. #fields: # env: staging
#========================= Outputs ===========================
# Configure what outputs to use when sending the data collected by the beat. # Multiple outputs may be used.
#----------------------------- Elasticsearch output ------------------------------ #output.elasticsearch: # Array of hosts to connect to. # hosts: ["localhost:9200"]
# Optional protocol and basic auth credentials. #protocol: "https" #username: "elastic" #password: "changeme"
#----------------------------- Logstash output -------------------------------- output.logstash: # The Logstash hosts hosts: ["192.168.80.18:5044"] #加密傳輸 ssl.certificate_authorities: ["/usr/local/filebeat-5.6.10/pki/tls/certs/logstash.crt"] ssl.certificate: "/usr/local/filebeat-5.6.10/pki/tls/certs/filebeat.crt" ssl.key: "/usr/local/filebeat-5.6.10/pki/tls/private/filebeat.key"
#----------------------------- kafka output----------------------------------- #output.kafka: # hosts: ["192.168.80.42:9092"] # topic: test # required_acks: 1
# Optional SSL. By default is off. # List of root certificates for HTTPS server verifications #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication #ssl.certificate: "/etc/pki/client/cert.pem"
# Client Certificate Key #ssl.key: "/etc/pki/client/cert.key"
#========================== Logging =========================
# Sets log level. The default log level is info. # Available log levels are: critical, error, warning, info, debug #logging.level: debug
# At debug level, you can selectively enable logging only for some components. # To enable all selectors use ["*"]. Examples of other selectors are "beat", # "publish", "service". #logging.selectors: ["*"] |
七、logstash(非filebeat)進行文件采集,輸出到kafka緩存,讀取kafka數據並處理輸出到文件或es
讀數據:
kafkaput.conf:
input { file { path => [ # 這里填寫需要監控的文件 "/home/admin/helloworld/logs/catalina.out" ] } }
output { kafka { # 輸出到控制台 # stdout { } # 輸出到kafka bootstrap_servers => "192.168.80.42:9092" topic_id => "test" } } |
取數據
indexer.conf
input{ #從redis讀取 redis { host => "192.168.80.32" # redis主機地址 port => 6379 # redis端口號 password => "123456" # redis 密碼 #db => 8 # redis數據庫編號 data_type => "channel" # 使用發布/訂閱模式 key => "logstash_list_0" # 發布通道名稱 } #從kafka讀取 kafka { bootstrap_servers => "192.168.80.42:9092" topics => ["test"] auto_offset_reset => "earliest" } }
output { #輸出到文件 file { path => "/usr/local/logstash-5.6.10/data/log/logstash/all1.log" # 指定寫入文件路徑 # message_format => "%{host} %{message}" # 指定寫入格式 flush_interval => 0 # 指定刷新間隔,0代表實時寫入 codec => json } #輸出到es elasticsearch { hosts => "node18:9200" codec => json } } |
八、logstash同步mysql數據庫數據到es(logstash5版本以上已集成jdbc插件,無需下載安裝,直接使用)
mysql2es.conf:
input { stdin { } jdbc { jdbc_connection_string => "jdbc:mysql://192.168.80.18:3306/fyyq-mysql" jdbc_user => "fyyq" jdbc_password => "fyyq@2017" jdbc_driver_library => "/usr/local/logstash-5.6.10/mysql-connector-java-5.1.46.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" statement_filepath => "/usr/local/logstash-5.6.10/mysql2es.sql" #schedule => "* * * * *" } }
output { stdout { codec => json_lines } elasticsearch { hosts => "node18:9200" #index => "mainIndex" #document_type => "user" #document_id => "%{id}" } } |
mysql2es.sql:
select * from sys_log
|
九、logstash輸出到hdfs文件
input { beats { port => 5044 #codec => "json" ssl => true ssl_certificate_authorities => ["/usr/local/logstash-5.6.10/pki/tls/certs/filebeat.crt"] ssl_certificate => "/usr/local/logstash-5.6.10/pki/tls/certs/logstash.crt" ssl_key => "/usr/local/logstash-5.6.10/pki/tls/private/logstash.key" ssl_verify_mode => "force_peer" } }
filter{ grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"} } }
output { # 輸出到控制台 # stdout { }
# 輸出到redis redis { host => "192.168.80.32" # redis主機地址 port => 6379 # redis端口號 password => "123456" # redis 密碼 #db => 8 # redis數據庫編號 data_type => "channel" # 使用發布/訂閱模式 key => "logstash_list_0" # 發布通道名稱 } #輸出到kafka kafka { bootstrap_servers => "192.168.80.42:9092" topic_id => "test" } #輸出到es elasticsearch { hosts => "node18:9200" codec => json } #輸出到hdfs webhdfs { host => "192.168.80.42" port => 50070 path => "/user/logstash/dt=%{+YYYY-MM-dd}/%{@source_host}-%{+HH}.log" user => "hadoop" } } |
十、Logstash-input插件及插件參數概覽
僅以beat插件為例,后續插件將以連接形式提供(都是官網標准介紹)
所有輸入插件都支持以下配置選項:
Setting |
Input type |
Required |
No(默認為{}) |
||
No(輸入數據的編解碼器,默認“plain”) |
||
No(默認true) |
||
No(自動生成,但最好自行定義) |
||
No |
||
No |
codec:可選
json (json格式編解碼器)
msgpack (msgpack格式編解碼器)
plain(文本格式編解碼器)
multiline(將多行文本event合並成一個event,eg:將java中的異常跟蹤日志合並成一條消)]
常用輸入插件:
1、beat-input:Receives events from the Elastic Beats framework,從框架接收事件
Settings:
Setting |
Input type |
Required |
No |
||
No |
||
No |
||
No |
||
Yes(必填項) |
||
No |
||
a valid filesystem path |
No |
|
No |
||
No |
||
a valid filesystem path |
No |
|
No |
||
string,one of |
No |
|
No |
||
|
2、file-input:來自文件的Streams事件(path字段必填項)
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
3、stdin-input:從標准輸入讀取事件
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-stdin.html
4、syslog-input:將syslog消息作為事件讀取
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-syslog.html
5、tcp-input:從TCP讀取事件(port字段必填項)
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html
6、udp-input:通過UDP讀取事件(port字段必填項)
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-udp.html
7、twitter-input:從Twitter Streaming API讀取事件(相對常用場景)
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-twitter.html
(consumer_key
、consumer_secret
、oauth_token
、oauth_token_secret
必填項)
8、redis-input:從Redis實例讀取事件
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-redis.html
(data_type
["list", "channel", "pattern_channel"]、key
必填項,)
9、kafka-input:從Kafka主題中讀取事件
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
(參數過多,自行查看)
10、jdbc-input:從JDBC數據創建事件
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
(jdbc_connection_string
、jdbc_driver_class
、jdbc_user
必填項)
11、http-input:通過HTTP或HTTPS接收事件
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-http.html
12、elasticsearch-input:從Elasticsearch集群讀取查詢結果
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-elasticsearch.html
13、exec-input:將shell命令的輸出捕獲為事件(command
字段必填項)
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-exec.html
非 常用輸入插件:
自行進入logstash的插件中心進行查看,有需要自行配置
總:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
十一、Logstash-filter插件及插件參數概覽
所有處理插件均支持的配置:
Setting |
Input type |
Required |
hash |
no |
|
array |
no |
|
boolean |
no |
|
string |
no |
|
boolean |
no |
|
array |
no |
|
array |
no |
常用處理插件:
1、 grok-filter:可以將非結構化日志數據解析為結構化和可查詢的內容
https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html#_grok_basics
grok模式的語法是 %{SYNTAX:SEMANTIC}
SYNTAX
是與您的文本匹配的模式的名稱
SEMANTIC
是您為匹配的文本提供的標識符
grok是通過系統預定義的正則表達式或者通過自己定義正則表達式來匹配日志中的各個值
正則解析式比較容易出錯,建議先調試(地址):
grok debugger調試:http://grokdebug.herokuapp.com/
grok事先已經預定義好了許多正則表達式規則,該規則文件存放路徑:
/usr/local/logstash-5.6.10/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.2/patterns
等等,可自行進入查看
示例一:
filter { grok {match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } } } |
初始輸入的message是:
55.3.244.1 GET /index.html 15824 0.043 |
經過grok的正則分析后:
client: 55.3.244.1(IP) method: GET(方法) request: /index.html(請求文件路徑) bytes: 15824(字節數) duration: 0.043(訪問時長) |
示例二:
filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}"} } } |
COMBINEDAPACHELOG的具體內容見:
https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/httpd
初始輸入message為:
192.168.80.183 - - [04/Jan/2018:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36" |
經過grok正則分析后:
"clientip" => "192.168.80.183", "timestamp" => "04/Jan/2018:05:13:42 +0000", "verb" => "GET", "request" => "/presentations/logstash-monitorama-2013/images/kibana-search.png", "referrer" => "\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"", "response" => "200", "bytes" => "203023", "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
|
示例三(自定義grok表達式mypattern[A-Z]):
filter { grok{ } |
初始輸入message:
12.12.12.12 ABC |
經過grok正則分析后:
"clientip" => "12.12.12.12", |
示例四(移除重復字段):
filter { grok { #match => { "message" => "%{COMBINEDAPACHELOG}"} match => { "message" => "%{IP:clientip}\s+%{IP:clientip1}"} } mutate { remove_field => ["message"] remove_field => ["host"] } } |
初始輸入message:
1.1.1.1 2.2.2.2 |
經過grok正則解析后(json格式):
{ "_index": "logstash-2018.07.31", "_type": "log", "_id": "AWTuNdzp6Wkp4mVEj3Fh", "_version": 1, "_score": null, "_source": { "@timestamp": "2018-07-31T02:41:00.014Z", "offset": 1114, "clientip": "1.1.1.1", "@version": "1", "input_type": "log", "beat": { "name": "node183", "hostname": "node183", "version": "5.6.10" }, "source": "/home/usieip/bdp-datashare/logs/a.log", "type": "log", "clientip1": "2.2.2.2", "tags": [ "beats_input_codec_plain_applied" ] }, "fields": { "@timestamp": [ 1533004860014 ] }, "sort": [ 1533004860014 ] } |
示例五(過濾篩選catalina.out文件中的信息,message字段已移除):
filter { grok { match => { "message" => "%{DATA:ymd} %{DATA:sfm} %{DATA:http} %{DATA:info} %{GREEDYDATA:index}"} } } |
【Data在pattern中的定義是:.*? GREEDYDATA在pattern中的定義是:.*】
初始輸入message:
2018-07-30 17:04:31.317 [http-bio-8080-exec-19] INFO c.u.i.b.m.s.i.LogInterceptor - ViewName: modules/datashare/front/index |
經過grok正則解析后(截圖及json格式如下):
{ "_index": "logstash-2018.07.31", "_type": "log", "_id": "AWTvhiPD6Wkp4mVEj3GU", "_version": 1, "_score": null, "_source": { "offset": 125, "input_type": "log", "index": "c.u.i.b.m.s.i.LogInterceptor - ViewName: modules/datashare/front/index", "source": "/home/usieip/bdp-datashare/logs/b.log", "type": "log", "tags": [], "ymd": "2018-07-30", "@timestamp": "2018-07-31T08:48:17.948Z", "@version": "1", "beat": { "name": "node183", "hostname": "node183", "version": "5.6.10" }, "http": "[http-bio-8080-exec-19]", "sfm": "17:04:31.317", "info": "INFO" }, "fields": { "ymd": [ 1532908800000 ], "@timestamp": [ 1533026897948 ] }, "sort": [ 1533026897948 ] } |
常用參數:
1)match:match作用:用來對字段的模式進行匹配
2)patterns_dir:用來指定規則的匹配路徑,如果使用logstash自定義的規則時,不需要寫此參數。Patterns_dir可以同時制定多個存放過濾規則的目錄;
patterns_dir => ["/opt/logstash/patterns","/opt/logstash/extra_patterns"] |
3)remove_field:如果匹配到某個”日志字段,則將匹配的這個日志字段從這條日志中刪除(多個以逗號隔開)
remove_field => ["foo _%{somefield}"] |
2、 clone-filter:克隆過濾器用於復制事件
3、 drop-filter:丟棄所有活動
4、 json-filter:解析JSON事件
5、 kv-filter:解析鍵值對
非常用參數:
參考教程:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
十二、Logstash-output插件及插件參數概覽
所有輸出插件均支持以下配置:
Setting |
Input type |
Required |
No(默認plain) |
||
No(默認true) |
||
No |
常用插件:
1、Elasticsearch-output:此插件是在Elasticsearch中存儲日志的推薦方法。如果您打算使用Kibana Web界面,則需要使用此輸出
2、file-output:此輸出將事件寫入磁盤上的文件(path字段必填項)
3、kafka-output:將事件寫入Kafka主題(topic_id是必填項)
4、 redis-output:此輸出將使用RPUSH將事件發送到Redis隊列
5、stdout-output:一個簡單的輸出,打印到運行Logstash的shell的STDOUT
非常用插件:
參考官網教程鏈接:https://www.elastic.co/guide/en/logstash/current/output-plugins.html
十三、Logstash與flume簡單對比
1)結構:
Logstash: Shipper、Broker、Indexer (broker部署redis或者kafka進行緩存)
Flume: Source、Channel、Sink
Logstash已集成,broker可以不需要,直接讀取處理輸出,不進行緩存
Flume需單獨配置,三組件缺一不可
2)配置:
Logstash:配置簡潔清晰,三個部分的屬性都定義好了,可自行選擇,若沒有,可自行開發插件,便捷易用;且logstash在Filter plugin部分具有比較完備的功能,比如grok,能通過正則解析和結構化任何文本,Grok 目前是Logstash最好的方式對非結構化日志數據解析成結構化和可查詢化。此外,Logstash還可以重命名、刪除、替換和修改事件字段,當然也包括完全丟棄事件,如debug事件。還有很多的復雜功能可供選擇,
Flume:配置繁瑣,分別手動配置source、channel、sink,采集環境如果復雜需要多個。Flume的插件比較多,channel常用的就內存和文件兩種
3)初衷:
Flume側重數據的傳輸,使用者需非常清楚整個數據的路由,相對來說其更可靠,channel是用於持久化目的的,數據必須確認傳輸到下一個目的地,才會刪除;
Logstash側重數據的預處理,日志字段經過預處理之后再進行解析
4)組件:
logstash可以與elk其他組件配合使用、開發,應用簡單,使用場景廣泛;
flume新版本輕量級,適合有一定計算編程基礎的人使用,且場景針對性強,需要配合很多其他工具進行使用,不方便
5)舉例:
Logstash:主板、電源、硬盤,機箱等都已經裝好的台式機,可以直接用
Flume :提供一套完整的主板,電源、硬盤、機箱等,自行組裝,裝好了才能用