filebeat-kafka日志收集
由於線上的logstash吃掉大量的CPU,占用較多的系統資源,就想找其它的組件替代.我們的日志需要收集並發送到kafka,生成的日志已經是需要的數據,不用過濾.經過調研發現filebeat也支持發往kafka.
filebeat很輕量,用於轉發和收集日志數據.filebeat作為代理安裝在服務器上,監視指定的日志文件或位置,收集日志事件,並將他們轉發到logstash,elasticsearch,kafka等.架構圖如下:

安裝
獲取安裝包並解壓
# wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.5.1-linux-x86_64.tar.gz
# tar -xvf filebeat-6.5.1-linux-x86_64.tar.gz
配置
filebeat支持很多種輸入和輸出.具體可看input,output.
項目中用到的輸入是log,輸出的kafka.在這只講這兩種配置.
輸入配置log
log輸入是從文件中按行讀取.在paths指定需要監視的文件.
例子:
filebeat.inputs:
- type: log
paths:
- /var/log/messages
- /var/log/*.log
主要有以下幾個配置項.
paths
需要監視的文件路徑.支持Go Glab的所有模式.例如: /var/log/*.log.這個配置將監視/var/log文件夾下所有以.log結尾的文件.可以用recursive_glob來遞歸子文件夾.
recursive_glob.enabled
允許擴展 * * 為遞歸的glob模式.啟用此功能后. /foo/* * 擴展到/foo, /foo/* ,/foo/* /* ,等等,它會將單個擴展 * * 為8級深度*模式.
默認情況下啟用此功能.設置false禁用.
exclude_lines
正則表達式列表,用於匹配您希望Filebeat排除的行.Filebeat會刪除與列表中的正則表達式匹配的所有行.默認情況下,不會刪除任何行.空行被忽略.
以下示例將Filebeat配置為刪除任何以DBG開頭的行:
filebeat.inputs:
- type: log
...
exclude_lines: ['^DBG']
include_lines
正則表達式列表,用於匹配您希望Filebeat包含的行.Filebeat僅導出與列表中的正則表達式匹配的行.默認情況下,將導出所有行.空行被忽略.
以下示例將Filebeat配置為導出以ERR或WARN開頭的所有行:
filebeat.inputs:
- type: log
...
include_lines: ['^ERR', '^WARN']
PS: 如果include_lines和exclude_lines兩個配置同時出現,優先執行inlcude_lines再執行exclude_lines.和配置項放的位置沒有關系.
json
filebeat支持json格式的消息日志.它將逐行處理日志,因此只有每行有一個json對象時,json解碼才有效.
配置示例:
json.keys_under_root: true
json.add_error_key: true
json.message_key: log
enabled
輸入開關.默認true打開.
輸出配置kafka
kafka將輸出流發送到Apache Kafka.
配置示例:
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
# message topic selection + partitioning
topic: '%{[fields.log_topic]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
主要以下幾個配置項
enabled
是否打開輸出配置項.true打開,false關閉.默認是true.
hosts
kafka的broker地址.
topic
kafka的topic.
worker
並發負載均衡Kafka輸出工作線程的數量.
timeout
kafka返回應答的等待時間.默認30(秒).
keep_alive
連接的存活時間.如果為0,表示短連,發送完就關閉.默認為0秒.
required_acks
ACK的可靠等級.0=無響應,1=等待本地消息,-1=等待所有副本提交.默認1.
PS: 如果設為0,kafka無應答返回時,消息將丟失.
配置例子
#=========================== Filebeat inputs =============================
#------------------------------log-----------------------------------
filebeat.inputs:
- type: log
enabled: true
paths:
- /data/collect_log/info.*
#=========================== Filebeat outputs =============================
#------------------------------kafka-----------------------------------
output.kafka:
hosts: ["test1:9092","test2:9092"]
topic: test_collect
keep_alive: 10s
收集/data/collect_log目錄下以info開頭的文件,發送到kafka,kafka的topic是test_collect.
啟動
# /home/filebeat -c filebeat-kafka.yml
日志在filebeat下的log目錄.想要顯示的看日志啟動時加 -e 參數.
參考文檔: https://www.elastic.co/guide/en/beats/filebeat/current/configuring-howto-filebeat.html
