1、新增kafka_client_jaas.conf,kafka_server_jaas.conf,kafka_zoo_jaas.conf三個文件放入kafka的config文件夾中,文件中配置用戶,admin用戶必須配置。
-
kafka_client_jaas.conf內容如下:
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; }; -
kafka_server_jaas.conf內容如下:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin" user_test1="123456" user_test2="1234567"; }; KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; }; Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; }; -
kafka_zoo_jaas.conf內容如下:
ZKServer{ org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin"; };
2、修改kafka的bin文件夾中的zookeeper-server-start.sh,添加以下內容:
export KAFKA_OPTS="-Djava.security.auth.login.config=/app/kafka/kafka_2.11-1.1.0/config/kafka_zoo_jaas.conf -Dzookeeper.sasl.serverconfig=ZKServer"
3、修改kafka的bin文件夾中的kafka-server-start.sh,添加以下內容:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/app/kafka/kafka_2.11-1.1.0/config/kafka_server_jaas.conf"
4、修改kafka的bin文件夾中的kafka-console-producer.sh,添加以下內容:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/app/kafka/kafka_2.11-1.1.0/config/kafka_client_jaas.conf"
5、修改kafka的bin文件夾中的kafka-console-consumer.sh,添加以下內容:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/app/kafka/kafka_2.11-1.1.0/config/kafka_client_jaas.conf"
6、修改kafka的config文件夾中的consumer.properties,添加以下內容:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
7、修改kafka的config文件夾中的producer.properties,添加以下內容:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
8、修改kafka的config文件夾中的zookeeper.properties
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
9、修改kafka的config文件夾中的server.properties
修改:listeners=SASL_PLAINTEXT://192.168.85.13:16667
添加以下配置:
#使用的認證協議
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL機制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
#完成身份驗證的類
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#如果沒有找到ACL(訪問控制列表)配置,則允許任何操作。
#allow.everyone.if.no.acl.found=true
super.users=User:admin
delete.topic.enable=true
auto.create.topics.enable=false
10、啟動zookeeper服務:
./zookeeper-server-start.sh /app/kafka/kafka_2.1.1-1.1.0/config/zookeeper.properties
11、啟動kafka服務:
./kafka-server-start.sh -daemon /app/kafka/kafka_2.11-1.1.0/config/server.properties &
12、查看topic列表:
./kafka-topics.sh --list --zookeeper localhost:2181
13、創建新的topic:
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test
14、給admin用戶授權:
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:admin --group=* --topic=*
15、給用戶test授予某個topic的權限(不加參數默認為all,所有權)
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test1 --topic test --group=*
說明:
可以加這兩個參數控制讀寫:--operationRead --operationWrite
控制消費組:不控制組 --group=*,指定消費組 --grouptest-comsumer-group
以下為測試********
16、在Prometheus的tool目錄下新建一個kafka1.conf用來進行用戶認證:
security.protocol=SASL_PLAINTEXT
sasl.mechanisms=PLAIN
sasl.username=test1
sasl.password=123456
17、驗證,使用Prometheus自帶的工具kafkacat來連接kafka:
kafkacat連接kafka並指定剛才創建的配置文件:
./kafkacat -L -b 192.168.85.13:16667 -F ./kafka1.conf
18、使用Prometheus自帶的工具kafkacat測試生產者發送消息:
./kafkacat -P -t test -b 192.168.85.13:16667 -F ./kafka1.conf
19、使用Prometheus自帶的工具kafkacat測試消費消息:
./kafkacat -C -t test -b 192.168.85.13:16667 -F ./kafka1.conf
