Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三種認證機制
kafka的認證范圍
kafka client 與 kafka server(broker)
broker與broker之間
broker與zookeeper之間
zookpeer認證
在zookeeper安裝根目錄的conf目錄下,創建zk_server_jaas.conf文件
Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456" user_admin="123456" user_zk001="123456" user_zk002="123456" user_zk003="123456" user_zk004="123456" user_zk005="123456"; };
#其中username和password用於brokers和zk進行認證,user_*用於zk client與zk server進行認證
#user_zk001="123456" 表示 zk001為用戶名,123456為密碼
zoo.cfg添加以下配置
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000
將kafka libs目錄下的jar包拷貝到zookeeper lib目錄:
因為認證的時候用到包org.apache.kafka.common.security.plain.PlainLoginModule, 這個是kafka-client.jar里面,所有需要將相應的jar拷貝到zookeeper安裝根目錄的lib目錄下, 大概要copy這些jar
kafka-clients-2.1.1.jar lz4-java-1.5.0.jar osgi-resource-locator-1.0.1.jar slf4j-api-1.7.25.jar snappy-java-1.1.7.2.jar
修改zk的啟動參數, 修改bin/zkEnv.sh, 在文件尾加上
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf"
kafka broker的認證配置
config目錄下, 創建kafka_server_jaas.conf
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456" user_admin="123456" user_alice="123456" user_write="123456" user_read="123456" user_kafka001="123456" user_kafka002="123456" user_kafka003="123456" user_kafka004="123456" user_kafka005="123456"; }; Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456"; };
修改 config/server.properties
listeners=SASL_PLAINTEXT://192.168.180.128:8123 advertised.listeners=SASL_PLAINTEXT://192.168.180.128:8123 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN #allow.everyone.if.no.acl.found=true super.users=User:admin authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer zookeeper.set.acl=true #listeners,用於server真正bind #advertised.listeners,用於開發給用戶,如果沒有設定,直接使用listeners
修改bin/kafka-server-start.sh
if [ "x$KAFKA_OPTS" ]; then export KAFKA_OPTS="-Djava.security.auth.login.config=/export/ap-comm/server/kafka_2.11-2.1.1/config/kafka_server_jaas.conf" fi
以上認證部分已經配置完成
驗證
啟動kafka
../bin/kafka-server-start.sh server.properties
config目錄下創建zk_client_jaas.conf
Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456"; };
修改kafka-topics.sh 添加配置
if [ "x$KAFKA_OPTS" ]; then export KAFKA_OPTS="-Djava.security.auth.login.config=/export/ap-comm/server/kafka_2.11-2.1.1/config/zk_client_jaas.conf" fi
創建topic
../bin/kafka-topics.sh --create --zookeeper localhost:2189 --replication-factor 1 --partitions 1 --topic test015
修改bin/kafka-acls.sh 添加以下配置
if [ "x$KAFKA_OPTS" ]; then export KAFKA_OPTS="-Djava.security.auth.login.config=/export/ap-comm/server/kafka_2.11-2.1.1/config/zk_client_jaas.conf" fi
write read用戶賦權
../bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2189 --add --allow-principal User:write --operation Write --topic test015 ../bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2189 --add --allow-principal User:read --operation Read --group test-group --topic test015
查看所有權限
../bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=localhost:2189
kafka client的認證配置
config/下創建kafka_write_jaas.conf
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="write" password="123456"; };
修改bin/kafka-console-producer.sh 添加以下配置
if [ "x$KAFKA_OPTS" ]; then export KAFKA_OPTS="-Djava.security.auth.login.config=/export/ap-comm/server/kafka_2.12-1.1.1/config/kafka_write_jaas.conf" fi
config/下創建producer.config
bootstrap.servers=192.168.180.128:8123 compression.type=none security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
producer啟動測試
../bin/kafka-console-producer.sh --broker-list 192.168.180.128:8123 --topic test015 --producer.config producer.config
config/下創建kafka_read_jaas.conf
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="read" password="123456"; };
修改bin/kafka-console-consumer.sh
if [ "x$KAFKA_OPTS" ]; then export KAFKA_OPTS="-Djava.security.auth.login.config=/export/ap-comm/server/kafka_2.11-2.1.1/config/kafka_read_jaas.conf" fi
config/下創建consumer.config
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN group.id=test-group
consumer啟動測試
../bin/kafka-console-consumer.sh --bootstrap-server 192.168.180.128:8123 --topic test015 --from-beginning --consumer.config consumer.config
Java客戶端認證
mvn依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>0.11.0.1</version> </dependency>
生產數據:KafkaProducerSasl.java:
package kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerSasl { public final static String TOPIC = "test010"; private static void producer() throws InterruptedException { System.setProperty("java.security.auth.login.config", "E:/work/saslconf/kafka_write_jaas.conf"); Properties props = new Properties(); props.put("bootstrap.servers", "192.168.180.128:8123"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); Producer<String, String> producer = new KafkaProducer<>(props); while (true){ long startTime = System.currentTimeMillis(); for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord<>(TOPIC, Integer.toString(i), Integer.toString(i))); } System.out.println(System.currentTimeMillis()-startTime); Thread.sleep(5000); } } public static void main(String[] args) { try { producer(); } catch (InterruptedException e) { e.printStackTrace(); } } }
消費數據:KafkaConsumerSasl.java:
package kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; public class KafkaConsumerSasl { public static void consumer() throws Exception { System.setProperty("java.security.auth.login.config", "E:/work/saslconf/kafka_read_jaas.conf"); Properties props = new Properties(); props.put("bootstrap.servers", "192.168.180.128:8123"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("group.id", "test-group"); props.put("session.timeout.ms", "6000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test010")); while (true) { long startTime = System.currentTimeMillis(); ConsumerRecords<String, String> records = consumer.poll(1000); System.out.println(System.currentTimeMillis() - startTime); System.out.println("recieve message number is " + records.count()); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s, partition = %d %n", record.offset(), record.key(), record.value(), record.partition()); } } } //http://www.open-open.com/lib/view/open1412991579999.html public static void main(String[] args) throws Exception { consumer(); } }
kafka安裝包下載:
wget http://mirror.bit.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz
參考文檔
http://kafka.apache.org/documentation/#security
https://www.jianshu.com/p/74f84fbd1f3f
https://blog.csdn.net/linux12a/article/details/77375274
https://www.cnblogs.com/smartloli/p/9191929.html
https://blog.csdn.net/bao_since/article/details/89101226
https://stackoverflow.com/questions/40196689/kafka-topic-authorization-failed
https://stackoverflow.com/questions/42635682/kafka-on-cloudera-test-topic-authorization-failed
https://www.cnblogs.com/xiao987334176/p/10077512.html
https://www.cnblogs.com/ilovena/p/10123516.html