簡單測試項目:
1、新建Java項目結構如下:
測試類FlumeTest代碼如下:
package com.demo.flume; import org.apache.log4j.Logger; public class FlumeTest { private static final Logger LOGGER = Logger.getLogger(FlumeTest.class); public static void main(String[] args) throws InterruptedException { for (int i = 20; i < 100; i++) { LOGGER.info("Info [" + i + "]"); Thread.sleep(1000); } } }
監聽kafka接收消息Consumer代碼如下:
package com.demo.flume; /** * INFO: info * User: zhaokai * Date: 2017/3/17 * Version: 1.0 * History: <p>如果有修改過程,請記錄</P> */ import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class Consumer { public static void main(String[] args) { System.out.println("begin consumer"); connectionKafka(); System.out.println("finish consumer"); } @SuppressWarnings("resource") public static void connectionKafka() { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.163:9092"); props.put("group.id", "testConsumer"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("flumeTest")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } for (ConsumerRecord<String, String> record : records) { System.out.printf("===================offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } } } }
log4j配置文件配置如下:
log4j.rootLogger=INFO,console # for package com.demo.kafka, log would be sent to kafka appender. log4j.logger.com.demo.flume=info,flume log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = 192.168.1.163 log4j.appender.flume.Port = 4141 log4j.appender.flume.UnsafeMode = true log4j.appender.flume.layout=org.apache.log4j.PatternLayout log4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m%n # appender console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
備注:其中hostname為flume安裝的服務器IP,port為端口與下面的flume的監聽端口相對應
pom.xml引入如下jar:
<dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.10</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.5.0</version> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>1.5.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-log4j-appender</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency> </dependencies>
2、配置flume
flume/conf下:
新建avro.conf 文件內容如下:
當然skin可以用任何方式,這里我用的是kafka,具體的skin方式可以看官網
a1.sources=source1 a1.channels=channel1 a1.sinks=sink1 a1.sources.source1.type=avro a1.sources.source1.bind=192.168.1.163 a1.sources.source1.port=4141 a1.sources.source1.channels = channel1 a1.channels.channel1.type=memory a1.channels.channel1.capacity=10000 a1.channels.channel1.transactionCapacity=1000 a1.channels.channel1.keep-alive=30 a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.sink1.topic = flumeTest a1.sinks.sink1.brokerList = 192.168.1.163:9092 a1.sinks.sink1.requiredAcks = 0 a1.sinks.sink1.sink.batchSize = 20 a1.sinks.sink1.channel = channel1
如上配置,flume服務器運行在192.163.1.163上,並且監聽的端口為4141,在log4j中只需要將日志發送到192.163.1.163的4141端口就能成功的發送到flume上。flume會監聽並收集該端口上的數據信息,然后將它轉化成kafka event,並發送到kafka集群flumeTest topic下。
3、啟動flume並測試
- flume啟動命令:bin/flume-ng agent --conf conf --conf-file conf/avro.conf --name a1 -Dflume.root.logger=INFO,console
- 運行FlumeTest類的main方法打印日志
- 允許Consumer的main方法打印kafka接收到的數據