kafka服務器搭建
kafka自身集群信息的管理以及對producer和consumer的信息管理是通過zookeeper實現的,zookeepr是kafka不可分割的一部分,所以搭建zookeeper服務器是搭建kafka集群環境不可或缺的一部分。zookeeper集群的搭建在網上有很多相關資料,本文不在詳細描述,也可參照我自己編寫的zookeeper 安裝的三種模式搭建zookeeper集群。本文所寫的kafka服務搭建方法適用於Ubuntu系統和MacOS系統(親測)。
集群搭建軟件環境:
- JDK: java version 1.8.0_121
- zookeeper:zookeeper-3.4.9
- kafka: kafka-2.10-0.10.2.0
首先,登錄官方網站下載搭建環境所需的壓縮包kafka-2.10-0.10.2.0.tgz:
http://kafka.apache.org/downloads
然后將該包解壓到你需要安裝的目錄下:
tar -zxvf kafka-2.10-0.10.2.0.tgz
- 單機模型
kafka單機模型指的是只在一台服務器上部署一個kafka實例(broker)。首先進入配置文件目錄下編寫broker配置文件。kafka解壓包中的config目錄下已經有了一份現成的broker配置文件server.properties,我們將其中的部分內容進行修改已達到自己想要的狀態。首先進入config目錄下,然后使用vim命令打開server.properties文件:
vim config/server.properties
本文主要修改以下參數:
- broker.id 該參數是kafka集群中每一個broker的唯一標識,用int型表示;
- delete.topic.enable: 該選項用來表示對topic進行刪除操作時是否立刻生效,如果設置為true則立刻生效。默認該選項被注銷;
- host.name: 配置broker的IP地址,即該台服務器的IP地址;
- port:配置該broker與客戶端、與其他broker、與zookeeper通信的端口地址;
- log.dirs: 配置broker日志存儲地址;
- zookeeper.connect: broker需要向zookeeper注冊自己的信息,以便整個集群知道自己的存在;
- zookeeper.connection.timeout.ms:與zookeeper連接時間,如果在該時間內沒有與zookeeper連接成功,則該broker就不會被添加到kafka集群中。
其他配置參考暫時按照默認數據配置,不做任何更改,修改完成后的配置文件如下(去注釋版):
# The id of the broker. This must be set to a unique integer for each broker. broker.id=0 # Switch to enable topic deletion or not, default value is false #delete.topic.enable=true # Hostname and port the broker will advertise to producers and consumers. If not set host.name=127.0.0.1 port=9092 # The number of threads handling network requests num.network.threads=3 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files log.dirs=/Users/wangchaohui/DevelopEnvironment/kafka_2.10-0.10.2.0/Data/log # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Log Retention Policy ############################# # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000
至此,kafka單機版環境搭建完成。下面趕快運行嘗鮮吧。
首先啟動zookeeper集群,從配置文件中可以看出,本文只配置了一台zookeeper服務器。首先進入zookeeper的bin目錄下,然后執行啟動命令:
然后啟動kafka服務器,kafka服務器啟動命令需要添加broker配置文件作為參數:
cd ~/DevelopEnvironment/kafka_2.10-0.10.2.0/bin ./kafka-server-start.sh ../config/server.properties
當看到終端輸出“[Kafka Server 0], started (kafka.server.KafkaServer)”等相關信息時,說明kafka集群啟動成功。
下面我們創建一個topic,該topic名稱為“my-first-topic”,對應的partitions和replication都為1,這兩個參數的含義請參見kafka基礎概念,這里不再詳細介紹。在bin目錄下執行如下命令:
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic my-first-topic --partitions 1 --replication-factor 1
此時我們看見終端打印出如下信息:Created topic "my-first-topic". 說明我們的topic創建成功。現在可以通過describe參數查看我們創建的topic:
下面對打印出來的內容做個粗略解釋:
- Topic:topic 名稱;
- PartitionCount:該topic的partition個數;
- ReplicationFactor:每個partition的冗余個數(包括自身).
接下來的五個參數一起解釋:my-first-topic(Topic)只有一個Partition為Partiton 0,所以他的所有備份也就只有一個在broker 0上(Replicas:0),所以partition 0對應的loader也就在broker 0上(Leader:0),當前存儲partition 0 有效的broker 只有 broker 0。
下面我們通過kafka自帶producer客戶端腳本文件向剛創建的topic發送信息:
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-first-topic This is test message : 1 This is test message : 2 This is test message : 3
然后我們運用kafka自帶的consumer客戶端腳本文件從broker 0 的 my-first-topic 中獲取剛才發送的三條消息,命令如下:
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-first-topic --from-beginning
當執行上面命令后終端打印出如下信息:
This is test message : 1 This is test message : 2 This is test message : 3
恰好是我們剛剛發送的三條信息,說明以上操作完全成功^_^
停止集群命令:
./kafka-server-stop.sh
- kafka偽集群模型
在實際的生產過程中,對kafka的應用一般不會只使用一台服務器,都是通過多台服務器搭建kafka集群環境,這樣才能體現出kafka的易擴展性、高吞吐量和安全性。由於本文章只是為了學習使用,而且我也只有一個PC機,沒有集群環境,所以我么可以搭建一個kafka偽集群模型來模擬kafka集群模型。
前面我們說過,kafka啟動命令中需要攜帶broker配置文件作為參數,而broker也就代表着kafka集群中的每台服務器,所以我們要模擬一個有N台服務器的kafka集群,就需要編寫N個broker配置文件。由於是在一個機器上模擬多台broker,所以每個broker的host.name 均相同,但port一定不能一樣,否則就會后面啟動的broker就會因為端口占用而不會正確啟動。
下面分別為另外兩個broker的配置文件server-1.properties和server-2.properties
server-1.properties:
# The id of the broker. This must be set to a unique integer for each broker. broker.id=1 # Switch to enable topic deletion or not, default value is false #delete.topic.enable=true # Hostname and port the broker will advertise to producers and consumers. If not set host.name=127.0.0.1 port=9093 # The number of threads handling network requests num.network.threads=3 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files log.dirs=/Users/wangchaohui/DevelopEnvironment/kafka_2.10-0.10.2.0/Data/log-1 # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Log Retention Policy ############################# # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000
server-2.properties:
# The id of the broker. This must be set to a unique integer for each broker. broker.id=2 # Switch to enable topic deletion or not, default value is false #delete.topic.enable=true # Hostname and port the broker will advertise to producers and consumers. If not set host.name=127.0.0.1 port=9094 # The number of threads handling network requests num.network.threads=3 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files log.dirs=/Users/wangchaohui/DevelopEnvironment/kafka_2.10-0.10.2.0/Data/log-2 # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Log Retention Policy ############################# # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000
需要注意的是三個配置文件不同的地方:
- broker.id
- port
- log.dirs
下面分別啟動啟動三個broker,請確保在啟動broker之前zookeeper集群一定是成功運行的狀態。啟動命令如下:
./kafka-server-start.sh ../config/server.properties ./kafka-server-start.sh ../config/server-1.properties ./kafka-server-start.sh ../config/server-2.properties
從三個broker的啟動信息當中我們可以看到如下信息:
其中的127.0.0.1,9092、127.0.0.1,9093、127.0.0.1,9094說明我們的配置文件起到了作用^_^,並且三個broker都啟動成功。
下面我們在偽集群中創建新的topic:my-second-topic,並設置partitions為3,replication-factor為2,命令如下:
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic my-second-topic --partitions 3 --replication-factor 2
執行完命令后,終端顯示“Created topic "my-second-topic".” 說明topic創建成功啦。
同樣,我們使用describe命令查看我們創建的topic:
./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic my-second-topic
終端顯示如下:
各個參數的含義上面已經解釋過啦,而且也十分容易看懂,這里就不在重復描述啦。
下面我們向my-second-topic 發送新的message:
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-second-topic my new test message 1 my new test message 2 my new test message 3
然后,我們從consumer端獲取剛才發送的三條新消息:
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-second-topic --from-beginning
執行完命令后,我們可以看到終端打印出來了剛才發送的三條消息:
my new test message 1 my new test message 2 my new test message 3
此時說明我們搭建的集群已經可以發送和接受消息啦。
kafka集群通過replicate方式來保證集群的安全性,哪怕只有一個broker存活,整個kafka集群系統也能正常運行(前提是你需要的topic在存活的那個broker上),那么下面我們就測試一下 集群的抗擊打能力。
上面我們已經通過describe命令查看了my-second-topic狀態,這里我們在重復執行一次,以便與下面做對比,命令不在重復,執行后的結果如下:
由於我們剛才向broker 0上發送了消息,所以剛才接受消息的一定是 partition 2。下面我們kill掉broker 0 服務器對應的進程
ps aux | grep server.properties
wangchaohui 3310 1.1 6.9 6279396 1161100 s000 S+ 4:24下午 1:03.88 /usr/bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC
然后根據進程號殺死該進程:
kill -9 3310
執行完成后可以看見broker 1,broker 2 對應的終端打印出如下信息:
java.io.IOException: Connection to 0 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:114) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112) at scala.Option.foreach(Option.scala:236) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:136) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:142) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:249) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2017-03-20 17:09:40,278] WARN [ReplicaFetcherThread-0-0], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@3e2b9697 (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 127.0.0.1:9092 (id: 0 rack: null) failed at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2017-03-20 17:09:42,288] WARN [ReplicaFetcherThread-0-0], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@7ee0ff45 (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 127.0.0.1:9092 (id: 0 rack: null) failed at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2017-03-20 17:09:44,011] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2017-03-20 17:09:44,012] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2017-03-20 17:09:44,012] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2017-03-20 17:09:44,292] WARN [ReplicaFetcherThread-0-0], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@3f48dc67 (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 127.0.0.1:9092 (id: 0 rack: null) failed at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
從waring信息可以看到連接broker 0 失敗。下面重新執行 topic 命令 describe,查看my-second-topic狀態如下:
與剛才的topic狀態進行對比,發現partition1和partition2都受到了影響,對於partition1 和partition2 ,當前有效的broker只有 2 和 1 了,並且,partition2的leader由原來的0變成了1。
那么,我們現在同樣向broker0發送message,看看會不會影響客戶端發送信息,在剛才的基礎上,我們在發送4,5,6三條消息:
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-second-topic my new test message 1 my new test message 2 my new test message 3 my new test message 4 my new test message 5 my new test message 6
此時客戶端仍然可以成功接受到信息:
wangchahuideMBP:bin wangchaohui$ ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-second-topic --from-beginning my new test message 1 my new test message 2 my new test message 3 [2017-03-20 17:09:46,804] WARN Auto-commit of offsets {my-second-topic-2=OffsetAndMetadata{offset=1, metadata=''}, my-second-topic-1=OffsetAndMetadata{offset=1, metadata=''}, my-second-topic-0=OffsetAndMetadata{offset=1, metadata=''}} failed for group console-consumer-36818: Offset commit failed with a retriable exception. You should retry committing offsets. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) my new test message 4 my new test message 5 my new test message 6
那么我們接着kill掉broker 1,看看還能不能發送信息:
ps aux | grep server-1.properties ...... kill -9 3539
再次查看my-second-topic狀態如下:
此時我們發現partition 0的leader由原來的1變成了2,可是我不明白的是為什么partition2的leader還是broker 1 ,而且有效broker 竟然是1......
下面繼續發送消息7,8,9:
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-second-topic my new test message 1 my new test message 2 my new test message 3 my new test message 4 my new test message 5 my new test message 6 my new test message 7 my new test message 8 my new test message 9
此時consumer依然接受到消息:
/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-second-topic --from-beginning my new test message 1 my new test message 2 my new test message 3 [2017-03-20 17:09:46,804] WARN Auto-commit of offsets {my-second-topic-2=OffsetAndMetadata{offset=1, metadata=''}, my-second-topic-1=OffsetAndMetadata{offset=1, metadata=''}, my-second-topic-0=OffsetAndMetadata{offset=1, metadata=''}} failed for group console-consumer-36818: Offset commit failed with a retriable exception. You should retry committing offsets. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) my new test message 4 my new test message 5 my new test message 6 [2017-03-20 17:26:48,011] WARN Auto-commit of offsets {my-second-topic-2=OffsetAndMetadata{offset=2, metadata=''}, my-second-topic-1=OffsetAndMetadata{offset=2, metadata=''}, my-second-topic-0=OffsetAndMetadata{offset=2, metadata=''}} failed for group console-consumer-36818: Offset commit failed with a retriable exception. You should retry committing offsets. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2017-03-20 17:26:53,016] WARN Auto-commit of offsets {my-second-topic-2=OffsetAndMetadata{offset=2, metadata=''}, my-second-topic-1=OffsetAndMetadata{offset=2, metadata=''}, my-second-topic-0=OffsetAndMetadata{offset=2, metadata=''}} failed for group console-consumer-36818: Offset commit failed with a retriable exception. You should retry committing offsets. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) my new test message 7 my new test message 8 my new test message 9
這個實驗說明了kafka集群的可靠性。果然只有一個broker存活,客戶端不需要修改任何參數依然可以正常發送和接受消息。
參考文獻