Kafka——常用命令


 

一、啟停Kafka

1. 啟動Kafka

后台常駐方式,帶上參數 -daemon,如:

bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &

指定 JMX port 端口啟動,指定 jmx,可以方便監控 Kafka 集群

JMX_PORT=9991 bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

2. 停止Kafka

bin/kafka-server-stop.sh

kill -s TERM $PIDS

注:kill停止不要強制-9,不是通過強制kill:

  • 會自動同步日志到磁盤里,當重啟時,以避免需要做任何的日志恢復。日志恢復需要時間,所以這樣可以加快有意啟動;
  • 它將所有leader分區服務器移動到其他的副本,但是leader遷移需要使用特殊的設置(controlled.shutdown.enable=true)。

 

二、Topic管理 

1. 創建Topic

# bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic ops_coffee --partitions 3 --replication-factor 2
Created topic "ops_coffee".

--partitions: 指定分區數,如果不指定默認會使用配置文件中 num.partitions配置的數量

2. 增加分區數

# bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic ops_coffee --partitions 5
Adding partitions succeeded!

注意:partition的數量只能增加不能減少

3. 查看Topic列表

kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

4. 查看Topic信息

# bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic ops_coffee
Topic:ops_coffee    PartitionCount:3    ReplicationFactor:2 Configs:
    Topic: ops_coffee   Partition: 0    Leader: 1   Replicas: 1,2   Isr: 1,2
    Topic: ops_coffee   Partition: 1    Leader: 2   Replicas: 2,3   Isr: 2,3
    Topic: ops_coffee   Partition: 2    Leader: 3   Replicas: 3,1   Isr: 3,1

5. 刪除Topic

kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic test

6. 查看Topic消費進度

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.16.10.91:9092,172.16.10.92:9092,172.16.10.93:9092 --topic test --time -1

time參數說明:

  • -1:表示查詢test各個分區當前最大的消息位移
  • -2:表示查詢test各個分區的最小位移

 

三、Consumer管理

1. 查看Group列表

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.88.108:9092 --list

2. 查看Group消費情況

bin/kafka-consumer-groups.sh --bootstrap-server 172.16.10.91:9092 --group logstash --describe

  • CURRENT-OFFSET:當前消費偏移量
  • LOG-END-OFFSET:末尾偏移量
  • LAG:為未消費的記錄,如果有很多,說明消費延遲很嚴重

3. 刪除Group 中 Topic

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --topic test --delete

4. 刪除Group

bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test-1 --delete

5. 重置offset

1、要求修改的group不能active,查看是否active

[root@izj6c46svwddzpu0evy0vbz kafka_2.11-2.0.1]# bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test_4 --describe
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Consumer group 'test_4' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
consumer-send   0          5697            5697            0               -               -               -
producer-syn    0          4125            4125            0               -               -               -

2、重置命令

bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test_4 --reset-offsets -to-offset 100 --topic consumer-send --execute

3、導出offset

bin/kafka-consumer-groups.sh --bootstrap-server 47.52.199.51:9092 --group test_4 --reset-offsets -to-offset 100 --topic consumer-send --export > 1.txt

 

四、生產消費者

1. 創建生成者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

2. 創建消費者

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic middleware --from-beginning

 

五、Topic數據過期

kafka 默認存放7天的臨時數據,如果遇到磁盤空間小,存放數據量大,可以設置縮短這個時間。

1. 全局設置

修改 server.properties

log.retention.hours=72
log.cleanup.policy=delete

2.單獨對某一個topic設置過期時間

./kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name mytopic --entity-type topics --add-config retention.ms=86400000

retention.ms=86400000 為一天,單位是毫秒。

查看設置:

$ ./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name mytopic --entity-type topics
Configs for topics:wordcounttopic are retention.ms=86400000

3.立即刪除某個topic下的數據

./kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --config cleanup.policy=delete

 

六、動態配置

1. 平衡Leader

bin/kafka-preferred-replica-election.sh --bootstrap-server localhost:9092

 

七、壓力測試

1. 生產者

kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 100000 --throughput 1000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

參數說明:

  • record-size:是一條信息有多大,單位是字節
  • num-records:是總共發送多少條信息
  • throughput :是每秒多少條信息

生產壓力測試過程如下:

5003 records sent, 1000.2 records/sec (0.10 MB/sec), 0.8 ms avg latency, 22.0 max latency.
5002 records sent, 1000.2 records/sec (0.10 MB/sec), 0.6 ms avg latency, 7.0 max latency.
5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.7 ms avg latency, 31.0 max latency.
5002 records sent, 1000.0 records/sec (0.10 MB/sec), 0.7 ms avg latency, 15.0 max latency.
5003 records sent, 1000.6 records/sec (0.10 MB/sec), 0.8 ms avg latency, 15.0 max latency.
5002 records sent, 1000.4 records/sec (0.10 MB/sec), 0.8 ms avg latency, 14.0 max latency.
5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.6 ms avg latency, 15.0 max latency.
5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.8 ms avg latency, 18.0 max latency.
5003 records sent, 1000.4 records/sec (0.10 MB/sec), 0.8 ms avg latency, 13.0 max latency.
5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.9 ms avg latency, 31.0 max latency.
100000 records sent, 999.970001 records/sec (0.10 MB/sec), 0.94 ms avg latency, 165.00 ms max latency, 1 ms 50th, 2 ms 95th, 7 ms 99th, 42 ms 99.9th.

測試結果說明:

本例中一共寫入10萬條消息,平均是999.970001條消息/秒,每秒向Kafka寫入了0.10MB的數據,每次寫入的平均延遲為0.94毫秒,最大的延遲為165毫秒

2. 消費者

kafka-consumer-perf-test.sh --zookeeper hadoop111:2181 --topic test --fetch-size 10000 --messages 10000000 --threads 1

參數說明:

  • --zookeeper :指定zookeeper的鏈接信息,集群中任意一台kafka服務器的地址和端口號
  • --topic :指定topic的名稱
  • --fetch-size :指定每次拉取的數據的大小
  • --messages :總共要消費的消息個數

消費壓力測試過程如下:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2020-05-15 17:41:57:339, 2020-05-15 17:41:59:243, 9.5367, 5.0088, 100000, 52521.0084

測試結果說明:

開始測試時間,測試結束數據,最大吞吐率9.5367MB/s,平均每秒消費5.0088MB/s,最大每秒消費100000條,平均每秒消費52521.0084條。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM