kafka-常用腳本


Kafka常用腳本
在Kafka安裝目錄下($KAFKA_HOME/bin),提供了很多內置的腳本供我們使用。使用腳本可以測試Kafka的大多數功能,下面我們就腳本的使用作出說明。

  1. 啟動broker
    bin/kafka-server-start.sh腳本提供了啟動broker的功能。

前台啟動:

> bin/kafka-server-start.sh config/server.properties

后台啟動:

> bin/kafka-server-start.sh -daemon config/server.properties
  1. 停止broker
    bin/kafka-server-stop.sh腳本提供了停止broker的功能。
> bin/kafka-server-stop.sh
  1. 創建topic
    bin/kafka-topic.sh腳本提供了創建topic的功能。創建topic時,需要指定兩個參數:
    • 分區數量
    • 副本數量

注意,副本數量最多不能超過當前集群中broker節點的數量。

下面創建一個名為test的topic,具有3個分區,副本數量為2。

> bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 3 --replication-factor 2 --topic test
  1. 查看topic列表
    bin/kafka-topic.sh腳本提供了查看topic列表的功能。通過–list參數即可查看當前集群所有的topic列表。
> bin/kafka-topics.sh --zookeeper localhost:2181 --list
__consumer_offsets
test
  1. 查看某個topic的詳細信息
    bin/kafka-topic.sh腳本提供了查看某個topic詳細信息的功能。通過–describe參數和–topic參數指定topic名稱即可。
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
Topic:test  PartitionCount:3    ReplicationFactor:2 Configs:
    Topic: test Partition: 0    Leader: 2   Replicas: 2,0   Isr: 2,0
    Topic: test Partition: 1    Leader: 0   Replicas: 0,1   Isr: 0,1
    Topic: test Partition: 2    Leader: 1   Replicas: 1,2   Isr: 1,2
  1. 刪除topic
    bin/kafka-topic.sh腳本提供了刪除topic的功能。通過–delete參數和–topic參數指定topic名稱即可。

將名稱為test的topic刪除:

> bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

注意,broker配置中必須設置delete.topic.enable=true,否則刪除操作不會生效。

  1. 命令行生產者
    bin/kafka-console-producer.sh腳本能夠通過命令行輸入向指定的topic中發送數據。

向名稱為test的topic中發送3條數據:

> bin/kafka-console-producer.sh --broker-list hostname:9092 --topic test
This is a message
This is another message
hello kafka
^C
  1. 命令行消費者
    bin/kafka-console-consumer.sh腳本能夠消費topic中的數據並打印到控制台。

消費名稱為test的topic中的數據:

> bin/kafka-console-consumer.sh --bootstrap-server hostname:9092 --topic test --from-beginning
This is a message
This is another message
hello kafka
^CProcessed a total of 3 messages

注意,–from-beginning參數表示從topic的最開始位置進行消費,如果沒有指定該參數,表示從末尾開始消費,只有當啟動消費者后,有新的數據寫入,才能夠顯示到控制台。

  1. 查看消費組
    bin/kafka-consumer-groups.sh腳本能夠查看集群中消費組的信息。通過–list參數能夠列出當前消費組列表。
> bin/kafka-consumer-groups.sh --bootstrap-server hostname:9092 --new-consumer --list
console-consumer-54336

通過–describe參數可以查看消費組的消費詳情:

> bin/kafka-consumer-groups.sh --bootstrap-server company01:9092 --new-consumer --describe --group console-consumer-54336
 
TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test                           0          1               1               0          consumer-1-860f59b3-ebe9-4cda-a074-d108b1fec7bf   /192.168.100.109               consumer-1
test                           1          1               1               0          consumer-1-860f59b3-ebe9-4cda-a074-d108b1fec7bf   /192.168.100.109               consumer-1
test                           2          1               1               0          consumer-1-860f59b3-ebe9-4cda-a074-d108b1fec7bf   /192.168.100.109               consumer-1

CURRENT-OFFSET表示消費者當前消費到該分區的偏移量。

LOG-END-OFFSET表示當前分區最后的偏移量。

LAG表示消費者“落后”的消費進度。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM