記錄下在搭建 kafka 2.7.1 版本過程中遇到的一些問題及解決方案
背景
目前的三套kafka集群版本比較老,0.9,1.0,1.1版本,並且磁盤容量即將到達限制,無法滿足日益增長的產品需求,故計划重新搭建一套新版本的kafka集群
本次搭建的目的
進行測試,計划搭建一套三節點的kafka集群,由於之前版本的kafka集群是很久之前搭建的,版本也比較老,有必要重新踩一遍坑,此外驗證下新版本的權限控制功能,是否能滿足線上需求
版本為什么選2.7.1
此時kafka 2.8版本已發布三個月,2.8版本移除了對zk的依賴,雖然是穩定版本,但是鑒於此集群用於業務,故暫不使用2.8,2.7.1是第二新的版本,選新不選舊。(其實是拍腦袋定的)
zookeeper的搭建
zk的版本選擇3.5.9,這是kafka 2.7.1 的配套版本,具體的搭建配置過程不提,說下遇到的問題
問題1 :啟動zk失敗,查看log,報錯如下
java.lang.UnsupportedClassVersionError: org/springframework/web/SpringServletContainerInitializer : Unsupported major.minor version 52.0 (unable to load class org.springframework.web.SpringServletContainerInitializer)
原因:是java的版本不對,需要的是jdk 1.8 實際線上默認安裝的是jdk 1.7,安裝1.8后zk正常啟動,安裝可參考:
https://www.cnblogs.com/fswhq/p/10713429.html
問題2:zk啟動后,集群狀態不對,參看log,報錯如下
ERROR [/xxxxx:3888:QuorumCnxManager$Listener@958] - Exception while listening
java.net.BindException: Cannot assign requested address (Bind failed)
at java.net.PlainSocketImpl.socketBind(Native Method)
at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)
at java.net.ServerSocket.bind(ServerSocket.java:375)
at java.net.ServerSocket.bind(ServerSocket.java:329)
原因:在zoo.cfg文件加上參數quorumListenOnAllIPs=true,貌似物理機上不用加,測試的三台是NVM,可能存在着網絡上的一些設置吧,不懂
官網原文:quorumListenOnAllIPs:當設置為true時,ZooKeeper服務器將在所有可用IP地址上偵聽來自其對等方的連接,而不僅是在配置文件的服務器列表中配置的地址。它會影響處理ZAB協議和快速領導者選舉協議的連接。默認值為false。
問題3:集群啟動過程中總有個節點無法加入集群,log中看到鏈接失敗和一個warn
WARN [QuorumConnectionThread-[myid=3]-3:QuorumCnxManager@381] - Cannot open channel to 2 at election address /xxxx:13889
java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.zookeeper.server.quorum.QuorumCnxManager.initiateConnection(QuorumCnxManager.java:373)
at org.apache.zookeeper.server.quorum.QuorumCnxManager$QuorumConnectionReqThread.run(QuorumCnxManager.java:436)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
觀察這個warn發現 Cannot open channel to 2 at election address /xxxx:13889,但是這個節點的myid 是 3,
原因:zoo.cfg中配置和myid配置,有兩個節點寫反了。。。 server.2服務器的myid配置為3了。
kafka集群搭建
kafka集群的搭建比較簡單,沒有遇到問題,按下不表。接下來重點說下kafka的權限管理(主要是測試,無原理)
kafka自帶的權限控制,不符合需求,准備嘗試改下kafka的源碼看行不行
1,下載kafka2.7.1源碼。官網下載即可
2,修改權限認證部分的代碼
路徑:
/xxxx/kafka-2.7.1-src/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java
具體的修改內容等能成功運行再貼,免得打臉
3,編譯,修改完代碼后 確認下安裝了java1.8 然后在源碼根目錄執行
./gradlew clean build -x test
然后執行
./gradlew jar 打jar包,修改一些語法錯誤后,居然就成功了
然后執行 生成jar包
./gradlew srcJar
生成jar包后,我們只需要kafka_client.jar 找了下在 xxx/kafka-2.7.1-src/clients/build/libs/ 目錄下發現了 kafka-clients-2.7.1.jar kafka-clients-2.7.1-sources.jar兩個jar包,應該就是他了!
接下來將kafka-clients-2.7.1.jar copy到kafka/libs 目錄下,覆蓋原有的jar包,至此源碼修改部分完成,下面開始配置SASL/PLAIN權限認證部分。
SASL/PLAIN權限認證
首先配置zookeeper
1,在zoo.cfg加配置,申明權限認證方式,這是指broker和zookeepr的認證
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000
2,新建zk_server_jaas.conf文件,指定鏈接到zookeeper需要的用戶名和密碼
Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-2019" user_kafka="kafka-2019" user_producer="prod-2019"; };
3,從kafka/libs下copy 以下jar包覆蓋到 zk/lib下
kafka-clients-0.10.0.1.jar lz4-1.3.0.jar slf4j-api-1.7.21.jar slf4j-log4j12-1.7.21.jar snappy-java-1.1.2.6.jar
4,修改zkEnv.sh 腳本,最后一行是新建的,指定zk_server_jaas.conf的路徑
#add the zoocfg dir to classpath CLASSPATH="$ZOOCFGDIR:$CLASSPATH" for i in "$ZOOBINDIR"/../zookeeper-server/src/main/resources/lib/*.jar do CLASSPATH="$i:$CLASSPATH" done SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/data1/apache-zookeeper-3.5.9-bin/conf/zk_server_jaas.conf "
5,依次重啟所有zk節點,並觀察是否又報錯,無報錯則基本問題了
其次配置kafka
1,新建kafka_server_jaas.conf文件,內容如下 KafkaServer 配置的是kafka集群的用戶權限,其中username和password是broker之間通信使用用戶密碼,user_xxx="yyy"是定義的 可以生產消費的用戶,xxx是用戶名,yyy是密碼,原始的權限控制 所使用的用戶必須都在本文件里配置,無法動態增加。 Client 配置的是broker和zk鏈接的用戶密碼,其內容和上文zk的配置對應起來即可
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin" user_producer="producer" user_consumer="consumer"; }; Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-2019"; };
2,配置 server.properties,如下
listeners=SASL_PLAINTEXT://xx.xx.xx.xx:19508 advertised.listeners=SASL_PLAINTEXT://xx.xx.xx.xx:19508 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN allow.everyone.if.no.acl.found=true authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:admin
3,修改kafka-run-class.sh腳本,將jass文件路徑加入啟動參數里,所以如果要動態修改用戶參數,需要重啟。
old # Generic jvm settings you want to add if [ -z "$KAFKA_OPTS" ]; then KAFKA_OPTS="" fi new # Generic jvm settings you want to add if [ -z "$KAFKA_OPTS" ]; then KAFKA_OPTS="-Djava.security.auth.login.config=/data1/kafka_2.13-2.7.1/config/kafka_server_jaas.conf" fi
至此,zk和kafka配置完畢,下面是測試階段,測試將從 python客戶端 和 命令行 兩個角度進行驗證,命令行比較復雜,先說
1.申請topic
bin/kafka-topics.sh --create --zookeeper xxxx:2181,xxxx:2181,xxxx:2181 --topic test10 --partitions 10 --replication-factor 3
2,配置producer.properties 和 consumer.properties,這里是在指定權限認證方式
producer.properties 新增
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
consumer.properties 新增
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN # consumer group id group.id=test-group
3,新增 kafka_client_scram_consumer_jaas.conf,kafka_client_scram_producer_jaas.conf 文件,這里指定的是生產者和消費者使用的用戶名和密碼,注意這里的用戶名和密碼和kafka_server_jaas.conf不一樣
kafka_client_scram_consumer_jaas.conf 內容
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer_test" password="consumer_test"; };
kafka_client_scram_producer_jaas.conf 內容
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="producer_test" password="producer_test"; };
4,修改kafka-console-consumer.sh 和 kafka-console-producer.sh 文件,建議copy一份,這里使用的是kafka-console-consumer-scram.sh 和 kafka-console-producer-scram.sh,這里是將相應的jaas文件導入
kafka-console-consumer-scram.sh
old exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@" new exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/kafka_2.13-2.7.1/config/kafka_client_scram_consumer_jaas.conf kafka.tools.ConsoleConsumer "$@"
kafka-console-producer-scram.sh
old exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@" new exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/kafka_2.13-2.7.1/config/kafka_client_scram_producer_jaas.conf kafka.tools.ConsoleProducer "$@"
測試下 生產者
這里的用戶名密碼是producer_test : producer_test1
>>bin/kafka-console-producer-scram.sh --bootstrap-server 10.182.13.237:19508,10.182.13.238:19508 --topic test10 --producer.config ./config/producer.properties [2021-07-13 11:54:17,303] ERROR Error when sending message to topic test10 with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password
這里的用戶名密碼是producer_test :producer_test
>>bin/kafka-console-producer-scram.sh --bootstrap-server 10.182.13.237:19508,10.182.13.238:19508 --topic test10 --producer.config ./config/producer.properties >test:1 >
測試下消費者
bin/kafka-console-consumer-acl.sh --bootstrap-server 10.182.13.237:19508 --topic test10 --from-beginning --consumer.config ./config/consumer.properties
這里的用戶名密碼是consumer_test : consumer_test1
>>bin/kafka-console-consumer-scram.sh --bootstrap-server 10.182.13.237:19508 --topic test10 --from-beginning --consumer.config ./config/consumer.properties [2021-07-13 11:58:49,030] ERROR [Consumer clientId=consumer-test-group-1, groupId=test-group] Connection to node -1 (10.182.13.237/10.182.13.237:19508) failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient) [2021-07-13 11:58:49,031] WARN [Consumer clientId=consumer-test-group-1, groupId=test-group] Bootstrap broker 10.182.13.237:19508 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2021-07-13 11:58:49,032] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password Processed a total of 0 messages
這里的用戶名密碼是consumer_test : consumer_test
>>bin/kafka-console-consumer-scram.sh --bootstrap-server 10.182.13.237:19508 --topic test10 --from-beginning --consumer.config ./config/consumer.properties [2021-07-13 11:57:19,626] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: test-group Processed a total of 0 messages
看着是group 未授權
授權group
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=10.182.9.145:2181,10.182.13.237:2181,10.182.13.238:2181 --add --allow-principal User:consumer_test --operation Read --group test-group
再來一次
>>bin/kafka-console-consumer-scram.sh --bootstrap-server 10.182.13.237:19508 --topic test10 --from-beginning --consumer.config ./config/consumer.properties test:1 test:1
這就有結果了
從目前看。用戶名和密碼一致,則認證通過,不一致則不通過,不需要授read / write權限,但是group read 權限需要授予
下面使用python客戶端測試,,代碼如下,很簡單,略過
import json from kafka.errors import KafkaError producer = KafkaProducer(bootstrap_servers=["10.182.9.145:19508"], security_protocol = "SASL_PLAINTEXT", sasl_mechanism = 'PLAIN', sasl_plain_username = "producer_aaa", sasl_plain_password = "producer_aaa") data = json.dumps({ "test": "1" }) for i in range(1,10): producer.send("test10",value=bytes(data)) producer.close()
#!/usr/bin/env python from kafka import KafkaConsumer # To consume messages consumer = KafkaConsumer('test10', group_id='test-group', bootstrap_servers=['10.182.9.145:19508'], auto_offset_reset="earliest", security_protocol = "SASL_PLAINTEXT", sasl_mechanism = "PLAIN", sasl_plain_username = "consumer_test", sasl_plain_password = "consumer_test" ) for message in consumer: # message value is raw byte string -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
目前的問題是,需要完善的權限控制,但是現在生產者默認有所有topic的寫入權限,消費者可以用group控制,這不符合線上的要求,按理說kafka的認證和授權是分開的,在源碼上只修改了認證部分
查官方文檔 && 測試 && 猜測中。。。。。。。。。。。。。。
找到原因了:如果一個topic創建后,不申請任何權限,那么它就是所有用戶(通過認證)都能訪問的,如果對此topic進行任意授權,那就只能讓授權列表中的用戶訪問了,
目標達到了,就貼一下修改細節,修改的函數如下,替換了authenticate 函數的內容,將認證的邏輯修改為,如果用戶名和密碼一致則認證通過
// protected boolean authenticate(String username, char[] password) throws IOException { // if (username == null) // return false; // else { // String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries, // JAAS_USER_PREFIX + username, // PlainLoginModule.class.getName()); // return expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray()); // } // } protected boolean authenticate(String username, char[] password) throws IOException { if (username == null) return false; else { return expectedPassword != null && Arrays.equals(password, username.toCharArray()); } }
接下來,需要再進一步,將邏輯修改為,讀取本地文件,判斷用戶名和密碼是否在文件內 來決定是否通過認證,此舉是為了便於管理業務的用戶和密碼
代碼修改如下,邏輯很簡單,是否通過認證是通過查找users.properties文件的內容確定的
protected boolean authenticate(String username, char[] password) throws IOException { String pass = String.valueOf(password); int flag = readFileContent("/xxxx/kafka_2.13-2.7.1/config/users.properties", username, pass); if (flag == 1) return true; return false; } public static int readFileContent(String fileName, String username, String pass) { File file = new File(fileName); BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(file)); String tempStr; String[] userandpassword; while ((tempStr = reader.readLine()) != null) { userandpassword = tempStr.split(":"); if (userandpassword[0].equals(username) && userandpassword[1].equals(pass)) return 1; } reader.close(); return 0; } catch (IOException e) { e.printStackTrace(); } finally { if (reader != null) { try { reader.close(); } catch (IOException e1) { e1.printStackTrace(); } } } return 0; }
重新打包,生成jar文件,替換kafka-clients-2.7.1.jar文件,然后重啟kafka集群,
此時出現大量報錯:如下
[2021-07-14 16:59:43,312] ERROR [Controller id=145, targetBrokerId=145] Connection to node 145 (xx.xx.xx.xx/xx.xx.xx.xx:19508) failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient) [2021-07-14 16:59:43,413] INFO [SocketServer brokerId=145] Failed authentication with /xx.xx.xx.xx (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector) [2021-07-14 16:59:43,485] INFO [Controller id=145, targetBrokerId=237] Failed authentication with xx.xx.xx.xx/xx.xx.xx.xx (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector) [2021-07-14 16:59:43,485] ERROR [Controller id=145, targetBrokerId=237] Connection to node 237 (xx.xx.xx.xx/xx.xx.xx.xx:19508) failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient) [2021-07-14 16:59:43,713] INFO [Controller id=145, targetBrokerId=145] Failed authentication with xx.xx.xx.xx/xx.xx.xx.xx (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector) [2021-07-14 16:59:43,713] ERROR [Controller id=145, targetBrokerId=145] Connection to node 145 (xx.xx.xx.xx/xx.xx.xx.xx:19508) failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient) [2021-07-14 16:59:43,815] INFO [SocketServer brokerId=145] Failed authentication with /xx.xx.xx.xx (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector)
原因是broker之間,broker和zookeeper之間的認證也是用的authenticate函數,但是目前users.properties文件里為空,所以所有的認證都失敗了,在文件中補全所有的用戶名和密碼后,報錯消失。
下面使用python 客戶端進行測試,
1,創建測試topic test11
bin/kafka-topics.sh --create --zookeeper xxxx:2181,xxxx:2181,xxxx:2181 --topic test11 --partitions 10 --replication-factor 3
2,此時通過python produce寫入數據報錯 用戶名和密碼為producer_zzz,producer_lll,
D:\python_pycharm\venv\Scripts\python.exe D:/python_pycharm/kafka_demo/kakfa_produce.py Traceback (most recent call last): File "D:/python_pycharm/kafka_demo/kakfa_produce.py", line 10, in <module> sasl_plain_password="producer_lll") File "D:\python_pycharm\venv\lib\site-packages\kafka\producer\kafka.py", line 347, in __init__ **self.config) File "D:\python_pycharm\venv\lib\site-packages\kafka\client_async.py", line 216, in __init__ self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) File "D:\python_pycharm\venv\lib\site-packages\kafka\client_async.py", line 250, in _bootstrap bootstrap.connect() File "D:\python_pycharm\venv\lib\site-packages\kafka\conn.py", line 374, in connect if self._try_authenticate(): File "D:\python_pycharm\venv\lib\site-packages\kafka\conn.py", line 451, in _try_authenticate raise self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type kafka.errors.AuthenticationFailedError: AuthenticationFailedError: Authentication failed for user producer_zzz
可以看到是 認證失敗,
3,然后在users.properties 文件中加上producer_zzz:producer_lll 再試一次
D:\python_pycharm\venv\Scripts\python.exe D:/python_pycharm/kafka_demo/kakfa_produce.py Process finished with exit code 0
可以發現寫入成功了,這是由於新topic沒有進行授權,默認所有produce都可以寫入,也證實了現在用戶可以熱更新了。
4更進一步的測試,進行授權
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=xx.xx.xx.xx:2181 --add --allow-principal User:producer_zzz --operation Write --topic test11
這是其他的用戶就不能隨意寫了,其他用戶的報錯如下。而producer_zzz用戶還是可以的。
D:\python_pycharm\venv\Scripts\python.exe D:/python_pycharm/kafka_demo/kakfa_produce.py Traceback (most recent call last): File "D:/python_pycharm/kafka_demo/kakfa_produce.py", line 37, in <module> producer.send("test11",value=bytes(data)) File "D:\python_pycharm\venv\lib\site-packages\kafka\producer\kafka.py", line 504, in send self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0) File "D:\python_pycharm\venv\lib\site-packages\kafka\producer\kafka.py", line 631, in _wait_on_metadata "Failed to update metadata after %.1f secs." % max_wait) kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
5 接下來測試consumer
直接運行報錯
D:\python_pycharm\venv\Scripts\python.exe D:/python_pycharm/kafka_demo/kafka_consumer.py Traceback (most recent call last): File "D:/python_pycharm/kafka_demo/kafka_consumer.py", line 16, in <module> for message in consumer: File "D:\python_pycharm\venv\lib\site-packages\kafka\vendor\six.py", line 561, in next return type(self).__next__(self) File "D:\python_pycharm\venv\lib\site-packages\kafka\consumer\group.py", line 1075, in __next__ return next(self._iterator) File "D:\python_pycharm\venv\lib\site-packages\kafka\consumer\group.py", line 998, in _message_generator self._coordinator.ensure_coordinator_known() File "D:\python_pycharm\venv\lib\site-packages\kafka\coordinator\base.py", line 225, in ensure_coordinator_known raise future.exception # pylint: disable-msg=raising-bad-type kafka.errors.GroupAuthorizationFailedError: [Error 30] GroupAuthorizationFailedError: test-group
6,在users.properties 加用戶名和密碼,consumer 授予read 權限,group 授權 一條龍執行
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=xx.xx.xx.xx:2181 --add --allow-principal User:consumer_zzz --operation Read --topic test11 bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=xx.xx.xx.xx:2181 --add --allow-principal User:consumer_zzz --operation Read --group test-group
消費成功!
D:\python_pycharm\venv\Scripts\python.exe D:/python_pycharm/kafka_demo/kafka_consumer.py test11:8:0: key=None value={"test": "1"} test11:5:0: key=None value={"test": "1"} test11:3:0: key=None value={"test": "1"} test11:0:0: key=None value={"test": "1"} test11:6:0: key=None value={"test": "1"} test11:6:1: key=None value={"test": "1"} test11:10:0: key=None value={"test": "1"} test11:4:0: key=None value={"test": "1"} test11:4:1: key=None value={"test": "1"}
至此,kafka2.7.1版本搭建以及動態的權限認證問題 基本解決,是否可以上線運行,還需要進行進一步的評估,對源碼的修改也需要充分的review,但基本流程已經跑通,總耗時約一個禮拜,大勝利~
