详细配置文件flume-conf.properties如下:
############################################ # producer config ########################################### #agent section producer.sources = s producer.channels = c c1 producer.sinks = r r1 #source section #producer.sources.s.type = exec #producer.sources.s.command = tail -f -n+1 /usr/local/test.log producer.sources.s.type = spooldir producer.sources.s.spoolDir = /usr/local/testlog producer.sources.s.fileHeader = true producer.sources.s.batchSize = 100 producer.sources.s.channels = c c1 # Each sink's type must be defined producer.sinks.r.type = org.apache.flume.plugins.KafkaSink producer.sinks.r.metadata.broker.list=127.0.0.1:9092 producer.sinks.r.partition.key=0 producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=0 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=sync producer.sinks.r.custom.encoding=UTF-8 producer.sinks.r.custom.topic.name=topcar #store in HDFS producer.sinks.r1.type = hdfs producer.sinks.r1.channel = c1 producer.sinks.r1.hdfs.path=hdfs://node2:9000/user/flume/events/%Y-%m-%d-%H producer.sinks.r1.hdfs.filePrefix=events- #producer.sinks.r1.hdfs.fileSuffix = .log #设定后缀 producer.sinks.r1.hdfs.round = true producer.sinks.r1.hdfs.roundValue = 10 producer.sinks.r1.hdfs.roundUnit = minute #--文件格式:默认SequenceFile,可选 DataStream \ CompressedStream producer.sinks.r1.hdfs.fileType=DataStream #--Format for sequence file records. “Text” or “Writable” producer.sinks.r1.hdfs.writeFormat=Text producer.sinks.r1.hdfs.rollInterval=0 #--触发roll操作的文件大小in bytes (0: never roll based on file size) producer.sinks.r1.hdfs.rollSize=128000000 #--在roll操作之前写入文件的事件数量(0 = never roll based on number of events) producer.sinks.r1.hdfs.rollCount=0 producer.sinks.r1.hdfs.idleTimeout=60 #--使用local time来替换转移字符 (而不是使用event header的timestamp) producer.sinks.r1.hdfs.useLocalTimeStamp = true producer.channels.c1.type = memory producer.channels.c1.capacity = 1000 producer.channels.c1.transactionCapacity=1000 producer.channels.c1.keep-alive=30 #Specify the channel the sink should use producer.sinks.r.channel = c # Each channel's type is defined. producer.channels.c.type = memory producer.channels.c.capacity = 1000 ############################################ # consumer config ########################################### consumer.sources = s consumer.channels = c consumer.sinks = r consumer.sources.s.type = seq consumer.sources.s.channels = c consumer.sinks.r.type = logger consumer.sinks.r.channel = c consumer.channels.c.type = memory consumer.channels.c.capacity = 100 consumer.sources.s.type = org.apache.flume.plugins.KafkaSource consumer.sources.s.zookeeper.connect=127.0.0.1:2181 consumer.sources.s.group.id=testGroup consumer.sources.s.zookeeper.session.timeout.ms=400 consumer.sources.s.zookeeper.sync.time.ms=200 consumer.sources.s.auto.commit.interval.ms=1000 consumer.sources.s.custom.topic.name=topcar consumer.sources.s.custom.thread.per.consumer=4
Flume启动命令如下:
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console