Kafka在Linux上安裝部署及樣例測試


Kafka是一個分布式的、可分區的、可復制的消息系統。它提供了普通消息系統的功能,但具有自己獨特的設計。這個獨特的設計是什么樣的呢

 

介紹

Kafka是一個分布式的、可分區的、可復制的消息系統。它提供了普通消息系統的功能,但具有自己獨特的設計。這個獨特的設計是什么樣的呢?

首先讓我們看幾個基本的消息系統術語:

•Kafka將消息以topic為單位進行歸納。
•將向Kafka topic發布消息的程序成為producers.
•將預訂topics並消費消息的程序成為consumer.
•Kafka以集群的方式運行,可以由一個或多個服務組成,每個服務叫做一個broker.
producers通過網絡將消息發送到Kafka集群,集群向消費者提供消息,如下圖所示:

 

客戶端和服務端通過TCP協議通信。Kafka提供了Java客戶端,並且對多種語言都提供了支持。

說明:

操作系統:CentOS 6.x 64位

Kafka版本:kafka_2.11-2.1.0

實現目的:

單機安裝配置kafka

具體操作:

一、關閉防火牆

關閉: service iptables stop
禁用: chkconfig iptables off

二、安裝JDK

kafka運行需要JDK支持

1、下載JDK

http://download.oracle.com/otn-pub/java/jdk/7u79-b15/jdk-7u79-linux-x64.rpm

注意:直接復制到下載工具進行下載,版本請使用JDK7

下載完成之后,上傳到/usr/local/src目錄下

2、安裝JDK

cd /usr/local/src

chmod +x jdk-7u79-linux-x64.rpm # 添加執行權限

rpm -ivh jdk-7u79-linux-x64.rpm #安裝

安裝完成之后,可以cd /usr/java/ 到安裝目錄查看

3、添加JDK到系統環境變量

vi /etc/profile #編輯,在最后添加以下代碼

JAVA_HOME=/usr/java/jdk1.7.0_79

PATH=$PATH:$JAVA_HOME/bin:/usr/bin:/usr/sbin:/bin:/sbin:/usr/X11R6/bin

CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar

export JAVA_HOME

export PATH

export CLASSPATH

:wq! #保存退出

source /etc/profile #使配置文件立即生效

java -version #查看JDK版本信息

到此,JDK安裝完成。

三、安裝kafka

1、下載kafka

cd /usr/local/src

wget http://59.80.44.99/archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz

注意,kafka_2.12-2.1.0.tgz版本是已經編譯好的版本,解壓就能使用。

tar -xzvf kafka_2.12-2.1.0.tgz #解壓

mv kafka_2.12-2.1.0 /usr/local/kafka #移動到安裝目錄

2、配置kafka

mkdir /usr/local/kafka/log/kafka #創建kafka日志目錄

cd /usr/local/kafka/config #進入配置目錄

vi server.properties #編輯修改相應的參數 (參考配置說明地址:http://blog.csdn.net/lizhitao/article/details/25667831)

broker.id=0

port=9092 #端口號

host.name=192.168.5.56 #服務器IP地址,修改為自己的服務器IP

log.dirs=/usr/local/kafka/log/kafka #日志存放路徑,上面創建的目錄

log.cleaner.enable=false     #是否開啟日志清理

zookeeper.connect=192.168.5.56:2181 #zookeeper地址和端口

:wq! #保存退出

3、配置zookeeper

mkdir /usr/local/kafka/zookeeper #創建zookeeper目錄

mkdir /usr/local/kafka/log/zookeeper #創建zookeeper日志目錄

cd /usr/local/kafka/config #進入配置目錄

vi zookeeper.properties #編輯修改相應的參數

dataDir=/usr/local/kafka/zookeeper #zookeeper數據目錄

dataLogDir=/usr/local/kafka/log/zookeeper #zookeeper日志目錄

clientPort=2181

maxClientCnxns=100

tickTime=2000

initLimit=10

syncLimit=5

:wq! #保存退出

四、創建啟動、關閉kafka腳本

cd /usr/local/kafka

#創建啟動腳本

vi kafkastart.sh #編輯,添加以下代碼

#!/bin/sh

#啟動zookeeper

/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &

sleep 3 #等3秒后執行

#啟動kafka

/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

:wq! #保存退出

#創建關閉腳本

vi kafkastop.sh #編輯,添加以下代碼

#!/bin/sh

#關閉zookeeper

/usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/config/zookeeper.properties &

sleep 3 #等3秒后執行

#關閉kafka

/usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties &

:wq! #保存退出

#添加腳本執行權限

chmod +x kafkastart.sh

chmod +x kafkastop.sh

五、設置腳本開機自動執行

vi /etc/rc.d/rc.local #編輯,在最后添加一行

sh /usr/local/kafka/kafkastart.sh & #設置開機自動在后台運行腳本

:wq! #保存退出

sh /usr/local/kafka/kafkastart.sh #啟動kafka

sh /usr/local/kafka/kafkastop.sh #關閉kafka

至此,Linux下Kafka單機安裝配置完成。

 

六、kafka樣例測試

首先要按照上述 啟動kafka

1、單節點 - 單代理配置

在此配置中,您有一個ZooKeeper和代理id實例。 以下是配置它的步驟 -

創建Kafka主題 - Kafka提供了一個名為 kafka-topics.sh 的命令行實用程序,用於在服務器上創建主題。 打開新終端並鍵入以下示例。

語法

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-name

示例

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka

我們剛剛創建了一個名為 Hello-Kafka 的主題,其中包含一個分區和一個副本因子。 上面創建的輸出將類似於以下輸出 -

輸出 - 創建主題 Hello-Kafka 

創建主題后,您可以在Kafka代理終端窗口中獲取通知,並在config / server.properties文件中的“/ tmp / kafka-logs /"中指定的創建主題的日志。

主題列表

要獲取Kafka服務器中的主題列表,可以使用以下命令 -

語法

bin/kafka-topics.sh --list --zookeeper localhost:2181

輸出

Hello-Kafka

由於我們已經創建了一個主題,它將僅列出 Hello-Kafka 。 假設,如果創建多個主題,您將在輸出中獲取主題名稱。

啟動生產者以發送消息

語法

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

從上面的語法,生產者命令行客戶端需要兩個主要參數 -

代理列表 - 我們要發送郵件的代理列表。 在這種情況下,我們只有一個代理。 Config / server.properties文件包含代理端口ID,因為我們知道我們的代理正在偵聽端口9092,因此您可以直接指定它。localhost要換成服務器ip。

主題名稱 - 以下是主題名稱的示例。

示例

bin/kafka-console-producer.sh --broker-list 10.1.2.3:9092 --topic Hello-Kafka

生產者將等待來自stdin的輸入並發布到Kafka集群。 默認情況下,每個新行都作為新消息發布,然后在 config / producer.properties 文件中指定默認生產者屬性。 現在,您可以在終端中鍵入幾行消息,如下所示。

輸出

$ bin/kafka-console-producer.sh --broker-list 10.1.1.3:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

啟動消費者以接收消息

與生產者類似,在 config / consumer.proper-ties 文件中指定了缺省使用者屬性。 打開一個新終端並鍵入以下消息消息語法。注意localhost需要改為服務器ip

語法

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-name --from-beginning

示例

bin/kafka-console-consumer.sh --bootstrap-server 10.1.1.3:9092 --topic Hello-Kafka --from-beginning

如果想實時顯示的話則去掉后面的   --from-beginning

輸出

Hello
My first message
My second message

最后,您可以從制作商的終端輸入消息,並看到他們出現在消費者的終端。 到目前為止,您對具有單個代理的單節點群集有非常好的了解。 現在讓我們繼續討論多個代理配置。

2、單節點多代理配置

在單服務器上模擬kafka集群模型可參考  http://www.bubuko.com/infodetail-1989010.html。

在進入多個代理集群設置之前,首先啟動ZooKeeper服務器。

創建多個Kafka Brokers - 我們在配置/ server.properties中已有一個Kafka代理實例。 現在我們需要多個代理實例,因此將現有的server.prop-erties文件復制到兩個新的配置文件中,並將其重命名為server-one.properties和server-two.properties。 然后編輯這兩個新文件並分配以下更改 -

config / server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config / server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

啟動多個代理 - 在三台服務器上進行所有更改后,打開三個新終端,逐個啟動每個代理。

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

現在我們有三個不同的經紀人在機器上運行。 自己嘗試,通過在ZooKeeper終端上鍵入 jps 檢查所有守護程序,然后您將看到響應。

創建主題

讓我們為此主題將復制因子值指定為三個,因為我們有三個不同的代理運行。 如果您有兩個代理,那么分配的副本值將是兩個。

語法

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic topic-name

示例

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic Multibrokerapplication

輸出

created topic "Multibrokerapplication"

 Describe 命令用於檢查哪個代理正在偵聽當前創建的主題,如下所示 -

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerappli-cation

輸出

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

從上面的輸出,我們可以得出結論,第一行給出所有分區的摘要,顯示主題名稱,分區數量和我們已經選擇的復制因子。 在第二行中,每個節點將是分區的隨機選擇部分的領導者。

在我們的例子中,我們看到我們的第一個broker(with broker.id 0)是領導者。 然后Replicas:0,2,1意味着所有代理復制主題最后 Isr  in-sync 副本的集合。 那么,這是副本的子集,當前活着並被領導者趕上。

啟動生產者以發送消息

此過程保持與單代理設置中相同。

示例

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication

輸出

bin/kafka-console-producer.sh --broker-list 10.1.1.3:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

啟動消費者以接收消息

此過程保持與單代理設置中所示的相同。

語法

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Multibrokerapplica-tion --from-beginning

輸出

bin/kafka-console-consumer.sh --bootstrap-server 10.1.1.3:9092 --topic Multibrokerapplica-tion —from-beginning 
This
is single node-multi broker demo
This
is the second message

基本主題操作

在本章中,我們將討論各種基本主題操作。

修改主題

您已經了解如何在Kafka Cluster中創建主題。 現在讓我們使用以下命令修改已創建的主題

語法

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name --parti-tions count

示例

We have already created a topic “Hello-Kafka" with single partition count and one replica factor. 
Now using “alter" command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181 
--alter --topic Hello-kafka --parti-tions 2

輸出

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

刪除主題

要刪除主題,可以使用以下語法。

語法

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

示例

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

輸出

> Topic Hello-kafka marked for deletion

注意 - 如果 delete.topic.enable 未設置為true,則此操作不會產生任何影響

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM