fluentd 是一个实时的数据收集系统,不仅可以收集日志,还可以收集定期执行的命令输出和 HTTP 请求内容。数据被收集后按照用户配置的解析规则,形成一系列 event。每一个 event 包含如下内容:
tag = xxx
time = xxx
record = {
"key1": "value1",
"key2": "value2"
}
其中:
- tag:为数据流的标记。fluentd 中可以具有多个数据源,解析器,过滤器和数据输出。他们之前使用 tag 来对应。类似于数据流按照 tag 分组。数据流向下游的时候只会进入 tag 相匹配的处理器。
- time:event 产生的时间,该字段通常由日志内的时间字段解析出来。
- record:日志的内容,为 JSON 格式。
fluentd 支持多种数据的解析过滤和输出操作。其中常用的有:
- tail 输入:增量读取日志文件作为数据源,支持日志滚动。
- exec 输入:定时执行命令,获取输出解析后作为数据源。
- syslog 输出:解析标准的 syslog 日志作为输入。
- forward 输入:接收其他 fluentd 转发来的数据作为数据源。
- dummy:虚拟数据源,可以定时产生假数据,用于测试。
- regexp 解析器:使用正则表达式命名分组的方式提取出日志内容为 JSON 字段。
- record_transformer 过滤器:人为修改 record 内的字段。
- file 输出:用于将 event 落地为日志文件。
- stdout:将 event 输出到 stdout。如果 fluentd 以 daemon 方式运行,输出到 fluentd 的运行日志中。
- forward:转发 event 到其他 fluentd 节点。
- copy:多路输出,复制 event 到多个输出端。
- kafka:输出 event 到 Kafka。
- webhdfs:输出 event 到 HDFS。
- elasticsearch:输出 event 到 HDFS。
接下来以官网介绍为基础,穿插自己的理解,介绍下 fluentd 的使用方法。
安装启动方法
官网安装步骤链接:https://docs.fluentd.org/installation/install-by-rpm
下面是精简的在 CentOS 下的安装步骤。打开 shell,执行如下命令:
curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh | sh
systemctl start td-agent
可以安装并启动 fluentd。
配置文件位置
编辑 fluentd 配置文件的方法:
vim /etc/td-agent/td-agent.conf
修改运行用户和组
默认来说 fluentd 使用 td-agent 用户启动。如果需要修改 fluentd 的用户,需要执行:
vim /usr/lib/systemd/system/td-agent.service
文件内容如下所示:
[Unit]
Description=td-agent: Fluentd based data collector for Treasure Data
Documentation=https://docs.treasuredata.com/articles/td-agent
After=network-online.target
Wants=network-online.target
[Service]
User=td-agent
Group=td-agent
LimitNOFILE=65536
Environment=LD_PRELOAD=/opt/td-agent/embedded/lib/libjemalloc.so
Environment=GEM_HOME=/opt/td-agent/embedded/lib/ruby/gems/2.4./
Environment=GEM_PATH=/opt/td-agent/embedded/lib/ruby/gems/2.4./
Environment=FLUENT_CONF=/etc/td-agent/td-agent.conf
Environment=FLUENT_PLUGIN=/etc/td-agent/plugin
Environment=FLUENT_SOCKET=/var/run/td-agent/td-agent.sock
Environment=TD_AGENT_LOG_FILE=/var/log/td-agent/td-agent.log
Environment=TD_AGENT_OPTIONS=
EnvironmentFile=-/etc/sysconfig/td-agent
PIDFile=/var/run/td-agent/td-agent.pid
RuntimeDirectory=td-agent
Type=forking
ExecStart=/opt/td-agent/embedded/bin/fluentd --log $TD_AGENT_LOG_FILE --daemon /var/run/td-agent/td-agent.pid $TD_AGENT_OPTIONS
ExecStop=/bin/kill -TERM ${MAINPID}
ExecReload=/bin/kill -HUP ${MAINPID}
Restart=always
TimeoutStopSec=120
[Install]
WantedBy=multi-user.target
修改Service
部分User
和Group
配置项可以更改 fluentd 进程的用户和组。
检测配置文件是否正确的方法
在 shell 中运行:
/opt/td-agent/embedded/bin/fluentd -c /etc/td-agent/td-agent.conf
观察输出,如果有错误会给出对应提示。
数据流逻辑
fluentd 以 tag 值为基准,决定数据的流经哪些处理器。
数据的流向为:source -> parser -> filter -> output
input 配置
tail
增量读取日志文件。需要提供一个用于标记已经读取到位置的文件(position file)所在的路径。
tail 针对日志滚动的支持:tail 方式采用跟踪文件 inode 的方式进行。比如日志名为app.log
,如果日志发生滚动,被重命名为app.log.1
。文件重命名的时候 inode 是不会改变的。因此发生滚动时写入到旧文件末尾的日志也可以被收集到。tail 会跟踪旧文件的 inode 一段时间(rotate_wait
配置),这段时间过去之后,tail 不再监听app.log.1
,开始监听新的app.log
文件。
tail 方式的示例配置:
<source>
@type tail
path /var/log/httpd-access.log
pos_file /var/log/td-agent/httpd-access.log.pos
tag apache.access
<parse>
@type apache2
</parse>
</source>
注意:如果文件发生修改会输出全量文件内容。
配置项解释
tag:数据源的 tag 值。*
号可以扩展为 path(/
替换为.
)。例如
path /path/to/file
tag foo.*
tag 会被扩展为foo.path.to.file
path:配置读取的路径。可以使用*
或者是strftime
。例如:
path /path/to/%Y/%m/%d/*
如果今天是 2020 年 1 月 2 日,fluentd 会读取/path/to/2020/01/02
目录下的内容。也可以配置多个路径,使用逗号分隔:
path /path/to/a/*,/path/to/b/c.log
exclude_path:排除部分目录或文件,使用数组格式配置。
path /path/to/*
exclude_path ["/path/to/*.gz", "/path/to/*.zip"]
refresh_interval:多长时间刷新一次文件监听列表,配合*
使用才有意义。
pos_file:位置文件地址。这个文件保存了监听的日志文件已经读取到第几行。该项一定要配置。注意,不要在多个 source 之间共用 pos file,否则会出现问题。pos_file_compaction_interval:pos file 文件压缩时间间隔。用于压缩 pos file 中不再监听的记录,不可解析的记录以及重复的记录。
parse 标签:用于指定 log 的解析器(必须的配置项)。例如:
# json
<parse>
@type json
</parse>
# regexp
<parse>
@type regexp
expression ^(?<name>[^ ]*) (?<user>[^ ]*) (?<age>\d*)$
</parse>
path_key:如果配置此项,监控文件的 path 会在 event 中,此项的 key 为path_key
。例如:
path /path/to/access.log
path_key tailed_path
生成的数据如下所示:
{"tailed_path":"/path/to/access.log","k1":"v1",...,"kN":"vN"}
rotate_wait:日志发生滚动的时候,可能会有部分日志仍然输出在旧的日志文件,此时需要保持监听旧日志文件一段时间,这个时间配置就是rotate_wait
。
exec
周期性执行命令,抽取命令输出为 event。
示例配置:
<source>
@type exec
command cmd arg arg
<parse>
keys k1,k2,k3
</parse>
<extract>
tag_key k1
time_key k2
time_format %Y-%m-%d %H:%M:%S
</extract>
run_interval 10s
</source>
以上命令的含义为每 10 秒钟执行cmd arg arg
命令,提取命令执行结果,以空白字符分隔三个字段的值为 k1,k2,k3。其中 k1 的值作为 tag,k2 作为时间字段,使用%Y-%m-%d %H:%M:%S
格式。
一个例子,周期获取系统的平均负载。配置方法如下:
<source>
@type exec
tag system.loadavg
command cat /proc/loadavg | cut -d ' ' -f 1,2,3
run_interval 1m
<parse>
@type tsv
keys avg1,avg5,avg15
delimiter " "
</parse>
</source>
输出的日志格式为:
2018-06-29 17:27:35.115878527 +0900 system.loadavg: {"avg1":"0.30","avg5":"0.20","avg15":"0.05"}
syslog
连接 rsyslog。可以作为 rsyslog 的接收端。
一个配置的例子:
<source>
@type syslog
port 5140
bind 0.0.0.0
tag system
</source>
fluentd 打开 5140 端口监听 rsyslog 发来的 log。
rsyslog 配置文件/etc/rsyslog.conf
设置为:
# Send log messages to Fluentd
*.* @127...1:5140
fluentd 解析到的 event 格式如下:
tag = "#{@tag}.#{facility}.#{priority}"
time = 1353436518,
record = {
"host": "host",
"ident": "ident",
"pid": "12345",
"message": "text"
}
dummy
专用于测试的数据源。周期产生假数据。
配置举例:
<source>
@type dummy
dummy {"hello":"world"}
</source>
dummy 常用参数:
- tag: 标记值
- size:每次发送的 event 数量
- rate:每秒产生多少个 event
- auto_increment_key:自增键名。如果配置了此项,会有一个 key 为该配置项值的自增键
- suspend:重启后自增值是否重新开始
- dummy:测试数据内容
forward
用于接收其他 fluentd forward 过来的 event。
示例配置:
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
output 配置
file
输出 event 为文件。默认每天输出一个日志文件。
示例配置:
<match pattern>
@type file
path /var/log/fluent/myapp
compress gzip
<buffer>
timekey 1d
timekey_use_utc true
timekey_wait 10m
</buffer>
</match>
包含的参数类型:
- path:path 支持 placeholder,可以在日志路径中嵌入时间,tag 和 record 中的字段值。例如:
path /path/to/${tag}/${key1}/file.%Y%m%d
<buffer tag,time,key1>
# buffer parameters
</buffer>
注意:buffer 标签后面的内容为 buffer chunk key。Buffer 根据这些 key 分段。
- append:flush 的 chuck 是否追加到已存在的文件后。默认为 false,便于文件的并行处理。
- format 标签,用来规定文件内容的格式,默认值为 out_file。
- inject 标签,用来为 event 增加 time 和 tag 等字段。
- add_path_suffix:是否增加 path 后缀
- path_suffix:path 后缀内容,默认为
.log
。 - compress:采用什么压缩格式,默认不压缩。
- recompress:是否在 buffer chunk 已经压缩的情况再次压缩,默认为 false。
forward
将 event 转发到其他的 fluentd 节点。如果配置了多个 fluentd 节点,会使用负载均衡和支持容错的方式发送。如果需要发送多份数据,需要使用 copy。
配置示例:
<match pattern>
@type forward
send_timeout 60s
recover_wait 10s
hard_timeout 60s
<server>
name myserver1
host 192.168.1.3
port 24224
weight 60
</server>
<server>
name myserver2
host 192.168.1.4
port 24224
weight 60
</server>
...
<secondary>
@type file
path /var/log/fluent/forward-failed
</secondary>
</match>
server 标签内可以配置如下字段:
- host
- name
- port
- shared_key
- username
- password
- standby 标记 server 为备用,只有其他 node 不可用的时候才会启用 standby 的 node
- weight 负载均衡的权重配置
copy
多路输出(复制 event 到多个输出端)
示例配置
<match pattern>
@type copy
<store>
@type file
path /var/log/fluent/myapp1
...
</store>
<store>
...
</store>
<store>
...
</store>
</match>
其中每一个 store 是一路输出。
重要参数:
- copy_mode:复制模式。可选值有
- no_copy:每路输出共享 event。
- shallow:浅拷贝,如果不修改嵌套字段可以使用。
- deep:深拷贝,使用
msgpack-ruby
方式。 - marshal:深拷贝,使用
marshal
方式。 - store 标签的 ignore_error 参数:如果被标记 ignore_error 的 store 出现错误,不会影响其他的 store。官网的例子为:
<match app.**>
@type copy
<store>
@type plugin1
</store>
<store>
@type plugin2
</store>
</match>
假如 plugin1 出现错误,plugin2 也不会执行。如果在 plugin1 的 store 添加上 ignore_error 参数,如下所示:
<match app.**>
@type copy
<store ignore_error>
@type plugin1
</store>
<store>
@type plugin2
</store>
</match>
上述情况 plugin2 的运行不受影响。通常为不重要的 store 添加 ignore_error 参数。
http
通过 http 请求的方式发送 event。payload 的格式由 format 标签决定。
示例配置:
<match pattern>
@type http
endpoint http://logserver.com:9000/api
open_timeout 2
<format>
@type json
</format>
<buffer>
flush_interval 10s
</buffer>
</match>
该例子使用 http 方式将 event 发送到http://logserver.com:9000/api
,使用 post 方式,连接超时时间为 2 秒。输出格式为 json,每 10 秒钟输出一次。
注意:
如果使用 JSON 的方式发送,HTTP 请求的 content-type 为 application/x-ndjson (newline-delimited JSONs)。如果用 spring mvc 接收会提示不支持。可以使用HTTPServletRequest
接收 request body。
stdout
标准输出的模式,如果使用后台模式运行 fluentd,输出到 fluentd 的日志。多用于 debug 的时候。
配置方法:
<match pattern>
@type stdout
</match>
elasticsearch
输出 event 到 elasticsearch。
示例配置:
<match my.logs>
@type elasticsearch
host localhost
port 9200
logstash_format true
</match>
可选参数:
- host:单个 elasticsearch 节点地址
- port:单个 elasticsearch 节点的端口号
- hosts:elasticsearch 集群地址。格式为 ip1:port1,ip2:port2...
- user 和 password:elasticsearch 的认证信息
- scheme:使用 https 还是 http。默认为 http 模式
- path:REST 接口路径,默认为空
- index_name:index 名称
- logstash_format:index 是否使用 logstash 命名方式(
logstash-%Y.%m.%d
),默认不启用 - logstash_prefix:logstash_format 启用的时候,index 命名前缀是什么。默认为
logstash
kafka
把 event 输出到 kafka。
示例配置如下:
<match pattern>
@type kafka2
# list of seed brokers
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>
use_event_time true
# buffer settings
<buffer topic>
@type file
path /var/log/td-agent/buffer/td
flush_interval 3s
</buffer>
# data type settings
<format>
@type json
</format>
# topic settings
topic_key topic
default_topic messages
# producer settings
required_acks -1
compression_codec gzip
</match>
重要的参数为:
- brokers:Kafka brokers 的地址和端口号
- topic_key:record 中哪个 key 对应的值用作 Kafka 消息的 key
- default_topic:如果没有配置 topic_key,默认使用的 topic 名字
- format 标签:确定发送的数据格式
- use_event_time:是否使用 fluentd event 的时间作为 Kafka 消息的时间。默认为 false。意思为使用当前时间作为发送消息的时间
- required_acks:producer acks 的值
- compression_codec:压缩编码方式
webhdfs
event 通过 REST 方式写入到 HDFS。
HADOOP 启用 webhdfs 的方法
core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://10.180.210.172:9000</value>
</property>
</configuration>
hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.http.address</name>
<value>0.0.0.0:50070</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.support.append</name>
<value>true</value>
</property>
<property>
<name>dfs.support.broken.append</name>
<value>true</value>
</property