一、Kafka的下載與解壓
http://kafka.apache.org/downloads.html下載kafka_2.11-1.1.1.tgz.gz並解壓到/home/jun下
[jun@master ~]$ cd kafka_2.11-1.1.1/ [jun@master kafka_2.11-1.1.1]$ ls -l total 48 drwxr-xr-x. 3 jun jun 4096 Jul 7 12:15 bin drwxr-xr-x. 2 jun jun 4096 Jul 7 12:15 config drwxr-xr-x. 2 jun jun 4096 Jul 23 16:32 libs -rw-r--r--. 1 jun jun 28824 Jul 7 12:12 LICENSE -rw-r--r--. 1 jun jun 336 Jul 7 12:12 NOTICE drwxr-xr-x. 2 jun jun 44 Jul 7 12:15 site-docs
二、配置Kafka集群
1.在Master節點上的配置
配置server.properties
[jun@master kafka_2.11-1.1.1]$ gedit /home/jun/kafka_2.11-1.1.1/config/server.properties
(1)在Server Basics部分增加下面的配置:將master作為broker,其id采用了默認的0
############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 host.name=master
listeners=PLAINTEXT://master:9092
(2)在Zookeeper部分增加下面的配置,Zookeeper作為協調器,它連接的節點包括了集群的所有計算機。
############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=master:2181,slave0:2181,slave1:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000
將Kafka文件復制Slave節點
[jun@master kafka_2.11-1.1.1]$ scp -r /home/jun/kafka_2.11-1.1.1/ slave0:~/ [jun@master kafka_2.11-1.1.1]$ scp -r /home/jun/kafka_2.11-1.1.1/ slave1:~/
2.在Slave節點上的配置
同理,將slave0的broker.id設置為1,host.name=slave0,listeners=PLAINTEXT://slave0:9092;將slave1的broker.id設置為2,host.name=slave1,listeners=PLAINTEXT://slave1:9092即可。
三、Kafka的應用
1.啟動Kafka自帶的Zookeeper服務
首先查看當前系統進程列表
[jun@master ~]$ jps 3070 Jps [jun@slave0 ~]$ jps 3141 Jps [jun@slave1 ~]$ jps 3114 Jps
啟動Kafka自帶的Zookeeper服務
[jun@master kafka_2.11-1.1.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties
[jun@slave0 kafka_2.11-1.1.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties
[jun@slave1 kafka_2.11-1.1.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties
另開一個終端,查看此時的系統進程,此時QuorumPeerMain就是Zookeeper服務器進程
[jun@master kafka_2.11-1.1.1]$ jps 3588 Jps 3147 QuorumPeerMain [jun@slave0 kafka_2.11-1.1.1]$ jps 3891 Jps 3547 QuorumPeerMain [jun@slave1 kafka_2.11-1.1.1]$ jps 3201 QuorumPeerMain 3538 Jps
2.啟動Kafka服務
[jun@master kafka_2.11-1.1.1]$ bin/kafka-server-start.sh config/server.properties [jun@slave0 kafka_2.11-1.1.1]$ bin/kafka-server-start.sh config/server.properties [jun@slave1 kafka_2.11-1.1.1]$ bin/kafka-server-start.sh config/server.properties
另開一個終端,查看此時的系統進程,此時的Kafka就是Kafka的服務器進程
[jun@master kafka_2.11-1.1.1]$ jps 4064 Jps 3687 Kafka 3147 QuorumPeerMain [jun@slave0 kafka_2.11-1.1.1]$ jps 4021 Kafka 3547 QuorumPeerMain 4398 Jps [jun@slave1 kafka_2.11-1.1.1]$ jps 3201 QuorumPeerMain 3682 Kafka 4059 Jps
3.創建主題
· 主題(Topic)是消息中間件的基本概念,相當於文件系統的目錄,其實就是用於保存消息內容的計算實體,通過主題名加以標識,就如同目錄通過目錄名標識一樣。
(1)在master節點上創建一個名稱為“test”的主題
[jun@master kafka_2.11-1.1.1]$ bin/kafka-topics.sh --create -zookeeper master:2181 --replication-factor 1 --partitions 1 --topic test Created topic "test".
(2)查看已經創建的消息主題,同樣,在其他Slave結點上執行該命令也可以查看到創建的主題
[jun@master kafka_2.11-1.1.1]$ bin/kafka-topics.sh --list --zookeeper master:2181 test
4.發送消息
消息中間件是一個用於接收消息並轉發消息的服務,為了檢驗Kafka是否能夠正常工作,需要創建一個消息生產者(producer)、利用它產生消息
[jun@master kafka_2.11-1.1.1]$ bin/kafka-console-producer.sh --broker-list master:9092 --topic test >say hi [2018-07-23 20:14:28,119] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {myfinaltest=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) >hi >you >cnm >
5.接收消息
在一個新終端上執行如下的命令,可以看到接收到了producer發送的消息
[jun@slave0 kafka_2.11-1.1.1]$ bin/kafka-console-consumer.sh --bootstrap-server slave0:9092 --topic test--from-beginning say hi hi you cnm [jun@slave1 kafka_2.11-1.1.1]$ bin/kafka-console-consumer.sh --bootstrap-server slave0:9092 --topic test--from-beginning say hi hi you cnm