1、創建一個agent,sink類型需指定為自定義sink
vi /usr/local/flume/conf/agent3.conf
agent3.sources=as1
agent3.channels=c1
agent3.sinks=s1
agent3.sources.as1.type=avro
agent3.sources.as1.bind=0.0.0.0
agent3.sources.as1.port=41414
agent3.sources.as1.channels=c1
agent3.channels.c1.type=memory
agent3.sinks.s1.type=storm.test.kafka.TestKafkaSink
agent3.sinks.s1.channel=c1
2、創建自定義kafka sink(自定義kafka sink中包裝的是kafka的生產者),代碼如下
//參考flume官方的開發文檔:http://flume.apache.org/FlumeDeveloperGuide.html#sink
//自定義kafkasink需要繼承AbstractSink類實現Configurable接口
//該sink中使用的kafka topic(test111)必須存在
1 package storm.test.kafka; 2 3 import java.util.Properties; 4 5 import kafka.javaapi.producer.Producer; 6 import kafka.producer.KeyedMessage; 7 import kafka.producer.ProducerConfig; 8 import kafka.serializer.StringEncoder; 9 10 import org.apache.flume.Channel; 11 import org.apache.flume.Context; 12 import org.apache.flume.Event; 13 import org.apache.flume.EventDeliveryException; 14 import org.apache.flume.Transaction; 15 import org.apache.flume.conf.Configurable; 16 import org.apache.flume.sink.AbstractSink; 17 18 public class TestKafkaSink extends AbstractSink implements Configurable { 19 20 Producer<String, String> producer; 21 String topic = "test111"; 22 23 @Override 24 public Status process() throws EventDeliveryException { 25 Status status = null; 26 Channel channel = getChannel(); 27 Transaction transaction = channel.getTransaction(); 28 transaction.begin(); 29 try { 30 Event event = channel.take(); 31 if (event==null) { 32 transaction.rollback(); 33 status = Status.BACKOFF; 34 return status; 35 } 36 byte[] body = event.getBody(); 37 final String msg = new String(body); 38 final KeyedMessage<String, String> message = new KeyedMessage<String, String>(topic , msg); 39 producer.send(message); 40 transaction.commit(); 41 status = Status.READY; 42 } catch (Exception e) { 43 transaction.rollback(); 44 status = Status.BACKOFF; 45 } finally { 46 transaction.close(); 47 } 48 49 return status; 50 } 51 52 @Override 53 public void configure(Context arg0) { 54 Properties prop = new Properties(); 55 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181"); 56 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092"); 57 prop.put("serializer.class", StringEncoder.class.getName()); 58 producer = new Producer<String, String>(new ProducerConfig(prop)); 59 } 60 61 }
將代碼打包為kafkasink.jar后復制到flume所在節點上的flume/lib目錄下,然后還需要將kafka_2.10-0.8.2.0.jar、kafka-clients-0.8.2.0.jar、metrics-core-2.2.0.jar、scala-library-2.10.4.jar這4個jar包復制到flume所在節點上的flume/lib目錄下。
3、啟動flume自定義的kafkasink的agent
[root@h5 ~]# cd /usr/local/flume/
[root@h5 flume]# bin/flume-ng agent --conf conf/ --conf-file conf/agent3.conf --name agent3 -Dflume.root.logger=INFO,console
4、將日志寫入到flume的agent,代碼如下
log4j.properties
log4j.rootLogger=INFO,flume
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.1.35
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
將日志寫入到flume,代碼如下
1 package com.mengyao.flume; 2 3 import java.io.File; 4 import java.io.IOException; 5 import java.util.Collection; 6 import java.util.List; 7 8 import org.apache.commons.io.FileUtils; 9 import org.apache.log4j.Logger; 10 11 public class FlumeProducer { 12 13 private static List<String> getLines() { 14 List<String> lines = null; 15 try { 16 final Collection<File> listFiles = FileUtils.listFiles(new File("D:/"), null, false); 17 for (File file : listFiles) { 18 lines = FileUtils.readLines(file); 19 break; 20 } 21 } catch (IOException e) { 22 e.printStackTrace(); 23 } 24 25 return lines; 26 } 27 28 public static void main(String[] args) throws Exception { 29 final List<String> lines = getLines(); 30 final Logger logger = Logger.getLogger(FlumeProducer.class); 31 for (String line : lines) { 32 logger.info(line+"\t"+System.currentTimeMillis()); 33 Thread.sleep(1000); 34 } 35 } 36 }
必須加入flume-ng-log4jappender-1.5.0-cdh5.1.3-jar-with-dependencies.jar這個依賴jar
5、使用kafka消費者消費flume(自定義kafka sink中使用了kafka的生產者)生產的數據
1、消費者shell代碼
[root@h7 kafka]# bin/kafka-console-consumer.sh --zookeeper h7:2181 --topic test111 --from-beginning ##kafka集群是h5、h6、h7;zookeeper集群是h5、h6、h7。在任意kafka節點上使用消費者都一樣
2、消費者java代碼
1 package storm.test.kafka; 2 3 import java.util.HashMap; 4 import java.util.List; 5 import java.util.Map; 6 import java.util.Properties; 7 8 import kafka.consumer.Consumer; 9 import kafka.consumer.ConsumerConfig; 10 import kafka.consumer.ConsumerIterator; 11 import kafka.consumer.KafkaStream; 12 import kafka.javaapi.consumer.ConsumerConnector; 13 import kafka.serializer.StringEncoder; 14 15 public class TestConsumer { 16 17 static final String topic = "test111"; 18 19 public static void main(String[] args) { 20 Properties prop = new Properties(); 21 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181"); 22 prop.put("serializer.class", StringEncoder.class.getName()); 23 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092"); 24 prop.put("group.id", "group1"); 25 ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop)); 26 Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 27 topicCountMap.put(topic, 1); 28 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); 29 final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0); 30 ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); 31 while (iterator.hasNext()) { 32 String msg = new String(iterator.next().message()); 33 System.out.println("收到消息:"+msg); 34 } 35 } 36 37 }