kafka服務器搭建與測試


kafka服務器搭建

  kafka自身集群信息的管理以及對producer和consumer的信息管理是通過zookeeper實現的,zookeepr是kafka不可分割的一部分,所以搭建zookeeper服務器是搭建kafka集群環境不可或缺的一部分。zookeeper集群的搭建在網上有很多相關資料,本文不在詳細描述,也可參照我自己編寫的zookeeper 安裝的三種模式搭建zookeeper集群。本文所寫的kafka服務搭建方法適用於Ubuntu系統和MacOS系統(親測)。

  集群搭建軟件環境:

  • JDK: java version 1.8.0_121
  • zookeeper:zookeeper-3.4.9
  • kafka: kafka-2.10-0.10.2.0

  首先,登錄官方網站下載搭建環境所需的壓縮包kafka-2.10-0.10.2.0.tgz:

http://kafka.apache.org/downloads

  然后將該包解壓到你需要安裝的目錄下:

tar -zxvf kafka-2.10-0.10.2.0.tgz
  • 單機模型

  kafka單機模型指的是只在一台服務器上部署一個kafka實例(broker)。首先進入配置文件目錄下編寫broker配置文件。kafka解壓包中的config目錄下已經有了一份現成的broker配置文件server.properties,我們將其中的部分內容進行修改已達到自己想要的狀態。首先進入config目錄下,然后使用vim命令打開server.properties文件:

vim config/server.properties

  本文主要修改以下參數:

  • broker.id  該參數是kafka集群中每一個broker的唯一標識,用int型表示;
  • delete.topic.enable: 該選項用來表示對topic進行刪除操作時是否立刻生效,如果設置為true則立刻生效。默認該選項被注銷;
  • host.name: 配置broker的IP地址,即該台服務器的IP地址;
  • port:配置該broker與客戶端、與其他broker、與zookeeper通信的端口地址;
  • log.dirs: 配置broker日志存儲地址;
  • zookeeper.connect: broker需要向zookeeper注冊自己的信息,以便整個集群知道自己的存在;
  • zookeeper.connection.timeout.ms:與zookeeper連接時間,如果在該時間內沒有與zookeeper連接成功,則該broker就不會被添加到kafka集群中。

  其他配置參考暫時按照默認數據配置,不做任何更改,修改完成后的配置文件如下(去注釋版):

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# Switch to enable topic deletion or not, default value is false
#delete.topic.enable=true

# Hostname and port the broker will advertise to producers and consumers. If not set

host.name=127.0.0.1
port=9092

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/Users/wangchaohui/DevelopEnvironment/kafka_2.10-0.10.2.0/Data/log

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1



############################# Log Retention Policy #############################


# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

  至此,kafka單機版環境搭建完成。下面趕快運行嘗鮮吧。

  首先啟動zookeeper集群,從配置文件中可以看出,本文只配置了一台zookeeper服務器。首先進入zookeeper的bin目錄下,然后執行啟動命令:

  然后啟動kafka服務器,kafka服務器啟動命令需要添加broker配置文件作為參數:

cd ~/DevelopEnvironment/kafka_2.10-0.10.2.0/bin
./kafka-server-start.sh ../config/server.properties 

  當看到終端輸出“[Kafka Server 0], started (kafka.server.KafkaServer)”等相關信息時,說明kafka集群啟動成功。

  下面我們創建一個topic,該topic名稱為“my-first-topic”,對應的partitions和replication都為1,這兩個參數的含義請參見kafka基礎概念,這里不再詳細介紹。在bin目錄下執行如下命令:

./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic my-first-topic --partitions 1 --replication-factor 1

  此時我們看見終端打印出如下信息:Created topic "my-first-topic". 說明我們的topic創建成功。現在可以通過describe參數查看我們創建的topic:

 

  下面對打印出來的內容做個粗略解釋:

  • Topic:topic 名稱;
  • PartitionCount:該topic的partition個數;
  • ReplicationFactor:每個partition的冗余個數(包括自身).

  接下來的五個參數一起解釋:my-first-topic(Topic)只有一個Partition為Partiton 0,所以他的所有備份也就只有一個在broker 0上(Replicas:0),所以partition 0對應的loader也就在broker 0上(Leader:0),當前存儲partition 0 有效的broker 只有 broker 0。 

  下面我們通過kafka自帶producer客戶端腳本文件向剛創建的topic發送信息:

 ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-first-topic
This is test message : 1
This is test message : 2
This is test message : 3

  然后我們運用kafka自帶的consumer客戶端腳本文件從broker 0 的 my-first-topic 中獲取剛才發送的三條消息,命令如下:

./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-first-topic --from-beginning

  當執行上面命令后終端打印出如下信息:

This is test message : 1
This is test message : 2
This is test message : 3

  恰好是我們剛剛發送的三條信息,說明以上操作完全成功^_^

  停止集群命令:

./kafka-server-stop.sh 
  • kafka偽集群模型

  在實際的生產過程中,對kafka的應用一般不會只使用一台服務器,都是通過多台服務器搭建kafka集群環境,這樣才能體現出kafka的易擴展性、高吞吐量和安全性。由於本文章只是為了學習使用,而且我也只有一個PC機,沒有集群環境,所以我么可以搭建一個kafka偽集群模型來模擬kafka集群模型。

  前面我們說過,kafka啟動命令中需要攜帶broker配置文件作為參數,而broker也就代表着kafka集群中的每台服務器,所以我們要模擬一個有N台服務器的kafka集群,就需要編寫N個broker配置文件。由於是在一個機器上模擬多台broker,所以每個broker的host.name 均相同,但port一定不能一樣,否則就會后面啟動的broker就會因為端口占用而不會正確啟動。

  下面分別為另外兩個broker的配置文件server-1.properties和server-2.properties

  server-1.properties:

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

# Switch to enable topic deletion or not, default value is false
#delete.topic.enable=true



# Hostname and port the broker will advertise to producers and consumers. If not set

host.name=127.0.0.1
port=9093

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/Users/wangchaohui/DevelopEnvironment/kafka_2.10-0.10.2.0/Data/log-1

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1



############################# Log Retention Policy #############################


# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

  server-2.properties:

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2

# Switch to enable topic deletion or not, default value is false
#delete.topic.enable=true



# Hostname and port the broker will advertise to producers and consumers. If not set

host.name=127.0.0.1
port=9094

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/Users/wangchaohui/DevelopEnvironment/kafka_2.10-0.10.2.0/Data/log-2

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1



############################# Log Retention Policy #############################


# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

  需要注意的是三個配置文件不同的地方:

  1.  broker.id
  2.  port
  3.  log.dirs

  下面分別啟動啟動三個broker,請確保在啟動broker之前zookeeper集群一定是成功運行的狀態。啟動命令如下:

./kafka-server-start.sh ../config/server.properties
./kafka-server-start.sh ../config/server-1.properties
./kafka-server-start.sh ../config/server-2.properties

  從三個broker的啟動信息當中我們可以看到如下信息:

  其中的127.0.0.1,9092、127.0.0.1,9093、127.0.0.1,9094說明我們的配置文件起到了作用^_^,並且三個broker都啟動成功。

  下面我們在偽集群中創建新的topic:my-second-topic,並設置partitions為3,replication-factor為2,命令如下:

./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic my-second-topic --partitions 3 --replication-factor 2

  執行完命令后,終端顯示“Created topic "my-second-topic".” 說明topic創建成功啦。

  同樣,我們使用describe命令查看我們創建的topic:

./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic my-second-topic

  終端顯示如下:

  各個參數的含義上面已經解釋過啦,而且也十分容易看懂,這里就不在重復描述啦。

  下面我們向my-second-topic 發送新的message:

./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-second-topic
my new test message 1
my new test message 2
my new test message 3

  然后,我們從consumer端獲取剛才發送的三條新消息:

./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-second-topic --from-beginning

  執行完命令后,我們可以看到終端打印出來了剛才發送的三條消息:

my new test message 1
my new test message 2
my new test message 3

  此時說明我們搭建的集群已經可以發送和接受消息啦。

  kafka集群通過replicate方式來保證集群的安全性,哪怕只有一個broker存活,整個kafka集群系統也能正常運行(前提是你需要的topic在存活的那個broker上),那么下面我們就測試一下 集群的抗擊打能力。

  上面我們已經通過describe命令查看了my-second-topic狀態,這里我們在重復執行一次,以便與下面做對比,命令不在重復,執行后的結果如下:

  由於我們剛才向broker 0上發送了消息,所以剛才接受消息的一定是 partition 2。下面我們kill掉broker 0 服務器對應的進程

ps aux | grep server.properties

wangchaohui       3310   1.1  6.9  6279396 1161100 s000  S+    4:24下午   1:03.88 /usr/bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC

  然后根據進程號殺死該進程:

kill -9 3310

  執行完成后可以看見broker 1,broker 2 對應的終端打印出如下信息:

java.io.IOException: Connection to 0 was disconnected before the response was read
    at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:114)
    at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112)
    at scala.Option.foreach(Option.scala:236)
    at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112)
    at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108)
    at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:136)
    at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:142)
    at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
    at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:249)
    at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234)
    at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2017-03-20 17:09:40,278] WARN [ReplicaFetcherThread-0-0], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@3e2b9697 (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 127.0.0.1:9092 (id: 0 rack: null) failed
    at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
    at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
    at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
    at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234)
    at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2017-03-20 17:09:42,288] WARN [ReplicaFetcherThread-0-0], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@7ee0ff45 (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 127.0.0.1:9092 (id: 0 rack: null) failed
    at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
    at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
    at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
    at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234)
    at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2017-03-20 17:09:44,011] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2017-03-20 17:09:44,012] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2017-03-20 17:09:44,012] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2017-03-20 17:09:44,292] WARN [ReplicaFetcherThread-0-0], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@3f48dc67 (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 127.0.0.1:9092 (id: 0 rack: null) failed
    at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
    at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
    at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
    at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234)
    at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

  從waring信息可以看到連接broker 0 失敗。下面重新執行 topic 命令 describe,查看my-second-topic狀態如下:

  與剛才的topic狀態進行對比,發現partition1和partition2都受到了影響,對於partition1 和partition2 ,當前有效的broker只有 2 和 1 了,並且,partition2的leader由原來的0變成了1。

  那么,我們現在同樣向broker0發送message,看看會不會影響客戶端發送信息,在剛才的基礎上,我們在發送4,5,6三條消息:

./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-second-topic
my new test message 1
my new test message 2
my new test message 3
my new test message 4
my new test message 5
my new test message 6

  此時客戶端仍然可以成功接受到信息:

wangchahuideMBP:bin wangchaohui$ ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-second-topic --from-beginning
my new test message 1
my new test message 2
my new test message 3
[2017-03-20 17:09:46,804] WARN Auto-commit of offsets {my-second-topic-2=OffsetAndMetadata{offset=1, metadata=''}, my-second-topic-1=OffsetAndMetadata{offset=1, metadata=''}, my-second-topic-0=OffsetAndMetadata{offset=1, metadata=''}} failed for group console-consumer-36818: Offset commit failed with a retriable exception. You should retry committing offsets. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
my new test message 4
my new test message 5
my new test message 6

   那么我們接着kill掉broker 1,看看還能不能發送信息:

ps aux | grep server-1.properties
......
kill -9 3539

  再次查看my-second-topic狀態如下:

  此時我們發現partition 0的leader由原來的1變成了2,可是我不明白的是為什么partition2的leader還是broker 1 ,而且有效broker 竟然是1......

  下面繼續發送消息7,8,9:

./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-second-topic
my new test message 1
my new test message 2
my new test message 3
my new test message 4
my new test message 5
my new test message 6
my new test message 7
my new test message 8
my new test message 9

 此時consumer依然接受到消息:

/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-second-topic --from-beginning
my new test message 1
my new test message 2
my new test message 3
[2017-03-20 17:09:46,804] WARN Auto-commit of offsets {my-second-topic-2=OffsetAndMetadata{offset=1, metadata=''}, my-second-topic-1=OffsetAndMetadata{offset=1, metadata=''}, my-second-topic-0=OffsetAndMetadata{offset=1, metadata=''}} failed for group console-consumer-36818: Offset commit failed with a retriable exception. You should retry committing offsets. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
my new test message 4
my new test message 5
my new test message 6
[2017-03-20 17:26:48,011] WARN Auto-commit of offsets {my-second-topic-2=OffsetAndMetadata{offset=2, metadata=''}, my-second-topic-1=OffsetAndMetadata{offset=2, metadata=''}, my-second-topic-0=OffsetAndMetadata{offset=2, metadata=''}} failed for group console-consumer-36818: Offset commit failed with a retriable exception. You should retry committing offsets. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2017-03-20 17:26:53,016] WARN Auto-commit of offsets {my-second-topic-2=OffsetAndMetadata{offset=2, metadata=''}, my-second-topic-1=OffsetAndMetadata{offset=2, metadata=''}, my-second-topic-0=OffsetAndMetadata{offset=2, metadata=''}} failed for group console-consumer-36818: Offset commit failed with a retriable exception. You should retry committing offsets. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
my new test message 7
my new test message 8
my new test message 9

  這個實驗說明了kafka集群的可靠性。果然只有一個broker存活,客戶端不需要修改任何參數依然可以正常發送和接受消息。

參考文獻

 

  

 

  

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM