kafka2.7.1集群搭建以及在SASL/PLAIN下實現動態權限


記錄下在搭建 kafka 2.7.1 版本過程中遇到的一些問題及解決方案
背景
    目前的三套kafka集群版本比較老,0.9,1.0,1.1版本,並且磁盤容量即將到達限制,無法滿足日益增長的產品需求,故計划重新搭建一套新版本的kafka集群
 
本次搭建的目的
    進行測試,計划搭建一套三節點的kafka集群,由於之前版本的kafka集群是很久之前搭建的,版本也比較老,有必要重新踩一遍坑,此外驗證下新版本的權限控制功能,是否能滿足線上需求
 
版本為什么選2.7.1
    此時kafka 2.8版本已發布三個月,2.8版本移除了對zk的依賴,雖然是穩定版本,但是鑒於此集群用於業務,故暫不使用2.8,2.7.1是第二新的版本,選新不選舊。(其實是拍腦袋定的)
 
 
zookeeper的搭建
    zk的版本選擇3.5.9,這是kafka 2.7.1 的配套版本,具體的搭建配置過程不提,說下遇到的問題
 
    問題1 :啟動zk失敗,查看log,報錯如下
    java.lang.UnsupportedClassVersionError: org/springframework/web/SpringServletContainerInitializer : Unsupported major.minor version 52.0 (unable to load class org.springframework.web.SpringServletContainerInitializer)
    原因:是java的版本不對,需要的是jdk 1.8 實際線上默認安裝的是jdk 1.7,安裝1.8后zk正常啟動,安裝可參考: https://www.cnblogs.com/fswhq/p/10713429.html
 
    問題2:zk啟動后,集群狀態不對,參看log,報錯如下
ERROR [/xxxxx:3888:QuorumCnxManager$Listener@958] - Exception while listening
java.net.BindException: Cannot assign requested address (Bind failed)
        at java.net.PlainSocketImpl.socketBind(Native Method)
        at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)
        at java.net.ServerSocket.bind(ServerSocket.java:375)
        at java.net.ServerSocket.bind(ServerSocket.java:329)
    原因:在zoo.cfg文件加上參數quorumListenOnAllIPs=true,貌似物理機上不用加,測試的三台是NVM,可能存在着網絡上的一些設置吧,不懂
 官網原文:quorumListenOnAllIPs:當設置為true時,ZooKeeper服務器將在所有可用IP地址上偵聽來自其對等方的連接,而不僅是在配置文件的服務器列表中配置的地址。它會影響處理ZAB協議和快速領導者選舉協議的連接。默認值為false。
   
   問題3:集群啟動過程中總有個節點無法加入集群,log中看到鏈接失敗和一個warn
WARN  [QuorumConnectionThread-[myid=3]-3:QuorumCnxManager@381] - Cannot open channel to 2 at election address /xxxx:13889
java.net.ConnectException: Connection refused (Connection refused)
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:589)
        at org.apache.zookeeper.server.quorum.QuorumCnxManager.initiateConnection(QuorumCnxManager.java:373)
        at org.apache.zookeeper.server.quorum.QuorumCnxManager$QuorumConnectionReqThread.run(QuorumCnxManager.java:436)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
觀察這個warn發現 Cannot open channel to 2 at election address /xxxx:13889,但是這個節點的myid 是 3,
    原因:zoo.cfg中配置和myid配置,有兩個節點寫反了。。。 server.2服務器的myid配置為3了。
 
kafka集群搭建
    kafka集群的搭建比較簡單,沒有遇到問題,按下不表。接下來重點說下kafka的權限管理(主要是測試,無原理)
    kafka自帶的權限控制,不符合需求,准備嘗試改下kafka的源碼看行不行
 
    1,下載kafka2.7.1源碼。官網下載即可
 
    2,修改權限認證部分的代碼
        路徑:
         /xxxx/kafka-2.7.1-src/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java
        具體的修改內容等能成功運行再貼,免得打臉
 
    3,編譯,修改完代碼后 確認下安裝了java1.8 然后在源碼根目錄執行
    ./gradlew clean build -x test
    然后執行  ./gradlew jar 打jar包,修改一些語法錯誤后,居然就成功了
 
然后執行 生成jar包 ./gradlew srcJar
 
生成jar包后,我們只需要kafka_client.jar 找了下在  xxx/kafka-2.7.1-src/clients/build/libs/ 目錄下發現了 kafka-clients-2.7.1.jar  kafka-clients-2.7.1-sources.jar兩個jar包,應該就是他了!
接下來將kafka-clients-2.7.1.jar copy到kafka/libs 目錄下,覆蓋原有的jar包,至此源碼修改部分完成,下面開始配置SASL/PLAIN權限認證部分。
 
 
SASL/PLAIN權限認證
 
首先配置zookeeper
 
 1,在zoo.cfg加配置,申明權限認證方式,這是指broker和zookeepr的認證
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
 
2,新建zk_server_jaas.conf文件,指定鏈接到zookeeper需要的用戶名和密碼
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-2019"
    user_kafka="kafka-2019"
    user_producer="prod-2019";
};

 

3,從kafka/libs下copy 以下jar包覆蓋到 zk/lib下
kafka-clients-0.10.0.1.jar
lz4-1.3.0.jar
slf4j-api-1.7.21.jar
slf4j-log4j12-1.7.21.jar
snappy-java-1.1.2.6.jar

 

4,修改zkEnv.sh 腳本,最后一行是新建的,指定zk_server_jaas.conf的路徑
#add the zoocfg dir to classpath
CLASSPATH="$ZOOCFGDIR:$CLASSPATH"
for i in "$ZOOBINDIR"/../zookeeper-server/src/main/resources/lib/*.jar
do
    CLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/data1/apache-zookeeper-3.5.9-bin/conf/zk_server_jaas.conf "
 
5,依次重啟所有zk節點,並觀察是否又報錯,無報錯則基本問題了
 
其次配置kafka
1,新建kafka_server_jaas.conf文件,內容如下 KafkaServer 配置的是kafka集群的用戶權限,其中username和password是broker之間通信使用用戶密碼,user_xxx="yyy"是定義的 可以生產消費的用戶,xxx是用戶名,yyy是密碼,原始的權限控制 所使用的用戶必須都在本文件里配置,無法動態增加。 Client 配置的是broker和zk鏈接的用戶密碼,其內容和上文zk的配置對應起來即可
KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin"
        user_admin="admin"
        user_producer="producer"
        user_consumer="consumer";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
        username="kafka"
        password="kafka-2019";
};

 

 
2,配置 server.properties,如下
 
listeners=SASL_PLAINTEXT://xx.xx.xx.xx:19508
advertised.listeners=SASL_PLAINTEXT://xx.xx.xx.xx:19508
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
allow.everyone.if.no.acl.found=true
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin

 

3,修改kafka-run-class.sh腳本,將jass文件路徑加入啟動參數里,所以如果要動態修改用戶參數,需要重啟。
old
# Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; then
  KAFKA_OPTS=""
fi
new
# Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; then
  KAFKA_OPTS="-Djava.security.auth.login.config=/data1/kafka_2.13-2.7.1/config/kafka_server_jaas.conf"
fi

 

 
至此,zk和kafka配置完畢,下面是測試階段,測試將從 python客戶端 和 命令行 兩個角度進行驗證,命令行比較復雜,先說
 
1.申請topic
bin/kafka-topics.sh --create --zookeeper xxxx:2181,xxxx:2181,xxxx:2181 --topic test10 --partitions 10 --replication-factor 3
 
2,配置producer.properties 和  consumer.properties,這里是在指定權限認證方式
producer.properties 新增
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

 

consumer.properties 新增
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
# consumer group id
group.id=test-group

 

 
3,新增 kafka_client_scram_consumer_jaas.conf,kafka_client_scram_producer_jaas.conf 文件,這里指定的是生產者和消費者使用的用戶名和密碼,注意這里的用戶名和密碼和kafka_server_jaas.conf不一樣
kafka_client_scram_consumer_jaas.conf 內容
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="consumer_test"
password="consumer_test";
};

 

kafka_client_scram_producer_jaas.conf 內容
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="producer_test"
password="producer_test";
};

 

 
4,修改kafka-console-consumer.sh 和 kafka-console-producer.sh 文件,建議copy一份,這里使用的是kafka-console-consumer-scram.sh 和 kafka-console-producer-scram.sh,這里是將相應的jaas文件導入
kafka-console-consumer-scram.sh
old
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
new
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/kafka_2.13-2.7.1/config/kafka_client_scram_consumer_jaas.conf  kafka.tools.ConsoleConsumer "$@"

 

kafka-console-producer-scram.sh
old
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
new
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/kafka_2.13-2.7.1/config/kafka_client_scram_producer_jaas.conf kafka.tools.ConsoleProducer "$@"

 

 
測試下 生產者
這里的用戶名密碼是producer_test : producer_test1
>>bin/kafka-console-producer-scram.sh --bootstrap-server 10.182.13.237:19508,10.182.13.238:19508 --topic test10 --producer.config ./config/producer.properties
 
[2021-07-13 11:54:17,303] ERROR Error when sending message to topic test10 with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password

 

這里的用戶名密碼是producer_test :producer_test
>>bin/kafka-console-producer-scram.sh --bootstrap-server 10.182.13.237:19508,10.182.13.238:19508 --topic test10 --producer.config ./config/producer.properties                     
>test:1
>

 

 
測試下消費者
bin/kafka-console-consumer-acl.sh --bootstrap-server  10.182.13.237:19508 --topic test10  --from-beginning --consumer.config ./config/consumer.properties
 
這里的用戶名密碼是consumer_test : consumer_test1
>>bin/kafka-console-consumer-scram.sh --bootstrap-server  10.182.13.237:19508 --topic test10  --from-beginning --consumer.config ./config/consumer.properties
[2021-07-13 11:58:49,030] ERROR [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (10.182.13.237/10.182.13.237:19508) failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)
[2021-07-13 11:58:49,031] WARN [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker 10.182.13.237:19508 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-07-13 11:58:49,032] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password
Processed a total of 0 messages
這里的用戶名密碼是consumer_test : consumer_test
>>bin/kafka-console-consumer-scram.sh --bootstrap-server  10.182.13.237:19508 --topic test10  --from-beginning --consumer.config ./config/consumer.properties      
[2021-07-13 11:57:19,626] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: test-group
Processed a total of 0 messages
看着是group 未授權
 
授權group 
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=10.182.9.145:2181,10.182.13.237:2181,10.182.13.238:2181 --add --allow-principal User:consumer_test --operation Read --group test-group

 

 
再來一次
>>bin/kafka-console-consumer-scram.sh --bootstrap-server  10.182.13.237:19508 --topic test10  --from-beginning --consumer.config ./config/consumer.properties
test:1
test:1

這就有結果了

 
從目前看。用戶名和密碼一致,則認證通過,不一致則不通過,不需要授read / write權限,但是group read 權限需要授予
 
下面使用python客戶端測試,,代碼如下,很簡單,略過

import json
from kafka.errors import KafkaError producer = KafkaProducer(bootstrap_servers=["10.182.9.145:19508"], security_protocol = "SASL_PLAINTEXT", sasl_mechanism = 'PLAIN', sasl_plain_username = "producer_aaa", sasl_plain_password = "producer_aaa") data = json.dumps({ "test": "1" }) for i in range(1,10): producer.send("test10",value=bytes(data)) producer.close()

 

#!/usr/bin/env python
from kafka import KafkaConsumer
 
# To consume messages
consumer = KafkaConsumer('test10',
                         group_id='test-group',
                         bootstrap_servers=['10.182.9.145:19508'],
                         auto_offset_reset="earliest",
                         security_protocol = "SASL_PLAINTEXT",
                         sasl_mechanism = "PLAIN",
                         sasl_plain_username = "consumer_test",
                         sasl_plain_password = "consumer_test"
                         )
for message in consumer:
    # message value is raw byte string -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                         message.offset, message.key,
                                         message.value))

 

 
目前的問題是,需要完善的權限控制,但是現在生產者默認有所有topic的寫入權限,消費者可以用group控制,這不符合線上的要求,按理說kafka的認證和授權是分開的,在源碼上只修改了認證部分
 
 查官方文檔 && 測試 && 猜測中。。。。。。。。。。。。。。
 
找到原因了:如果一個topic創建后,不申請任何權限,那么它就是所有用戶(通過認證)都能訪問的,如果對此topic進行任意授權,那就只能讓授權列表中的用戶訪問了,
目標達到了,就貼一下修改細節,修改的函數如下,替換了authenticate 函數的內容,將認證的邏輯修改為,如果用戶名和密碼一致則認證通過
 
//    protected boolean authenticate(String username, char[] password) throws IOException {
//        if (username == null)
//            return false;
//        else {
//            String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries,
//                    JAAS_USER_PREFIX + username,
//                    PlainLoginModule.class.getName());
//            return expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray());
//        }
//    }

    protected boolean authenticate(String username, char[] password) throws IOException {
        if (username == null)
            return false;
        else {
            return expectedPassword != null && Arrays.equals(password, username.toCharArray());
        }
    }
 

 

接下來,需要再進一步,將邏輯修改為,讀取本地文件,判斷用戶名和密碼是否在文件內 來決定是否通過認證,此舉是為了便於管理業務的用戶和密碼
 
 
代碼修改如下,邏輯很簡單,是否通過認證是通過查找users.properties文件的內容確定的
protected boolean authenticate(String username, char[] password) throws IOException {
        String pass = String.valueOf(password);
        int flag = readFileContent("/xxxx/kafka_2.13-2.7.1/config/users.properties", username, pass);
        if (flag == 1)
            return true;
        return false;
    }
 
 
    public static int readFileContent(String fileName, String username, String pass) {
        File file = new File(fileName);
        BufferedReader reader = null;
        try {
            reader = new BufferedReader(new FileReader(file));
            String tempStr;
            String[] userandpassword;
            while ((tempStr = reader.readLine()) != null) {
                userandpassword = tempStr.split(":");
                if (userandpassword[0].equals(username) && userandpassword[1].equals(pass))
                    return 1;
            }
            reader.close();
            return 0;
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (reader != null) {
                try {
                    reader.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }
        return 0;
    }

 

重新打包,生成jar文件,替換kafka-clients-2.7.1.jar文件,然后重啟kafka集群,
此時出現大量報錯:如下
[2021-07-14 16:59:43,312] ERROR [Controller id=145, targetBrokerId=145] Connection to node 145 (xx.xx.xx.xx/xx.xx.xx.xx:19508) failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)
[2021-07-14 16:59:43,413] INFO [SocketServer brokerId=145] Failed authentication with /xx.xx.xx.xx (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector)
[2021-07-14 16:59:43,485] INFO [Controller id=145, targetBrokerId=237] Failed authentication with xx.xx.xx.xx/xx.xx.xx.xx (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector)
[2021-07-14 16:59:43,485] ERROR [Controller id=145, targetBrokerId=237] Connection to node 237 (xx.xx.xx.xx/xx.xx.xx.xx:19508) failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)
[2021-07-14 16:59:43,713] INFO [Controller id=145, targetBrokerId=145] Failed authentication with xx.xx.xx.xx/xx.xx.xx.xx (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector)
[2021-07-14 16:59:43,713] ERROR [Controller id=145, targetBrokerId=145] Connection to node 145 (xx.xx.xx.xx/xx.xx.xx.xx:19508) failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)
[2021-07-14 16:59:43,815] INFO [SocketServer brokerId=145] Failed authentication with /xx.xx.xx.xx (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector)
原因是broker之間,broker和zookeeper之間的認證也是用的authenticate函數,但是目前users.properties文件里為空,所以所有的認證都失敗了,在文件中補全所有的用戶名和密碼后,報錯消失。
 
下面使用python 客戶端進行測試,
1,創建測試topic test11
bin/kafka-topics.sh --create --zookeeper xxxx:2181,xxxx:2181,xxxx:2181 --topic test11 --partitions 10 --replication-factor 3
 
2,此時通過python produce寫入數據報錯 用戶名和密碼為producer_zzz,producer_lll,
D:\python_pycharm\venv\Scripts\python.exe D:/python_pycharm/kafka_demo/kakfa_produce.py
Traceback (most recent call last):
  File "D:/python_pycharm/kafka_demo/kakfa_produce.py", line 10, in <module>
    sasl_plain_password="producer_lll")
  File "D:\python_pycharm\venv\lib\site-packages\kafka\producer\kafka.py", line 347, in __init__
    **self.config)
  File "D:\python_pycharm\venv\lib\site-packages\kafka\client_async.py", line 216, in __init__
    self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
  File "D:\python_pycharm\venv\lib\site-packages\kafka\client_async.py", line 250, in _bootstrap
    bootstrap.connect()
  File "D:\python_pycharm\venv\lib\site-packages\kafka\conn.py", line 374, in connect
    if self._try_authenticate():
  File "D:\python_pycharm\venv\lib\site-packages\kafka\conn.py", line 451, in _try_authenticate
    raise self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type
kafka.errors.AuthenticationFailedError: AuthenticationFailedError: Authentication failed for user producer_zzz
可以看到是 認證失敗,
 
3,然后在users.properties 文件中加上producer_zzz:producer_lll 再試一次
D:\python_pycharm\venv\Scripts\python.exe D:/python_pycharm/kafka_demo/kakfa_produce.py
Process finished with exit code 0
可以發現寫入成功了,這是由於新topic沒有進行授權,默認所有produce都可以寫入,也證實了現在用戶可以熱更新了。
 
4更進一步的測試,進行授權
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=xx.xx.xx.xx:2181 --add --allow-principal User:producer_zzz  --operation Write --topic test11

 

這是其他的用戶就不能隨意寫了,其他用戶的報錯如下。而producer_zzz用戶還是可以的。
D:\python_pycharm\venv\Scripts\python.exe D:/python_pycharm/kafka_demo/kakfa_produce.py
Traceback (most recent call last):
  File "D:/python_pycharm/kafka_demo/kakfa_produce.py", line 37, in <module>
    producer.send("test11",value=bytes(data))
  File "D:\python_pycharm\venv\lib\site-packages\kafka\producer\kafka.py", line 504, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "D:\python_pycharm\venv\lib\site-packages\kafka\producer\kafka.py", line 631, in _wait_on_metadata
    "Failed to update metadata after %.1f secs." % max_wait)
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
 
5 接下來測試consumer
直接運行報錯
D:\python_pycharm\venv\Scripts\python.exe D:/python_pycharm/kafka_demo/kafka_consumer.py
Traceback (most recent call last):
  File "D:/python_pycharm/kafka_demo/kafka_consumer.py", line 16, in <module>
    for message in consumer:
  File "D:\python_pycharm\venv\lib\site-packages\kafka\vendor\six.py", line 561, in next
    return type(self).__next__(self)
  File "D:\python_pycharm\venv\lib\site-packages\kafka\consumer\group.py", line 1075, in __next__
    return next(self._iterator)
  File "D:\python_pycharm\venv\lib\site-packages\kafka\consumer\group.py", line 998, in _message_generator
    self._coordinator.ensure_coordinator_known()
  File "D:\python_pycharm\venv\lib\site-packages\kafka\coordinator\base.py", line 225, in ensure_coordinator_known
    raise future.exception  # pylint: disable-msg=raising-bad-type
kafka.errors.GroupAuthorizationFailedError: [Error 30] GroupAuthorizationFailedError: test-group

 

6,在users.properties 加用戶名和密碼,consumer 授予read 權限,group 授權 一條龍執行
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=xx.xx.xx.xx:2181 --add --allow-principal User:consumer_zzz --operation Read --topic test11
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=xx.xx.xx.xx:2181 --add --allow-principal User:consumer_zzz --operation Read --group test-group
 

 

消費成功!
D:\python_pycharm\venv\Scripts\python.exe D:/python_pycharm/kafka_demo/kafka_consumer.py
test11:8:0: key=None value={"test": "1"}
test11:5:0: key=None value={"test": "1"}
test11:3:0: key=None value={"test": "1"}
test11:0:0: key=None value={"test": "1"}
test11:6:0: key=None value={"test": "1"}
test11:6:1: key=None value={"test": "1"}
test11:10:0: key=None value={"test": "1"}
test11:4:0: key=None value={"test": "1"}
test11:4:1: key=None value={"test": "1"}

 

至此,kafka2.7.1版本搭建以及動態的權限認證問題 基本解決,是否可以上線運行,還需要進行進一步的評估,對源碼的修改也需要充分的review,但基本流程已經跑通,總耗時約一個禮拜,大勝利~
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM