Kafka shell :運維常用命令


一、通過kafka-topics.sh來執行topic相關的命令

1、用法

[root@kafka-node1 kafka]# bin/kafka-topics.sh
Create, delete, describe, or change a topic.
Option                                   Description
------                                   -----------
--alter                                  Alter the number of partitions,
                                           replica assignment, and/or
                                           configuration for the topic.
--config <String: name=value>            A topic configuration override for the
                                           topic being created or altered.The
                                           following is a list of valid
                                           configurations:
                                         	cleanup.policy
                                         	compression.type
                                         	delete.retention.ms
                                         	file.delete.delay.ms
                                         	flush.messages
                                         	flush.ms
                                         	follower.replication.throttled.
                                           replicas
                                         	index.interval.bytes
                                         	leader.replication.throttled.replicas
                                         	max.message.bytes
                                         	message.format.version
                                         	message.timestamp.difference.max.ms
                                         	message.timestamp.type
                                         	min.cleanable.dirty.ratio
                                         	min.compaction.lag.ms
                                         	min.insync.replicas
                                         	preallocate
                                         	retention.bytes
                                         	retention.ms
                                         	segment.bytes
                                         	segment.index.bytes
                                         	segment.jitter.ms
                                         	segment.ms
                                         	unclean.leader.election.enable
                                         See the Kafka documentation for full
                                           details on the topic configs.
--create                                 Create a new topic.
--delete                                 Delete a topic
--delete-config <String: name>           A topic configuration override to be
                                           removed for an existing topic (see
                                           the list of configurations under the
                                           --config option).
--describe                               List details for the given topics.
--disable-rack-aware                     Disable rack aware replica assignment
--force                                  Suppress console prompts
--help                                   Print usage information.
--if-exists                              if set when altering or deleting
                                           topics, the action will only execute
                                           if the topic exists
--if-not-exists                          if set when creating topics, the
                                           action will only execute if the
                                           topic does not already exist
--list                                   List all available topics.
--partitions <Integer: # of partitions>  The number of partitions for the topic
                                           being created or altered (WARNING:
                                           If partitions are increased for a
                                           topic that has a key, the partition
                                           logic or ordering of the messages
                                           will be affected
--replica-assignment <String:            A list of manual partition-to-broker
  broker_id_for_part1_replica1 :           assignments for the topic being
  broker_id_for_part1_replica2 ,           created or altered.
  broker_id_for_part2_replica1 :
  broker_id_for_part2_replica2 , ...>
--replication-factor <Integer:           The replication factor for each
  replication factor>                      partition in the topic being created.
--topic <String: topic>                  The topic to be create, alter or
                                           describe. Can also accept a regular
                                           expression except for --create option
--topics-with-overrides                  if set when describing topics, only
                                           show topics that have overridden
                                           configs
--unavailable-partitions                 if set when describing topics, only
                                           show partitions whose leader is not
                                           available
--under-replicated-partitions            if set when describing topics, only
                                           show under replicated partitions
--zookeeper <String: urls>               REQUIRED: The connection string for
                                           the zookeeper connection in the form
                                           host:port. Multiple URLS can be
                                           given to allow fail-over.

啟動zookeeper

使用安裝包中的腳本啟動單節點Zookeeper實例:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

啟動Kafka服務

使用kafka-server-start.sh啟動kafka服務:

bin/kafka-server-start.sh config/server.properties

創建Topic

使用kafka-topics.sh 創建但分區單副本的topic test

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看Topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

產生消息

使用kafka-console-producer.sh 發送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

消費消息

使用kafka-console-consumer.sh 接收消息並在終端打印

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

刪除Topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

注意事項:當執行刪除命令之后。topic不是物理刪除,而是一個標記刪除的操作

問題1:標記刪除之后的主題是否還可以繼續生產數據?

不會又影響

注意:當服務器重啟就會刪除已經標記的topic,這個需要和版本有關系

如果是0.10的話,注意,有如果刪除topic的需求,需要在server.partitions文件中加入delete.topic.enable=true

查看Topic的詳細信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

第一行給出了所有分區的摘要,每個附加行給出了關於一個分區的信息。 由於我們只有一個分區,所以只有一行。

“Leader”: 是負責給定分區的所有讀取和寫入的節點。 每個節點將成為分區隨機選擇部分的領導者。

“Replicas”: 是復制此分區日志的節點列表,無論它們是否是領導者,或者即使他們當前處於活動狀態。

“Isr”: 是一組“同步”副本。這是復制品列表的子集,當前活着並被引導到領導者。

更改tipic的信息

 更改topic信息、topic名稱、zookeeper地址是必須的參數、其他參數還包括了partition個數等

1、修改

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic5 --partitions 40

2、修改增加配置參數

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic4 --config flush.messages=1

3、刪除配置參數

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic4 --delete-config flush.messages

二、python調用

1、生產消息

#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author:ahong luo
from kafka import KafkaProducer
# 創建生產者
producer = KafkaProducer(bootstrap_servers="192.168.118.95")
for _ in range(80000):
    producer.send('test',b'192.168.118.95')
producer.send('test',b' 192.168.118.95')

2、消費消息

#-*- coding:utf-8 -*-
# Author:ahong luo
#https://pypi.org/project/kafka-python/
from kafka import KafkaConsumer

#connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('logs', bootstrap_servers="192.168.118.100:2181")

for msg in consumer:
    print(msg)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM