#source的名字 agent.sources = kafkaSource # channels的名字,建議按照type來命名 agent.channels = memoryChannel # sink的名字,建議按照目標來命名 agent.sinks = hdfsSink # 指定source使用的channel名字 agent.sources.kafkaSource.channels = memoryChannel # 指定sink需要使用的channel的名字,注意這里是channel agent.sinks.hdfsSink.channel = memoryChannel #-------- kafkaSource相關配置----------------- # 定義消息源類型 agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource # 定義kafka所在zk的地址 # # 這里特別注意: 是kafka的zookeeper的地址 # agent.sources.kafkaSource.zookeeperConnect = 127.0.0.1:2181 # 配置消費的kafka topic #agent.sources.kafkaSource.topic = testtopic# 配置消費者組的id agent.sources.kafkaSource.groupId = flume # 消費超時時間,參照如下寫法可以配置其他所有kafka的consumer選項。注意格式從kafka.xxx開始是consumer的配置屬性 agent.sources.kafkaSource.kafka.consumer.timeout.ms = 100 #------- memoryChannel相關配置------------------------- # channel類型 agent.channels.memoryChannel.type = memory # channel存儲的事件容量 agent.channels.memoryChannel.capacity=10000 # 事務容量 agent.channels.memoryChannel.transactionCapacity=1000 #---------hdfsSink 相關配置------------------ agent.sinks.hdfsSink.type = hdfs # 注意, 我們輸出到下面一個子文件夾datax中 agent.sinks.hdfsSink.hdfs.path = hdfs://lenovo:9000/user/hive/warehouse/test/%Y%m%d%H agent.sinks.hdfsSink.hdfs.writeFormat = Text agent.sinks.hdfsSink.hdfs.fileType = DataStream agent.sinks.hdfsSink.hdfs.rollSize = 1024 agent.sinks.hdfsSink.hdfs.rollCount = 0 agent.sinks.hdfsSink.hdfs.rollInterval = 60 #配置前綴和后綴 agent.sinks.hdfsSink.hdfs.filePrefix=test agent.sinks.hdfsSink.hdfs.fileSuffix=.data #避免文件在關閉前使用臨時文件 agent.sinks.hdfsSink.hdfs.inUserPrefix=_ agent.sinks.hdfsSink.hdfs.inUserSuffix= #自定義攔截器 agent.sources.kafkaSource.interceptors=i1 agent.sources.kafkaSource.interceptors.i1.type=com.hadoop.flume.FormatInterceptor$Builder