Kafka的安全認證機制SASL/PLAINTEXT


一.背景

kafka提供了多種安全認證機制,主要分為SSL和SASL2大類。其中SASL/PLAIN是基於賬號密碼的認證方式,比較常用。最近做了個kafka的鑒權,發現官網上講的不是很清楚,網上各種博客倒是很多,但是良莠不齊,巨多坑。經過一天的研究,終於搞定了,特在此記錄下。

二.環境

操作系統:linux
kafka版本:kafka_2.12-0.11.0.1
zookeeper版本:zookeeper-3.5.1-alpha

三.認證步驟

3.1.Zookeeper配置和啟動

1.為zookeeper添加SASL支持,在配置文件zoo.cfg添加

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

2.新建zk_server_jaas.conf文件,為Zookeeper添加賬號認證信息
這個文件你放在哪里隨意,只要后面zkEnv配置正確的路徑就好了。我是放在/home路徑下。zk_server_jaas.conf文件的內容如下

Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="cluster"
    password="clusterpasswd"
    user_kafka="kafkapasswd";
};

username和paasword是zk集群之間的認證密碼。
user_kafka="kafkapasswd"定義了一個用戶"kafka",密碼是"kafkapasswd",本次測試用戶是kafka broker。
3.導入kafka的相關jar
由上一步可發現,認證方式使用的是Kafka的認證類org.apache.kafka.common.security.plain.PlainLoginModule。因此zk需要依賴幾個jar包。
在/home下新建zk_sasl_dependency目錄,從kafka/lib目錄下復制以下幾個jar包到該目錄下。根據kafka版本不同,幾個jar包的版本可能不一樣

kafka-clients-0.11.0.1.jar
lz4-1.3.0.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.25.jar
snappy-java-1.1.2.6.jar

4.修改zkEnv.sh
在zkEnv.sh添加

for i in /home/zk_sasl_dependency/*.jar; 
do 
    CLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/home/zk_server_jaas.conf "

關於這一步,網上的配置五花八門,但是原理都是jar包導入和認證信息配置。
在zk啟動的時候導入/home/zk_sasl_dependency/的jar包,SERVER_JVMFLAGS配置jvm參數,導入zk的sasl認證信息。
5.啟動zk服務端
執行./zkServer.sh start啟動zk。如果啟動異常查看日志排查問題。

3.2kafka配置和啟動

1.新建kafka_server_jaas.conf,為kafka添加認證信息

KafkaServer {
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="cluster"
 password="cluster"
 user_cluster=“clusterpasswd”
 user_kafka="kafkapasswd" ;
};
Client{
 org.apache.kafka.common.security.plain.PlainLoginModule required  
 username="kafka"  
 password="kafkapasswd";  
};

KafkaServer,第一行指定了認證方法為PLAIN,usernam和password是kafka的多個broker之間進行認證的賬號密碼。
user_kafka="kafkapasswd"設置了用戶kafka,密碼為kafkapswd,用於客戶端的生產者和消費者連接認證。
網上的說法是 Client,是kafka作為用戶使用zk的認證信息,這里的username和password一定要和zk_server_jaas.conf的配置對的上。
但是我試驗發現 user_cluster=“clusterpasswd”才是真正進行認證的信息,這個Client好像一點用沒有,刪掉也可以正常啟動server,kafka服務也是正常的,費解啊!

2.在kafka的配置文件開啟SASL認證
在server.properties添加如下信息

listeners=SASL_PLAINTEXT://(IP):9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN 
sasl.enabled.mechanisms=PLAIN
allow.everyone.if.no.acl.found=true

3.在server啟動腳本JVM參數
我是直接在

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

添加了認證信息,修改后為

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/kafka_server_jaas.conf"

4.啟動kafka服務端

./kafka-server-start.sh ../config/server.properties

kafka服務端正常啟動后,應該會有類似下面這行的日志信息,說明認證功能開啟成功

Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint((IP),9092,ListenerName(SASL_PLAINTEXT),SASL_PLAINTEXT) (kafka.utils.ZkUtils)

3.3kafka的SASL認證功能認證和使用

1.使用kafka腳本認證

我們使用kafka自帶的腳本進行認證。
1.新建kafka_client_jaas.conf,為客戶端添加認證信息
在/home下新建kafka_client_jaas.conf,添加以下信息

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="kafka"
  password="kafkapasswd";
};

2.修改客戶端配置信息
修改producer.properties和consumer.properties,添加認證機制

security.protocol=SASL_PLAINTEXT 
sasl.mechanism=PLAIN 

3.修改客戶端啟動腳本
修改kafka-console-producer.sh,配置認證文件kafka_client_jaas.conf,將

export KAFKA_HEAP_OPTS="-Xmx512M"
```修改為

export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka_client_jaas.conf"

kafka-console-consumer.sh的修改類似。
4.客戶端啟動並認證
啟動consumer

./bin/kafka-console-consumer.sh --bootstrap-server (IP):9092 --topic test --from-beginning --consumer.config config/consumer.properties

啟動producer

./bin/kafka-console-producer.sh --broker-list (IP):9092 --topic test --producer.config configoducer.properties

producer端發送消息,consumer端成功接收到消息。
##2.Java客戶端認證
```java
package com.zte.sdn.oscp.jms.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;

import java.util.Collections;
import java.util.Properties;

public class KafkaTest {

    @Test
    public void testProduct() throws Exception {
        System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");

        Properties props = new Properties();
        props.put("bootstrap.servers", "IP:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");

        Producer<String, String> producer = new KafkaProducer<>(props);
        while (true){
			long startTime = System.currentTimeMillis();
			for (int i = 0; i < 100; i++) {
				producer.send(new ProducerRecord<>("kafkatest", Integer.toString(i), Integer.toString(i)));
			}
			System.out.println(System.currentTimeMillis()-startTime);
			Thread.sleep(5000);
		}
    }

    @Test
    public void testConsumer() throws Exception {
        System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");

        Properties props = new Properties();
        props.put("bootstrap.servers", "(IP):9092");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("group.id", "kafka_test_group");
        props.put("session.timeout.ms", "6000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("kafkatest"));
        while (true) {
            long startTime = System.currentTimeMillis();
            ConsumerRecords<String, String> records = consumer.poll(1000);
            System.out.println(System.currentTimeMillis() - startTime);
            System.out.println("recieve message number is " + records.count());
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s, partition = %d %n",
                        record.offset(),
                        record.key(),
                        record.value(),
                        record.partition());
            }
        }
    }
}

3.4客戶端認證時延問題

認證時發現生產者和消費者和kafka的broker建立連接都有一定時延。在生產者的日志發現時延主要發生在

2018-12-17 10:55:46[DEBUG][kafka-producer-network-thread | producer-1]-NetworkClient.java: 762 - Initiating connection to node (IP):9092 (id: 0 rack: null)
2018-12-17 10:55:50[DEBUG][kafka-producer-network-thread | producer-1]-SaslClientAuthenticator.java: 209 - Set SASL client state to SEND_HANDSHAKE_REQUEST

難道客戶端連接服務端時,認證時間需要這么長??


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM