目錄
安裝
下載與安裝
kafka下載地址:https://kafka.apache.org/downloads
需要說明的是,kafka的安裝依賴於zk,zk的部署可直接參考《Zookeeper介紹與基本部署》。當然,kafka默認也內置了zk的啟動腳本,在kafka安裝路徑的bin目錄下,名稱為zookeeper-server-start.sh
,如果不想獨立安裝zk,可直接使用該腳本。
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
tar xf kafka_2.12-2.2.0.tgz -C /usr/local/
cd /usr/local
ln -s kafka_2.12-2.2.0 kafka
配置
kafka主配置文件為/usr/local/kafka/config/server.properties
,配置示例如下:
broker.id=0
listeners=PLAINTEXT://192.168.0.29:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.0.29:2181,192.168.0.195:2181,192.168.0.27:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable=true
delete.topics.enable=true
配置說明:
- broker.id:每個broker在集群中的唯一標識,正整數。當該服務器的ip地址發生變更,但broker.id未變,則不會影響consumers的消費情況
- listeners:kafka的監聽地址與端口,在實際測試中如果寫0.0.0.0會報錯。
- num.network.threads:kafka用於處理網絡請求的線程數
- num.io.threads:kafka用於處理磁盤io的線程數
- socket.send.buffer.bytes:發送數據的緩沖區
- socket.receive.buffer.bytes:接收數據的緩沖區
- socket.request.max.bytes:允許接收的最大數據包的大小(防止數據包過大導致OOM)
- log.dirs:kakfa用於保存數據的目錄,所有的消息都會存儲在該目錄當中。可以通過逗號來指定多個路徑,kafka會根據最少被使用的原則選擇目錄分配新的partition。需要說明的是,kafka在分配partition的時候選擇的原則不是按照磁盤空間大小來定的,而是根據分配的partition的個數多少而定
- num.partitions:設置新創建的topic的默認分區數
- number.recovery.threads.per.data.dir:用於恢復每個數據目錄時啟動的線程數
- log.retention.hours:配置kafka中消息保存的時間,還支持log.retention.minutes和log.retention.ms。如果多個同時設置會選擇時間最短的配置,默認為7天。
- log.retention.check.interval.ms:用於檢測數據過期的周期
- log.segment.bytes:配置partition中每個segment數據文件的大小。默認為1GB。超出該大小后,會自動創建一個新的segment文件。
- zookeeper.connect:指定連接的zk的地址,zk中存儲了broker的元數據信息。可以通過逗號來設置多個值。格式為:
hostname:port/path
。hostname為zk的主機名或ip,port為zk監聽的端口。/path
表示kafka的元數據存儲到zk上的目錄,如果不設置,默認為根目錄 - zookeeper.connection.timeout:kafka連接zk的超時時間
- group.initial.rebalance.delay.ms:在實際環境當中,當將多個consumer加入到一個空的consumer group中時,每加入一個consumer就會觸發一次對partition消費的重平衡,如果加入100個,就得重平衡100次,這個過程就會變得非常耗時。通過設置該參數,可以延遲重平衡的時間,比如有100個consumer會在10s內全部加入到一個consumer group中,就可以將該值設置為10s,10s之后,只需要做一次重平衡即可。默認為0則代表不開啟該特性。
- auto.create.topics.enable:當有producer向一個不存在的topic中寫入消息時,是否自動創建該topic
- delete.topics.enable:kafka提供了刪除topic的功能,但默認並不會直接將topic數據物理刪除。如果要從物理上刪除(刪除topic后,數據文件也一並刪除),則需要將此項設置為true
需要說明的是,多個kafka節點依賴zk實現集群,所以各節點並不需要作特殊配置,只需要broker.id不同,並接入到同一個zk集群即可。
啟停操作
#啟動
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
#檢查java進程
# jps
1394 QuorumPeerMain
13586 Logstash
27591 Kafka
27693 Jps
#停止
/usr/local/kafka/bin/kafka-server-start.sh
驗證
可以通過zookeeper查看kafka的元數據信息:
#通過zk客戶端連接zookeeper
../zookeeper/bin/zkCli.sh
#查看根下多了很多目錄
[zk: localhost:2181(CONNECTED) 1] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
#查看/brokers/ids,可以看到有三個broker已經加入
[zk: localhost:2181(CONNECTED) 8] ls /brokers/ids
[0, 1, 2]
#查看/brokers/topics,目前為空,說明還沒有創建任何的topic
[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics
[]
基本操作
創建topic
上面完成了kafka的部署,通過驗證部署我們發現當前沒有topic,所以創建一個topic如下:
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic myfirsttopic
Created topic myfirsttopic.
參數說明:
- --create:創建一個topic
- --zookeeper: 指定zookeeper集群的主機列表,多個server可使用逗號分隔,這里因為kafka和zk是在同一個server,所以直接連接了本機的2181端口
- --replication-factor:指定創建這個topic的副本數
- --partitions:指定該topic的分區數
- --topic:指定topic的名稱
列出現有的topic
上面通過操作zk就可以看到topic相關信息,接下來我們直接通過kafka命令行來進行相關操作:
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --list
myfirsttopic
查看topic的詳細信息
#查看myfirsttopic的詳細信息
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic
Topic:myfirsttopic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: myfirsttopic Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0
Topic: myfirsttopic Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: myfirsttopic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
參數說明:
- --describe:查看topic的詳細信息
輸出說明:
- leader:當前負責讀寫的leader broker
- replicas:當前分區所有的副本對應的broker列表
- isr:處於活動狀態的broker
增加topic的partition數量
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 6 --topic myfirsttopic
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic
Topic:myfirsttopic PartitionCount:6 ReplicationFactor:2 Configs:
Topic: myfirsttopic Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0
Topic: myfirsttopic Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: myfirsttopic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: myfirsttopic Partition: 3 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: myfirsttopic Partition: 4 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: myfirsttopic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1
修改一個topic的副本數
#創建一個topic名為mysecondtopic,指定分區為2,副本為1
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 2 --topic mysecondtopic
Created topic mysecondtopic.
#查看新創建的topic詳細信息
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mysecondtopic
Topic:mysecondtopic PartitionCount:2 ReplicationFactor:1 Configs:
Topic: mysecondtopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: mysecondtopic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
#將broker.id為0上的partition的副本由原來的[0]擴充為[0,2],將broker.id為1上的partition的副本由原來的[1]擴充為[1,2]。
#需要先創建一個json文件如下:
# cat partitions-to-move.json
{
"partitions":
[
{
"topic":"mysecondtopic",
"partition": 0,
"replicas": [0,2]
},
{
"topic": "mysecondtopic",
"partition": 1,
"replicas": [1,2]
}
],
"version": 1
}
#執行副本修改
# ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./partitions-to-move.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"mysecondtopic","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"mysecondtopic","partition":0,"replicas":[0],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
#再次查看topic狀態,發現副本數由按照預期發生變更
# ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mysecondtopic
Topic:mysecondtopic PartitionCount:2 ReplicationFactor:2 Configs:
Topic: mysecondtopic Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0
Topic: mysecondtopic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1
刪除一個topic
#執行刪除操作
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myfirsttopic
Topic myfirsttopic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
#查看topic,可以看到myfirsttopic已被刪除
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --list
__consumer_offsets
mysecondtopic
通過producer生產消息
# ./bin/kafka-console-producer.sh --broker-list 192.168.0.29:9092 --topic mysecondtopic
>hello kafka!
>hello world!
>just a test!
>
>hi world!
>hahahaha!
>
通過consumer消費消息
# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.29:9092 --topic mysecondtopic --from-beginning
hello kafka!
just a test!
hi world!
hello world!
hahahaha!