kafka運維命令大全


轉自:【kafka運維】Kafka全網最全最詳細運維命令合集(精品強烈建議收藏!!!)

https://mp.weixin.qq.com/s?__biz=Mzg4ODY1NTcxNg==&mid=2247484656&idx=3&sn=cf8b22b7631453b34cfb5c748056f41e&chksm=cff69efbf88117ed240376b6e2f2131718116716ec9a358068d7cdab19006b725443fa779d96&scene=21#wechat_redirect

用於查閱,侵權請聯系刪除

1.TopicCommand

1.1.Topic創建

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test


相關可選參數

參數 描述 例子
--bootstrap-server 指定kafka服務 指定連接到的kafka服務; 如果有這個參數,則 --zookeeper可以不需要 --bootstrap-server localhost:9092
--zookeeper 棄用, 通過zk的連接方式連接到kafka集群; --zookeeper localhost:2181 或者localhost:2181/kafka
--replication-factor 副本數量,注意不能大於broker數量;如果不提供,則會用集群中默認配置 --replication-factor 3
--partitions 分區數量,當創建或者修改topic的時候,用這個來指定分區數;如果創建的時候沒有提供參數,則用集群中默認值; 注意如果是修改的時候,分區比之前小會有問題 --partitions 3
--replica-assignment 副本分區分配方式;創建topic的時候可以自己指定副本分配情況; --replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 這個意思是有三個分區和三個副本,對應分配的Broker; 逗號隔開標識分區;冒號隔開表示副本
--config<String: name=value> 用來設置topic級別的配置以覆蓋默認配置;只在--create 和--bootstrap-server 同時使用時候生效; 可以配置的參數列表請看文末附件 例如覆蓋兩個配置 --config retention.bytes=123455 --config retention.ms=600001
--command-config <String: command 文件路徑> 用來配置客戶端Admin Client啟動配置,只在--bootstrap-server 同時使用時候生效; 例如:設置請求的超時時間 --command-config config/producer.proterties; 然后在文件中配置 request.timeout.ms=300000

1.2.刪除Topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test


支持正則表達式匹配Topic來進行刪除,只需要將topic 用雙引號包裹起來 例如: 刪除以create_topic_byhand_zk為開頭的topic;

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic "create_topic_byhand_zk.*" .表示任意匹配除換行符 \n 之外的任何單字符。要匹配 . ,請使用 . 。 ·*·:匹配前面的子表達式零次或多次。要匹配 * 字符,請使用 *。 .* : 任意字符

刪除任意Topic (慎用)

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic ".*?"

更多的用法請參考正則表達式

1.3.Topic分區擴容

zk方式(不推薦)

>bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2

kafka版本 >= 2.2 支持下面方式(推薦)

單個Topic擴容

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4

批量擴容 (將所有正則表達式匹配到的Topic分區擴容到4個)

sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4

".*?" 正則表達式的意思是匹配所有; 您可按需匹配

PS: 當某個Topic的分區少於指定的分區數時候,他會拋出異常;但是不會影響其他Topic正常進行;


相關可選參數 | 參數 |描述 |例子| |--|--|--| |--replica-assignment|副本分區分配方式;創建topic的時候可以自己指定副本分配情況; |--replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 這個意思是有三個分區和三個副本,對應分配的Broker; 逗號隔開標識分區;冒號隔開表示副本|

PS: 雖然這里配置的是全部的分區副本分配配置,但是正在生效的是新增的分區; 比如: 以前3分區1副本是這樣的

Broker-1 Broker-2 Broker-3 Broker-4
0 1 2  

現在新增一個分區,--replica-assignment 2,1,3,4 ; 看這個意思好像是把0,1號分區互相換個Broker

Broker-1 Broker-2 Broker-3 Broker-4
1 0 2 3

但是實際上不會這樣做,Controller在處理的時候會把前面3個截掉; 只取新增的分區分配方式,原來的還是不會變

Broker-1 Broker-2 Broker-3 Broker-4
0 1 2 3

1.4.查詢Topic描述

1.查詢單個Topic

sh bin/kafka-topics.sh --topic test --bootstrap-server xxxx:9092 --describe --exclude-internal

2.批量查詢Topic(正則表達式匹配,下面是查詢所有Topic)

sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server xxxx:9092 --describe --exclude-internal

支持正則表達式匹配Topic,只需要將topic 用雙引號包裹起來


相關可選參數

參數 描述 例子
--bootstrap-server 指定kafka服務 指定連接到的kafka服務; 如果有這個參數,則 --zookeeper可以不需要 --bootstrap-server localhost:9092
--at-min-isr-partitions 查詢的時候省略一些計數和配置信息 --at-min-isr-partitions
--exclude-internal 排除kafka內部topic,比如__consumer_offsets-* --exclude-internal
--topics-with-overrides 僅顯示已覆蓋配置的主題,也就是單獨針對Topic設置的配置覆蓋默認配置;不展示分區信息 --topics-with-overrides

5.查詢Topic列表

1.查詢所有Topic列表

sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal

2.查詢匹配Topic列表(正則表達式)

查詢test_create_開頭的所有Topic列表 sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal --topic "test_create_.*"


相關可選參數

參數 描述 例子
--exclude-internal 排除kafka內部topic,比如__consumer_offsets-* --exclude-internal
--topic 可以正則表達式進行匹配,展示topic名稱 --topic

2.ConfigCommand

Config相關操作; 動態配置可以覆蓋默認的靜態配置;

2.1 查詢配置

Topic配置查詢

展示關於Topic的動靜態配置

1.查詢單個Topic配置(只列舉動態配置)

sh bin/kafka-configs.sh --describe --bootstrap-server xxxxx:9092 --topic test_create_topic 或者 sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics --entity-name test_create_topic

2.查詢所有Topic配置(包括內部Topic)(只列舉動態配置)

sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics

3.查詢Topic的詳細配置(動態+靜態)

只需要加上一個參數--all

其他配置/clients/users/brokers/broker-loggers 的查詢

同理 ;只需要將--entity-type 改成對應的類型就行了 (topics/clients/users/brokers/broker-loggers)

查詢kafka版本信息

sh bin/kafka-configs.sh --describe --bootstrap-server xxxx:9092 --version

所有可配置的動態配置 請看最后面的 *附件* 部分

2.2 增刪改 配置 --alter

--alter

刪除配置--delete-config k1=v1,k2=v2 添加/修改配置: --add-config k1,k2 選擇類型--entity-type (topics/clients/users/brokers/broker- loggers) 類型名稱--entity-name

Topic添加/修改動態配置

--add-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999

Topic刪除動態配置

--delete-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms

其他配置同理,只需要類型改下--entity-type

類型有: (topics/clients/users/brokers/broker- loggers)

哪些配置可以修改 請看最后面的附件:ConfigCommand 的一些可選配置

3.副本擴縮、分區遷移、跨路徑遷移 kafka-reassign-partitions

請戳 【kafka運維】副本擴縮容、數據遷移、副本重分配、副本跨路徑遷移 (如果點不出來,表示文章暫未發表,請耐心等待)

https://mp.weixin.qq.com/s/fQ03wpctV1dGnmk1r-xEWA

(3.1)腳本的使用介紹

關鍵參數--generate

1=》構造文件

  cd /data/kafka

  vim move.json

{
  "topics": [
    {"topic": "test"}
],
  "version": 1
}

運行 generate 參數生成 當前副本的配置介紹 json,以及建議修改的 json

kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file ./move.json --broker-list "0,1,2"  --generate

Current partition replica assignment
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[2,1,0],"log_dirs":["any","any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[0,2,1],"log_dirs":["any","any","any"]}]}

我找json 在線格式化看看

https://www.sojson.com/

對比一下發現,partition 1 和 2 的replicas 不一樣了;

  

 

 

 

 

 

 

4.Topic的發送kafka-console-producer.sh

4.1 生產無key消息

## 生產者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties

4.2 生產有key消息 加上屬性--property parse.key=true

## 生產者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties --property parse.key=true

默認消息key與消息value間使用“Tab鍵”進行分隔,所以消息key以及value中切勿使用轉義字符(\t)


可選參數

參數 值類型 說明 有效值
--bootstrap-server String 要連接的服務器必需(除非指定--broker-list) 如:host1:prot1,host2:prot2
--topic String (必需)接收消息的主題名稱  
--batch-size Integer 單個批處理中發送的消息數 200(默認值)
--compression-codec String 壓縮編解碼器 none、gzip(默認值)snappy、lz4、zstd
--max-block-ms Long 在發送請求期間,生產者將阻止的最長時間 60000(默認值)
--max-memory-bytes Long 生產者用來緩沖等待發送到服務器的總內存 33554432(默認值)
--max-partition-memory-bytes Long 為分區分配的緩沖區大小 16384
--message-send-max-retries Integer 最大的重試發送次數 3
--metadata-expiry-ms Long 強制更新元數據的時間閾值(ms) 300000
--producer-property String 將自定義屬性傳遞給生成器的機制 如:key=value
--producer.config String 生產者配置屬性文件[--producer-property]優先於此配置 配置文件完整路徑  
--property String 自定義消息讀取器 parse.key=true/false key.separator=<key.separator>ignore.error=true/false
--request-required-acks String 生產者請求的確認方式 0、1(默認值)、all
--request-timeout-ms Integer 生產者請求的確認超時時間 1500(默認值)
--retry-backoff-ms Integer 生產者重試前,刷新元數據的等待時間閾值 100(默認值)
--socket-buffer-size Integer TCP接收緩沖大小 102400(默認值)
--timeout Integer 消息排隊異步等待處理的時間閾值 1000(默認值)
--sync 同步發送消息    
--version 顯示 Kafka 版本 不配合其他參數時,顯示為本地Kafka版本  
--help 打印幫助信息    

5. Topic的消費kafka-console-consumer.sh

1. 新客戶端從頭消費--from-beginning (注意這里是新客戶端,如果之前已經消費過了是不會從頭消費的) 下面沒有指定客戶端名稱,所以每次執行都是新客戶端都會從頭消費

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2. 正則表達式匹配topic進行消費--whitelist 消費所有的topic

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist '.*'

消費所有的topic,並且還從頭消費

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist '.*' --from-beginning

3.顯示key進行消費--property print.key=true

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true

4. 指定分區消費--partition 指定起始偏移量消費--offset

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100

5. 給客戶端命名--group

注意給客戶端命名之后,如果之前有過消費,那么--from-beginning就不會再從頭消費了

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group

6. 添加客戶端屬性--consumer-property

這個參數也可以給客戶端添加屬性,但是注意 不能多個地方配置同一個屬性,他們是互斥的;比如在下面的基礎上還加上屬性--group test-group 那肯定不行

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-consumer-group

7. 添加客戶端屬性--consumer.config

--consumer-property 一樣的性質,都是添加客戶端的屬性,不過這里是指定一個文件,把屬性寫在文件里面, --consumer-property 的優先級大於 --consumer.config

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties


參數 描述 例子
--group 指定消費者所屬組的ID  
--topic 被消費的topic  
--partition 指定分區 ;除非指定–offset,否則從分區結束(latest)開始消費 --partition 0
--offset 執行消費的起始offset位置 ;默認值: latest; /latest /earliest /偏移量 --offset 10
--whitelist 正則表達式匹配topic;--topic就不用指定了; 匹配到的所有topic都會消費; 當然用了這個參數,--partition --offset等就不能使用了  
--consumer-property 將用戶定義的屬性以key=value的形式傳遞給使用者 --consumer-propertygroup.id=test-consumer-group
--consumer.config 消費者配置屬性文件請注意,[consumer-property]優先於此配置 --consumer.config config/consumer.properties
--property 初始化消息格式化程序的屬性 print.timestamp=true,false 、print.key=true,false 、print.value=true,false 、key.separator=<key.separator> 、line.separator=<line.separator>、key.deserializer=<key.deserializer>、value.deserializer=<value.deserializer>
--from-beginning 從存在的最早消息開始,而不是從最新消息開始,注意如果配置了客戶端名稱並且之前消費過,那就不會從頭消費了  
--max-messages 消費的最大數據量,若不指定,則持續消費下去 --max-messages 100
--skip-message-on-error 如果處理消息時出錯,請跳過它而不是暫停  
--isolation-level 設置為read_committed以過濾掉未提交的事務性消息,設置為read_uncommitted以讀取所有消息,默認值:read_uncommitted  
--formatter kafka.tools.DefaultMessageFormatter、kafka.tools.LoggingMessageFormatter、kafka.tools.NoOpMessageFormatter、kafka.tools.ChecksumMessageFormatter  

6.kafka-leader-election Leader重新選舉

6.1 指定Topic指定分區用重新PREFERRED:優先副本策略 進行Leader重選舉


> sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic test_create_topic4 --election-type PREFERRED --partition 0

6.2 所有Topic所有分區用重新PREFERRED:優先副本策略 進行Leader重選舉

sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --election-type preferred  --all-topic-partitions

6.3 設置配置文件批量指定topic和分區進行Leader重選舉

先配置leader-election.json文件


{
"partitions": [
{
"topic": "test_create_topic4",
"partition": 1
},
{
"topic": "test_create_topic4",
"partition": 2
}
]
}

sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred --path-to-json-file config/leader-election.json

相關可選參數

參數 描述 例子
--bootstrap-server 指定kafka服務 指定連接到的kafka服務 --bootstrap-server localhost:9092
--topic 指定Topic,此參數跟--all-topic-partitionspath-to-json-file 三者互斥  
--partition 指定分區,跟--topic搭配使用  
--election-type 兩個選舉策略(PREFERRED:優先副本選舉,如果第一個副本不在線的話會失敗;UNCLEAN: 策略)  
--all-topic-partitions 所有topic所有分區執行Leader重選舉; 此參數跟--topicpath-to-json-file 三者互斥  
--path-to-json-file 配置文件批量選舉,此參數跟--topicall-topic-partitions 三者互斥  

7. 持續批量推送消息kafka-verifiable-producer.sh

單次發送100條消息--max-messages 100

一共要推送多少條,默認為-1,-1表示一直推送到進程關閉位置

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --max-messages 100

每秒發送最大吞吐量不超過消息 --throughput 100

推送消息時的吞吐量,單位messages/sec。默認為-1,表示沒有限制

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --throughput 100

發送的消息體帶前綴--value-prefix

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --value-prefix 666

注意--value-prefix 666必須是整數,發送的消息體的格式是加上一個 點號. 例如: 666.

其他參數: --producer.config CONFIG_FILE 指定producer的配置文件 --acks ACKS 每次推送消息的ack值,默認是-1

8. 持續批量拉取消息kafka-verifiable-consumer

持續消費

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4

單次最大消費10條消息--max-messages 10

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4 --max-messages 10


相關可選參數

參數 描述 例子
--bootstrap-server 指定kafka服務 指定連接到的kafka服務; --bootstrap-server localhost:9092
--topic 指定消費的topic  
--group-id 消費者id;不指定的話每次都是新的組id  
group-instance-id 消費組實例ID,唯一值  
--max-messages 單次最大消費的消息數量  
--enable-autocommit 是否開啟offset自動提交;默認為false  
--reset-policy 當以前沒有消費記錄時,選擇要拉取offset的策略,可以是earliestlatest,none。默認是earliest  
--assignment-strategy consumer分配分區策略,默認是org.apache.kafka.clients.consumer.RangeAssignor  
--consumer.config 指定consumer的配置文件  

9.生產者壓力測試kafka-producer-perf-test.sh

1. 發送1024條消息--num-records 100並且每條消息大小為1KB--record-size 1024 最大吞吐量每秒10000條--throughput 100

sh bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100000 --producer-props bootstrap.servers=localhost:9092 --record-size 1024

你可以通過LogIKM查看分區是否增加了對應的數據大小圖片從LogIKM 可以看到發送了1024條消息; 並且總數據量=1M; 1024條*1024byte = 1M;

2. 用指定消息文件--payload-file發送100條消息最大吞吐量每秒100條--throughput 100

  1. 先配置好消息文件batchmessage.txt圖片

  2. 然后執行命令 發送的消息會從batchmessage.txt里面隨機選擇; 注意這里我們沒有用參數--payload-delimeter指定分隔符,默認分隔符是\n換行;

    bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100 --producer-props bootstrap.servers=localhost:9090 --payload-file config/batchmessage.txt

  1. 驗證消息,可以通過 LogIKM 查看發送的消息

    圖片


相關可選參數

參數 描述 例子
--topic 指定消費的topic  
--num-records 發送多少條消息  
--throughput 每秒消息最大吞吐量  
--producer-props 生產者配置, k1=v1,k2=v2 --producer-props bootstrap.servers= localhost:9092,client.id=test_client
--producer.config 生產者配置文件 --producer.config config/producer.propeties
--print-metrics 在test結束的時候打印監控信息,默認false --print-metrics true
--transactional-id 指定事務 ID,測試並發事務的性能時需要,只有在 --transaction-duration-ms > 0 時生效,默認值為 performance-producer-default-transactional-id  
--transaction-duration-ms 指定事務持續的最長時間,超過這段時間后就會調用 commitTransaction 來提交事務,只有指定了 > 0 的值才會開啟事務,默認值為 0  
--record-size 一條消息的大小byte; 和 --payload-file 兩個中必須指定一個,但不能同時指定  
--payload-file 指定消息的來源文件,只支持 UTF-8 編碼的文本文件,文件的消息分隔符通過 --payload-delimeter指定,默認是用換行\nl來分割的,和 --record-size 兩個中必須指定一個,但不能同時指定 ; 如果提供的消息  
--payload-delimeter 如果通過 --payload-file 指定了從文件中獲取消息內容,那么這個參數的意義是指定文件的消息分隔符,默認值為 \n,即文件的每一行視為一條消息;如果未指定--payload-file則此參數不生效;發送消息的時候是隨機送文件里面選擇消息發送的;  

10.消費者壓力測試kafka-consumer-perf-test.sh

消費100條消息--messages 100

sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 --bootstrap-server localhost:9090 --messages 100


相關可選參數

參數 描述 例子
--bootstrap-server    
--consumer.config 消費者配置文件  
--date-format 結果打印出來的時間格式化 默認:yyyy-MM-dd HH:mm:ss:SSS
--fetch-size 單次請求獲取數據的大小 默認1048576
--topic 指定消費的topic  
--from-latest    
--group 消費組ID  
--hide-header 如果設置了,則不打印header信息  
--messages 需要消費的數量  
--num-fetch-threads feth 數據的線程數 默認:1
--print-metrics 結束的時候打印監控數據  
--show-detailed-stats    
--threads 消費線程數; 默認 10

11.刪除指定分區的消息kafka-delete-records.sh

刪除指定topic的某個分區的消息刪除至offset為1024

先配置json文件offset-json-file.json

{"partitions":
[{"topic": "test1", "partition": 0,
"offset": 1024}],
"version":1
}

在執行命令

sh bin/kafka-delete-records.sh --bootstrap-server 172.23.250.249:9090 --offset-json-file config/offset-json-file.json

驗證 通過 LogIKM 查看發送的消息

圖片從這里可以看出來,配置"offset": 1024 的意思是從最開始的地方刪除消息到 1024的offset; 是從最前面開始刪除的

12. 查看Broker磁盤信息

查詢指定topic磁盤信息--topic-list topic1,topic2

sh bin/kafka-log-dirs.sh --bootstrap-server xxxx:9090 --describe --topic-list test2

查詢指定Broker磁盤信息--broker-list 0 broker1,broker2

sh bin/kafka-log-dirs.sh --bootstrap-server xxxxx:9090 --describe --topic-list test2 --broker-list 0

例如我一個3分區3副本的Topic的查出來的信息 logDir Broker中配置的log.dir

{
"version": 1,
"brokers": [{
"broker": 0,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 1,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 2,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 3,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3",
"error": null,
"partitions": []
}]
}]
}

如果你覺得通過命令查詢磁盤信息比較麻煩,你也可以通過 LogIKM 查看圖片

12. 消費者組管理 kafka-consumer-groups.sh

1. 查看消費者列表--list

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list圖片

先調用MetadataRequest拿到所有在線Broker列表 再給每個Broker發送ListGroupsRequest請求獲取 消費者組數據

2. 查看消費者組詳情--describe

DescribeGroupsRequest

查看消費組詳情--group 或 --all-groups

查看指定消費組詳情--group sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group


查看所有消費組詳情--all-groups sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups 查看該消費組 消費的所有Topic、及所在分區、最新消費offset、Log最新數據offset、Lag還未消費數量、消費者ID等等信息圖片

查詢消費者成員信息--members

所有消費組成員信息 sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090 指定消費組成員信息 sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090圖片

查詢消費者狀態信息--state

所有消費組狀態信息 sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090 指定消費組狀態信息 sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090圖片

3. 刪除消費者組--delete

DeleteGroupsRequest

刪除消費組--delete

刪除指定消費組--group sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090 刪除所有消費組--all-groups sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090

PS: 想要刪除消費組前提是這個消費組的所有客戶端都停止消費/不在線才能夠成功刪除;否則會報下面異常

Error: Deletion of some consumer groups failed:
* Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

4. 重置消費組的偏移量 --reset-offsets

能夠執行成功的一個前提是 消費組這會是不可用狀態;

下面的示例使用的參數是: --dry-run ;這個參數表示預執行,會打印出來將要處理的結果; 等你想真正執行的時候請換成參數--excute ;

下面示例 重置模式都是 --to-earliest 重置到最早的;

請根據需要參考下面 相關重置Offset的模式 換成其他模式;

重置指定消費組的偏移量 --group

重置指定消費組的所有Topic的偏移量--all-topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic 重置指定消費組的指定Topic的偏移量--topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2

重置所有消費組的偏移量 --all-group

重置所有消費組的所有Topic的偏移量--all-topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic 重置所有消費組中指定Topic的偏移量--topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2

--reset-offsets 后面需要接重置的模式

相關重置Offset的模式

參數 描述 例子
--to-earliest : 重置offset到最開始的那條offset(找到還未被刪除最早的那個offset)  
--to-current: 直接重置offset到當前的offset,也就是LOE  
--to-latest 重置到最后一個offset  
--to-datetime: 重置到指定時間的offset;格式為:YYYY-MM-DDTHH:mm:SS.sss; --to-datetime "2021-6-26T00:00:00.000"
--to-offset 重置到指定的offset,但是通常情況下,匹配到多個分區,這里是將匹配到的所有分區都重置到這一個值; 如果 1.目標最大offset<--to-offset, 這個時候重置為目標最大offset;2.目標最小offset>--to-offset ,則重置為最小; 3.否則的話才會重置為--to-offset的目標值; 一般不用這個 --to-offset 3465圖片
--shift-by 按照偏移量增加或者減少多少個offset;正的為往前增加;負的往后退;當然這里也是匹配所有的; --shift-by 100 、--shift-by -100
--from-file 根據CVS文檔來重置; 這里下面單獨講解  

--from-file着重講解一下

上面其他的一些模式重置的都是匹配到的所有分區; 不能夠每個分區重置到不同的offset;不過--from-file可以讓我們更靈活一點;

  1. 先配置cvs文檔 格式為: Topic:分區號: 重置目標偏移量

     test2,0,100
    test2,1,200
    test2,2,300
  2. 執行命令

    sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv

5. 刪除偏移量delete-offsets

能夠執行成功的一個前提是 消費組這會是不可用狀態;

偏移量被刪除了之后,Consumer Group下次啟動的時候,會從頭消費;

sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server XXXX:9090 --topic test2


相關可選參數

參數 描述 例子
--bootstrap-server 指定連接到的kafka服務; --bootstrap-server localhost:9092
--list 列出所有消費組名稱 --list
--describe 查詢消費者描述信息 --describe
--group 指定消費組  
--all-groups 指定所有消費組  
--members 查詢消費組的成員信息  
--state 查詢消費者的狀態信息  
--offsets 在查詢消費組描述信息的時候,這個參數會列出消息的偏移量信息; 默認就會有這個參數的;  
dry-run 重置偏移量的時候,使用這個參數可以讓你預先看到重置情況,這個時候還沒有真正的執行,真正執行換成--excute;默認為dry-run  
--excute 真正的執行重置偏移量的操作;  
--to-earliest 將offset重置到最早  
to-latest 將offset重置到最近  

附件

ConfigCommand 的一些可選配置


Topic相關可選配置

key value 示例
cleanup.policy 清理策略  
compression.type 壓縮類型(通常建議在produce端控制)  
delete.retention.ms 壓縮日志的保留時間  
file.delete.delay.ms    
flush.messages 持久化message限制  
flush.ms 持久化頻率  
follower.replication.throttled.replicas flowwer副本限流 格式:分區號:副本follower號,分區號:副本follower號 0:1,1:1
index.interval.bytes    
leader.replication.throttled.replicas leader副本限流 格式:分區號:副本Leader號 0:0
max.compaction.lag.ms    
max.message.bytes 最大的batch的message大小  
message.downconversion.enable message是否向下兼容  
message.format.version message格式版本  
message.timestamp.difference.max.ms    
message.timestamp.type    
min.cleanable.dirty.ratio    
min.compaction.lag.ms    
min.insync.replicas 最小的ISR  
preallocate    
retention.bytes 日志保留大小(通常按照時間限制)  
retention.ms 日志保留時間  
segment.bytes segment的大小限制  
segment.index.bytes    
segment.jitter.ms    
segment.ms segment的切割時間  
unclean.leader.election.enable 是否允許非同步副本選主  

Broker相關可選配置

key value 示例
advertised.listeners    
background.threads    
compression.type    
follower.replication.throttled.rate    
leader.replication.throttled.rate    
listener.security.protocol.map    
listeners    
log.cleaner.backoff.ms    
log.cleaner.dedupe.buffer.size    
log.cleaner.delete.retention.ms    
log.cleaner.io.buffer.load.factor    
log.cleaner.io.buffer.size    
log.cleaner.io.max.bytes.per.second    
log.cleaner.max.compaction.lag.ms    
log.cleaner.min.cleanable.ratio    
log.cleaner.min.compaction.lag.ms    
log.cleaner.threads    
log.cleanup.policy    
log.flush.interval.messages    
log.flush.interval.ms    
log.index.interval.bytes    
log.index.size.max.bytes    
log.message.downconversion.enable    
log.message.timestamp.difference.max.ms    
log.message.timestamp.type    
log.preallocate    
log.retention.bytes    
log.retention.ms    
log.roll.jitter.ms    
log.roll.ms    
log.segment.bytes    
log.segment.delete.delay.ms    
max.connections    
max.connections.per.ip    
max.connections.per.ip.overrides    
message.max.bytes    
metric.reporters    
min.insync.replicas    
num.io.threads    
num.network.threads    
num.recovery.threads.per.data.dir    
num.replica.fetchers    
principal.builder.class    
replica.alter.log.dirs.io.max.bytes.per.second    
sasl.enabled.mechanisms    
sasl.jaas.config    
sasl.kerberos.kinit.cmd    
sasl.kerberos.min.time.before.relogin    
sasl.kerberos.principal.to.local.rules    
sasl.kerberos.service.name    
sasl.kerberos.ticket.renew.jitter    
sasl.kerberos.ticket.renew.window.factor    
sasl.login.refresh.buffer.seconds    
sasl.login.refresh.min.period.seconds    
sasl.login.refresh.window.factor    
sasl.login.refresh.window.jitter    
sasl.mechanism.inter.broker.protocol    
ssl.cipher.suites    
ssl.client.auth    
ssl.enabled.protocols    
ssl.endpoint.identification.algorithm    
ssl.key.password    
ssl.keymanager.algorithm    
ssl.keystore.location    
ssl.keystore.password    
ssl.keystore.type    
ssl.protocol    
ssl.provider    
ssl.secure.random.implementation    
ssl.trustmanager.algorithm    
ssl.truststore.location    
ssl.truststore.password    
ssl.truststore.type    
unclean.leader.election.enable    

Users相關可選配置

key value 示例
SCRAM-SHA-256    
SCRAM-SHA-512    
consumer_byte_rate 針對消費者user進行限流  
producer_byte_rate 針對生產者進行限流  
request_percentage 請求百分比  

clients相關可選配置

key value 示例
consumer_byte_rate    
producer_byte_rate    
request_percentage    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM