如果现在要想将flume中的sink设置为kafka,因为在实际的开发中,可能会有若干个子系统或者若干个客户端进行flume日志采集,那么能够承受这种采集任务量的只有kafka来完成,可是需要注意一个问题,现在的kafka是采用了Kerberos认证,所以要想在flume之中去使用kafka操作,就需要考虑到开发包以及jaas配置问题。
1、将kafka的客户端的程序jar文件拷贝到flume的lib目录之中:
mv kafka-clients-0.10.2.1.jar D:\dev\apache-flume-1.7.0-bin\lib
2、在"D:\"目录下建立jass配置文件
vim D:\kafka_client_jaas.conf
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-pwd"; };
3、修改flume.cnf文件追加kafka
vim D:\dev\apache-flume-1.7.0-bin\conf\flume.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source #a1.sources.r1.type = netcat a1.sources.r1.type = avro a1.sources.r1.bind = 192.168.0.106 a1.sources.r1.port = 44444 # Describe the sink # a1.sinks.k1.type = logger a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # Use a channel which buffers events in memory # a1.channels.c1.type = memory a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = 203.195.205.63:9092 a1.channels.c1.kafka.topic = mldn-topic a1.channels.c1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.channels.c1.kafka.producer.sasl.mechanism = PLAIN a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
4、window启动flume
cd D:\dev\apache-flume-1.7.0-bin\bin\ d: flume-ng.cmd agent --conf D:/dev/apache-flume-1.7.0-bin/conf --conf-file D:/dev/apache-flume-1.7.0-bin/conf/flume.conf --name a1 -property "flume.root.logger=INFO,console;java.security.auth.login.config=D:/kafka_client_jaas.conf"
5、启动kafka消费端——FlumeReceiveMessageConsumer.java
package cn.mldn.mykafka.consumer; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringDeserializer; /** * Flume整合Kafka -- kafka消费端 * * @author hp * */
public class FlumeReceiveMessageConsumer { public static final String SERVERS = "203.195.205.63:9092"; public static final String TOPIC = "mldn-topic"; static { System.setProperty("java.security.auth.login.config", "d:/kafka_client_jaas.conf"); // 表示系统环境属性
} public static void main(String[] args) { Properties props = new Properties(); props.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); // 定义消息消费者的连接服务器地址
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); // 消息消费者一定要设置反序列化的程序类,与消息生产者完全对应
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-1"); // 定义消费者处理对象
Consumer<String, String> consumer = new KafkaConsumer<String, String>( props); consumer.subscribe(Arrays.asList(TOPIC)); // 设置消费者读取的主题名称
boolean flag = true; // 消费者需要一直进行数据的读取处理操作
while (flag) { // 一直读取消息
ConsumerRecords<String, String> allRecorders = consumer.poll(200); for (ConsumerRecord<String, String> record : allRecorders) { System.out.println( "flume.key = " + record.key() + ",flume.value = " + record.value()); } } consumer.close(); } }
6、启动业务程序,模拟打印消息——TestFlumeDemo.java
package cn.mldn.myflume; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestFlumeDemo { private static final Logger LOGGER = LoggerFactory .getLogger(TestFlumeDemo.class); public static void main(String[] args) { for (int x = 0 ; x < 10 ; x ++) { LOGGER.info("lynch.cn" + x); } } }
7、FlumeReceiveMessageConsumer.java消费端会接收到flume采集的日志数据
flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705707577<flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level 20000Fflume.client.log4j.message.encodingUTF8 flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705716934<flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level 20000Fflume.client.log4j.message.encodingUTF8 flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705717194<flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level 20000Fflume.client.log4j.message.encodingUTF8