#組件
#a1為定義的name
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource #定義source類型為Kafka Source
a1.sources.r1.batchSize = 5000 #批量寫入通道的最大消息數
a1.sources.r1.batchDurationMillis = 2000 #將批處理寫入通道之前的最長時間(以毫秒為單位)
a1.sources.r1.kafka.bootstrap.servers = 192.168.x.xx:9092,192.168.x.xx:9092,192.168.x.xx:9092 #kafka集群地址
a1.sources.r1.kafka.topics = cp_udplog #監控的kafka topic名稱
a1.sources.r1.kafka.consumer.group.id = custom.g.id #定義消費者組
a1.sources.r1.kafka.consumer.auto.offset.reset = earliest #定義消費者的消費位置
#channel
a1.channels.c1.type = file #定義channel的方式
a1.channels.c1.checkpointDir = /home/programs/flume/checkpoint/behavior1 #設置檢查點目錄(要自己先手動創建,不然會報錯)
a1.channels.c1.dataDirs = /home/programs/flume/data/behavior1/ #設置channel數據的緩存地址,需要手動創建
a1.channels.c1.maxFileSize = 2146435071 # 單個日志文件的最大大小(以字節為單位)
a1.channels.c1.capacity = 1000000 #設置channel中存儲的最大事件數
a1.channels.c1.keep-alive = 60 #添加或刪除事件的超時時間
#sink1
a1.sinks.k1.type = hdfs #sink的類型
a1.sinks.k1.hdfs.path = /kafka_flume/log/%Y-%m-%d #指定HDFS上的位置,可以按照時間進行分區
a1.sinks.k1.hdfs.filePrefix = log- #文件前綴
a1.sinks.k1.hdfs.round = true #按照時間進行滾動
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
#組合
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
#注:此處設置一般針對小文件的處理,按照事件數(events)或者文件的大小進行合並成一個文件
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 133217728
a1.sinks.k1.hdfs.rollCount = 0