Kafka部署篇


安裝

下載與安裝

kafka下載地址:https://kafka.apache.org/downloads
需要說明的是,kafka的安裝依賴於zk,zk的部署可直接參考《Zookeeper介紹與基本部署》。當然,kafka默認也內置了zk的啟動腳本,在kafka安裝路徑的bin目錄下,名稱為zookeeper-server-start.sh,如果不想獨立安裝zk,可直接使用該腳本。

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
tar xf kafka_2.12-2.2.0.tgz -C /usr/local/
cd /usr/local
ln -s kafka_2.12-2.2.0 kafka

配置

kafka主配置文件為/usr/local/kafka/config/server.properties,配置示例如下:

broker.id=0
listeners=PLAINTEXT://192.168.0.29:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.0.29:2181,192.168.0.195:2181,192.168.0.27:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable=true
delete.topics.enable=true

配置說明:

  • broker.id:每個broker在集群中的唯一標識,正整數。當該服務器的ip地址發生變更,但broker.id未變,則不會影響consumers的消費情況
  • listeners:kafka的監聽地址與端口,在實際測試中如果寫0.0.0.0會報錯。
  • num.network.threads:kafka用於處理網絡請求的線程數
  • num.io.threads:kafka用於處理磁盤io的線程數
  • socket.send.buffer.bytes:發送數據的緩沖區
  • socket.receive.buffer.bytes:接收數據的緩沖區
  • socket.request.max.bytes:允許接收的最大數據包的大小(防止數據包過大導致OOM)
  • log.dirs:kakfa用於保存數據的目錄,所有的消息都會存儲在該目錄當中。可以通過逗號來指定多個路徑,kafka會根據最少被使用的原則選擇目錄分配新的partition。需要說明的是,kafka在分配partition的時候選擇的原則不是按照磁盤空間大小來定的,而是根據分配的partition的個數多少而定
  • num.partitions:設置新創建的topic的默認分區數
  • number.recovery.threads.per.data.dir:用於恢復每個數據目錄時啟動的線程數
  • log.retention.hours:配置kafka中消息保存的時間,還支持log.retention.minutes和log.retention.ms。如果多個同時設置會選擇時間最短的配置,默認為7天。
  • log.retention.check.interval.ms:用於檢測數據過期的周期
  • log.segment.bytes:配置partition中每個segment數據文件的大小。默認為1GB。超出該大小后,會自動創建一個新的segment文件。
  • zookeeper.connect:指定連接的zk的地址,zk中存儲了broker的元數據信息。可以通過逗號來設置多個值。格式為:hostname:port/path。hostname為zk的主機名或ip,port為zk監聽的端口。/path表示kafka的元數據存儲到zk上的目錄,如果不設置,默認為根目錄
  • zookeeper.connection.timeout:kafka連接zk的超時時間
  • group.initial.rebalance.delay.ms:在實際環境當中,當將多個consumer加入到一個空的consumer group中時,每加入一個consumer就會觸發一次對partition消費的重平衡,如果加入100個,就得重平衡100次,這個過程就會變得非常耗時。通過設置該參數,可以延遲重平衡的時間,比如有100個consumer會在10s內全部加入到一個consumer group中,就可以將該值設置為10s,10s之后,只需要做一次重平衡即可。默認為0則代表不開啟該特性。
  • auto.create.topics.enable:當有producer向一個不存在的topic中寫入消息時,是否自動創建該topic
  • delete.topics.enable:kafka提供了刪除topic的功能,但默認並不會直接將topic數據物理刪除。如果要從物理上刪除(刪除topic后,數據文件也一並刪除),則需要將此項設置為true

需要說明的是,多個kafka節點依賴zk實現集群,所以各節點並不需要作特殊配置,只需要broker.id不同,並接入到同一個zk集群即可。

啟停操作

#啟動
/usr/local/kafka/bin/kafka-server-start.sh  -daemon /usr/local/kafka/config/server.properties 

#檢查java進程

# jps
1394 QuorumPeerMain
13586 Logstash
27591 Kafka
27693 Jps

#停止
/usr/local/kafka/bin/kafka-server-start.sh

驗證

可以通過zookeeper查看kafka的元數據信息:

#通過zk客戶端連接zookeeper

../zookeeper/bin/zkCli.sh 

#查看根下多了很多目錄
[zk: localhost:2181(CONNECTED) 1] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

#查看/brokers/ids,可以看到有三個broker已經加入
[zk: localhost:2181(CONNECTED) 8] ls /brokers/ids
[0, 1, 2]

#查看/brokers/topics,目前為空,說明還沒有創建任何的topic
[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics
[]

基本操作

創建topic

上面完成了kafka的部署,通過驗證部署我們發現當前沒有topic,所以創建一個topic如下:

# ./bin/kafka-topics.sh --create --zookeeper localhost:2181  --replication-factor 2 --partitions 3 --topic myfirsttopic 
Created topic myfirsttopic.

參數說明:

  • --create:創建一個topic
  • --zookeeper: 指定zookeeper集群的主機列表,多個server可使用逗號分隔,這里因為kafka和zk是在同一個server,所以直接連接了本機的2181端口
  • --replication-factor:指定創建這個topic的副本數
  • --partitions:指定該topic的分區數
  • --topic:指定topic的名稱

列出現有的topic

上面通過操作zk就可以看到topic相關信息,接下來我們直接通過kafka命令行來進行相關操作:

# ./bin/kafka-topics.sh --zookeeper localhost:2181 --list                                                               
myfirsttopic

查看topic的詳細信息

#查看myfirsttopic的詳細信息
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic
Topic:myfirsttopic      PartitionCount:3        ReplicationFactor:2     Configs:
        Topic: myfirsttopic     Partition: 0    Leader: 0       Replicas: 0,2   Isr: 0
        Topic: myfirsttopic     Partition: 1    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: myfirsttopic     Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1

參數說明:

  • --describe:查看topic的詳細信息

輸出說明:

  • leader:當前負責讀寫的leader broker
  • replicas:當前分區所有的副本對應的broker列表
  • isr:處於活動狀態的broker

增加topic的partition數量

# ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 6 --topic myfirsttopic
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myfirsttopic            
Topic:myfirsttopic      PartitionCount:6        ReplicationFactor:2     Configs:
        Topic: myfirsttopic     Partition: 0    Leader: 0       Replicas: 0,2   Isr: 0
        Topic: myfirsttopic     Partition: 1    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: myfirsttopic     Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: myfirsttopic     Partition: 3    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: myfirsttopic     Partition: 4    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: myfirsttopic     Partition: 5    Leader: 2       Replicas: 2,1   Isr: 2,1

修改一個topic的副本數

#創建一個topic名為mysecondtopic,指定分區為2,副本為1
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 2 --topic mysecondtopic
Created topic mysecondtopic.

#查看新創建的topic詳細信息
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mysecondtopic
Topic:mysecondtopic     PartitionCount:2        ReplicationFactor:1     Configs:
        Topic: mysecondtopic    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: mysecondtopic    Partition: 1    Leader: 1       Replicas: 1     Isr: 1

#將broker.id為0上的partition的副本由原來的[0]擴充為[0,2],將broker.id為1上的partition的副本由原來的[1]擴充為[1,2]。
#需要先創建一個json文件如下:
# cat partitions-to-move.json
{
    "partitions":
    [
        {
            "topic":"mysecondtopic",
            "partition": 0,
            "replicas": [0,2]
        },
        {
            "topic": "mysecondtopic",
            "partition": 1,
            "replicas": [1,2]
        }
    ],
    "version": 1
}

#執行副本修改
# ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./partitions-to-move.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"mysecondtopic","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"mysecondtopic","partition":0,"replicas":[0],"log_dirs":["any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

#再次查看topic狀態,發現副本數由按照預期發生變更
# ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mysecondtopic
Topic:mysecondtopic     PartitionCount:2        ReplicationFactor:2     Configs:
        Topic: mysecondtopic    Partition: 0    Leader: 0       Replicas: 0,2   Isr: 0
        Topic: mysecondtopic    Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1

刪除一個topic

#執行刪除操作
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myfirsttopic
Topic myfirsttopic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

#查看topic,可以看到myfirsttopic已被刪除
# ./bin/kafka-topics.sh --zookeeper localhost:2181 --list
__consumer_offsets
mysecondtopic

通過producer生產消息

# ./bin/kafka-console-producer.sh  --broker-list 192.168.0.29:9092 --topic mysecondtopic                                                
>hello kafka!
>hello world!
>just a test!
>
>hi world!
>hahahaha!
>

通過consumer消費消息

# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.29:9092 --topic mysecondtopic --from-beginning       
hello kafka!
just a test!
hi world!
hello world!

hahahaha!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM