1、新增kafka_client_jaas.conf,kafka_server_jaas.conf,kafka_zoo_jaas.conf三个文件放入kafka的config文件夹中,文件中配置用户,admin用户必须配置。
-
kafka_client_jaas.conf内容如下:
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; };
-
kafka_server_jaas.conf内容如下:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin" user_test1="123456" user_test2="1234567"; }; KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; }; Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; };
-
kafka_zoo_jaas.conf内容如下:
ZKServer{ org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin"; };
2、修改kafka的bin文件夹中的zookeeper-server-start.sh,添加以下内容:
export KAFKA_OPTS="-Djava.security.auth.login.config=/app/kafka/kafka_2.11-1.1.0/config/kafka_zoo_jaas.conf -Dzookeeper.sasl.serverconfig=ZKServer"
3、修改kafka的bin文件夹中的kafka-server-start.sh,添加以下内容:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/app/kafka/kafka_2.11-1.1.0/config/kafka_server_jaas.conf"
4、修改kafka的bin文件夹中的kafka-console-producer.sh,添加以下内容:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/app/kafka/kafka_2.11-1.1.0/config/kafka_client_jaas.conf"
5、修改kafka的bin文件夹中的kafka-console-consumer.sh,添加以下内容:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/app/kafka/kafka_2.11-1.1.0/config/kafka_client_jaas.conf"
6、修改kafka的config文件夹中的consumer.properties,添加以下内容:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
7、修改kafka的config文件夹中的producer.properties,添加以下内容:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
8、修改kafka的config文件夹中的zookeeper.properties
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
9、修改kafka的config文件夹中的server.properties
修改:listeners=SASL_PLAINTEXT://192.168.85.13:16667
添加以下配置:
#使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
#完成身份验证的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#如果没有找到ACL(访问控制列表)配置,则允许任何操作。
#allow.everyone.if.no.acl.found=true
super.users=User:admin
delete.topic.enable=true
auto.create.topics.enable=false
10、启动zookeeper服务:
./zookeeper-server-start.sh /app/kafka/kafka_2.1.1-1.1.0/config/zookeeper.properties
11、启动kafka服务:
./kafka-server-start.sh -daemon /app/kafka/kafka_2.11-1.1.0/config/server.properties &
12、查看topic列表:
./kafka-topics.sh --list --zookeeper localhost:2181
13、创建新的topic:
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test
14、给admin用户授权:
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:admin --group=* --topic=*
15、给用户test授予某个topic的权限(不加参数默认为all,所有权)
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test1 --topic test --group=*
说明:
可以加这两个参数控制读写:--operationRead --operationWrite
控制消费组:不控制组 --group=*,指定消费组 --grouptest-comsumer-group
以下为测试********
16、在Prometheus的tool目录下新建一个kafka1.conf用来进行用户认证:
security.protocol=SASL_PLAINTEXT
sasl.mechanisms=PLAIN
sasl.username=test1
sasl.password=123456
17、验证,使用Prometheus自带的工具kafkacat来连接kafka:
kafkacat连接kafka并指定刚才创建的配置文件:
./kafkacat -L -b 192.168.85.13:16667 -F ./kafka1.conf
18、使用Prometheus自带的工具kafkacat测试生产者发送消息:
./kafkacat -P -t test -b 192.168.85.13:16667 -F ./kafka1.conf
19、使用Prometheus自带的工具kafkacat测试消费消息:
./kafkacat -C -t test -b 192.168.85.13:16667 -F ./kafka1.conf