Java+Flume實現日志數據傳輸
如果要在Java之中進行日志的采集處理,很明顯應該使用的是log4j組件,現在准備將log4j采集到的數據通過flume進行處理。
1、如果要想將日志數據發送給flume,flume.conf必須改如下配置:
a1.sources.r1.type = avro
2、修改log4j.properties配置文件,實現Flume數據輸出:
log4j.logger.cn.mldn.myflume=INFO,flume log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.layout=org.apache.log4j.PatternLayout ##局域網環境測試通過 ##外網遠程環境測試不通過 log4j.appender.flume.Hostname=192.168.0.106 log4j.appender.flume.Port=44444
3、啟動Flume
3.1、window啟動Flume
cd D:\dev\apache-flume-1.7.0-bin\bin\ flume-ng.cmd agent --conf D:/dev/apache-flume-1.7.0-bin/conf --conf-file D:/dev/apache-flume-1.7.0-bin/conf/flume.conf --name a1 -property "flume.root.logger=INFO,console"
3.2、linux啟動Flume
cd /root/soft/apache-flume-1.7.0-bin/bin ./flume-ng agent --conf ../conf/ --conf-file ../conf/flume.conf --name a1 -Dflume.root.logger=INFO,console
4、TestFlumeDemo.java
package cn.lynch.myflume; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestFlumeDemo { private static final Logger LOGGER = LoggerFactory .getLogger(TestFlumeDemo.class); public static void main(String[] args) { for (int x = 0 ; x < 10 ; x ++) { LOGGER.info("lynch.cn" + x); } } }
執行main方法,在Flume控制台可以看到log4j信息
2020-07-02 00:05:05,628 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504422, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 30 0D 0A lynch.cn0.. } 2020-07-02 00:05:05,628 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504462, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 31 0D 0A lynch.cn1.. } 2020-07-02 00:05:05,629 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504463, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 32 0D 0A lynch.cn2.. } 2020-07-02 00:05:05,630 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504464, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 33 0D 0A lynch.cn3.. } 2020-07-02 00:05:05,630 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504465, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 34 0D 0A lynch.cn4.. } 2020-07-02 00:05:05,630 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504466, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 35 0D 0A lynch.cn5.. } 2020-07-02 00:05:05,631 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504467, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 36 0D 0A lynch.cn6.. } 2020-07-02 00:05:05,631 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504468, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 37 0D 0A lynch.cn7.. } 2020-07-02 00:05:05,631 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504469, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 38 0D 0A lynch.cn8.. } 2020-07-02 00:05:05,631 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{flume.client.log4j.timestamp=1593619504470, flume.client.log4j.logger.name=cn.mldn.myflume.TestFlumeDemo, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 6C 79 6E 63 68 2E 63 6E 39 0D 0A lynch.cn9.. }
window啟動Flume報了如下異常:
D:\dev\apache-flume-1.7.0-bin\bin\flume-ng.ps1 : 找不到與參數名稱“Dflume.root.logger=INFO,console”匹配的參數。 + CategoryInfo : InvalidArgument: (:) [flume-ng.ps1],ParentContainsErrorRecordException + FullyQualifiedErrorId : NamedParameterNotFound,flume-ng.ps1
解決方案:
a、在conf目錄下復制flume-env.ps1.template改為flume-env.ps1,改下FLUME_CLASSPATH:
$FLUME_CLASSPATH="D:\dev\apache-flume-1.7.0-bin\lib"
b、flume-env.sh操作同上
c、D:\dev\apache-flume-1.7.0-bin\bin\flume-ng.ps1,如下三個選項注釋掉
#$hadoopHome = GetHadoopHome
#$hbaseHome = GetHbaseHome
#$hiveHome = GetHiveHome
d、檢查安裝是否成功
cd D:\dev\apache-flume-1.7.0-bin\bin\ flume-ng.cmd version