本課分2部分講解:
第一部分,講解Kafka的概念、架構和用例場景;
第二部分,講解Kafka的安裝和實戰。
由於時間關系,今天的課程只講到如何用官網的例子驗證Kafka的安裝是否成功。后續課程會接着講解如何集成Spark Streaming和Kafka。
一、Kafka的概念、架構和用例場景
http://kafka.apache.org/documentation.html#introdution
1、Kafka的概念
Apache Kafka是分布式發布-訂閱消息系統。它最初由LinkedIn公司開發,之后成為Apache項目的一部分。Kafka是一種快速、可擴展的、設計內在就是分布式的,分區的和可復制的提交日志服務。
什么是消息組件:
以帥哥和美女聊天為例,帥哥如何和美女交流呢?這中間通常想到的是微信、QQ、電話、郵件等通信媒介,這些通信媒介就是消息組件,帥哥把聊天信息發送給消息組件、消息組件將消息推送給美女,這就是常說的生產者、消費者模型。而且在發送信息時可以將內容進行分類,即所謂的Topic主題。Kafka就是這樣的通信組件,將不同對象組件粘合起來的紐帶,且是解耦合方式傳遞數據。
Apache Kafka與傳統消息系統相比,有以下不同的特點:
- 分布式系統,易於向外擴展;
- 在線低延遲,同時為發布和訂閱提供高吞吐量;
- 將消息存儲到磁盤,因此可以處理1天甚至1周前內容
2、Kafka的架構

Kafka既然具備消息系統的基本功能,那么就必然會有組成消息系統的組件:
Topic,Producer和Consumer。Kafka還有其特殊的Kafka Cluster組件。
Topic主題:
代表一種數據的類別或類型,工作、娛樂、生活有不同的Topic,生產者需要說明把說明數據分別放在那些Topic中,里面就是一個個小對象,並將數據數據推到Kafka,消費者獲取數據是pull的過程。一組相同類型的消息數據流。這些消息在Kafka會被分區存放,並且有多個副本,以防數據丟失。每個分區的消息是順序寫入的,並且不可改寫。

- Producer(生產者):把數據推到Kafka系統的任何對象。
- Kafka Cluster(Kafka集群):把推到Kafka系統的消息保存起來的一組服務器,也叫Broker。因為Kafka集群用到了Zookeeper作為底層支持框架,所以由一個選出的服務器作為Leader來處理所有消息的讀和寫的請求,其他服務器作為Follower接受Leader的廣播同步備份數據,以備災難恢復時用。
- Consumer(消費者):從Kafka系統訂閱消息的任何對象。
消費者可以有多個,並且某些消費者還可以組成Consumer Group。多個Consumer Group之間組成消息廣播的關系,所以各個Group可以拉相同的消息數據。在Consumer Group內部,各消費者之間對Consumer Group拉出來的消息數據是隊列先進先出的關系,某個消息數據只能給該Group的一個消費者使用。

數據傳輸基於kernel(內核)級別的(傳輸速度接近0拷貝-ZeroCopy)、沒有用戶空間的參與。Linux本身是軟件,軟件啟動時第一個啟動進程叫init,在init進程啟動后會進入用戶空間;例如:在分布式系統中,機器A上的應用程序需要讀取機器B上的Java服務數據,由於Java程序對應的JVM是用戶空間級別而且數據在磁盤上,A上應用程序讀取數據時會首先進入機器B上的內核空間再進入機器B的用戶空間,讀取用戶空間的數據后,數據再經過B機器上的內核空間分發到網絡中,機器A網卡接收到傳輸過來的數據后再將數據寫入A機器的內核空間,從而最終將數據傳輸給A的用戶空間進行處理。如下圖:

外部系統從Java程序中讀取數據,傳輸給內核空間並依賴網卡將數據寫入到網絡中,從而把數據傳輸出去。其實Java本身是內核的一層外衣,Java Socket編程,操作的各種數據都是在JVM的用戶空間中進行的。而Kafka操作數據是放在內核空間的,通常內核空間處理數據的速度比用戶空間快上萬倍,所以通過kafka可以實現高速讀、寫數據。
3、Kafka的用例場景
類似微信,手機和郵箱等等這樣大家熟悉的消息組件,Kafka也可以:
- 支持文字/圖片
- 可以存儲內容
- 分門別類
從內容消費的角度,Kafka把郵箱中的郵件看成是Topic。
二、Kafka的安裝和實戰
http://kafka.apache.org/documentation.html#quickstart
1、安裝和配置Zookeeper
Kafka集群模式需要提前安裝好Zookeeper。
- 提示:Kafka單例模式不需要安裝額外的Zookeeper,可以使用內置的Zookeeper。
- Kafka集群模式需要至少3台服務器。本課實戰用到的服務器Hostname:master,slave1,slave2。
- 本課中用到的Zookeeper版本是Zookeeper-3.4.6。
1) 下載Zookeeper
進入http://www.apache.org/dyn/closer.cgi/zookeeper/,你可以選擇其他鏡像網址去下載,用官網推薦的鏡像:http://mirror.bit.edu.cn/apache/zookeeper/。提示:可以直接下載群里的Zookeeper安裝文件。


下載zookeeper-3.4.6.tar.gz

1) 安裝Zookeeper
提示:下面的步驟發生在master服務器。
以ubuntu14.04舉例,把下載好的文件放到/root目錄,用下面的命令解壓:
cd /root
tar -zxvf zookeeper-3.4.6.tar.gz
解壓后在/root目錄會多出一個zookeeper-3.4.6的新目錄,用下面的命令把它剪切到指定目錄即安裝好Zookeeper了:
cd /root
mv zookeeper-3.4.6 /usr/local/spark
之后在/usr/local/spark目錄會多出一個zookeeper-3.4.6的新目錄。下面我們講如何配置安裝好的Zookeeper。
2) 配置Zookeeper
提示:下面的步驟發生在master服務器。
- 配置.bashrc
- 打開文件:vi /root/.bashrc
- 在PATH配置行前添加:
export ZOOKEEPER_HOME=/usr/local/spark/zookeeper-3.4.6
- 最后修改PATH:
export PATH=${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:${SPARK_HOME}/sbin:${HIVE_HOME}/bin:${KAFKA_HOME}/bin:$PATH
- 使配置的環境變量立即生效:source /root/.bashrc
- 創建data目錄
- cd $ZOOKEEPER_HOME
- mkdir data
- 創建並打開zoo.cfg文件
- cd $ZOOKEEPER_HOME/conf
- cp zoo_sample.cfg zoo.cfg
- vi zoo.cfg
- 配置zoo.cfg
# 配置Zookeeper的日志和服務器身份證號等數據存放的目錄。
# 千萬不要用默認的/tmp/zookeeper目錄,因為/tmp目錄的數據容易被意外刪除。
dataDir=../data
# Zookeeper與客戶端連接的端口
clientPort=2181
# 在文件最后新增3行配置每個服務器的2個重要端口:Leader端口和選舉端口
# server.A=B:C:D:其中 A 是一個數字,表示這個是第幾號服務器;
# B 是這個服務器的hostname或ip地址;
# C 表示的是這個服務器與集群中的 Leader 服務器交換信息的端口;
# D 表示的是萬一集群中的 Leader 服務器掛了,需要一個端口來重新進行選舉,
# 選出一個新的 Leader,而這個端口就是用來執行選舉時服務器相互通信的端口。
# 如果是偽集群的配置方式,由於 B 都是一樣,所以不同的 Zookeeper 實例通信
# 端口號不能一樣,所以要給它們分配不同的端口號。
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888
- 創建並打開myid文件
- cd $ZOOKEEPER_HOME/data
- touch myid
- vi myid
- 配置myid
按照zoo.cfg的配置,myid的內容就是1。
3) 同步master的安裝和配置到slave1和slave2
- 在master服務器上運行下面的命令
cd /root
scp ./.bashrc root@slave1:/root
scp ./.bashrc root@slave2:/root
cd /usr/local/spark
scp -r ./zookeeper-3.4.6 root@slave1:/usr/local/spark
scp -r ./zookeeper-3.4.6 root@slave2:/usr/local/spark
- 在slave1服務器上運行下面的命令
vi $ZOOKEEPER_HOME/data/myid
按照zoo.cfg的配置,myid的內容就是2。
- 在slave2服務器上運行下面的命令
vi $ZOOKEEPER_HOME/data/myid
按照zoo.cfg的配置,myid的內容就是3。
4) 啟動Zookeeper服務
- 在master服務器上運行下面的命令
zkServer.sh start
- 在slave1服務器上運行下面的命令
source /root/.bashrc
zkServer.sh start
- 在slave1服務器上運行下面的命令
source /root/.bashrc
zkServer.sh start
5) 驗證Zookeeper是否安裝和啟動成功
- 在master服務器上運行命令:jps和zkServer.sh status
root@master:/usr/local/spark/zookeeper-3.4.6/bin# jps
3844 QuorumPeerMain
4790 Jps
zkServer.sh status
root@master:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status
JMX enabled by default
Using config: /usr/local/spark/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: follower
- 在slave1服務器上運行命令:jps和zkServer.sh status
source /root/.bashrc
root@slave1:/usr/local/spark/zookeeper-3.4.6/bin# jps
3462 QuorumPeerMain
4313 Jps
root@slave1:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status
JMX enabled by default
Using config: /usr/local/spark/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: follower
- 在slave2服務器上運行命令:jps和zkServer.sh status
root@slave2:/usr/local/spark/zookeeper-3.4.6/bin# jps
4073 Jps
3277 QuorumPeerMain
root@slave2:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status
JMX enabled by default
Using config: /usr/local/spark/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: leader
至此,代表Zookeeper已經安裝和配置成功。
2、安裝和配置Kafka
本課中用到的Kafka版本是Kafka-2.10-0.9.0.1。
1) 下載Kafka
進入http://kafka.apache.org/downloads.html,左鍵單擊kafka_2.10-0.9.0.1.tgz。提示:可以直接下載群里的Kafka安裝文件。

下載kafka_2.10-0.9.0.1.tgz

1) 安裝Kafka
提示:下面的步驟發生在master服務器。
以ubuntu14.04舉例,把下載好的文件放到/root目錄,用下面的命令解壓:
cd /root
tar -zxvf kafka_2.10-0.9.0.1.tgz
解壓后在/root目錄會多出一個kafka_2.10-0.9.0.1的新目錄,用下面的命令把它剪切到指定目錄即安裝好Kafka了:
cd /root
mv kafka_2.10-0.9.0.1 /usr/local
之后在/usr/local目錄會多出一個kafka_2.10-0.9.0.1的新目錄。下面我們講如何配置安裝好的Kafka。
2) 配置Kafka
提示:下面的步驟發生在master服務器。
- 配置.bashrc
- 打開文件:vi /root/.bashrc
- 在PATH配置行前添加:
export KAFKA_HOME=/usr/local/kafka_2.10-0.9.0.1
- 最后修改PATH:
export PATH=${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:${SPARK_HOME}/sbin:${HIVE_HOME}/bin:${KAFKA_HOME}/bin:$PATH
- 使配置的環境變量立即生效:source /root/.bashrc
- 打開server.properties
- cd $ZOOKEEPER_HOME/config
- vi server.properties
- 配置server.properties
broker.id=0
port=9092
zookeeper.connect=master:2181,slave1:2181,slave2:2181
3) 同步master的安裝和配置到slave1和slave2
- 在master服務器上運行下面的命令
cd /root
scp ./.bashrc root@slave1:/root
scp ./.bashrc root@slave2:/root
cd /usr/local
scp -r ./kafka_2.10-0.9.0.1 root@slave1:/usr/local
scp -r ./kafka_2.10-0.9.0.1 root@slave2:/usr/local
- 在slave1服務器上運行下面的命令
vi $KAFKA_HOME/config/server.properties
修改broker.id=1。
- 在slave2服務器上運行下面的命令
vi $KAFKA_HOME/config/server.properties
修改broker.id=2。
4) 啟動Kafka服務
- 在master服務器上運行下面的命令
cd $KAFKA_HOME/bin
kafka-server-start.sh ../config/server.properties &
- 在slave1服務器上運行下面的命令
source /root/.bashrc
cd $KAFKA_HOME/bin
kafka-server-start.sh ../config/server.properties &
- 在slave2服務器上運行下面的命令
source /root/.bashrc
cd $KAFKA_HOME/bin
kafka-server-start.sh ../config/server.properties &
5) 驗證Kafka是否安裝和啟動成功
- 在任意服務器上運行命令創建Topic“HelloKafka”:
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partitions 1 --topic HelloKafka
- 在任意服務器上運行命令為創建的Topic“HelloKafka”生產一些消息:
kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic HelloKafka
輸入下面的消息內容:
This is Spark!
I’m Rocky!
- 在任意服務器上運行命令從指定的Topic“HelloKafka”上消費(拉取)消息:
kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --from-beginning --topic HelloKafka
過一會兒,你會看到打印的消息內容:
This is DT_Spark!
I’m Rocky!
Life is short, you need Spark!
- 在任意服務器上運行命令查看所有的Topic名字:
kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
- 在任意服務器上運行命令查看指定Topic的概況:
kafka-topics.sh --describe --zookeepermaster:2181,slave1:2181,slave2:2181 --topic HelloKafka
至此,代表Kafka已經安裝和配置成功。
姜偉
