Filebeat6.31整合Kafka集群消息隊列(三)


wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.3.2-linux-x86_64.tar.gz

[root@es-node1 ~]# tar zxvf filebeat-6.3.2-linux-x86_64.tar.gz -C /usr/local/

# mv /usr/local/filebeat-6.3.2-linux-x86_64/ /usr/local/filebeat

# egrep -v "#|^$" filebeat.yml

filebeat.prospectors         #用於定義數據原型,檢測日志或是發現日志
- input_type: log            #指定數據的輸入類型,默認為log,另外還可以指定stdin
  paths:                      #自定需要監控的日志文件路徑;可以是完整的日志路徑也可以是模糊的匹配格式
   - /var/log/messages        #指定系統日志位置
  fields:                      #定義日志來源,可以添加自定義字段,其實就是定義Kafka消息隊列的topic主題名稱,如果kafka消息隊列中沒有該名稱,會自動生成
    log_topic: test
  paths:                    #與上述一樣定義需要監控的日志文件路徑,不夠這次是定義apache-web服務的日志
   - /etc/httpd/logs/*
  fields:                    #定義日志來源,生成kafka消息隊列topic主題
    log_topic: webapache
processors:            #這個地方需要注意,此配置是將日志輸出格式過濾掉,一般情況下,一些無用的日志字段我們可以刪除,只看關鍵性的信息
- drop_fields:
   fields: ["beat", "input_type", "source", "offset",]
name: "192.168.37.134"         #設置filebeat收集日志中對應的主機名稱,,如果設置為空,這使用該機器的主機名稱,這里這是本地IP,便於區分多台主機的日志信息
output.kafka:                  #多種輸出類型,可支持想kafka,logstash,elasticsearch輸出日志信,在這里是將日志信息輸出到Kafka中,
  enabled: true                 啟動該模塊
  hosts: ["192.168.37.134:9092", "192.168.37.135:9092", "192.168.37.136:9092"]        #指定輸出數據到kafka集群上,地址與端口號想對應
  version: "0.10"          
  topic: '%{[fields][log_topic]}'    #指定要發送數據到kafka集群的哪個topic,與上述的"fields: log_topic:"相對應,這是6.x的配置
  partition.round_robin:         #開啟kafka的partition分區
    reachable_only: true   
  worker: 2
  required_acks: 1
  compression: gzip      #壓縮格式
  max_message_bytes: 10000000    #壓縮格式字節大小
logging.level: debug        #日志類型為debug

root@es-node1 bin]#nohup ./filebeat -e -c filebeat.yml &

[root@es-node1 bin]# ./kafka-topics.sh --zookeeper 192.168.37.129:2181,192.168.37.133,192.168.37.133:2181 --list
osmessages
test
webapache

【Kafka節點 】啟動消費,本次消費是

[root@es-node3 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.37.134:9092,192.168.37.135:9092,192.168.37.136:9092 --topic test --from-beginning

{
    "@timestamp": "2018-08-16T04:24:19.871Z",
    "@metadata": {
        "beat": "filebeat",
        "type": "doc",
        "version": "6.3.2",
        "topic": "test"
    },
    "message": "Aug 16 12:24:13 es-node1 dbus[623]: [system] Successfully activated service 'org.freedesktop.nm_dispatcher'",
    "fields": {
        "log_topic": "test"
    },
    "beat": {
        "name": "192.168.37.134",
        "hostname": "es-node1",
        "version": "6.3.2"
    },
    "host": {
        "name": "192.168.37.134"
    },
    "source": "/var/log/messages",
    "offset": 290635
}

 

以下是apache是通過json校驗的日志輸出信息

 1 {
 2     "@timestamp": "2018-08-16T04:19:34.153Z",
 3     "@metadata": {
 4         "beat": "filebeat",
 5         "type": "doc",
 6         "version": "6.3.2",
 7         "topic": "webapache"
 8     },
 9     "beat": {
10         "name": "192.168.37.129",
11         "hostname": "es-node1",
12         "version": "6.3.2"
13     },
14     "host": {
15         "name": "192.168.37.129"
16     },
17     "source": "/etc/httpd/logs/access_log",
18     "offset": 17968,
19     "message": "192.168.37.1 - - [16/Aug/2018:12:19:33 +0800] \"GET /noindex/css/fonts/Bold/OpenSans-Bold.ttf HTTP/1.1\" 404 238 \"http://192.168.37.129/noindex/css/open-sans.css\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36\"",
20     "fields": {
21         "log_topic": "webapache"
22     }
23 }

 

 

上述Filebeat收集到的日志格式信息量過大,我們只需要收集關鍵性的日志信息即可,無用的可以過濾掉,配置如下

processors:
- drop_fields:
   fields: ["beat", "input_type", "source", "offset",]

 

過濾到之后 的apache日志輸出如下所示

 1 {
 2     "@timestamp": "2018-08-16T05:10:02.261Z",
 3     "@metadata": {
 4         "beat": "filebeat",
 5         "type": "doc",
 6         "version": "6.3.2",
 7         "topic": "webapache"
 8     },
 9     "message": "192.168.37.1 - - [16/Aug/2018:13:09:53 +0800] \"GET /noindex/css/fonts/Bold/OpenSans-Bold.ttf HTTP/1.1\" 404 238 \"http://192.168.37.129/noindex/css/open-sans.css\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36\"",
10     "fields": {
11         "log_topic": "webapache"
12     },
13     "host": {
14         "name": "192.168.37.129"
15     }
16 }

【filebeat.yml配置文件】

文章最后結尾是filebeat的過濾和未過濾的配置,方便直接復制粘貼;

為什么這我們再次將filebeat的配置文件粘貼出來呢?因為我被這個坑了整整一天的時間,啟動fiebeat一直報錯,網上關於6.x版本和kafka整合的博客和資料幾乎沒有,出現報錯,也沒有解決方案,當時直接氣暈,對外尋求幫助,也沒有解決方案,最后上層技術大佬,才得以解決,這個filebeat,yml實在是太多坑了,都是因為JSON格式,下面是我具體的報錯信息

 error initializing publisher: missing required field accessing 'output.kafka.hosts'

提示缺少訪問“輸出. Kafka . hosts”的必需字段,就這個,我糾結了一天,好在問題解決了,心累~

[root@es-node1 filebeat]# egrep -v "#|^$" filebeat.yml
filebeat.prospectors:
- input_type: log
  paths: 
   - /var/log/messages
  fields: 
    log_topic: test
  paths: 
   - /etc/httpd/logs/*
  fields: 
    log_topic: webapache
processors:
- drop_fields:
   fields: ["beat", "input_type", "source", "offset",]
name: "192.168.37.134"
output.kafka:
  enabled: true
  hosts: ["192.168.37.134:9092", "192.168.37.135:9092", "192.168.37.136:9092"]
  version: "0.10"
  topic: '%{[fields][log_topic]}'
  partition.round_robin:
    reachable_only: true
  worker: 2
  required_acks: 1
  compression: gzip
  max_message_bytes: 10000000
logging.level: debug

 

###################### Filebeat Configuration Example #########################

# This file is an example configuration file highlighting only the most common
# options. The filebeat.full.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/filebeat/index.html

#=========================== Filebeat prospectors =============================

filebeat.prospectors:

# Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.

- input_type: log

  # Paths that should be crawled and fetched. Glob based paths.
  paths: 
   - /var/log/messages
  fields: 
    log_topic: test
  paths: 
   - /etc/httpd/logs/*
  fields: 
    log_topic: webapache
    #- c:\programdata\elasticsearch\logs\*

  # Exclude lines. A list of regular expressions to match. It drops the lines that are
  # matching any regular expression from the list.
  #exclude_lines: ["^DBG"]

  # Include lines. A list of regular expressions to match. It exports the lines that are
  # matching any regular expression from the list.
  #include_lines: ["^ERR", "^WARN"]

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that
  # are matching any regular expression from the list. By default, no files are dropped.
  #exclude_files: [".gz$"]

  # Optional additional fields. These field can be freely picked
  # to add additional information to the crawled log files for filtering
  #fields:
  #  level: debug
  #  review: 1

  ### Multiline options

  # Mutiline can be used for log messages spanning multiple lines. This is common
  # for Java Stack Traces or C-Line Continuation

  # The regexp Pattern that has to be matched. The example pattern matches all lines starting with [
  #multiline.pattern: ^\[

  # Defines if the pattern set under pattern should be negated or not. Default is false.
  #multiline.negate: false

  # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern
  # that was (not) matched before or after or as long as a pattern is not matched based on negate.
  # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash
  #multiline.match: after

processors:
- drop_fields:
   fields: ["beat", "input_type", "source", "offset",]

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
name: "192.168.37.134"

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
#  env: staging

#================================ Outputs =====================================

# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
output.kafka:
  enabled: true
  hosts: ["192.168.37.134:9092", "192.168.37.135:9092", "192.168.37.136:9092"]
  version: "0.10"
  topic: '%{[fields][log_topic]}'
  partition.round_robin:
    reachable_only: true
  worker: 2
  required_acks: 1
  compression: gzip
  max_message_bytes: 10000000


#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
  #hosts: []
  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

#----------------------------- Logstash output --------------------------------
#output.logstash:
  # The Logstash hosts
  #hosts: ["172.16.213.51:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

#================================ Logging =====================================

# Sets log level. The default log level is info.
# Available log levels are: critical, error, warning, info, debug
logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM