zookeepe版本: zookeeper-3.4.13, 安裝路徑/usr/local/zookeeper-3.4.13/
kafka版本:kafka_2.13-2.6.0.tgz
一、Zookeeper配置
安裝Zookeeper
1、從Kafka/lib目錄拷貝以下jar到zookeeper的lib目錄下
kafka-clients-2.6.0.jar lz4-java-1.7.1.jar slf4j-api-1.7.25.jar slf4j-log4j12-1.7.25.jar snappy-java-1.1.7.3.jar
2、zoo.cfg 文件配置
添加配置
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000
3、編寫JAAS文件,放在conf文件夾下
/usr/local/zookeeper-3.4.13/zk_server_jaas.conf
Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin123456" user_kafka="kafka123456" user_producer="prod123456"; };
定義了兩個用戶,一個是kafka,一個是producer, 這些用user_配置出來的用戶都可以提供給生產者程序和消費者程序認證使用
還有兩個屬性,username和password,是配置Zookeeper節點之間內部認證的用戶名和密碼。
各個節點分別啟動zookeeper
cd /usr/local/zookeeper-3.4.13/bin
./zkServer.sh start
備注: zookeeper如果是集群的話,每個zookeeper都做相同的配置
二、Kafka配置
1、在kafka安裝目錄config下創建kafka_server_jaas.conf文件
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin123456" user_admin="admin123456" user_producer="prod123456" user_consumer="cons123456"; }; Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka123456"; };
KafkaServer配置的kafka的賬號和密碼,Client配置的是Broker到ZK的鏈接用戶名和密碼。這里要與前面zookeeper的配置zk_server_jaas.conf中user_kafka的賬號和密碼保持一致。
2、修改server.properties
listeners=SASL_PLAINTEXT://0.0.0.0:9092 advertised.listeners=SASL_PLAINTEXT://118.xx.xx.101:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer allow.everyone.if.no.acl.found=true
3、修改啟動腳本
bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M -Djava.security.auth.login.config=/xxx/kafka/config/kafka_server_jaas.conf " fi
指定-Djava.security.auth.login.config的路徑
4、啟動kafka
./kafka-server-start.sh ../config/server.properties &
三、SpringBoot整合
spring boot版本為2.4.10
1、引入依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.7.RELEASE</version> </dependency>
2、新建kafka_client_jaas.conf
該文件存放在E:\\study\\xxstudy\\kafkademo\\config\\路徑下
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin123456"; };
這里的用戶名和密碼要和前面kafka使用的賬號密碼相同,才能有訪問權限。
3、生產者
public class JaasProducerDemo { private final static String TOPIC_NAME = "test5"; static { System.setProperty("java.security.auth.login.config", "E:\\study\\xxstudy\\kafkademo\\config\\kafka_client_jaas.conf"); } public static void main(String[] args) throws Exception { producerSendWithJaas(); } public static void producerSendWithJaas(){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"118.xx.xx.101:9092"); properties.put(ProducerConfig.ACKS_CONFIG,"all"); properties.put(ProducerConfig.RETRIES_CONFIG,"0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384"); properties.put(ProducerConfig.LINGER_MS_CONFIG,"1"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT"); properties.put(SaslConfigs.SASL_MECHANISM,"PLAIN"); Producer<String,String> producer = new KafkaProducer<String, String>(properties); // 消息對象 for(int i = 0; i< 100; i++) { String key = "key-" + i; ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME, key,"value-" + i); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println("key:" + key + " , recordMetadata ,partition:" + recordMetadata.partition() +",offset: " + recordMetadata.offset()); } }); } //關閉通道 producer.close(); } }
4、消費者
public class JaasConsumerDemo { private final static String TOPIC_NAME = "test5"; static { System.setProperty("java.security.auth.login.config", "E:\\study\\xxstudy\\kafkademo\\config\\kafka_client_jaas.conf"); } public static void main(String[] args) { consumerWithJaas(); } private static void consumerWithJaas(){ Properties prop = new Properties(); prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"118.xx.xx.101:9092"); prop.put("group.id","test"); prop.put("enable.auto.commit","true"); prop.put("auto.commit.interval.ms","1000"); prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); prop.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT"); prop.put(SaslConfigs.SASL_MECHANISM,"PLAIN"); KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop); // 消費訂閱哪個Topic或者幾個Topic consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true){ ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000)); for( ConsumerRecord<String,String> record: records){ System.out.printf("partition - %d, offset - %d, key - %s, value - %s%n", record.partition(),record.offset(), record.key(), record.value()); } } } }
5、測試
運行消費者,再運行生產者