KAFKA的SASL认证


kafka认证官方的相关文档参见:https://docs.confluent.io/current/security/index.html

这里我们选用了sasl/plain进行认证。下面介绍一下配置流程。

配置步骤

主要分为两部分,ZooKeeper配置、kafka配置。

ZooKeeper配置

1.在zookeeper的conf目录下为zoo.cfg添加如下配置,可添加到最后。

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

requireClientAuthScheme=sasl

jaasLoginRenew=3600000

2.在zookeeper的conf目录下创建文件zk_server_jaas.conf,注意下文#号以及后面的文字是注释,配置的时候请删除

Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"                                                     #这里是注释,这个账号是给kafka用的
password="admin-secret"                                           #这里是注释,这个账号是上面admin对应的密码
user_admin="admin-secret";                                       #这里是注释,这个是定义的用户,可提供给生产者和消费者使用,这一行的意思是用户名是”admin“,密码是"admin-secret",还可继续定义多个用户。
};

3.拷贝相关的jar到zookeeper下,在zookeeper的conf目录下创建一个sasl_jars目录,放置这些包。

kafka-clients-0.10.2.1.jar
lz4-1.3.0.jar
org.osgi.core-4.3.0.jar
slf4j-api-1.7.21.jar
snappy-java-1.1.2.6.jar

4.更改zookeeper的bin/zkEnv.sh脚本,让其启动的时候加载jar包和jaas,conf文件,在添加jar包路径的部分添加如下代码。

for i in "$ZOOBINDIR"/../conf/sasl_jars/*.jar; do
CLASSPATH="$i:$CLASSPATH"
done

SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf "

kafka配置

1.创建kafka_server_jaas.conf文件存储在kafka的config目录下,注意下文#号以及后面的文字是注释,配置的时候请删除

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"                                                           #这里是注释,这就是上面zookeeper配置的提供给kafka的账号
password="admin-secret"                                                 #这里是注释,这就是上面zookeeper配置的提供给kafka的账号密码
user_admin="admin-secret"                                              #这里是注释,这是提供给生产者和消费者使用的账号和密码,前面的key”user_amin"中admin就是用户,后面的value admin-secret就是密码
user_alice="alice";                                                              #这里是注释,这是另外一个用户
};

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice";
};

Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};

kafka_server_jaas

2.配置config目录下的server.properties,添加如下配置,如果有,就修改,所有有SASL的地方都要设置成SASL_PLAINTEXT

# server.properties

advertised.listeners=SASL_PLAINTEXT://10.226.133.81:9092
listeners=SASL_PLAINTEXT://ip(kafkaserver的机器):port(9092) 
security.inter.broker.protocol=SASL_PLAINTEXT 
sasl.enabled.mechanisms=PLAIN 
sasl.mechanism.inter.broker.protocol=PLAIN 
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 
# default false | true to accept all the users to use it.
#allow.everyone.if.no.acl.found=true 
super.users=User:admin;User:alice

3.在bin/kafka-run-class.sh中添加java.security.auth.login.config的环境变量。

KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/root/dhl/kafka/kafka_2.11-0.10.2.1/config/kafka_server_jaas.conf'
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi

4.为特定用户添加特定topic的acl授权,如下表示为用户alice添加topic nginx-log的读写权限。

./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.62:2181 --add --allow-principal User:alice --operation Read --operation Write --topic nginx-log

5.验证授权

./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.100.62:2181 --list --topic nginx-log

生产者消费者验证

生产者

1.以用户alice为例,创建JAAS认证文件alice_jaas.conf放在config目录下,内容如下:

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice";
};

2.拷贝bin/kafka-console-producer.sh为bin/alice-kafka-console-producer.sh,并将JAAS文件作为一个JVM参数传给console producer

cat bin/alice-kafka-console-producer.sh

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/root/dhl/kafka/kafka_2.11-0.10.2.1/config/alice_jaas.conf kafka.tools.ConsoleProducer "$@"

3.创建文件producer.config指定如下属性:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

4.启动producer

./bin/alice-kafka-console-producer.sh --broker-list 10.226.133.81:9092 --topic testalice --producer.config producer.config

消费者

1.以用户alice为例,创建JAAS认证文件alice_jaas.conf放在config目录下,如果用户跟生产者是同一个,可以复用上面生产者的JAAS文件,内容如下:

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice";
};

2.拷贝bin/kafka-console-consumer.sh为bin/alice-kafka-console-consumer.sh,并将JAAS文件作为一个JVM参数传给console consumer

cat bin/alice-kafka-console-consumer.sh

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/root/dhl/kafka/kafka_2.11-0.10.2.1/config/alice_jaas.conf kafka.tools.ConsoleConsumer "$@"

3.创建文件consumer.config指定如下属性:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=testli-bamboocc

4.启动consumer

./bin/alice-kafka-console-consumer.sh --bootstrap-server 10.226.133.81:9092 --topic testalice --from-beginning --consumer.config consumer.config

接下来如果produer发送数据,可以在consumer展示出来,那就证明配置成功了。



 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM