flume簡介安裝及高可用集群部署


一:flume簡介及安裝

Flume學習筆記:Flume的安裝與基礎應用

Flume學習之路 (一)Flume的基礎介紹

Flume學習筆記:Flume集群的Avro RPC實現

Flume學習之路 (二)Flume的Source類型

Flume學習之路 (三)Flume的配置方式

flume攔截器

Flume學習系列(四)---- Interceptors(攔截器)

Flume中的HDFS Sink配置

Flume常見錯誤整理

(一)flume連接Hadoop集群

1.啟動Hadoop集群

1)分布式集群HA模式部署:https://www.cnblogs.com/ssyfj/p/12369486.html
2)若是啟動過程出現下述問題:retry.RetryInvocationHandler (RetryInvocationHandler.java:invoke則重啟zkfc  ----   hadoop-daemon.sh start zkfc

2.配置flume

1)將hadoop根目錄下的share/hadoop/common/下的jar包、以及share/hadoop/common/lib下的jar包拷貝到flume根目錄的lib目錄下
(2)將hadoop根目錄下的share/hadoop/hdfs/下的jar包、以及share/hadoop/hdfs/lib下的jar包拷貝到flume根目錄的lib目錄下
(3)將hadoop根目錄下的etc/hadoop/下的core-site.xml和hdfs-site.xml拷貝到flume根目錄下的conf目錄下

3.編寫flume配置文件

#agent1
#name the component on this agent
a1.sources = r1
a1.channels = c1
a1.sinks = s1
#config source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/App/apache-flume-1.6.0-bin/SourceData
a1.sources.r1.channels = c1
#config channel --- file
a1.channels.c1.type = file
a1.channels.c1.dataDirs = /home/hadoop/App/apache-flume-1.6.0-bin/ChannelData
#config sink
a1.sinks.s1.type = hdfs
a1.sinks.s1.channel = c1
a1.sinks.s1.hdfs.path = hdfs://ns1/flume/logdfs/%y-%m-%d/%H%M/%S
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundValue = 10
a1.sinks.s1.hdfs.roundUnit = minute
a1.sinks.s1.hdfs.useLocalTimeStamp = true
a1.sinks.s1.hdfs.fileType = DataStream

4.啟動flume

flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agentdfs.conf --name a1 -Dflume.root.logger=INFO,console

5.上傳文件到spoolDir

6.hdfs系統中查看文件(發現是flume采集的數據)

7.補充:采用的文件系統存儲channel數據,是將數據持久化到磁盤中,所以我們可以在指定的目錄下查看信息

(二)flume連接Kafka集群

1.先啟動zookeeper集群

2.進入Kafka根目錄,啟動Kafka進程

bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

3.生成一個topic,用於從flume獲取數據

bin/kafka-topics.sh --create --zookeeper hadoopH5:2181 --replication-factor 3 --partitions 1 --topic flume

只需要在任意一台中創建,后面會自動同步

4.編寫flume配置文件 

#agent1
#name the component on this agent
a1.sources = r1
a1.channels = c1
a1.sinks = s1
#config source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/App/apache-flume-1.6.0-bin/SourceData
a1.sources.r1.channels = c1
#config channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity = 100
#config sink
a1.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.s1.topic = flume
a1.sinks.s1.brokerList =hadoopH5:9092,hadoopH6:9092,hadoopH7:9092  #注意:主機名我們是在hosts文件中添加映射過的
a1.sinks.s1.channel = c1

5.啟動flume

flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agentkafka.conf --name a1 -Dflume.root.logger=INFO,console

6.拷貝文件到spoolDir中

7.開啟Kafka消費者端,讀取topic:flume數據

bin/kafka-console-consumer.sh --zookeeper hadoopH5:2181 --from-beginning --topic flume

二:flume分布式安裝

flume高可用集群安裝

Flume NG高可用集群搭建詳解(基於flume-1.7.0)

flume集群高可用連接kafka集群

Flume架構

基於Flume的日志收集系統(一)架構和設計

(一)flume分布式簡單實現

第一個是agent1,第二個是agent2

(二)配置文件修改

agent1.conf

#agent1
#name the component on this agent
a1.sources = r1
a1.channels = c1
a1.sinks = s1
#config source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/App/apache-flume-1.6.0-bin/data
a1.sources.r1.channels = c1
#config channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity = 100
#config sink
a1.sinks.s1.type = avro
a1.sinks.s1.channel = c1
a1.sinks.s1.hostname = hadoopH2
a1.sinks.s1.port = 4141

agent2.conf

#agent1
#name the component on this agent
a1.sources = r1
a1.channels = c1
a1.sinks = s1
#config source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0  #綁定本機的ip和端口,進行監聽
a1.sources.r1.port = 4141
a1.sources.r1.channels = c1
#config channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity = 100
#config sink
a1.sinks.s1.type = logger
a1.sinks.s1.channel = c1

(三)啟動flume

1.先啟動后面的節點agent2

flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agent2.conf --name a1 -Dflume.root.logger=INFO,console

2.后啟動agent1

flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agent1.conf --name a1 -Dflume.root.logger=INFO,console

(四)flume數據采集測試

 1.向指定目錄下傳送文件

2.agent2查看采集的數據

三:分布式flume集群對比(NG、OG)

(一)Flume簡介

Flume NG是一個分布式、可靠、可用的系統,它能夠將不同數據源的海量日志數據進行高效收集、聚合,最后存儲到一個中心化數據存儲系統中,方便進行數據分析。事實上flume也可以收集其他信息,不僅限於日志。由原來的Flume OG到現在的Flume NG,進行了架構重構,並且現在NG版本完全不兼容原來的OG版本。相比較而言,flume NG更簡單更易於管理操作。

總之:Flume NG相比於Flume OG更好

(二)flume OG分布式集群組件---更符合我們之前學的分布式系統(用於理解,不進行實現)

OG有三個組件agent、collector、master,agent主要負責收集各個日志服務器上的日志,將日志聚合到collector,可設置多個collector,master主要負責管理agent和collector,最后由collector把收集的日志寫的HDFS中,當然也可以寫到本地、給storm、給Hbase。

(三)flume NG---不需要master節點,系統整體實現負載均衡

NG最大的改動就是不再有分工角色設置,所有的都是agent,可以彼此之間相連,多個agent連到一個agent,此agent也就相當於collector了,NG也支持負載均衡.

四:高可用flume分布式系統集群安裝(NG)

防止一個agent失敗,導致數據采集失敗:

控制Source的鏈接數:Source無法同時保持很大的連接數。分組實現

 控制Sink的鏈接數:每個Agent Sink都會和數據庫保持一個連接,當Agent變多時,連接數最終會超過數據庫的限制。分層實現

這里采用該拓撲圖進行集群構建。注意:數據只是在一條鏈路中傳送,如果某個節點出現故障,會按照優先級選擇一條其他路徑傳遞數據,避免了鏈路數據冗余 

(一)集群節點規划

(二)啟動zookeeper集群(同上)

(三)啟動hdfs系統(同上)

(四)flume配置文件編寫---數據源agent(注意:編寫的代碼后面不要加空格和注釋)

1.在hadoopH3中編寫agentdata.conf

#agent 在數據源節點上,這里是在hadoopH3上
ag.sources = r1
ag.channels = c1
ag.sinks = k1 k2 k3 k4
#set group
ag.sinkgroups = g1
#set source
ag.sources.r1.channels = c1
ag.sources.r1.type = spooldir
ag.sources.r1.spoolDir = /home/hadoop/App/apache-flume-1.6.0-bin/data
#set channel
ag.channels.c1.type = memory
ag.channels.c1.capacity = 1000
ag.channels.c1.transactionCapacity = 100
#set sink1  #配置agent1主機
ag.sinks.k1.channel = c1
ag.sinks.k1.type = avro
ag.sinks.k1.hostname = hadoopH1
ag.sinks.k1.port = 6666
#set sink2  #配置agent2主機
ag.sinks.k2.channel = c1
ag.sinks.k2.type = avro
ag.sinks.k2.hostname = hadoopH2
ag.sinks.k2.port = 6666
#set sink2  #配置agent3主機
ag.sinks.k3.channel = c1
ag.sinks.k3.type = avro
ag.sinks.k3.hostname = hadoopH3
ag.sinks.k3.port = 6666
#set sink2  #配置agent4主機
ag.sinks.k4.channel = c1
ag.sinks.k4.type = avro
ag.sinks.k4.hostname = hadoopH4
ag.sinks.k4.port = 6666
#set sink group
ag.sinkgroups.g1.sinks = k1 k2 k3 k4
ag.sinkgroups.g1.processor.type = failover
ag.sinkgroups.g1.processor.priority.k1 = 10
ag.sinkgroups.g1.processor.priority.k2 = 9
ag.sinkgroups.g1.processor.priority.k3 = 5
ag.sinkgroups.g1.processor.priority.k4 = 4
ag.sinkgroups.g1.processor.maxpenalty = 10000

2.在hadoopH4中編寫agentdata.conf

#agent 在數據源節點上,這里是在hadoopH4上
ag.sources = r1
ag.channels = c1
ag.sinks = k1 k2 k3 k4
#set group
ag.sinkgroups = g1
#agent 在數據源節點上,這里是在hadoopH4上
ag.sources = r1
ag.channels = c1
ag.sinks = k1 k2 k3 k4
#set group
ag.sinkgroups = g1
#set source
ag.sources.r1.channels = c1
ag.sources.r1.type = spooldir
ag.sources.r1.spoolDir = /home/hadoop/App/apache-flume-1.6.0-bin/data
#set channel
ag.channels.c1.type = memory
ag.channels.c1.capacity = 1000
ag.channels.c1.transactionCapacity = 100
#set sink1  #設置agent1主機
ag.sinks.k1.channel = c1
ag.sinks.k1.type = avro
ag.sinks.k1.hostname = hadoopH1
ag.sinks.k1.port = 6666
#set sink2  #設置agent2主機
ag.sinks.k2.channel = c1
ag.sinks.k2.type = avro
ag.sinks.k2.hostname = hadoopH2
ag.sinks.k2.port = 6666
#set sink2  #設置agent3主機
ag.sinks.k3.channel = c1
ag.sinks.k3.type = avro
ag.sinks.k3.hostname = hadoopH3
ag.sinks.k3.port = 6666
#set sink2  #設置agent4主機
ag.sinks.k4.channel = c1
ag.sinks.k4.type = avro
ag.sinks.k4.hostname = hadoopH4
ag.sinks.k4.port = 6666
#set sink group
ag.sinkgroups.g1.sinks = k1 k2 k3 k4
ag.sinkgroups.g1.processor.type = failover
ag.sinkgroups.g1.processor.priority.k1 = 5
ag.sinkgroups.g1.processor.priority.k2 = 4
ag.sinkgroups.g1.processor.priority.k3 = 10
ag.sinkgroups.g1.processor.priority.k4 = 9
ag.sinkgroups.g1.processor.maxpenalty = 10000

(五)flume配置文件編寫---數據采集agent

1.在hadoopH1、hadoopH2中配置agent1.conf

#agent信息采集節點 agent1 agent2
ag.sources = r1
ag.channels = c1
ag.sinks = k1
#set source
ag.sources.r1.channels = c1
ag.sources.r1.type = avro
ag.sources.r1.bind = 0.0.0.0
ag.sources.r1.port = 6666
#set channel
ag.channels.c1.type = memory
ag.channels.c1.capacity = 1000
ag.channels.c1.transactionCapacity = 100
#set sink1
ag.sinks.k1.channel = c1
ag.sinks.k1.type = avro
#設置controller主機 ag.sinks.k1.hostname
= hadoopH3 ag.sinks.k1.port = 8888

2.在hadoopH1、hadoopH2中配置agent1.conf

#agent信息采集節點 agent3 agent4
ag.sources = r1
ag.channels = c1
ag.sinks = k1
#set source
ag.sources.r1.channels = c1
ag.sources.r1.type = avro
ag.sources.r1.bind = 0.0.0.0
ag.sources.r1.port = 6666
#set channel
ag.channels.c1.type = memory
ag.channels.c1.capacity = 1000
ag.channels.c1.transactionCapacity = 100
#set sink1
ag.sinks.k1.channel = c1
ag.sinks.k1.type = avro
#設置controller主機
ag.sinks.k1.hostname = hadoopH4
ag.sinks.k1.port = 8888

(六)flume配置文件編寫---數據控制controller

1.在hadoopH3、hadoopH4中配置controller.conf

#controller節點,進行數據讀取和寫入
ag.sources = r1
ag.channels = c1
ag.sinks = k1
#set source
ag.sources.r1.channels = c1
ag.sources.r1.type = avro
ag.sources.r1.bind = 0.0.0.0
ag.sources.r1.port = 8888
#set channel
ag.channels.c1.type = memory
ag.channels.c1.capacity = 1000
ag.channels.c1.transactionCapacity = 100
#set sink1
ag.sinks.k1.type = hdfs
ag.sinks.k1.channel = c1
ag.sinks.k1.hdfs.path = hdfs://ns1/flume/logdfs/%y-%m-%d/%H%M/%S
ag.sinks.k1.hdfs.round = true
ag.sinks.k1.hdfs.roundValue = 10
ag.sinks.k1.hdfs.roundUnit = minute
ag.sinks.k1.hdfs.useLocalTimeStamp = true
ag.sinks.k1.hdfs.fileType = DataStream

(七)啟動flume集群

1.啟動hadoopH3、hadoopH4主機中controller節點

flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./controller.conf --name ag -Dflume.root.logger=INFO,console

2.啟動hadoopH1、hadoopH2、hadoopH3、hadoopH4主機中的agent采集節點

flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agent1.conf --name ag -Dflume.root.logger=INFO,console

3.啟動hadoopH3、hadoopH4主機的數據源點采集節點agentdata.conf

flume-ng agent --conf $FLUME_HOME/conf/ --conf-file ./agentdata.conf --name ag -Dflume.root.logger=INFO,console

(八)數據采集測試

1.拷貝數據到spoolDir目錄下

2.查看hdfs系統目錄


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM