Java+Flume实现日志数据传输
如果要在Java之中进行日志的采集处理,很明显应该使用的是log4j组件,现在准备将log4j采集到的数据通过flume进行处理。
1、如果要想将日志数据发送给flume,flume.conf必须改如下配置:
a1.sources.r1.type = avro
2、修改log4j.properties配置文件,实现Flume数据输出:
log4j.logger.cn.mldn.myflume=INFO,flume log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.layout=org.apache.log4j.PatternLayout ##局域网环境测试通过 ##外网远程环境测试不通过 log4j.appender.flume.Hostname=192.168.0.106 log4j.appender.flume.Port=44444
3、启动Flume
3.1、window启动Flume
cd D:\dev\apache-flume-1.7.0-bin\bin\ flume-ng.cmd agent --conf D:/dev/apache-flume-1.7.0-bin/conf --conf-file D:/dev/apache-flume-1.7.0-bin/conf/flume.conf --name a1 -property "flume.root.logger=INFO,console"
3.2、linux启动Flume
cd /root/soft/apache-flume-1.7.0-bin/bin ./flume-ng agent --conf ../conf/ --conf-file ../conf/flume.conf --name a1 -Dflume.root.logger=INFO,console
4、TestFlumeDemo.java
package cn.lynch.myflume; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestFlumeDemo { private static final Logger LOGGER = LoggerFactory .getLogger(TestFlumeDemo.class); public static void main(String[] args) { for (int x = 0 ; x < 10 ; x ++) { LOGGER.info("lynch.cn" + x); } } }
执行main方法,在Flume控制台可以看到log4j信息
2020-07-02 00:05:05,628 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504422, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 30 0D 0A lynch.cn0.. } 2020-07-02 00:05:05,628 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504462, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 31 0D 0A lynch.cn1.. } 2020-07-02 00:05:05,629 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504463, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 32 0D 0A lynch.cn2.. } 2020-07-02 00:05:05,630 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504464, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 33 0D 0A lynch.cn3.. } 2020-07-02 00:05:05,630 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504465, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 34 0D 0A lynch.cn4.. } 2020-07-02 00:05:05,630 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504466, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 35 0D 0A lynch.cn5.. } 2020-07-02 00:05:05,631 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504467, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 36 0D 0A lynch.cn6.. } 2020-07-02 00:05:05,631 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504468, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 37 0D 0A lynch.cn7.. } 2020-07-02 00:05:05,631 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504469, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 38 0D 0A lynch.cn8.. } 2020-07-02 00:05:05,631 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504470, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 39 0D 0A lynch.cn9.. }
window启动Flume报了如下异常:
D:\dev\apache-flume-1.7.0-bin\bin\flume-ng.ps1 : 找不到与参数名称“Dflume.root.logger=INFO,console”匹配的参数。 + CategoryInfo : InvalidArgument: (:) [flume-ng.ps1],ParentContainsErrorRecordException + FullyQualifiedErrorId : NamedParameterNotFound,flume-ng.ps1
解决方案:
a、在conf目录下复制flume-env.ps1.template改为flume-env.ps1,改下FLUME_CLASSPATH:
$FLUME_CLASSPATH="D:\dev\apache-flume-1.7.0-bin\lib"
b、flume-env.sh操作同上
c、D:\dev\apache-flume-1.7.0-bin\bin\flume-ng.ps1,如下三个选项注释掉
#$hadoopHome = GetHadoopHome
#$hbaseHome = GetHbaseHome
#$hiveHome = GetHiveHome
d、检查安装是否成功
cd D:\dev\apache-flume-1.7.0-bin\bin\ flume-ng.cmd version
