一:flume簡介及安裝
Flume學習筆記:Flume的安裝與基礎應用
Flume學習之路 (一)Flume的基礎介紹
Flume學習筆記:Flume集群的Avro RPC實現
Flume學習之路 (二)Flume的Source類型
Flume學習之路 (三)Flume的配置方式
flume攔截器
Flume學習系列(四)---- Interceptors(攔截器)
Flume中的HDFS Sink配置
Flume常見錯誤整理
(一)flume連接Hadoop集群
1.啟動Hadoop集群
(1)分布式集群HA模式部署:https://www.cnblogs.com/ssyfj/p/12369486.html (2)若是啟動過程出現下述問題:retry.RetryInvocationHandler (RetryInvocationHandler.java:invoke則重啟zkfc ---- hadoop-daemon.sh start zkfc
2.配置flume
(1)將hadoop根目錄下的share/hadoop/common/下的jar包、以及share/hadoop/common/lib下的jar包拷貝到flume根目錄的lib目錄下 (2)將hadoop根目錄下的share/hadoop/hdfs/下的jar包、以及share/hadoop/hdfs/lib下的jar包拷貝到flume根目錄的lib目錄下 (3)將hadoop根目錄下的etc/hadoop/下的core-site.xml和hdfs-site.xml拷貝到flume根目錄下的conf目錄下
3.編寫flume配置文件
#agent1 #name the component on this agent a1.sources = r1 a1.channels = c1 a1.sinks = s1 #config source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/hadoop/App/apache-flume-1.6.0-bin/SourceData a1.sources.r1.channels = c1 #config channel --- file a1.channels.c1.type = file a1.channels.c1.dataDirs = /home/hadoop/App/apache-flume-1.6.0-bin/ChannelData #config sink a1.sinks.s1.type = hdfs a1.sinks.s1.channel = c1 a1.sinks.s1.hdfs.path = hdfs://ns1/flume/logdfs/%y-%m-%d/%H%M/%S a1.sinks.s1.hdfs.round = true a1.sinks.s1.hdfs.roundValue = 10 a1.sinks.s1.hdfs.roundUnit = minute a1.sinks.s1.hdfs.useLocalTimeStamp = true a1.sinks.s1.hdfs.fileType = DataStream
4.啟動flume
flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agentdfs.conf --name a1 -Dflume.root.logger=INFO,console

5.上傳文件到spoolDir


6.hdfs系統中查看文件(發現是flume采集的數據)

7.補充:采用的文件系統存儲channel數據,是將數據持久化到磁盤中,所以我們可以在指定的目錄下查看信息

(二)flume連接Kafka集群
1.先啟動zookeeper集群
2.進入Kafka根目錄,啟動Kafka進程
bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

3.生成一個topic,用於從flume獲取數據
bin/kafka-topics.sh --create --zookeeper hadoopH5:2181 --replication-factor 3 --partitions 1 --topic flume
只需要在任意一台中創建,后面會自動同步

4.編寫flume配置文件
#agent1 #name the component on this agent a1.sources = r1 a1.channels = c1 a1.sinks = s1 #config source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/hadoop/App/apache-flume-1.6.0-bin/SourceData a1.sources.r1.channels = c1 #config channel a1.channels.c1.type = memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity = 100 #config sink a1.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.s1.topic = flume a1.sinks.s1.brokerList =hadoopH5:9092,hadoopH6:9092,hadoopH7:9092 #注意:主機名我們是在hosts文件中添加映射過的 a1.sinks.s1.channel = c1
5.啟動flume
flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agentkafka.conf --name a1 -Dflume.root.logger=INFO,console

6.拷貝文件到spoolDir中

7.開啟Kafka消費者端,讀取topic:flume數據
bin/kafka-console-consumer.sh --zookeeper hadoopH5:2181 --from-beginning --topic flume

二:flume分布式安裝
flume高可用集群安裝
Flume NG高可用集群搭建詳解(基於flume-1.7.0)
flume集群高可用連接kafka集群
Flume架構
基於Flume的日志收集系統(一)架構和設計
(一)flume分布式簡單實現

第一個是agent1,第二個是agent2
(二)配置文件修改
agent1.conf
#agent1 #name the component on this agent a1.sources = r1 a1.channels = c1 a1.sinks = s1 #config source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/hadoop/App/apache-flume-1.6.0-bin/data a1.sources.r1.channels = c1 #config channel a1.channels.c1.type = memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity = 100 #config sink a1.sinks.s1.type = avro a1.sinks.s1.channel = c1 a1.sinks.s1.hostname = hadoopH2 a1.sinks.s1.port = 4141
agent2.conf
#agent1 #name the component on this agent a1.sources = r1 a1.channels = c1 a1.sinks = s1 #config source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 #綁定本機的ip和端口,進行監聽 a1.sources.r1.port = 4141 a1.sources.r1.channels = c1 #config channel a1.channels.c1.type = memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity = 100 #config sink a1.sinks.s1.type = logger a1.sinks.s1.channel = c1
(三)啟動flume
1.先啟動后面的節點agent2
flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agent2.conf --name a1 -Dflume.root.logger=INFO,console

2.后啟動agent1
flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agent1.conf --name a1 -Dflume.root.logger=INFO,console

(四)flume數據采集測試
1.向指定目錄下傳送文件

2.agent2查看采集的數據

三:分布式flume集群對比(NG、OG)
(一)Flume簡介
Flume NG是一個分布式、可靠、可用的系統,它能夠將不同數據源的海量日志數據進行高效收集、聚合,最后存儲到一個中心化數據存儲系統中,方便進行數據分析。事實上flume也可以收集其他信息,不僅限於日志。由原來的Flume OG到現在的Flume NG,進行了架構重構,並且現在NG版本完全不兼容原來的OG版本。相比較而言,flume NG更簡單更易於管理操作。
總之:Flume NG相比於Flume OG更好
(二)flume OG分布式集群組件---更符合我們之前學的分布式系統(用於理解,不進行實現)
OG有三個組件agent、collector、master,agent主要負責收集各個日志服務器上的日志,將日志聚合到collector,可設置多個collector,master主要負責管理agent和collector,最后由collector把收集的日志寫的HDFS中,當然也可以寫到本地、給storm、給Hbase。

(三)flume NG---不需要master節點,系統整體實現負載均衡
NG最大的改動就是不再有分工角色設置,所有的都是agent,可以彼此之間相連,多個agent連到一個agent,此agent也就相當於collector了,NG也支持負載均衡.

四:高可用flume分布式系統集群安裝(NG)
防止一個agent失敗,導致數據采集失敗:

控制Source的鏈接數:Source無法同時保持很大的連接數。分組實現

控制Sink的鏈接數:每個Agent Sink都會和數據庫保持一個連接,當Agent變多時,連接數最終會超過數據庫的限制。分層實現

這里采用該拓撲圖進行集群構建。注意:數據只是在一條鏈路中傳送,如果某個節點出現故障,會按照優先級選擇一條其他路徑傳遞數據,避免了鏈路數據冗余
(一)集群節點規划

(二)啟動zookeeper集群(同上)
(三)啟動hdfs系統(同上)
(四)flume配置文件編寫---數據源agent(注意:編寫的代碼后面不要加空格和注釋)
1.在hadoopH3中編寫agentdata.conf
#agent 在數據源節點上,這里是在hadoopH3上 ag.sources = r1 ag.channels = c1 ag.sinks = k1 k2 k3 k4 #set group ag.sinkgroups = g1 #set source ag.sources.r1.channels = c1 ag.sources.r1.type = spooldir ag.sources.r1.spoolDir = /home/hadoop/App/apache-flume-1.6.0-bin/data #set channel ag.channels.c1.type = memory ag.channels.c1.capacity = 1000 ag.channels.c1.transactionCapacity = 100 #set sink1 #配置agent1主機 ag.sinks.k1.channel = c1 ag.sinks.k1.type = avro ag.sinks.k1.hostname = hadoopH1 ag.sinks.k1.port = 6666 #set sink2 #配置agent2主機 ag.sinks.k2.channel = c1 ag.sinks.k2.type = avro ag.sinks.k2.hostname = hadoopH2 ag.sinks.k2.port = 6666 #set sink2 #配置agent3主機 ag.sinks.k3.channel = c1 ag.sinks.k3.type = avro ag.sinks.k3.hostname = hadoopH3 ag.sinks.k3.port = 6666 #set sink2 #配置agent4主機 ag.sinks.k4.channel = c1 ag.sinks.k4.type = avro ag.sinks.k4.hostname = hadoopH4 ag.sinks.k4.port = 6666 #set sink group ag.sinkgroups.g1.sinks = k1 k2 k3 k4 ag.sinkgroups.g1.processor.type = failover ag.sinkgroups.g1.processor.priority.k1 = 10 ag.sinkgroups.g1.processor.priority.k2 = 9 ag.sinkgroups.g1.processor.priority.k3 = 5 ag.sinkgroups.g1.processor.priority.k4 = 4 ag.sinkgroups.g1.processor.maxpenalty = 10000
2.在hadoopH4中編寫agentdata.conf
#agent 在數據源節點上,這里是在hadoopH4上 ag.sources = r1 ag.channels = c1 ag.sinks = k1 k2 k3 k4 #set group ag.sinkgroups = g1 #agent 在數據源節點上,這里是在hadoopH4上 ag.sources = r1 ag.channels = c1 ag.sinks = k1 k2 k3 k4 #set group ag.sinkgroups = g1 #set source ag.sources.r1.channels = c1 ag.sources.r1.type = spooldir ag.sources.r1.spoolDir = /home/hadoop/App/apache-flume-1.6.0-bin/data #set channel ag.channels.c1.type = memory ag.channels.c1.capacity = 1000 ag.channels.c1.transactionCapacity = 100 #set sink1 #設置agent1主機 ag.sinks.k1.channel = c1 ag.sinks.k1.type = avro ag.sinks.k1.hostname = hadoopH1 ag.sinks.k1.port = 6666 #set sink2 #設置agent2主機 ag.sinks.k2.channel = c1 ag.sinks.k2.type = avro ag.sinks.k2.hostname = hadoopH2 ag.sinks.k2.port = 6666 #set sink2 #設置agent3主機 ag.sinks.k3.channel = c1 ag.sinks.k3.type = avro ag.sinks.k3.hostname = hadoopH3 ag.sinks.k3.port = 6666 #set sink2 #設置agent4主機 ag.sinks.k4.channel = c1 ag.sinks.k4.type = avro ag.sinks.k4.hostname = hadoopH4 ag.sinks.k4.port = 6666 #set sink group ag.sinkgroups.g1.sinks = k1 k2 k3 k4 ag.sinkgroups.g1.processor.type = failover ag.sinkgroups.g1.processor.priority.k1 = 5 ag.sinkgroups.g1.processor.priority.k2 = 4 ag.sinkgroups.g1.processor.priority.k3 = 10 ag.sinkgroups.g1.processor.priority.k4 = 9 ag.sinkgroups.g1.processor.maxpenalty = 10000
(五)flume配置文件編寫---數據采集agent
1.在hadoopH1、hadoopH2中配置agent1.conf
#agent信息采集節點 agent1 agent2 ag.sources = r1 ag.channels = c1 ag.sinks = k1 #set source ag.sources.r1.channels = c1 ag.sources.r1.type = avro ag.sources.r1.bind = 0.0.0.0 ag.sources.r1.port = 6666 #set channel ag.channels.c1.type = memory ag.channels.c1.capacity = 1000 ag.channels.c1.transactionCapacity = 100 #set sink1 ag.sinks.k1.channel = c1 ag.sinks.k1.type = avro
#設置controller主機 ag.sinks.k1.hostname = hadoopH3 ag.sinks.k1.port = 8888
2.在hadoopH1、hadoopH2中配置agent1.conf
#agent信息采集節點 agent3 agent4 ag.sources = r1 ag.channels = c1 ag.sinks = k1 #set source ag.sources.r1.channels = c1 ag.sources.r1.type = avro ag.sources.r1.bind = 0.0.0.0 ag.sources.r1.port = 6666 #set channel ag.channels.c1.type = memory ag.channels.c1.capacity = 1000 ag.channels.c1.transactionCapacity = 100 #set sink1 ag.sinks.k1.channel = c1 ag.sinks.k1.type = avro #設置controller主機 ag.sinks.k1.hostname = hadoopH4 ag.sinks.k1.port = 8888
(六)flume配置文件編寫---數據控制controller
1.在hadoopH3、hadoopH4中配置controller.conf
#controller節點,進行數據讀取和寫入 ag.sources = r1 ag.channels = c1 ag.sinks = k1 #set source ag.sources.r1.channels = c1 ag.sources.r1.type = avro ag.sources.r1.bind = 0.0.0.0 ag.sources.r1.port = 8888 #set channel ag.channels.c1.type = memory ag.channels.c1.capacity = 1000 ag.channels.c1.transactionCapacity = 100 #set sink1 ag.sinks.k1.type = hdfs ag.sinks.k1.channel = c1 ag.sinks.k1.hdfs.path = hdfs://ns1/flume/logdfs/%y-%m-%d/%H%M/%S ag.sinks.k1.hdfs.round = true ag.sinks.k1.hdfs.roundValue = 10 ag.sinks.k1.hdfs.roundUnit = minute ag.sinks.k1.hdfs.useLocalTimeStamp = true ag.sinks.k1.hdfs.fileType = DataStream
(七)啟動flume集群
1.啟動hadoopH3、hadoopH4主機中controller節點
flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./controller.conf --name ag -Dflume.root.logger=INFO,console
2.啟動hadoopH1、hadoopH2、hadoopH3、hadoopH4主機中的agent采集節點
flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agent1.conf --name ag -Dflume.root.logger=INFO,console
3.啟動hadoopH3、hadoopH4主機的數據源點采集節點agentdata.conf
flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agentdata.conf --name ag -Dflume.root.logger=INFO,console
(八)數據采集測試
1.拷貝數據到spoolDir目錄下
2.查看hdfs系統目錄


