如果要想在java客戶端進行Kerberos認證,則一定需要有一個與之匹配的Kerberos配置文件存在。現在在D盤上建立一個客戶端的訪問程序文件:kafka_client_jaas.conf
vim d:/kafka_client_jaas.conf
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="bob" password="bob-pwd"; };
如果要想在Java程序里面配置Kerberos認證處理操作,則需要將上面配置文件的路勁引入到項目之中:
static { // 表示系統環境屬性
System.setProperty("java.security.auth.login.config", "d:/kafka_client_jaas.conf"); }
生產者程序代碼——KerberosSendMessageProducer.java
package cn.lynch.mykafka.producer; import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; public class KerberosSendMessageProducer { public static final String SERVERS = "203.195.205.63:9092"; public static final String TOPIC = "lynch-topic"; static { System.setProperty("java.security.auth.login.config", "d:/kafka_client_jaas.conf"); // 表示系統環境屬性
} public static void main(String[] args) { // 如果要想進行Kafka消息發送需要使用Properties定義一些環境屬性
Properties props = new Properties(); props.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); // 定義Kafka服務地址 // Kafka之中是以key和value的形式進行消息的發送處理, 所以為了保證Kafka的性能,專門提供有統一類型 // props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") ;
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) ; props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()) ; long start = System.currentTimeMillis() ; // 定義消息發送者對象,依靠此對象可以進行消息的傳遞
Producer<String,Integer> producer = new KafkaProducer<String,Integer>(props) ; for (int x = 0 ; x < 100 ; x ++) { producer.send(new ProducerRecord<String,Integer>(TOPIC,"mldn-" + x,x)) ; } long end = System.currentTimeMillis() ; System.out.println("*** 消息發送完成:" + (end - start)); producer.close(); } }
消費端程序代碼——KerberosReceiveMessageConsumer
package cn.lynch.mykafka.consumer; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; public class KerberosReceiveMessageConsumer { public static final String SERVERS = "203.195.205.63:9092"; public static final String TOPIC = "lynch-topic"; static { System.setProperty("java.security.auth.login.config", "d:/kafka_client_jaas.conf"); // 表示系統環境屬性
} public static void main(String[] args) { Properties props = new Properties(); props.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); // 定義消息消費者的連接服務器地址
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); // 消息消費者一定要設置反序列化的程序類,與消息生產者完全對應
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-5"); // 定義消費者處理對象
Consumer<String, Integer> consumer = new KafkaConsumer<String, Integer>( props); consumer.subscribe(Arrays.asList(TOPIC)); // 設置消費者讀取的主題名稱
boolean flag = true; // 消費者需要一直進行數據的讀取處理操作
while (flag) { // 一直讀取消息
ConsumerRecords<String, Integer> allRecorders = consumer.poll(200); for (ConsumerRecord<String, Integer> record : allRecorders) { System.out.println( "key = " + record.key() + "、value = " + record.value()); } } consumer.close(); } }
如果使用的是SSL認證,發現認證失敗之后實際上不會立刻斷掉鏈接,因為SSL是基於jvm的認證處理操作,而Kerberos認證處理操作的性能一定要比ssl更好,所以新時代的kafka處理基本上都以sasl認證為主,sasl認證就是Kerberos認證。