學會使用Kafka(八)Kafka基本客戶端命令操作


主題管理

創建主題

kafka-topics.sh --bootstrap-server 172.16.100.10:9092 --create --topic TestCCC --partitions 3 --replication-factor 3

列出所有主題

kafka-topics.sh --list --bootstrap-server 172.16.100.10:9092

# 對於啟用了sasl權限的需要加上權限文件
kafka-consumer-groups.sh --bootstrap-server 172.16.100.10:9092 --list --command-config ../config/sasl.properties

列出所有消費者組

# 新版客戶端 Kafka版本1.0
./kafka-consumer-groups.sh --new-consumer --bootstrap-server 172.16.100.10:9092 --list | wc -l

# 新版客戶端,Kafka版本2.1
./kafka-consumer-groups.sh --bootstrap-server 172.16.100.10:9092 --list 

# 舊版客戶端
./kafka-consumer-groups.sh --zookeeper 172.16.100.10:2181 --list | wc -l

查看消費者組成員(僅限2.x以上)

kafka-consumer-groups.sh --describe --bootstrap-server 172.31.13.93:9092 --members --group GROUP_NAME

查看所有主題詳情

kafka-topics.sh --describe --bootstrap-server 172.16.100.10:9092

查看主題詳情

kafka-topics.sh --describe --bootstrap-server 172.16.100.10:9092 --topic TestCCC

查看所有ISR列表小於AR列表的主題

kafka-topics.sh --describe --bootstrap-server 192.168.5.138:9092 --under-replicated-partitions

說明:如果沒有返回任何信息則說明同步沒有問題。因為正常情況下Replicats和Isr列表是相同的,如果同步有問題,有些副本落后太多則兩個Isr列表的成員就會少。

查看特定主題的同步是否有問題

kafka-topics.sh --describe --bootstrap-server 192.168.5.138:9092 --under-replicated-partitions --topic Test

查看哪些主題在建立是單獨設置了配置

kafka-topics.sh --describe --bootstrap-server 192.168.5.138:9092 --topics-with-overrides

查看主題參數

kafka-configs.sh --describe --zookeeper 172.16.100.10/kafka --entity-type topics --entity-name Test

刪除主題

kafka-topics.sh --delete --bootstrap-server 172.16.100.10:9092 --topic TestCCC

這只是標記主題為刪除,因為它是一個異步操作,如果發現某些時候刪除了主題但是其ZK中的節點包括磁盤數據還都在,你可以手動清理一下:

  • 刪除ZK中/admin/delete_topics下的需要刪除的主題名稱

  • 手動刪除磁盤上的該主題分區目錄

  • 在ZK中執行 rmr /controller 來觸發Controller的重新選舉,這一步要慎重因為它會造成大規模Leader重新選舉,不過只執行前兩步也行,只是Controller中的緩存沒有更新而已

delete.topic.enable=true 如果這個參數設置為false,那么你用命令刪除了主題,Kafka也不會刪除。只有該參數為true,那么Kafka才會異步刪除相關數據,只有當其他情況kafka無法完成刪除的時候你才需要手動刪除。

修改主題的分區數量

kafka-topics.sh --bootstrap-server 172.16.100.10:9092 --alter --topic TestCCC --partitions 4

 測試消息的生產和消費

啟動生產者

kafka-console-producer.sh --broker-list 172.16.100.10:9092 --topic Test

啟動消費者

kafka-console-consumer.sh --bootstrap-server 172.16.100.10:9092 --topic Test --from-beginning

獲取指定主題當前總的消息數量

# --time -1 表示最大位移;--time -2 表示最早位移,這個通常是0
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.5.134:9092 --topic Test --time -1

說明:--time -1 的每個分區結果 減去 --time -2 的每個分區結果,然后每個分區差值相加就是當前主題有多少條消息

重設消費者位移

查看某個消費者組針對某個主題的位移信息

kafka-consumer-groups.sh --bootstrap-server 192.168.5.134:9092 --describe --group TestGroup

重設位移必須要停止消費者

重設位移有幾種選項:

  • --to-earliest:設置到最早位移處,也就是0
  • --to-latest:設置到最新處,也就是主題分區HW的位置
  • --to-offset NUM:指定具體的位移位置
  • --shift-by NUM:基於當前位移向前回退多少
  • --by-duration:回退到多長時間
# 設置TestGroup消費者組所消費的所有topic位移回退到0

kafka-consumer-groups.sh --bootstrap-server 192.168.5.134:9092 --group TestGroup --reset-offsets --all-topics --to-earliest --execute

# 也可以指定具體主題
kafka-consumer-groups.sh --bootstrap-server 192.168.5.134:9092 --group TestGroup --reset-offsets --topic Test --to-earliest --execute

 

吞吐量測試 

生產

kafka-producer-perf-test.sh --topic Test --num-records 100000 --record-size 150 --throughput -1 --producer-props bootstrap.servers=192.168.5.134:9092 acks=-1

消費

kafka-consumer-perf-test.sh --broker-list 192.168.5.134:9092 --messages 10000 --topic Test

日志查看

我們可以通過命令來查看日志內容以及索引文件內容。

./kafka-run-class.sh kafka.tools.DumpLogSegments --files /work/data/kafka/logs/hellokafka-0/00000000000000000000.log

配置管理

所謂配置就是參數,比如修改主題的默認參數。

主題級別的

# 查看配置
kafka-configs.sh --describe --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BB

這里顯示 Configs for topic 'BBB' are 表示它的配置有哪些,這里沒有表示沒有為該主題單獨設置配置,都是使用的默認配置。

image.png

# 增加一個配置
kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --add-config flush.messages=2

image.png

如果修改的話還是相同的命令,只是把值修改一下

image.png

# 刪除配置
kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --delete-config flush.messages

image.png

客戶端級別

這個主要是設置流控

# 設置指定消費者的流控 --entity-name 是客戶端在創建生產者或者消費者時是指定的client.id名稱
kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=1024' --entity-type clients --entity-name COMSUMER_NAME

image.png

下圖為ZK中對應的信息

image.png

查看當前有多少消費者組

./kafka-consumer-groups.sh --bootstrap-server 172.16.48.171:9092 --list

查看消費者組的消費偏移量

./kafka-consumer-groups.sh --bootstrap-server 172.16.48.171:9092 --describe --group TestGroup

CURRENT-OFFSET:當前消費者位移

LOG-END-OFFSET:分區最新位移

LAG:LOG-END-OFFSET減去CURRENT-OFFSET的值,表示積壓量

CONSUMER-ID:是Kafka自己生成的

CLIENT-ID:是消費者代碼里寫的CLIENT ID,用於區分同消費者組中的不同客戶端

注意:查看偏移量在kafka早期版本(0.9.0.0之前)使用下面的命令

kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect [ZOOKEEPER_IPADDRESS]:[ZOOKEEPER_PORT] --group [CONSUMER_GROUP]

分區管理

分區平衡

Leader副本在集群中應該是均衡分布,因為Leader副本對外提供讀寫服務,盡可能不讓同一個主題的多個Leader副本在同一個代理上,但是隨着時間推移比如故障轉移等情況發送,Leader副本可能不均衡。有兩種方式設置自動平衡,自動和手動。

自動就是在配置文件中增加 auto.leader.rebalance.enable true 如果該項為false,當某個節點故障恢復並重新上線后,它原來的Leader副本也不會轉移回來,只是一個Follower副本。

手動就是通過命令來執行

kafka-preferred-replica-election.sh --zookeeper 172.16.48.171:2181/kafka

分區遷移

當下線一個節點需要將該節點上的分區副本遷移到其他可用節點上,Kafka並不會自動進行分區遷移,如果不遷移就會導致某些主題數據丟失和不可用的情況。當增加新節點時,只有新創建的主題才會分配到新節點上,之前的主題分區不會自動分配到新節點上,因為老的分區在創建時AR列表中沒有這個新節點。

image.png

上面2個主題,每個主題3個分區,每個分區3個副本,我們假設現在代理2要下線,所以我們要把代理2上的這兩個主題的分區數據遷移出來。

# 1. 在KAFKA目錄的config目錄中建立topics-to-move.json文件
{
    "topics":[
        {
            "topic":"AAA"
        },
        {
            "topic":"BBB"
        }
    ],
    "version":1
}
# 2. 生成分區分配方案,只是生成一個方案信息然后輸出
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "1,2" --generate

image.png

這個命令的原理是從zookeeper中讀取主題元數據信息及制定的有效代理,根據分區副本分配算法重新計算指定主題的分區副本分配方案。把【Proposed partition reassignment configuration】下面的分區方案保存到一個JSON文件中,partitions-reassignment.json 文件名無所謂。

# 3. 執行方案
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute
# 4. 查看進度
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --verify

image.png

查看結果,這里已經沒有代理0了。

image.png

集群擴容

上面演示了節點下線的數據遷移,這里演示一下集群擴容的數據遷移。我們還是用上面兩個主題,假設代理0又重新上線了。其實擴容就是上面的反向操作

# 1. 建立JSON文件
# 該文件和之前的相同

# 2. 生成方案並保存到一個JSON文件中
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "0,1,2" --generate

image.png

# 3. 數據遷移,這里通過--throttle做一個限流操作,如果數據過大會把網絡堵塞。
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute --throttle 1024

image.png

查看進度和結果

image.png

實際上上面這種方式也可以用在這種場景下,比如3台kafka集群這時候需要用新的機器替換老的機器,這時候你可以把新機器加入到這個老的機器中變成一個更大的集群,然后通過上面的方式在 --broker-list "新機器的ID" 然后進行執行,這樣的話這個集群中的消息以后就只會發送到新的機器上。然后切換生產者到新機器上,切換一些消費者到新機器上,這樣老機器隊列消費完畢就可以把剩余的消費者也切換到新機器上。老機器就可以下線了。

增加分區

通常在需要提供吞吐量的時候我們會增加分區,然后如果代理數量不擴大,同時生產者和消費者線程不增大,你擴展了分區也沒有用。

image.png

kafka-topics.sh --alter --zookeeper 172.16.48.171:2181/kafka --partitions 3 --topic KafkaTest03

image.png

增加副本

集群規模擴大並且想對所有主題或者指定主題提高可用性,那么可以增加原有主題的副本數量

image.png

上面是3個分區,每個分區1個副本,我們現在把每個分區擴展為3個副本

# 1. 創建JSON文件 replica-extends.json 
{
    "version": 1,
    "partitions": [{
            "topic": "KafkaTest04",
            "partition": 0,
            "replicas": [0,1,2]
        },
        {
            "topic": "KafkaTest04",
            "partition": 1,
            "replicas": [0,1,2]
        },
        {
            "topic": "KafkaTest04",
            "partition": 2,
            "replicas": [0,1,2]
        }
    ]
}
# 2. 執行分區副本重新分配命令
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./replica-extends.json --execute

image.png

查看狀態

image.png

查看結果

image.png

鏡像操作

Kafka有一個鏡像工具kafka-mirror-maker.sh,用於將一個集群數據同步到另外一個集群中,這個非常有用,比如機房搬遷就需要進行數據同步。該工具的本質就是創建一個消費者,在源集群中需要遷移的主題消費數據,然后創建一個生產者,將消費的數據寫入到目標集群中。

首先創建消費者配置文件mirror-consumer.properties(文件路徑和名稱是自定義的)

# 源kafka集群代理地址列表
bootstrap.servers=IP1:9092,IP2:9092,IP3:9092
# 消費者組名
group.id=mirror

其次創建生產者配置文件mirror-producer.properties(文件路徑和名稱是自定義的)

# 目標kafka集群地址列表
bootstrap.servers=IP1:9092,IP2:9092,IP3:9092

運行鏡像命令

# 通過 --whitelist 指定需要鏡像的主題,通過  --blacklist 指定不需要鏡像的主題  
# --new.producer 使用新的生產者 --new.consumer 使用新的消費者
# --num.streams N 消費者線程數量 --num.producers N 生產者線程數量 kafka
-mirror-maker.sh --consumer.config PATH/mirror-consumer.properties --producer.config PATH/mirror-producer.properties --whitelist TOPIC

由於鏡像操作是啟動一個生產者和消費者,所以數據同步完成后這個生產者和消費者並不會關閉,它會依然等待新數據,所以同步完成以后你需要自己查看,確認完成了則關閉生產者和消費者。另外目標集群上並不需要提前建立主題,它會自己建立,但是如果已經建立好了它就會直接使用。

下面是一個我在公司測試環(kafka版本為 0.8.1.1,下面的命令和新版本kafka略有區別)境測試遷移的一個截圖,我這里只測試了 EEE999 這個主題

./kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config ./mirrorConsumerConf.conf --num.streams 10 --producer.config ./mirrorProducerConf.conf -num.producers 10 --whitelist "EEE999"

目標服務器不需要提前建立這個主題,你可以建立也可以不建立。這個命令可以用 nohup 執行放到后台。然后通過下面的命令查看同步進度

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect 192.168.50.162:2181/kafka --gyncer --topic EEE999

注意,它的同步是把原有的數據都同步到目標環境中,所以如果在老版本中兩個卡夫卡集群是完全獨立的那么意味着ZK也是獨立的,所以新環境中的消費者在消費隊列的時候可能會出現重復消費的情況,這就需要你的程序支持冪等原則或者手動設置消費偏移量。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM