一、agent
第一步是定義agent(代理)及agent下的sources、channels、sinks的簡稱,如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
二、sources
第二步是定義sources(接收數據源),以下是常見的sources類型:
1.type = netcat =>監視一個端口,需要端口名稱、端口號:
bind = localhost
port = 44444
2.type = exec =>監視一個文件,需要命令行、命令行使用的腳本
command = tail -F /opt/jars/calllog.csv
shell = /usr/bin/bash -c
3.type = spooldir =>監視一個文件夾,需要文件夾路徑
可以添加進入文件夾文件的后綴名、可以添加絕對路徑的文件名、通過正則表達式過濾以tmp結尾的文件
spoolDir = /root/spooldir
fileSuffix = .COMPLETED
fileHeader = true
ignorePattern = ([^]*\.tmp)
4.selector.type = replicating =>將數據量復制給多個channel
5.type = avro =>通過端口接收數據,需要端口名稱、端口號
bind = hd1-1
port = 4141
三、channels
第三步是設置channel(管道)的類型等
1.tpye = memory =>使用內存為管道,設置內存總容量、每次傳輸的容量
capacity =1000
transactionCapacity =100
2.使用磁盤作為管道
四、sink
第四步是設置sink(下沉)的類型和細節設置
1.type = logger =>輸出日志文件,用於監控端口直接在端口輸出接受的數據
2.type = hdfs =>輸出到hdfs,
hdfs.path = hdfs://hd1-1:9000/flume/%Y%m%d/%H =>設置hdfs的路徑
hdfs.filePrefix = logs- =>設置文件的前綴
hdfs.round = true =>按照時間滾動文件夾
hdfs.roundValue = 1 =>多長時間創建一個新文件夾
hdfs.roundUnit = minute =>定義時間的單位
hdfs.useLocalTimeStamp = true =>使用本地時間戳
hdfs.batchSize = 500 =>積攢到少event后flush到hdfs一次
hdfs.fileType = DataStream =>設置文件類型,可支持壓縮
hdfs.rollInterval = 30 =>多久生成一個新文件
hdfs.rollSize = 134217700 =>設置每個文件的滾動大小
hdfs.rollCount = 0 =>滾動與Event無關
hdfs.minBlockReplicas = 1 =>最小冗余數(及備份數,hdfs自帶無需配置)
3.type = avro =>將數據發送到端口,需要設置端口名稱、端口號
hostname = hd1-1
port = 4141
4.type = file_roll =>將數據傳輸到本地文件,需要設置文件路徑
sink.directory = /root/flume2 注意flume2文件夾需要自己創建
5.type = org.apache.flume.sink.kafka.KafkaSink =>將數據傳輸到kafka
需要設置集群的機器名稱和端口號、主題、batchSize、Ack機制
brokerList = hd1-1:9092,hd1-2:9092,hd1-3:9092
topic = calllog
batchSize = 20
requiredAcks =1 ACK機制(1、0、-1,1是最安全的)
五、bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1