flume使用详解


1.    Flume简介

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。

这篇文章介绍的是Flume 1.7版本,flume v1.7新增了tailDir数据源。

1.1 系统要求

Flume1.7运行系统要求:jdk1.7,linux

由于taildir的实现是基于jdk1.7的,所以要求jdk版本在1.7以上。

Flume也可以运行的windows上。但是在启动及管理比较繁琐。在官方的文档介绍中启动命令等都是linux基础上。另外部分flume组件的运行只有linux系统支持,比如taildir source中对文件按照inode来唯一标识,然而windows系统中文件没有inode的概念。所以本篇也是基于linux系统。

1.2  资料整理

在搜索引擎中输入flume将会得到很多资料。官方文档如下。查看官方资料对于学习新事物非常重要。

Flume介绍:http://flume.apache.org/

可以在这个网站下载flume。不过关于flume其他的原理或入门例子等,建议查看flume用户手册

 

Flume用户手册:http://flume.apache.org/FlumeUserGuide.html

Flume开发者手册:http://flume.apache.org/FlumeDeveloperGuide.html

Flume github源码:https://github.com/apache/flume

1.3 flume 原理介绍

                       

图 1 flume agent 组成结构

 

一个flume由三个部分组成:source,channel,sink。根据官方的介绍原文,我整理如下:

  1. Source:A source consumes events delivered to it by external source.
  2. Channel: when a source receive an event, it stores it into one or more channels.The channel is a passive store that keeps the event until it’s consumed by a flume sink
  3. Sink: The sink remove the event from the channel and puts it into an exteral repository like HDFS.
  4. The source and sink within the given agent run asynchronously with the events staged in the channel.

1.4 flume agent 示例

  1. 配置文件

下载好flume解压后,在conf文件夹下存放着配置文件模板,可以复制一份重命名后在此基础上进行修改。

# example.conf: A single-node Flume configuration
 

# 指定flume组件的名称,agent名为a1,source为r1,sink为k1,channel为c1

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# 配置source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444


# 配置sink,logger表示接受到的event将直接展示到console,这个类型经常在调试时使用

a1.sinks.k1.type = logger


#配置 channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# 给source及sink指定channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

 

  1. 启动

使用flume-ng shell脚本进行启动,如下:

$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template -Dflume.root.logger=INFO,console

启动命令由4部分组成:

-n $agent_name:这里指定启动的agent 名,按照配置文件中的命名这里应该替换成a1

-c conf: 指定配置文件目录,可以是相对路径或绝对路径

-f conf/flume-conf.properties.template :指定具体配置文件名

-Dflume.root.logger=INFO,console:将flume运行日志展示到console台,这个是可选的,但是一般都需要加上,便于查看flume运行情况。

 

  1. 运行结果

在另外一个终端,使用telnet命令发送Hello world!

因为根据配置文件我们指定了netcat类型的source是监听在本机的44444端口上。

$ telnet localhost 44444

Trying 127.0.0.1...

Connected to localhost.localdomain (127.0.0.1).

Escape character is '^]'.

Hello world! <ENTER>

OK

 

将在flume运行的控制台查看到sink已经将接受到的event打印到控制台。

12/06/19 15:32:19 INFO source.NetcatSource: Source starting

12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]

12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D          Hello world!. }

至此,一个完整的flume运行过程完成。

 

2.    Flume Source

Flume不仅提供了丰富的source类型,可以直接使用,目前已经覆盖了很多应用场景。同时也支持自定义source。

在这里简单介绍下exec,spooldir,taildir三种source。其他类型及具体详情请查看官方文档Flume Source章节

2.1  Exec

使用exec作为数据源,需要指定执行的shell命令。经常使用到的命令tail -F [file],来读取新增到日志文件的内容。

缺点:数据可能丢失,官方推荐spooldir作为数据源。

 

2.2  Spooldir

Spooldir将从指定的文件夹中读取文件,并且是按行读取文件中的内容。如果指定的文件夹中出现新文件,也将会被识别并读取。Spooldir将读取完的文件进行重命名(默认添加.COMPLETE)或永久删除。

优点:Spooldir不会出现丢失数据的情况,即使flume重启或停止。

缺点:1. 放置在spooldir目录中的文件不允许进行修改,否则flume会报错并停止工作

                2. 在spooldir目录中的文件名不可重复使用,否则flume会报错并停止工作

2.3  Taildir

Taildir可以说是exec和spooldir两种source的优点集合。在车载OBD的日志服务功能就是使用此作为数据源。

注意:taildir目前不支持windows系统。

查看源码可以看到在ReliableTaildirEventReader.java实现代码中获取文件的inode,其中“unix”表明仅在linux系统生效:

276   private long getInode(File file) throws IOException { 
277     long inode = (long) Files.getAttribute(file.toPath(), "unix:ino"); 
278     return inode; 
279   } 

 

Taildir数据源将会监控指定目录下所有文件,实时获取新附加到各个文件末尾的内容。它将定时保存各个文件最后读取位置记录到一个json格式的文件。Flume重新启动后将按照此json文件保存的位置开始读取。

如果需要监控多个文件源,并且对各个不同读取到的数据文件进行区别处理,可以使用提供的headerkey。

配置文件举例:

# Describe/configure source1

agent1.sources.s1.type = TAILDIR

agent1.sources.s1.positionFile = ./bin/taildir_position.json

agent1.sources.s1.filegroups = f1 f2

agent1.sources.s1.filegroups.f1 = /home/neoway/apache-flume-1.7.0-bin/log1/.*

agent1.sources.s1.headers.f1.componentName = mqtt

agent1.sources.s1.filegroups.f2 = /home/neoway/apache-flume-1.7.0-bin/log2/.*

agent1.sources.s1.headers.f2.componentName = mybatis

agent1.sources.s1.fileHeader = true

#agent1.sources.s1.channels = c1

agent1.sources.s1.channels = c1

 

读取到event:

2017-08-24 09:48:06:INFO SinkRunner-PollingRunner-DefaultSinkProcessor org.apache.flume.sink.LoggerSink - Event: { headers:{ componentName = mqtt, file=/home/neoway/apache-flume-1.7.0-bin/log/mylineDeserializer.log} body: 32 30 31 37 2D 30 38 2D 32 33 54 31 34 3A 33 30 2017-08-23T14:30 }

2017-08-24 09:48:06:INFO SinkRunner-PollingRunner-DefaultSinkProcessor org.apache.flume.sink.LoggerSink - Event: { headers:{ componentName = mqtt, file=/home/neoway/apache-flume-1.7.0-bin/log/mylineDeserializer.log} body: 32 30 31 37 2D 30 38 2D 32 33 54 31 34 3A 33 30 2017-08-23T14:30 }

 

可以看到在读取到的event中与header部分,在sink部分处理时,可以获取envent的header,从而判断出属于哪个文件源并依此做对应处理。

 

3.    Flume Sink

Flume提供了很多类型的sink,详情可参考flume用户手册的flume sink章节

在车载的日志服务的需求是将读取到的内容保存到mysql数据库中。这里需要使用自定义sink。我参考了这篇文章:http://blog.csdn.net/poisions/article/details/51695372

  1. 自定义mysqlSink类,继承 AbstractSink 并实现 Configurable 。重写start()方法,stop()方法,process()方法
  2. 将编译好的jar包及连接mysql的驱动jar包存放到flume的lib目录下
  3. 在配置文件中配置sink,为自定义mysqlsink的包路径。
agent1.sinks.k1.type = org.flume.mysql.sink.MysqlSink

agent1.sinks.k1.hostname = 192.168.10.136

agent1.sinks.k1.port=3306

agent1.sinks.k1.databaseName=carcloud

agent1.sinks.k1.recordTableName=log_record

agent1.sinks.k1.configTableName=log_config

agent1.sinks.k1.projectName= carcloud

#the string that joint all componentNames by ',' and each componentName come from filegroups's fileHeader;

agent1.sinks.k1.componentNames = mqtt,mybatis

agent1.sinks.k1.user=root

agent1.sinks.k1.password=123

agent1.sinks.k1.channel = c1

 

增加说明:

在实际应用中,我参考的http://blog.csdn.net/poisions/article/details/51695372示例不能满足商用需求。比如使用原生的jdbc连接mysql会出现wait_timeout的情况,报错 No operations allowed after connection closed。这个问题我最后放弃原生jdbc,使用spring的JdbcTemplate代替完成。瞬间感觉代码健壮了不少。原谅我之前懒。

同时这个文章中bathsize=100,每100个event才提交一次,非常容易出现错误:The channel is full or unexpected failure导致flume停止工作。我修改为不使用bathsize,每次处理一个event并提交。

4. Flume探索路上遇到的问题

4.1  在windows系统运行flume

在除接触flume时,一直在windows上尝试启动flume,碰到很多问题。慢慢查多资料发现flume设计的命令都是linux的,从而转战到linux系统。这也是对linux系统不熟悉造成的坑。

 

4.2  安装路径有空格

在linux安装路径上的目录有空格,也会出现问题。在文件及文件夹命名时使用空格是个坏习惯,可以使用‘-’代替空格。

 

4.3  Taildir重复读取

在taildir测试的时候,遇到了往taildir监控的文件中追加内容时,总是会从头读取文件的内容,而不是仅读取新添加的这一行内容。

测试环境是这样的:

  1. 使用 sed命令往目标文件追加内容
  2. 查询数据库,数据库表中增加了目标文件所有行内容,而非仅仅是上一步sed的行内容。

一度怀疑taildir是否能读取追加的内容。并且检查了所有的配置,均无效。

查询资料也完全没有提到过使用taildir会重复读取的问题。

最后将源码拷贝下来,自定义为myTaildirSource。并且在运行的关键部分打印日志,顺便了解下tailDir的运行过程。

根据日志发现每次往目标文件中sed内容后,taildir显示目标文件的inode发送了变化,从而被识别为新文件,难怪会从头读取。

接下来查阅资料关于linux系统的inode机制,什么情况下会导致inode发送变化。根据查阅的资料inode仅在重命名后,或者删除后再次新建一个同名的文件时 inode发送变化。

最后无意去查看了车载项目产生的日志文件的inode,在往日志文件中追加内容时inode不会发送变化。至此问题解决。

附:使用ls –i 可查看文件的inode

 

4.4  Flume后台启动

前面介绍的启动命令是在前台直接运行的,这时不能关闭这个界面,否则flume也被停止。

在后台启动flume命令:

./bin/flume-ng agent --conf ../conf --conf-file ../conf/x1_dir_to_db_flume.conf --name a1 -Dflume.root.logger=INFO,console > x1nohup.out 2>&1 &

在原来的启动命令上增加> x1nohup.out 2>&1 &即可实现后台启动。并且flume的运行日志都将打印到x1nohup.out文件中。

为了方便减少每次都复制启动代码。可以将启动代码前添加

#!/bin/sh

,然后将此内容保存在新建的文件例如start.sh中。注意start.sh的路径,要放在./bin对应的路径上。最后chmod +x start.sh 给这个文件增加执行权限,即可通过执行start.sh来启动flume。

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM