kafka認證官方的相關文檔參見:https://docs.confluent.io/current/security/index.html
這里我們選用了sasl/plain進行認證。下面介紹一下配置流程。
配置步驟
主要分為兩部分,ZooKeeper配置、kafka配置。
ZooKeeper配置
1.在zookeeper的conf目錄下為zoo.cfg添加如下配置,可添加到最后。
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
2.在zookeeper的conf目錄下創建文件zk_server_jaas.conf,注意下文#號以及后面的文字是注釋,配置的時候請刪除
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin" #這里是注釋,這個賬號是給kafka用的。
password="admin-secret" #這里是注釋,這個賬號是上面admin對應的密碼
user_admin="admin-secret"; #這里是注釋,這個是定義的用戶,可提供給生產者和消費者使用,這一行的意思是用戶名是”admin“,密碼是"admin-secret",還可繼續定義多個用戶。
};
3.拷貝相關的jar到zookeeper下,在zookeeper的conf目錄下創建一個sasl_jars目錄,放置這些包。
kafka-clients-0.10.2.1.jar
lz4-1.3.0.jar
org.osgi.core-4.3.0.jar
slf4j-api-1.7.21.jar
snappy-java-1.1.2.6.jar
4.更改zookeeper的bin/zkEnv.sh腳本,讓其啟動的時候加載jar包和jaas,conf文件,在添加jar包路徑的部分添加如下代碼。
for i in "$ZOOBINDIR"/../conf/sasl_jars/*.jar; do
CLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf "
kafka配置
1.創建kafka_server_jaas.conf文件存儲在kafka的config目錄下,注意下文#號以及后面的文字是注釋,配置的時候請刪除
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin" #這里是注釋,這就是上面zookeeper配置的提供給kafka的賬號
password="admin-secret" #這里是注釋,這就是上面zookeeper配置的提供給kafka的賬號密碼
user_admin="admin-secret" #這里是注釋,這是提供給生產者和消費者使用的賬號和密碼,前面的key”user_amin"中admin就是用戶,后面的value admin-secret就是密碼
user_alice="alice"; #這里是注釋,這是另外一個用戶
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
kafka_server_jaas
2.配置config目錄下的server.properties,添加如下配置,如果有,就修改,所有有SASL的地方都要設置成SASL_PLAINTEXT
# server.properties
advertised.listeners=SASL_PLAINTEXT://10.226.133.81:9092
listeners=SASL_PLAINTEXT://ip(kafkaserver的機器):port(9092)
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# default false | true to accept all the users to use it.
#allow.everyone.if.no.acl.found=true
super.users=User:admin;User:alice
3.在bin/kafka-run-class.sh中添加java.security.auth.login.config的環境變量。
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/root/dhl/kafka/kafka_2.11-0.10.2.1/config/kafka_server_jaas.conf'
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi
4.為特定用戶添加特定topic的acl授權,如下表示為用戶alice添加topic nginx-log的讀寫權限。
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.62:2181 --add --allow-principal User:alice --operation Read --operation Write --topic nginx-log
5.驗證授權
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.62:2181 --list --topic nginx-log
生產者消費者驗證
生產者
1.以用戶alice為例,創建JAAS認證文件alice_jaas.conf放在config目錄下,內容如下:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice";
};
2.拷貝bin/kafka-console-producer.sh為bin/alice-kafka-console-producer.sh,並將JAAS文件作為一個JVM參數傳給console producer
cat bin/alice-kafka-console-producer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/root/dhl/kafka/kafka_2.11-0.10.2.1/config/alice_jaas.conf kafka.tools.ConsoleProducer "$@"
3.創建文件producer.config指定如下屬性:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
4.啟動producer
./bin/alice-kafka-console-producer.sh --broker-list 10.226.133.81:9092 --topic testalice --producer.config producer.config
消費者
1.以用戶alice為例,創建JAAS認證文件alice_jaas.conf放在config目錄下,如果用戶跟生產者是同一個,可以復用上面生產者的JAAS文件,內容如下:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice";
};
2.拷貝bin/kafka-console-consumer.sh為bin/alice-kafka-console-consumer.sh,並將JAAS文件作為一個JVM參數傳給console consumer
cat bin/alice-kafka-console-consumer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/root/dhl/kafka/kafka_2.11-0.10.2.1/config/alice_jaas.conf kafka.tools.ConsoleConsumer "$@"
3.創建文件consumer.config指定如下屬性:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=testli-bamboocc
4.啟動consumer
./bin/alice-kafka-console-consumer.sh --bootstrap-server 10.226.133.81:9092 --topic testalice --from-beginning --consumer.config consumer.config
接下來如果produer發送數據,可以在consumer展示出來,那就證明配置成功了。