Flume整合Kafka(基於kerberos認證)——完成實時數據采集


如果現在要想將flume中的sink設置為kafka,因為在實際的開發中,可能會有若干個子系統或者若干個客戶端進行flume日志采集,那么能夠承受這種采集任務量的只有kafka來完成,可是需要注意一個問題,現在的kafka是采用了Kerberos認證,所以要想在flume之中去使用kafka操作,就需要考慮到開發包以及jaas配置問題。

1、將kafka的客戶端的程序jar文件拷貝到flume的lib目錄之中:

mv kafka-clients-0.10.2.1.jar D:\dev\apache-flume-1.7.0-bin\lib

 

2、在"D:\"目錄下建立jass配置文件
vim D:\kafka_client_jaas.conf

KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-pwd"; };

 

3、修改flume.cnf文件追加kafka
vim D:\dev\apache-flume-1.7.0-bin\conf\flume.conf

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source #a1.sources.r1.type = netcat a1.sources.r1.type = avro a1.sources.r1.bind = 192.168.0.106 a1.sources.r1.port = 44444 # Describe the sink # a1.sinks.k1.type = logger a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # Use a channel which buffers events in memory # a1.channels.c1.type = memory a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = 203.195.205.63:9092 a1.channels.c1.kafka.topic = mldn-topic a1.channels.c1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.channels.c1.kafka.producer.sasl.mechanism = PLAIN a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

4、window啟動flume

cd D:\dev\apache-flume-1.7.0-bin\bin\ d: flume-ng.cmd agent --conf D:/dev/apache-flume-1.7.0-bin/conf --conf-file D:/dev/apache-flume-1.7.0-bin/conf/flume.conf --name a1 -property "flume.root.logger=INFO,console;java.security.auth.login.config=D:/kafka_client_jaas.conf"  

 

5、啟動kafka消費端——FlumeReceiveMessageConsumer.java

package cn.mldn.mykafka.consumer; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringDeserializer; /** * Flume整合Kafka -- kafka消費端 * * @author hp * */
public class FlumeReceiveMessageConsumer { public static final String SERVERS = "203.195.205.63:9092"; public static final String TOPIC = "mldn-topic"; static { System.setProperty("java.security.auth.login.config", "d:/kafka_client_jaas.conf");    // 表示系統環境屬性
 } public static void main(String[] args) { Properties props = new Properties(); props.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); // 定義消息消費者的連接服務器地址
 props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); // 消息消費者一定要設置反序列化的程序類,與消息生產者完全對應
 props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-1"); // 定義消費者處理對象
        Consumer<String, String> consumer = new KafkaConsumer<String, String>( props); consumer.subscribe(Arrays.asList(TOPIC)); // 設置消費者讀取的主題名稱
        boolean flag = true; // 消費者需要一直進行數據的讀取處理操作
        while (flag) { // 一直讀取消息
            ConsumerRecords<String, String> allRecorders = consumer.poll(200); for (ConsumerRecord<String, String> record : allRecorders) { System.out.println( "flume.key = " + record.key() + ",flume.value = " + record.value()); } } consumer.close(); } }

 

6、啟動業務程序,模擬打印消息——TestFlumeDemo.java

package cn.mldn.myflume; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestFlumeDemo { private static final Logger LOGGER = LoggerFactory .getLogger(TestFlumeDemo.class); public static void main(String[] args) { for (int x = 0 ; x < 10 ; x ++) { LOGGER.info("lynch.cn" + x); } } }

 

7、FlumeReceiveMessageConsumer.java消費端會接收到flume采集的日志數據

flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705707577<flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level 20000Fflume.client.log4j.message.encodingUTF8 flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705716934<flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level 20000Fflume.client.log4j.message.encodingUTF8 flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705717194<flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level 20000Fflume.client.log4j.message.encodingUTF8

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM