Kafka是一個分布式的、可分區的、可復制的消息系統。它提供了普通消息系統的功能,但具有自己獨特的設計。這個獨特的設計是什么樣的呢
介紹
Kafka是一個分布式的、可分區的、可復制的消息系統。它提供了普通消息系統的功能,但具有自己獨特的設計。這個獨特的設計是什么樣的呢?
首先讓我們看幾個基本的消息系統術語:
•Kafka將消息以topic為單位進行歸納。
•將向Kafka topic發布消息的程序成為producers.
•將預訂topics並消費消息的程序成為consumer.
•Kafka以集群的方式運行,可以由一個或多個服務組成,每個服務叫做一個broker.
producers通過網絡將消息發送到Kafka集群,集群向消費者提供消息,如下圖所示:
客戶端和服務端通過TCP協議通信。Kafka提供了Java客戶端,並且對多種語言都提供了支持。
說明:
操作系統:CentOS 6.x 64位
Kafka版本:kafka_2.11-2.1.0
實現目的:
單機安裝配置kafka
具體操作:
一、關閉防火牆
關閉: service iptables stop
禁用: chkconfig iptables off
二、安裝JDK
kafka運行需要JDK支持
1、下載JDK
http://download.oracle.com/otn-pub/java/jdk/7u79-b15/jdk-7u79-linux-x64.rpm
注意:直接復制到下載工具進行下載,版本請使用JDK7
下載完成之后,上傳到/usr/local/src目錄下
2、安裝JDK
cd /usr/local/src
chmod +x jdk-7u79-linux-x64.rpm # 添加執行權限
rpm -ivh jdk-7u79-linux-x64.rpm #安裝
安裝完成之后,可以cd /usr/java/ 到安裝目錄查看
3、添加JDK到系統環境變量
vi /etc/profile #編輯,在最后添加以下代碼
JAVA_HOME=/usr/java/jdk1.7.0_79
PATH=$PATH:$JAVA_HOME/bin:/usr/bin:/usr/sbin:/bin:/sbin:/usr/X11R6/bin
CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
export JAVA_HOME
export PATH
export CLASSPATH
:wq! #保存退出
source /etc/profile #使配置文件立即生效
java -version #查看JDK版本信息
到此,JDK安裝完成。
三、安裝kafka
1、下載kafka
cd /usr/local/src
wget http://59.80.44.99/archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
注意,kafka_2.12-2.1.0.tgz版本是已經編譯好的版本,解壓就能使用。
tar -xzvf kafka_2.12-2.1.0.tgz #解壓
mv kafka_2.12-2.1.0 /usr/local/kafka #移動到安裝目錄
2、配置kafka
mkdir /usr/local/kafka/log/kafka #創建kafka日志目錄
cd /usr/local/kafka/config #進入配置目錄
vi server.properties #編輯修改相應的參數 (參考配置說明地址:http://blog.csdn.net/lizhitao/article/details/25667831)
broker.id=0
port=9092 #端口號
host.name=192.168.5.56 #服務器IP地址,修改為自己的服務器IP
log.dirs=/usr/local/kafka/log/kafka #日志存放路徑,上面創建的目錄
log.cleaner.enable=false #是否開啟日志清理
zookeeper.connect=192.168.5.56:2181 #zookeeper地址和端口
:wq! #保存退出
3、配置zookeeper
mkdir /usr/local/kafka/zookeeper #創建zookeeper目錄
mkdir /usr/local/kafka/log/zookeeper #創建zookeeper日志目錄
cd /usr/local/kafka/config #進入配置目錄
vi zookeeper.properties #編輯修改相應的參數
dataDir=/usr/local/kafka/zookeeper #zookeeper數據目錄
dataLogDir=/usr/local/kafka/log/zookeeper #zookeeper日志目錄
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10
syncLimit=5
:wq! #保存退出
四、創建啟動、關閉kafka腳本
cd /usr/local/kafka
#創建啟動腳本
vi kafkastart.sh #編輯,添加以下代碼
#!/bin/sh
#啟動zookeeper
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
sleep 3 #等3秒后執行
#啟動kafka
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
:wq! #保存退出
#創建關閉腳本
vi kafkastop.sh #編輯,添加以下代碼
#!/bin/sh
#關閉zookeeper
/usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/config/zookeeper.properties &
sleep 3 #等3秒后執行
#關閉kafka
/usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties &
:wq! #保存退出
#添加腳本執行權限
chmod +x kafkastart.sh
chmod +x kafkastop.sh
五、設置腳本開機自動執行
vi /etc/rc.d/rc.local #編輯,在最后添加一行
sh /usr/local/kafka/kafkastart.sh & #設置開機自動在后台運行腳本
:wq! #保存退出
sh /usr/local/kafka/kafkastart.sh #啟動kafka
sh /usr/local/kafka/kafkastop.sh #關閉kafka
至此,Linux下Kafka單機安裝配置完成。
六、kafka樣例測試
首先要按照上述 啟動kafka
1、單節點 - 單代理配置
在此配置中,您有一個ZooKeeper和代理id實例。 以下是配置它的步驟 -
創建Kafka主題 - Kafka提供了一個名為 kafka-topics.sh
的命令行實用程序,用於在服務器上創建主題。 打開新終端並鍵入以下示例。
語法
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-name
示例
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka
我們剛剛創建了一個名為 Hello-Kafka
的主題,其中包含一個分區和一個副本因子。 上面創建的輸出將類似於以下輸出 -
輸出 - 創建主題 Hello-Kafka
創建主題后,您可以在Kafka代理終端窗口中獲取通知,並在config / server.properties文件中的“/ tmp / kafka-logs /"中指定的創建主題的日志。
主題列表
要獲取Kafka服務器中的主題列表,可以使用以下命令 -
語法
bin/kafka-topics.sh --list --zookeeper localhost:2181
輸出
Hello-Kafka
由於我們已經創建了一個主題,它將僅列出 Hello-Kafka
。 假設,如果創建多個主題,您將在輸出中獲取主題名稱。
啟動生產者以發送消息
語法
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
從上面的語法,生產者命令行客戶端需要兩個主要參數 -
代理列表 - 我們要發送郵件的代理列表。 在這種情況下,我們只有一個代理。 Config / server.properties文件包含代理端口ID,因為我們知道我們的代理正在偵聽端口9092,因此您可以直接指定它。localhost要換成服務器ip。
主題名稱 - 以下是主題名稱的示例。
示例
bin/kafka-console-producer.sh --broker-list 10.1.2.3:9092 --topic Hello-Kafka
生產者將等待來自stdin的輸入並發布到Kafka集群。 默認情況下,每個新行都作為新消息發布,然后在 config / producer.properties
文件中指定默認生產者屬性。 現在,您可以在終端中鍵入幾行消息,如下所示。
輸出
$ bin/kafka-console-producer.sh --broker-list 10.1.1.3:9092 --topic Hello-Kafka[2016-01-16 13:50:45,931] WARN property topic is not valid (kafka.utils.Verifia-bleProperties) Hello My first message My second message
啟動消費者以接收消息
與生產者類似,在 config / consumer.proper-ties
文件中指定了缺省使用者屬性。 打開一個新終端並鍵入以下消息消息語法。注意localhost需要改為服務器ip
語法
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-name --from-beginning
示例
bin/kafka-console-consumer.sh --bootstrap-server 10.1.1.3:9092 --topic Hello-Kafka --from-beginning
如果想實時顯示的話則去掉后面的 --from-beginning
輸出
Hello
My first message
My second message
最后,您可以從制作商的終端輸入消息,並看到他們出現在消費者的終端。 到目前為止,您對具有單個代理的單節點群集有非常好的了解。 現在讓我們繼續討論多個代理配置。
2、單節點多代理配置
在單服務器上模擬kafka集群模型可參考 http://www.bubuko.com/infodetail-1989010.html。
在進入多個代理集群設置之前,首先啟動ZooKeeper服務器。
創建多個Kafka Brokers - 我們在配置/ server.properties中已有一個Kafka代理實例。 現在我們需要多個代理實例,因此將現有的server.prop-erties文件復制到兩個新的配置文件中,並將其重命名為server-one.properties和server-two.properties。 然后編輯這兩個新文件並分配以下更改 -
config / server-one.properties
# The id of the broker. This must be set to a unique integer for each broker. broker.id=1 # The port the socket server listens on port=9093 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-1
config / server-two.properties
# The id of the broker. This must be set to a unique integer for each broker. broker.id=2 # The port the socket server listens on port=9094 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-2
啟動多個代理 - 在三台服務器上進行所有更改后,打開三個新終端,逐個啟動每個代理。
Broker1 bin/kafka-server-start.sh config/server.properties Broker2 bin/kafka-server-start.sh config/server-one.properties Broker3 bin/kafka-server-start.sh config/server-two.properties
現在我們有三個不同的經紀人在機器上運行。 自己嘗試,通過在ZooKeeper終端上鍵入 jps 檢查所有守護程序,然后您將看到響應。
創建主題
讓我們為此主題將復制因子值指定為三個,因為我們有三個不同的代理運行。 如果您有兩個代理,那么分配的副本值將是兩個。
語法
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic topic-name
示例
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic Multibrokerapplication
輸出
created topic "Multibrokerapplication"
Describe
命令用於檢查哪個代理正在偵聽當前創建的主題,如下所示 -
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerappli-cation
輸出
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerappli-cation Topic:Multibrokerapplication PartitionCount:1 ReplicationFactor:3 Configs: Topic:Multibrokerapplication Partition:0 Leader:0 Replicas:0,2,1 Isr:0,2,1
從上面的輸出,我們可以得出結論,第一行給出所有分區的摘要,顯示主題名稱,分區數量和我們已經選擇的復制因子。 在第二行中,每個節點將是分區的隨機選擇部分的領導者。
在我們的例子中,我們看到我們的第一個broker(with broker.id 0)是領導者。 然后Replicas:0,2,1意味着所有代理復制主題最后 Isr
是 in-sync
副本的集合。 那么,這是副本的子集,當前活着並被領導者趕上。
啟動生產者以發送消息
此過程保持與單代理設置中相同。
示例
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
輸出
bin/kafka-console-producer.sh --broker-list 10.1.1.3:9092 --topic Multibrokerapplication [2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties) This is single node-multi broker demo This is the second message
啟動消費者以接收消息
此過程保持與單代理設置中所示的相同。
語法
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Multibrokerapplica-tion --from-beginning
輸出
bin/kafka-console-consumer.sh --bootstrap-server 10.1.1.3:9092 --topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message
基本主題操作
在本章中,我們將討論各種基本主題操作。
修改主題
您已經了解如何在Kafka Cluster中創建主題。 現在讓我們使用以下命令修改已創建的主題
語法
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name --parti-tions count
示例
We have already created a topic “Hello-Kafka" with single partition count and one replica factor. Now using “alter" command we have changed the partition count. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic Hello-kafka --parti-tions 2
輸出
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!
刪除主題
要刪除主題,可以使用以下語法。
語法
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
示例
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
輸出
> Topic Hello-kafka marked for deletion
注意 - 如果 delete.topic.enable 未設置為true,則此操作不會產生任何影響