不進行認證配置安裝:https://www.cnblogs.com/pzb-shadow/p/13030365.html
安裝kerbers:https://blog.csdn.net/huanqingdong/article/details/84979110
kafka-manager3.0安裝連接:https://cloud.tencent.com/developer/article/1702379
kafka-manager配置SSL監控:https://cloud.tencent.com/developer/article/1702379
詳細解析 kafka manager 的使用:https://www.jianshu.com/p/6a592d558812
1.安裝包地址
使用Git或者直接從Releases中下載,此處從下面的地址下載2.0.0.2版本:https://github.com/yahoo/kafka-manager/releases
注意上面地址下載下來的是源碼包,需要進行編譯,需要注意的是3.x版本需要基於JDK11環境,2.0.0.2
及以下版本才兼容JDK1.8。
2.下載編譯安裝包
[root@kafka1-master opt]#wget https://github.com/yahoo/kafka-manager/archive/2.0.0.2.tar.gz [root@kafka1-master opt]#cd kafka-manager-2.0.0.2/ [root@kafka1-master opt]#curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo [root@kafka1-master opt]#mv bintray-sbt-rpm.repo /etc/yum.repos.d/ [root@kafka1-master opt]#yum install sbt [root@kafka1-master opt]#sbt-version [root@kafka1-master opt]#./sbt clean dist [root@kafka1-master opt]#cd /usr/local/kafka-manager/target/universal/ [root@kafka1-master opt]#mv kafka-manager-2.0.0.2/ /opt/cloudera/
3.認證配置:
修改conf/application.conf文件中zk的地址以及啟用Kafka-Manager使用賬號登錄和消費者的配置
##zkhosts kafka-manager.zkhosts="beta1:2181" kafka-manager.zkhosts=${?ZK_HOSTS} pinned-dispatcher.type="PinnedDispatcher" pinned-dispatcher.executor="thread-pool-executor" application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"] akka { loggers = ["akka.event.slf4j.Slf4jLogger"] loglevel = "INFO" } akka.logger-startup-timeout = 60s #開啟登錄kafka-manager驗證 basicAuthentication.enabled=true basicAuthentication.enabled=${?KAFKA_MANAGER_AUTH_ENABLED} basicAuthentication.username="admin" basicAuthentication.username=${?KAFKA_MANAGER_USERNAME} basicAuthentication.password="admin" basicAuthentication.password=${?KAFKA_MANAGER_PASSWORD} basicAuthentication.realm="Kafka-Manager" basicAuthentication.excluded=["/api/health"] # ping the health of your instance without authentification kafka-manager.consumer.properties.file=/usr/local/kafka-manager-1.3.3.18/kafka-manager-1.3.3.18/conf/consumer.properties #kafka-manager.consumer.properties.file=${?CONSUMER_PROPERTIES_FILE}
修改conf/consumer.properties內容如下:
security.protocol=SASL_PLAINTEXT key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka
因為做了kerberos認證,所以需要jaas.conf文件指定到啟動命令后面。配置前要保證票據可用!
在kafka-manager/conf的目錄創建jaas.conf內容如下,並將keyTab對應的文件也拷貝到當前目錄
# 其中Clinet段用於連接zookeeper認證,KafkaClient用於連接kafka服務器認證 Client{ com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/root/test.keytab"#注意對應的keytab路徑 principal="test"#對應的keytab前綴 serviceName="kafka" doNotPrompt=true; }; KafkaClient{ com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/root/test.keytab" principal="test" serviceName="kafka" doNotPrompt=true; };
添加啟動腳本,/opt/module/kafka-manager/scripts/rm.sh,內容如下
#!/bin/bash echo '-------------------------------------------------------------------分界線'$(date +%F%t%T)> manager.out # 配置kafka-manager元數據使用的zookeeper,此處必須是用export export ZK_HOSTS=centos610:2181 # kafka-manager 路徑 MANAGER_HOME=/opt/module/kafka-manager # 可執行文件路徑 KAFKA_MANAGER=$MANAGER_HOME/bin/kafka-manager # 日志位置 APP_HOME=-Dapplication.home=$MANAGER_HOME # 端口 HTTP_PORT=-Dhttp.port=9001 # SASL安全認證 JAAS_CONF=-Djava.security.auth.login.config=$MANAGER_HOME/conf/jaas.conf KRB5_CONF=-Djava.security.krb5.conf=$MANAGER_HOME/conf/krb5.conf nohup $KAFKA_MANAGER $JAAS_CONF $KRB5_CONF $APP_HOME $HTTP_PORT >manager.out 2>&1 & echo "$!" tailf manager.out
添加停止腳本,/opt/module/kafka-manager/scripts/sm.sh,內容如下
echo '-------------------------------------------------------------------分界線'$(date +%F%t%T)> manager.out kill `cat RUNNING_PID`
rm -rf RUNNING_PID tailf manager.out
Kafka監控界面介紹
具體參考:https://blog.csdn.net/u011089412/article/details/87895652
配置SSL監控:https://cloud.tencent.com/developer/article/1702379
當我們首次打開CMAK的監控頁面時,是一片空白的。因為此時我們還沒有添加任何需要被監控的集群,所以首先第一步就是要添加集群:

需要注意的是,如果要開啟JMX輪詢,則必須事先在Kafka的啟動腳本中打開JMX的端口號:
[root@localhost ~]# vim /usr/local/kafka/bin/kafka-server-start.sh # 打開JMX端口 export JMX_PORT=9999
然后重啟Kafka:
[root@localhost ~]# kafka-server-stop.sh [root@localhost ~]# nohub kafka-server-start.sh /usr/local/kafka/config/server.properties &
剩下的配置基本保持默認即可,然后點擊“Save”進行保存:

參數解釋
brokerViewUpdatePeriodSeconds Broker視圖周期更新時間/單位(s) 30
clusterManagerThreadPoolSize 集群管理線程池大小 2
clusterManagerThreadPoolQueueSize 集群管理線程池列隊大小 100
KafkaCommandThreadPoolSize Kafka命令線程池大小 2
logkafkaCommandThreadPoolQueueSize logkafka命令線程池列隊大小 100
logkafkaUpdatePeriodSeconds Logkafka周期更新時間/單位(s) 30
partitionOffsetCacheTimeoutSecs Partition Offset緩存過期時間/單位(s) 5
brokerViewThreadPoolSize Broker視圖線程池大小 8 3 number_of_brokers
brokerViewThreadPoolQueue Size Broker視圖線程池隊列大小 1000 3 total # of partitions across all topics
offsetCacheThreadPoolSize Offset緩存線程池大小 8
offsetCacheThreadPoolQueueSize Offset緩存線程池列隊大小 1000
kafkaAdminClientThreadPoolSize Kafka管理客戶端線程池大小 8
kafkaAdminClientTheadPoolQueue Sizec Kafka管理客戶端線程池隊列大小 1000
kafkaManagedOffsetMetadataCheckMillis Offset元數據檢查時間 30000 (這部分解釋屬自己理解)
kafkaManagedOffsetGroupCacheSize Offset組緩存大小 100000 (這部分解釋屬自己理解)
kafkaManagedOffsetGroupExpireDays Offset組緩存保存時間 7 (這部分解釋屬自己理解)
Security Protocol :kafka安全認證機制
SASL Mechanism:SASL機制
SASL JAAS Confing:jaas.conf文件的client配置內容
如
保存成功后,點擊“Go to cluster view”:
就可以查看到我們添加的這個集群信息,在“Cluster Summary”一欄中顯示了該集群的Topic數量和Broker數量:

點擊“Topics”的數字就可以進入到Topic的監控界面:

- Partitions:Topic的Partition數量
- Brokers:Topic的Broker數量
- Replicas:Topic中Partition的副本數量
- Under Replicated:Topic中的Partition副本處於同步失敗或失效狀態的比例,該指標過高則代表副本沒有復制到足夠的Broker上
- Producer Message/Sec:生產者每秒投遞的消息數量
- Summed Recent Offsets:當前總計的消費偏移量
- Brokers Spread:看作broker使用率,如kafka集群9個broker,某topic有7個partition,則broker spread: 7 / 9 = 77%
- Brokers Skew:partition是否存在傾斜,如kafka集群9個broker,某topic有18個partition,正常每個broker應該2個partition。若其中有3個broker上的partition數>2,則broker skew: 3 / 9 = 33%
- Brokers Leader Skew:leader partition是否存在傾斜,如kafka集群9個broker,某topic14個partition,則正常每個broker有2個leader partition。若其中一個broker有0個leader partition,一個有4個leader partition,則broker leader skew: (4 - 2) / 14 = 14%
點擊“Brokers”的數字就則是進入到Broker的監控界面:

- Messages in /sec:每秒流入的消息數
- Bytes in /sec:每秒流入的字節數
- Bytes out /sec:每秒流出的字節數
- Bytes rejected /sec:每秒拒絕的字節數
- Failed fetch request /sec:每秒失敗抓取數據請求數
- Failed produce request /sec:每秒失敗的生產數據請求數
Tips:這些指標的監控需要打開JMX
在Topic的監控界面中,點擊一個具體的Topic可以進入到該Topic的監控頁面,並且提供了對Topic的操作支持:

Topic的監控指標里也有與Broker一樣的監控指標,只不過這里是針對Topic的,指標的含義都一樣:

Kafka SSL簽名庫生成
Kafka的安全措施:
- Kafka提供了SSL或SASL機制來保障集群安全
- Kafka提供了Broker到zk連接的安全機制
- Kafka支持Client的讀寫驗證
值得一提的是通常情況下都不會給Kafka加安全措施,類似的其他中間件也是。因為通常我們都會將這些中間件部署在一個可信的網絡里,例如與外網隔離的內部網絡,並且有防火牆進行保護。
而且給Kafka加上SSL或SASL安全機制也會導致性能有所損耗,通常這個損耗在20~30%左右。但如果你的Kafka是允許在外網進行訪問的話,那么就需要考慮增加安全機制了。
FAQ
日志報錯:
GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
原因:
原因:如果沒有獲得有效的 Kerberos 憑據,可能會發生這種情況。特別是,如果您希望底層機制獲取憑據,但您忘記通過將javax.security.auth.useSubjectCredsOnly 系統屬性值設置為false(例如通過 -Djavax.security.auth.useSubjectCredsOnly=false在您的執行命令中)來指明這一點,則會發生這種情況。
解決方案:如果您希望底層機制獲取憑據,而不是您的應用程序或包裝程序(例如某些教程使用的登錄實用程序)使用 JAAS 執行身份驗證,請確保將javax.security.auth.useSubjectCredsOnly系統屬性值設置為 false。
修改kafka manager啟動配置項添加
-Djavax.security.auth.useSubjectCredsOnly=false
安全
https://cloud.tencent.com/developer/article/1702379
https://blog.csdn.net/huanqingdong/article/details/84979110