一、fluentd簡介
fluentd是一個針對日志的收集、處理、轉發系統。通過豐富的插件系統, 可以收集來自於各種系統或應用的日志,轉化為用戶指定的格式后,轉發到用戶所指定的日志存儲系統之中。
通過 fluentd,你可以非常輕易的實現像追蹤日志文件並將其過濾后轉存到 MongoDB 這樣的操作。fluentd 可以徹底的將你從繁瑣的日志處理中解放出來。
用圖來說明的話,沒有使用fluentd以前,系統是這樣的:
使用fluentd之后,系統是這樣的:
本篇博文將對fluentd的安裝、配置、使用等各方面做一個簡單的介紹。
fluentd 既可以作為日志收集器安裝到每一個結點上, 也可以作為一個服務端收集各個結點上報的日志流。 你甚至也可以在各個結點上都部署 fluentd 收集日志,然后上報到一個 fluentd 集群做統一處理, 然后再轉發到最終的日志存儲服務器。
所以在一個完整的日志收集、處理系統里,你可以構建一個這樣的日志處理流:
Apps (with fluentd/fluent-bit) -> broker (kafka) -> fluentd cluster -> elasticsearch -> kibana
其中提到的 fluent-bit 是一個極簡版的 fluentd,專門用作日志的收集和轉發, 可以在應用結點上取代 fluentd 收集日志,滿足極端的資源要求。
1.1 與 logstash 的對比
通過上述描述,你也許會覺得和 ELK 中的 Logstash 高度相似。事實上也確實如此,你完全可以用 fluentd 來替換掉 ELK 中的 Logstash。
有兩篇文章對這兩個工具做了很好的對比:
概括一下的話,有以下區別:
- fluentd 比 logstash 更省資源;
- 更輕量級的 fluent-bid 對應 filebeat,作為部署在結點上的日志收集器;
- fluentd 有更多強大、開放的插件數量和社區;
二、fluentd安裝
2017 年 12 月的時候,fluentd 發布了 v1.0 版本,也就是 td-agent v3 版。
從 gem 安裝和從 rpm、yum 安裝的名字不一樣,連配置文件的路徑都不一樣,需要記住的是:
- 從 gem 安裝的,配置文件和執行程序都叫做 fluent;
- 從 rpm 安裝的,配置文件和執行程序都叫做 td-agent3;
td-agent 和 fluentd 是同一個軟件,區別在於 td-agent 更注重於穩定性,在更新上會稍晚於 fluentd,而且依賴的一些庫也會有不同(如 jemalloc),更適用於用於生產環境。
2.1 安裝fluentd
詳細可參考官方文檔!
以 CentOS 為例:
# 安裝
$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh | sh
# 通過 systemd 啟動
$ systemctl start td-agent.service
$ systemctl status td-agent.service
# 或者也可以手動啟動
$ /etc/init.d/td-agent start
$ /etc/init.d/td-agent stop
$ /etc/init.d/td-agent restart
$ /etc/init.d/td-agent status
2.2 安裝插件
# 從 rpm 安裝的話,
# 比如要使用下例的 mongo,需要安裝
$ td-agent-gem install fluent-plugin-mongo
$ td-agent-gem <PLUGIN_NAME>
# 從 gem 安裝的話
$ gem install <PLUGIN_NAME>
三、配置文件
3.1 路徑
- 如果是通過 gem 安裝的,那么可以通過下列命令生成和編輯配置文件:
$ fluentd --setup /etc/fluent
$ vi /etc/fluent/fluent.conf
- 如果是通過 RPM, Deb 或 DMG 安裝的,那么配置文件在:
$ vi /etc/td-agent/td-agent.conf
3.2 常用
你可以在配置文件里使用 @include
來切分你的配置文件,include 支持多種寫法:
# 絕對路徑
include /path/to/config.conf
# 相對路徑
@include conf.d/*.conf
# 甚至 URL
@include http://example.com/fluent.conf
3.3 數據格式
在配置文件里你需要為很多參數賦值,這些值必須使用 fluentd 支持的數據格式,有下列這些:
string
:字符串,最常見的格式,詳細支持語法見文檔;integer
:整數;float
:浮點數;size
:大小,僅支持整數:<INTEGER>k
或<INTERGER>K
;<INTEGER>m
或<INTERGER>M
;<INTEGER>g
或<INTERGER>G
;<INTEGER>t
或<INTERGER>T
;
time
:時間,也支持整數:<INTEGER>s
或<INTERGER>S
;<INTEGER>m
或<INTERGER>M
;<INTEGER>h
或<INTERGER>H
;<INTEGER>d
或<INTERGER>D
;
array
:按照 JSON array 解析;hash
:按照 JSON object 解析;
四、命令
配置文件的核心是各種命令塊(directives),每一種命令都是為了完成某種處理,命令與命令之間還可以組成串聯關系,以 pipline 的形式流式的處理和分發日志。
命令的主要組成部分有:
- source
- filter
- match
- label
- error
最常見的方式就是 source 收集日志,然后由串聯的 filter 做流式的處理,最后交給 match 進行分發。match 是日志流程的終點,一旦匹配了某一個 match,就不會再繼續往下匹配了。
同時你還可以用 label 將任務分組,用 error 處理異常,用 system 修改運行參數。
不同的命令中,都可以通過 @type
指定想要使用的插件名字,而且還可以傳入各式各樣的插件參數, 由豐富的插件提供強大的功能,下面是詳細一些的說明。
4.1 source
source 是 fluentd 的一切數據的來源,每一個 source 內都包含一個輸入模塊,比如原生集成的包含 http
和 forward
兩個模塊,分別用來接收 HTTP 請求和 TCP 請求:
# Receive events from 24224/tcp
# This is used by log forwarding and the fluent-cat command
<source>
@type forward
port 24224
</source>
# http://this.host:9880/myapp.access?json={"event":"data"}
<source>
@type http
port 9880
</source>
當然,除了這兩個外,fluentd 還有大量的支持各種協議或方式的 source 插件,比如最常用的 tail
就可以幫你追蹤文件。
每一個具體的插件都包含其特有的參數,比如上例中 port
就是一個參數,當你要使用一個 source
插件的時候,注意看看有哪些參數是需要配置的,然后將其寫到 source directive
內。
source dirctive
在獲取到輸入后,會向 fluent 的路由拋出一個事件,這個事件包含三個要素:
- tag
- time
- record
那上例代碼中的第二個 source 舉例,當我們發起一個http://this.host:9880/myapp.access?json={"event":"data"}
的請求時,這個 source 會拋出:
# generated by http://this.host:9880/myapp.access?json={"event":"data"}
tag: myapp.access
time: (current time)
record: {"event":"data"}
4.2 match
match 用來指定動作,通過 tag 匹配 source,然后執行指定的命令來分發日志,最常見的用法就是將 source 收集的日志轉存到數據庫。
# http://this.host:9880/myapp.access?json={"event":"data"}
<source>
@type http
port 9880
</source>
# 將標記為 myapp.access 的日志轉存到文件
<match myapp.access>
@type file
path /var/log/fluent/access
</match>
上例中的 myapp.access 就是 tag,tag 有好幾種匹配模式:
*
:匹配任意一個 tag;**
:匹配任意數量個 tag;a b
:匹配 a 或 b;{X,Y,Z}
:匹配 X, Y, Z 中的一個;
比如可以寫成這樣:
<match a.*>
<match **>
<match a.{b,c}>
<match a.* b.*>
match 是從上往下依次匹配的,一旦一個日志流被匹配上,就不會再繼續匹配剩下的 match 了。 所以如果有 <match **>
這樣的全匹配,一定要放到配置文件的最后。
用法和 source 幾乎一模一樣,不過 source 是拋出事件,match 是接收並處理事件。
而且 match 不僅僅用來處理輸出,還可以對日志事件進行一些處理后重新拋出,當成一個新的事件從新走一遍流程,比如可以用rewrite_tag_filter
插件為日志流重新打上 tag,實現通過正則來對日志進行分流的需求:
<match app>
# 捕獲被打上了 app tag 的日志
...
</match>
<match cp>
# 捕獲被打上了 cp tag 的日志
...
</match>
<match **>
# https://docs.fluentd.org/v0.12/articles/out_rewrite_tag_filter
# 被打上 tag 的日志會被從頭處理,從而被上面的 match 捕獲,實現了日志的分流
@type rewrite_tag_filter
<rule>
key log # 指定要處理的 field
pattern ^.*\ c\.p\.\ .* # 匹配條件
tag cp # 打上 tag `cp`
</rule>
<rule>
key log
pattern ^.*
tag app # 其余日志打上 tag `app`
</rule>
</match>
4.3 filter
filter 和 match 的語法幾乎完全一樣,但是 filter 可以串聯成 pipeline,對數據進行串行處理,最終再交給 match 輸出。
# http://this.host:9880/myapp.access?json={"event":"data"}
<source>
@type http
port 9880
</source>
<filter myapp.access>
@type record_transformer
<record>
host_param "#{Socket.gethostname}"
</record>
</filter>
<match myapp.access>
@type file
path /var/log/fluent/access
</match>
這個例子里,filter 獲取數據后,調用原生的 @type record_transformer
插件,在事件的 record 里插入了新的字段 host_param,然后再交給 match 輸出。
4.4 system
fluentd 的相關設置,可以在啟動時設置,也可以在配置文件里設置,包含:
- log_level
- suppress_repeated_stacktrace
- emit_error_log_interval
- suppress_config_dump
- without_source
五、插件介紹Plugins
Fluentd 有一個非常活躍社區,提供了大量的插件,你可以在這里看到大多數常見插件的列表!
Fluentd 支持 7 種類型的插件:
- Input:事件流入口;
- Parser:修改 Input 插件中事件格式,用於 Source;
- Filter: 修改事件流,用於 Filter;
- Output:輸出插件,用於 Match;
- Formatter:修改 Output 插件中事件流的格式,用於 Match;
- Buffer:在 Output 插件中指定 buffer,用於 Match;
- Storage:將插件狀態存入內存或數據庫,可用於 Source、Filter 和 Match,需要插件支持 storage 命令;
六、插件參數Parameters
不同的插件都可以設定不同的參數,拿最簡單的 forward 舉個例子:
<source>
@type http
port 9880
</source>
其中 @type
、port
都是參數,一個指明了插件的名字,另一個指明了監聽的端口。
fluentd 里有兩種類型的參數:
- 默認參數:以
@
開頭的都是默認參數; - 插件參數:其余的參數都是插件參數,為插件做配置,可以在插件文檔里查閱。
6.1 默認參數 Common plugin parameter
fluentd 里只有四個默認參數:
@type
:用於指定插件類型;@id
:指定插件 id,在輸出監控信息的時候有用;@label
:指定分組標簽,可以對日志流做批處理;@log_level
:為每一組命令設定日志級別。
6.1.1 label
label 用於將任務進行分組,方便復雜任務的管理。
你可以在 source 里指定 @label @<LABEL_NAME>
, 這個 source 所觸發的事件就會被發送給指定的 label 所包含的任務, 而不會被后續的其他任務獲取到。
需要注意的是,label 一旦被聲明了,就必須在后面被用到,否則會報錯。
看個例子:
<source>
@type forward
</source>
<source>
# 這個任務指定了 label 為 @SYSTEM
# 會被發送給 <label @SYSTEM>
# 而不會被發送給下面緊跟的 filter 和 match
@type tail
@label @SYSTEM
</source>
<filter access.**>
@type record_transformer
<record>
# ...
</record>
</filter>
<match **>
@type elasticsearch
# ...
</match>
<label @SYSTEM>
# 將會接收到上面 @type tail 的 source event
<filter var.log.middleware.**>
@type grep
# ...
</filter>
<match **>
@type s3
# ...
</match>
</label>
6.1.2 error
用來接收插件通過調用 emit_error_event
API 拋出的異常,使用方法和 label 一樣,通過設定 <label @ERROR>
就可以接收到相關的異常。
6.1.3 log_level
目前支持的日志級別參數值有:
fatal
error
warn
info
debug
trace
從上往下依次遞減,當你指定了一個級別后,會捕獲大於等於該級別的所有日志。
比如如果你指定 @log_level info
,就會獲取到 info, warn, error, fatal
級別的日志。
6.2 其他插件參數
除了默認參數外,各個插件還可以定制自己的參數,這個就需要查閱你所用插件的文檔頁面了。
拿 tail
舉個例子,我們可以查閱 文檔, 可以看到它有 tag, path, exclude_path, ...
等一系列的參數,比如其中 tag
就可以為日志流打上供 match
使用的 tag
。
七、高可用
內容來源於官方文檔:Fluentd High Availability Configuration。
7.1 Message Delivery Semantics
任何消息傳遞系統,都需要考慮消息遞交語義(delivery semantics):
- At most once:最多傳遞一次,有可能會丟消息,但是不會重復;
- At least once:最少傳遞一次,不會丟消息,但是可能重復;
- Exactly once:確切的只傳遞一次,需要多次確認消息狀態,會極大的犧牲性能。
一般來說,我們會根據業務場景,在前兩種中選擇一種,第三種因為性能較差,只適合在小型內部系統上玩玩。
7.2 網絡拓撲
一個日志收集系統由兩個角色組成:
- log forwarders:負責日志采集和轉發;
- log aggregators:負責日志收集和匯總處理。
fluentd 可以扮演上述兩個角色(或者由 fluent-bit 扮演 forwarders 角色),為了保證高可用, 對 aggregators 做多點備份:
我們需要在 log forwarders 里配置多個 aggregators:
# Log Forwarding
<match mytag.**>
@type forward
# 主 aggregator
<server>
host 192.168.0.1
port 24224
</server>
# 備用 aggregators
<server>
host 192.168.0.2
port 24224
standby # 聲明為備用
</server>
# 所有的日志流都會存入磁盤,定期 flush 到 aggregators
# 較長的 flush 可以減少 CPU
<buffer>
flush_interval 60s
</buffer>
</match>
7.3 數據丟失的場景
Forwarder 會把所有數據存放在 buffer 中,假如你在 match 中配置了 buffer_type file
,則會將數據都存放在磁盤中,然后按照 flush_interval
定期將數據發送到 aggregator。
但是,如果 forwarder 進程在將數據寫入 buffer 前死掉了,或者存放 buffer 的磁盤壞掉了,就會導致數據丟失。
7.4 監控
7.4.1 插件監控
fluentd 內置了一個 HTTP 接口,可以用來獲取插件信息,只需要在配置文件里加上:
<source>
@type monitor_agent
bind 0.0.0.0
port 24220
</source>
然后訪問:http://localhost:24220/api/plugins.json
就可以拿到插件的信息。
八、性能調優
一般來說,fluentd 單節點的吞吐量大概是 10w/sec 左右。
要想提高性能的話,可以在輸出端(match)指定 num_threads
來提高並發,在輸入端安裝 fluent-plugin-multiprocess
插件來提高 CPU 的利用率(Ruby 也有 GIL 問題)。
8.1 負載均衡
fluentd 的 multiprocess 插件非常的雞肋,只是幫你多啟動幾個 fluentd 進程,然后每個進程執行自己的配置文件。這個你使用進程管理器(如 supervisor 或 systemd)都能做到。
后來又引入了 multi worker 的參數,但是簡單看了下后發現需要插件做適配,而我並沒有精力去一個個的排查插件的兼容性,所以也就不考慮了。
為了提高 fluentd 的吞吐量,你有幾個辦法:
- 拆分 fluentd 的配置文件,然后各自啟動新的進程,缺點是各自監聽不同的端口;
- 啟動 multi worker,利用多核提高性能;
- 增加一個負載均衡,將流量分配到后端不同的 fluentd 進程上。
我采用了最后一種方法,使用 haproxy 分發 tcp 到后端的 fluentd,寫了一個 docker-compose 文件,開箱即用:
https://github.com/Laisky/HelloWorld/tree/master/docker/docker_log/multi-process
不過在做拆分的時候,要考慮到當前的處理流程是否是無狀態的,比如兩個典型的場景:
- 日志多行合並;
- 日志解析;
其中多行合並就是有狀態的,不能很好的進行並行。而日志解析是無狀態的,可以根據需求開任意多的進程來處理。為了分擔壓力,建議將 fluentd 的處理拆為幾個不同的步驟,其中第一個步驟僅進行多行合並等有狀態的請求,然后第二層再並行的進行較重的解析等操作,最大程度的提高 fluentd 集群的吞吐量。
九、Demo
9.1 Nginx Log
一個監聽 Nginx 日志的例子:
<source>
@type tail
@id nginx-access
@label @nginx
path /var/log/nginx/access.log
pos_file /var/lib/fluentd/nginx-access.log.posg
tag nginx.access
format /^(?<remote>[^ ]*) (?<host>[^ ]*) \[(?<time>[^\]]*)\] (?<code>[^ ]*) "(?<method>\S+)(?: +(?<path>[^\"]*) +\S*)?" (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$/
time_format %d/%b/%Y:%H:%M:%S %z
</source>
<source>
@type tail
@id nginx-error
@label @nginx
path /var/log/nginx/error.log
pos_file /var/lib/fluentd/nginx-error.log.posg
tag nginx.error
format /^(?<time>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?<log_level>\w+)\] (?<pid>\d+).(?<tid>\d+): (?<message>.*)$/
</source>
<label @nginx>
<match nginx.access>
@type mongo
database nginx
collection access
host 10.47.12.119
port 27016
time_key time
flush_interval 10s
</match>
<match nginx.error>
@type mongo
database nginx
collection error
host 10.47.12.119
port 27016
time_key time
flush_interval 10s
</match>
</label>
為了匹配,你也需要修改 Nginx 的 log_format
為:
log_format main '$remote_addr $host [$time_local] $status "$request" $body_bytes_sent "$http_referer" "$http_user_agent"';
9.2 Docker Log
如果你在啟動 docker 時配置了 --log_driver=fluentd
的話,就可以用 fluentd 來接受 docker 的日志。
但是 docker 默認會按照換行符將日志拆成一條條的 json,所以你需要合並多行日志,並提取日志信息。 下面是一個例子,拆分成兩層,先做合並,再做解析:
# 這一層只做合並,做完后就轉發給下一層
<filter geely.sit>
@type concat
timeout_label @NORMAL # concat 需要處理好 timeout flush,否則會丟數據
flush_interval 5s
key log
stream_identity_key container_id
multiline_start_regexp /^\d{4}-\d{2}-\d{2} +\d{2}:\d{2}:\d{2}.\d{3} +\|/
</filter>
<match **>
@type relabel
@label @NORMAL
</match>
<label @NORMAL>
<match **.sit>
@type copy
<store>
@type forward
send_timeout 30s
recover_wait 10s
hard_timeout 30s
<server>
host lb
port 24225
</server>
</store>
</match>
</label>
第二層做解析,因為上一層拼合的日志包含 \n
,所以要用 multiline 來做解析:
<filter geely.sit>
@type parser
key_name log
reserve_data true
<parse>
@type multiline
format_firstline /^\d{4}-\d{2}-\d{2} +\d{2}:\d{2}:\d{2}.\d{3} +\|/
format1 /^(?<time>.{23}) {0,}\| {0,}(?<project>[^ ]+) {0,}\| {0,}(?<level>[^ ]+) {0,}\| {0,}(?<thread>[^\|]+) {0,}\| {0,}(?<class>[^\:]+)\:(?<line>\d+) {0,}- {0,}(?<message>.+)/
keep_time_key true
</parse>
</filter>
9.3 Docker 化
一個例子,執行的時候需要把 fluent.conf 掛載到 /fluentd/etc/fluent.conf
,才能執行:
FROM fluent/fluentd:v1.1.3
RUN apk add --update --virtual .build-deps \
sudo build-base ruby-dev
RUN sudo gem install fluent-plugin-elasticsearch -v 2.8.6 \
&& sudo gem install fluent-plugin-concat -v 2.1.0 \
&& sudo gem install fluent-plugin-rewrite-tag-filter -v 2.0.2 \
&& sudo gem install fluent-plugin-kafka -v 0.6.3 \
&& sudo gem install fluent-plugin-cadvisor -v 0.3.1 \
&& sudo gem install fluent-plugin-flowcounter -v 1.3 \
&& sudo gem install fluent-plugin-ignore-filter -v 2.0.0 \
&& sudo gem sources --clear-all \
&& apk del .build-deps \
&& rm -rf /var/cache/apk/* \
/home/fluent/.gem/ruby/2.3.0/cache/*.gem
RUN mkdir -p /data/log/td-agent/buffer/
ENV FLUENTD_CONF="fluent.conf"
ENTRYPOINT exec fluentd -c /fluentd/etc/${FLUENTD_CONF} -p /fluentd/plugins $FLUENTD_OPT
參考博文: