java程序連接到一個需要Kerberos認證的kafka集群上,消費生產者生產的信息,kafka版本是2.10-0.10.0.1;
Java程序以maven構建,(怎么構建maven工程,可去問下度娘:“maven工程入門示例”)
先上pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ht</groupId> <artifactId>kafkaTest</artifactId> <version>1.0</version> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>
然后是Jave代碼,先上圖,一一解釋圖中標識:

注釋:
1:可以將所需的配置文件加載到程序;(參見:度娘--“JDK 運行參數 JAVA -Dxxx與System.setProperty()的關系”)
2:新版本的Producter和Consumer都可以直接連接brocker,不用再配置zookeeper的相關信息,所以這里是要連接的kafka的主機ip和端口號
3:設置的topic的組Id
4:設置偏移量
5:設置認證配置
6:設置所要讀取的主題Topic
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class ConsumerTest { public static void main(String[] args) { // System.setProperty("java.security.auth.login.config", "/home/kafka/kafka_client_jaas.conf"); // System.setProperty("java.security.krb5.conf", "/home/kafka/krb5.conf"); // 環境變量添加,需要輸入配置文件的路徑System.out.println("===================配置文件地址"+fsPath+"\\conf\\cons_client_jaas.conf"); Properties props = new Properties(); props.put("bootstrap.servers", "192.168.132.130:9092"); props.put("group.id", "group-1111"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "GSSAPI"); props.put("sasl.kerberos.service.name", "kafka"); KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props); kafkaConsumer.subscribe(Arrays.asList("cust_info")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(1); for (ConsumerRecord<String, String> record : records) System.out.println("Partition: " + record.partition() + " Offset: " + record.offset() + " Value: " + record.value() + " ThreadID: " + Thread.currentThread().getId()); } } }
以上就是所有配置,將工程通過導出為Runnable JAR file 導出為jar文件
直接運行 java -jar jar包名.jar 即可;
如果程序里沒有設置1相關的配置文件,也可以運行下列命令:
java -Djava.security.auth.login.config=/home/kafka/kafka_client_jaas.conf -Djava.security.krb5.conf=/home/kafka/krb5.conf -jar jar包名.jar
