Kafka安全認證SASL/PLAIN


zookeepe版本: zookeeper-3.4.13, 安裝路徑/usr/local/zookeeper-3.4.13/

kafka版本:kafka_2.13-2.6.0.tgz

 

 

 

 

一、Zookeeper配置

安裝Zookeeper

參考: Zookeeper的下載、安裝和啟動 

Zookeeper 集群搭建--單機偽分布式集群

 

1、從Kafka/lib目錄拷貝以下jar到zookeeper的lib目錄下

kafka-clients-2.6.0.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.25.jar
snappy-java-1.1.7.3.jar

  

2、zoo.cfg 文件配置

添加配置

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

  

3、編寫JAAS文件,放在conf文件夾下

/usr/local/zookeeper-3.4.13/zk_server_jaas.conf

Server {
org.apache.kafka.common.security.plain.PlainLoginModule required 
    username="admin" 
    password="admin123456" 
    user_kafka="kafka123456" 
    user_producer="prod123456";
};

 定義了兩個用戶,一個是kafka,一個是producer, 這些用user_配置出來的用戶都可以提供給生產者程序和消費者程序認證使用

   還有兩個屬性,username和password,是配置Zookeeper節點之間內部認證的用戶名和密碼。

 

各個節點分別啟動zookeeper  

cd /usr/local/zookeeper-3.4.13/bin

./zkServer.sh  start

 

備注: zookeeper如果是集群的話,每個zookeeper都做相同的配置

 

二、Kafka配置

1、在kafka安裝目錄config下創建kafka_server_jaas.conf文件

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
            username="admin"
            password="admin123456"
            user_admin="admin123456"
            user_producer="prod123456"
            user_consumer="cons123456";
};



Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
        username="kafka"
        password="kafka123456";
};

 KafkaServer配置的kafka的賬號和密碼,Client配置的是Broker到ZK的鏈接用戶名和密碼。這里要與前面zookeeper的配置zk_server_jaas.conf中user_kafka的賬號和密碼保持一致。 

  

2、修改server.properties

listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://118.xx.xx.101:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true

  

3、修改啟動腳本

bin/kafka-server-start.sh

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M -Djava.security.auth.login.config=/xxx/kafka/config/kafka_server_jaas.conf "
fi 

指定-Djava.security.auth.login.config的路徑

 

 4、啟動kafka

./kafka-server-start.sh  ../config/server.properties &

 

 

三、SpringBoot整合

spring boot版本為2.4.10

1、引入依賴

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.7.RELEASE</version>
        </dependency>

  

2、新建kafka_client_jaas.conf

該文件存放在E:\\study\\xxstudy\\kafkademo\\config\\路徑下

KafkaClient {
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="admin"
 password="admin123456";
}; 

這里的用戶名和密碼要和前面kafka使用的賬號密碼相同,才能有訪問權限。

 

3、生產者

public class JaasProducerDemo {

    private final static String  TOPIC_NAME = "test5";

    static {
        System.setProperty("java.security.auth.login.config", "E:\\study\\xxstudy\\kafkademo\\config\\kafka_client_jaas.conf");

    }

    public static void main(String[] args) throws  Exception {
        producerSendWithJaas();
    }

    public static void producerSendWithJaas(){
        Properties  properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"118.xx.xx.101:9092");
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        properties.put(ProducerConfig.RETRIES_CONFIG,"0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
        properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
         properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
         properties.put(SaslConfigs.SASL_MECHANISM,"PLAIN");
         
        Producer<String,String> producer = new KafkaProducer<String, String>(properties);
        // 消息對象
        for(int i = 0; i< 100; i++) {
            String key = "key-" + i;
            ProducerRecord<String,String> record =
                    new ProducerRecord<>(TOPIC_NAME, key,"value-" + i);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    System.out.println("key:" + key + " , recordMetadata ,partition:" + recordMetadata.partition()
                            +",offset: " + recordMetadata.offset());
                }
            });

        }


        //關閉通道
        producer.close();
    }


}

  

 

4、消費者

public class JaasConsumerDemo {

    private final static String  TOPIC_NAME = "test5";

    static {
        System.setProperty("java.security.auth.login.config", "E:\\study\\xxstudy\\kafkademo\\config\\kafka_client_jaas.conf");

    }

    public static void main(String[] args) {

        consumerWithJaas();
    }

    private static  void consumerWithJaas(){
        Properties prop = new Properties();
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"118.xx.xx.101:9092");
        prop.put("group.id","test");
        prop.put("enable.auto.commit","true");
        prop.put("auto.commit.interval.ms","1000");
        prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
        prop.put(SaslConfigs.SASL_MECHANISM,"PLAIN");

        KafkaConsumer<String,String>  consumer = new KafkaConsumer<String, String>(prop);
        // 消費訂閱哪個Topic或者幾個Topic
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        while (true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
            for( ConsumerRecord<String,String> record: records){
                System.out.printf("partition - %d, offset - %d, key - %s, value - %s%n",
                        record.partition(),record.offset(), record.key(), record.value());
            }
        }
    }

}

  

 

5、測試

運行消費者,再運行生產者

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM