Kafka-offset查看與變更
一、環境介紹
主機名 |
IP |
Mysql角色 |
kerberbos |
dcdl-test-namenode1 |
10.2.98.128 |
Kafka Broker Default Group |
非kdc認證 |
dcdl-test-datanode1 |
10.2.96.129 |
Kafka Broker Default Group |
kdc認證 |
dcdl-test-datanode2 |
10.2.96.130 |
Kafka Broker Group 1 |
kdc認證 |
dcdl-test-datanode3 |
10.2.96.131 |
Kafka Broker Default Group |
kdc認證 |
二、在非kerberbos認證下kafka offset變更
1、創建測試kafka topic
kafka-topics --create --zookeeper dcdl-test-datanode1.essence.com:2181 --replication-factor 1 --partitions 1 --topic test2 |
2、查看創建kafka topic
kafka-topics --zookeeper 10.2.98.128:2181 --list |
|
3、向kafka topic 寫入數據
kafka-console-producer --broker-list 10.2.98.128:9092 --topic test2 |
|
4、消費kafka topic 數據信息
kafka-console-consumer --bootstrap-server 10.2.98.128:9092 --group offset-group --topic test2 |
|
5、查看消費者消費偏移量
kafka-consumer-groups --bootstrap-server 10.2.98.128:9092 --group offset-group --describe |
Current-offset與log-end-offset 相等表示kafka topic 消息已全部消費完畢 |
6、設置kafka topic offset為最初偏移量
kafka-consumer-groups --bootstrap-server 10.2.98.128:9092 --group offset-group --topic test2 --reset-offsets --to-earliest --execute |
注意:Offset-group必須是非活躍狀態,退出消費。 |
查看topic test2的offset 與第5步驟對比已回到最初的偏移量 |
7、設置kafka topic offset任意偏移量
kafka-consumer-groups --bootstrap-server 10.2.98.128:9092 --group offset-group --topic test2 --reset-offsets --to-offset 8 --execute |
|
查看topic test2的offset與第7步驟對比current-offset向前偏移8 |
三、Kafka在kerberbos認證下查看與修改topic offset
1、以kafka用戶為測試背景,導入kafka用戶變量
配置kafka用戶 kerberbos認證相關文件
修改client.properties文件
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/jaas-keytab.conf" |
2、創建kafka topic
kafka-topics --create --zookeeper 10.2.98.129:2181/kafka2 --replication-factor 1 --partitions 1 --topic offsettest |
|
3、向kafka topice offsettest寫入數據
kafka-console-producer --broker-list 10.2.98.129:9092,10.2.98.130:9092,10.2.98.131:9092 --topic offsettest --producer.config /home/kafka/client.properties |
|
4、消費kafka topic offsettest消息
kafka-console-consumer --bootstrap-server 10.2.98.129:9092,10.2.98.130:9092,10.2.98.131:9092 --group offset-group --topic offsettest --consumer.config /home/kafka/client.properties |
|
5、查看kafka topic offsettest的消費者消費偏移量
kafka-consumer-groups --bootstrap-server 10.2.98.129:9092,10.2.98.130:9092,10.2.98.131:9092 --group offset-group --describe --command-config /home/kafka/client.properties
|
提示:current-offset與log-end-offset相等,及lag等於0,表示topic 已全部消費 |
6、設置kafka topic offsettest 最初偏移量
kafka-consumer-groups --bootstrap-server 10.2.98.129:9092,10.2.98.130:9092,10.2.98.131:9092 --group offset-group --topic offsettest --reset-offsets --to-earliest --execute --command-config /home/kafka/client.properties |
注意:Offset-group必須是非活躍狀態,退出消費。 |
提示:查看topic offsettest的current-offset更改0, LAG變更為19,與第5步相比已回至最初偏移量 |
7、設置kafka topic偏移量任意值
kafka-consumer-groups --bootstrap-server 10.2.98.129:9092,10.2.98.130:9092,10.2.98.131:9092 --group offset-group --topic offsettest --reset-offsets --to-offset 8 --execute --command-config /home/kafka/client.properties |
|
提示:查看topic offsettest的CURRENT-OFFSET變更為8,與第6步驟相比LAG已減少8變更為11. |
四、普通用戶查看,修改offset所需最小權限
1、以用戶kafka_test,topic: offsettest; 消費者組:testgroup 測試背景
2、kerberos認證用戶,導入kafka_test 變量
$ kinit -kt /**/**/kafka_test.keytab $ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka_test/jaas-keytab.conf" $ vi client.properties
|
3、在管理用戶kafka下授於用戶kafka_test Topic讀/寫權限
kafka-sentry -gpr -r risk_role -p "Topic=offsettest->action=read" kafka-sentry -gpr -r risk_role -p "Topic=offsettest->action=read" kafka-sentry -arg -r risk_role -g kafka_test |
4、向topic offsettest寫入數據
kafka-console-producer --broker-list dcdl-test-datanode2.essence.com:9092,dcdl-test-datanode1.essence.com:9092,dcdl-test-datanode3.essence.com:9092 --topic offsettest --producer.config client.properties |
|
提示:沒有權限寫入數據 |
5、在kafka_test用戶下消費topic offsettest數據
kafka-console-consumer --bootstrap-server dcdl-test-datanode2.essence.com:9092,dcdl-test-datanode1.essence.com:9092,dcdl-test-datanode3.essence.com:9092 --group testgroup --topic offsettest --consumer.config client.properties |
提示:沒有權限讀取topic數據 |
6、在管理用戶kafka下授於普通用戶kafka_test topic offsettest的describe權限
kafka-sentry -gpr -r risk_role -p "Topic=offsettest->action=describe" kafka-sentry -arg -r risk_role -g kafka_test |
7、在kafka_test用戶下topic offsettest再次生產,消費數據
kafka-console-producer --broker-list dcdl-test-datanode2.essence.com:9092,dcdl-test-datanode1.essence.com:9092,dcdl-test-datanode3.essence.com:9092 --topic offsettest --producer.config client.properties |
提示:授於topic的describe權限后,已可以生產數據 |
kafka-console-consumer --bootstrap-server dcdl-test-datanode2.essence.com:9092,dcdl-test-datanode1.essence.com:9092,dcdl-test-datanode3.essence.com:9092 --group testgroup --topic offsettest --consumer.config client.properties |
提示:授於topic的describe權限后,已可以消費數據 |
8、在topic offsettest有read,write,describe權限,查看topic offset
kafka-consumer-groups --bootstrap-server dcdl-test-datanode2.essence.com:9092,dcdl-test-datanode1.essence.com:9092,dcdl-test-datanode3.essence.com:9092 --group testgroup --describe --command-config client.properties |
提示:此步說明無需授權消費者組testgroup,可查看offset |
9、授權消費者組testgroup write,describe權限
kafka-sentry -gpr -r risk_role -p "CONSUMERGROUP=testgroup->action=write" kafka-sentry -gpr -r risk_role -p "CONSUMERGROUP=testgroup->action=describe" kafka-sentry -arg -r risk_role -g kafka_test |
10、修改topic offsettest的最初偏移量
kafka-consumer-groups --bootstrap-server dcdl-test-datanode2.essence.com:9092,dcdl-test-datanode1.essence.com:9092,dcdl-test-datanode3.essence.com:9092 --group testgroup --topic offsettest --reset-offsets --to-earliest --execute --command-config client.properties |
11、提示:消費者組testgroup是從offset 23起始消費,修改最初偏移量到23.
|
12、由以上步驟得出普通用戶訪問kerberos 認證kafka topic以及修改offset的權限如下
(1) 在給Topic賦權read或者write權限時,務必同時帶上describe權限,否則權限不生效。當然你也可以將權限設置為ALL.
(2) 在kafka 消費者組授權write權限時,也務必同時帶上decribe權限,否則權限不生效。
(3) 修改kafka offset時,消費者組必須是無成員狀態下,否則無法修改offset.