一、通過kafka-topics.sh來執行topic相關的命令
1、用法
[root@kafka-node1 kafka]# bin/kafka-topics.sh
Create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions,
replica assignment, and/or
configuration for the topic.
--config <String: name=value> A topic configuration override for the
topic being created or altered.The
following is a list of valid
configurations:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
max.message.bytes
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
See the Kafka documentation for full
details on the topic configs.
--create Create a new topic.
--delete Delete a topic
--delete-config <String: name> A topic configuration override to be
removed for an existing topic (see
the list of configurations under the
--config option).
--describe List details for the given topics.
--disable-rack-aware Disable rack aware replica assignment
--force Suppress console prompts
--help Print usage information.
--if-exists if set when altering or deleting
topics, the action will only execute
if the topic exists
--if-not-exists if set when creating topics, the
action will only execute if the
topic does not already exist
--list List all available topics.
--partitions <Integer: # of partitions> The number of partitions for the topic
being created or altered (WARNING:
If partitions are increased for a
topic that has a key, the partition
logic or ordering of the messages
will be affected
--replica-assignment <String: A list of manual partition-to-broker
broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each
replication factor> partition in the topic being created.
--topic <String: topic> The topic to be create, alter or
describe. Can also accept a regular
expression except for --create option
--topics-with-overrides if set when describing topics, only
show topics that have overridden
configs
--unavailable-partitions if set when describing topics, only
show partitions whose leader is not
available
--under-replicated-partitions if set when describing topics, only
show under replicated partitions
--zookeeper <String: urls> REQUIRED: The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.
啟動zookeeper
使用安裝包中的腳本啟動單節點Zookeeper實例:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
啟動Kafka服務
使用kafka-server-start.sh啟動kafka服務:
bin/kafka-server-start.sh config/server.properties
創建Topic
使用kafka-topics.sh 創建但分區單副本的topic test
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看Topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
產生消息
使用kafka-console-producer.sh 發送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
消費消息
使用kafka-console-consumer.sh 接收消息並在終端打印
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
刪除Topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
注意事項:當執行刪除命令之后。topic不是物理刪除,而是一個標記刪除的操作
問題1:標記刪除之后的主題是否還可以繼續生產數據?
不會又影響
注意:當服務器重啟就會刪除已經標記的topic,這個需要和版本有關系
如果是0.10的話,注意,有如果刪除topic的需求,需要在server.partitions文件中加入delete.topic.enable=true
查看Topic的詳細信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
第一行給出了所有分區的摘要,每個附加行給出了關於一個分區的信息。 由於我們只有一個分區,所以只有一行。
“Leader”: 是負責給定分區的所有讀取和寫入的節點。 每個節點將成為分區隨機選擇部分的領導者。
“Replicas”: 是復制此分區日志的節點列表,無論它們是否是領導者,或者即使他們當前處於活動狀態。
“Isr”: 是一組“同步”副本。這是復制品列表的子集,當前活着並被引導到領導者。
更改tipic的信息
更改topic信息、topic名稱、zookeeper地址是必須的參數、其他參數還包括了partition個數等
1、修改
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic5 --partitions 40
2、修改增加配置參數
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic4 --config flush.messages=1
3、刪除配置參數
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic4 --delete-config flush.messages
二、python調用
1、生產消息
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# Author:ahong luo
from kafka import KafkaProducer
# 創建生產者
producer = KafkaProducer(bootstrap_servers="192.168.118.95")
for _ in range(80000):
producer.send('test',b'192.168.118.95')
producer.send('test',b' 192.168.118.95')
2、消費消息
#-*- coding:utf-8 -*-
# Author:ahong luo
#https://pypi.org/project/kafka-python/
from kafka import KafkaConsumer
#connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('logs', bootstrap_servers="192.168.118.100:2181")
for msg in consumer:
print(msg)
