創建topic,指定備份分區數
bin/kafka-topics.sh --create --zookeeper zk:2181 --replication-factor 2 --partitions 4 --topic test-topic
查看topic
查看topic列表
bin/kafka-topics.sh --zookeeper zk:2181–list
查看topic分區情況
bin/kafka-topics.sh --zookeeper zk:2181 --describe
指定topic
bin/kafka-topics.sh --zookeeper zk:2181 --describe --topic test-topic
創建producer
bin/kafka-console-producer.sh --broker-list broker:9092 --topic test-topic
創建consumer
bin/kafka-console-consumer.sh --zookeeper zk:2181 --topic test-topic
bin/kafka-console-consumer.sh --zookeeper zk:2181 --topic test-topic --from-beginning
新生產者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
新消費者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
修改主題分區
./kafka-topics.sh --zookeeper zk:2181 --alter --topic TEST_16 --partitions 16
修改主題參數
修改保存時間
bin/kafka-topics.sh --zookeeper zk:2181 -topic test--alter --config retention.ms=86400000
基於0.8.0版本。
重新分配分區kafka-reassign-partitions.sh
這個命令可以分區指定到想要的--broker-list上 bin/kafka-reassign-partitions.sh --topics-to-move-json-file topics-to-move.json --broker-list "171" --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --execute cat topic-to-move.json {"topics": [{"topic": "test2"}], "version":1 }
為Topic增加 partition數目kafka-add-partitions.sh
bin/kafka-add-partitions.sh --topic test --partition 2 --zookeeper 192.168.197.170:2181,192.168.197.171:2181 (為topic test增加2個分區)
手動均衡topic
bin/kafka-preferred-replica-election.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --path-to-json-file preferred-click.json
cat preferred-click.json
{ "partitions": [ {"topic": "click", "partition": 0}, {"topic": "click", "partition": 1}, {"topic": "click", "partition": 2}, {"topic": "click", "partition": 3}, {"topic": "click", "partition": 4}, {"topic": "click", "partition": 5}, {"topic": "click", "partition": 6}, {"topic": "click", "partition": 7}, {"topic": "play", "partition": 0}, {"topic": "play", "partition": 1}, {"topic": "play", "partition": 2}, {"topic": "play", "partition": 3}, {"topic": "play", "partition": 4}, {"topic": "play", "partition": 5}, {"topic": "play", "partition": 6}, {"topic": "play", "partition": 7}
] }
分區遷移,擴容
cat > increase-replication-factor.json <<EOF {"version":1, "partitions":[ {"topic":"__consumer_offsets","partition":0,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":1,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":2,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":3,"replicas":[0,1]}] } EOF
執行
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
驗證
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
0.8版本查詢offset信息
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk:2181 --group test--topic test
新消費者列表查詢(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
顯示某個消費組的消費詳情(僅支持offset存儲在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
顯示某個消費組的消費詳情(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
平衡leader
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
kafka自帶壓測命令
bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092
broker配置項 | 默認值 | 推薦值 | 示例 | 說明 |
broker.id | null | null | broker.id=1 | 當前broker的id,集群內互異 |
host.name | null | null | host.name=10.10.10.10 | broker的主機地址,若有配置,會綁定到該地址,若沒有,會綁定到所有的地址 0.10版本引入listeners配置,若listeners有配置,該項不生效 |
port | 9092 | 9092 | port=9092 | broker監聽端口 0.10版本引入listeners配置,若listeners有配置,該項不生效 |
listeners | "" | PLAINTEXT://10.10.10.10:9092 | listeners=PLAINTEXT://10.10.10.10:9092 | 服務監聽配置 |
auto.create.topics.enable | true | false | auto.create.topics.enable=false | 允許自動創建主題 |
auto.leader.rebalance.enable | true | true | auto.leader.rebalance.enable=true | 后台定期自動重分配leader |
default.replication.factor | 1 | 3 | default.replication.factor=3 | 默認副本數,同時也決定了集群所有主題均可以正常使用的允許的宕機台數為N-1(N為配置值) |
delete.topic.enable | true | true | delete.topic.enable=true | 允許刪除主題 |
group.initial.rebalance.delay.ms | 3000 | 3000 | group.initial.rebalance.delay.ms=3000 | 消費組內各節點第一次啟動的負載均衡分配等待時間,配置越大意味着重新平衡次數越少,但會增加等待時間 |
log.cleaner.enable | true | true | log.cleaner.enable=true | 清理策略定義的過期數據日志 |
log.dirs | null | pathtokafka/kafka-instance-logs | log.dirs=/app/kafka/kafka-test-logs | kafka數據日志目錄 |
log.flush.interval.messages | 9223372036854775807 | 20000(根據實際業務情況配置) | log.flush.interval.messages=20000 | 將分區消息刷到磁盤的消息量 |
log.flush.interval.ms | =log.flush.scheduler.interval.ms=9223372036854775807 | 300000 | log.flush.interval.ms=300000 | 將消息刷到磁盤的時間間隔 |
log.retention.check.interval.ms | 300000 | 600000 | log.retention.check.interval.ms=600000 | 失效數據日志檢查間隔 |
log.retention.hours | 168 | 72 | log.retention.hours=72 | 數據保留時間 |
log.segment.bytes | 1073741824 | 1073741824 | log.segment.bytes=1073741824 | 單個數據文件大小 |
num.io.threads | 8 | 8 | num.io.threads=8 | 服務器用於處理請求的線程數,包括磁盤I/O,可根據核數適當增加 |
num.network.threads | 3 | 8 | num.network.threads=8 | 處理網絡請求的線程數,主要用於讀寫緩沖區數據,一般不需要太大,可根據核數適當增加 |
num.partitions | 1 | 4 | num.partitions=4 | 默認主題分區數 |
num.recovery.threads.per.data.dir | 1 | 16(各目錄總和核數一半左右) | num.recovery.threads.per.data.dir=16 | 宕機后恢復數據,清理數據的線程數,比較消耗CPU |
num.replica.fetchers | 1 | 8 | num.replica.fetchers=8 | 數據同步線程數 |
socket.receive.buffer.bytes | 102400 | 1048576 | socket.receive.buffer.bytes=1048576 | socket的接收緩沖區 (SO_RCVBUF) |
socket.send.buffer.bytes | 102400 | 1048576 | socket.send.buffer.bytes=1048576 | socket的發送緩沖區(SO_SNDBUF) |
socket.request.max.bytes | 104857600 | 104857600 | socket.request.max.bytes=104857600 | socket請求的最大字節數 |
transaction.state.log.min.isr | 1 | 1 | transaction.state.log.min.isr=1 | 消息提交后返回成功的最小副本數,ISR中副本數 |
zookeeper.connect | "" | "" | zookeeper.connect=10.10.10.10:2181/kafka/instance | zk連接串 |
zookeeper.connection.timeout.ms | =zookeeper.session.timeout.ms=6000 | 6000 | zookeeper.connection.timeout.ms | zk連接超時時間 |