Spark Streaming on Kafka解析和安裝實戰


本課分2部分講解:

第一部分,講解Kafka的概念、架構和用例場景;

第二部分,講解Kafka的安裝和實戰。

由於時間關系,今天的課程只講到如何用官網的例子驗證Kafka的安裝是否成功。后續課程會接着講解如何集成Spark Streaming和Kafka。

一、Kafka的概念、架構和用例場景

http://kafka.apache.org/documentation.html#introdution

1Kafka的概念

Apache Kafka是分布式發布-訂閱消息系統。它最初由LinkedIn公司開發,之后成為Apache項目的一部分。Kafka是一種快速、可擴展的、設計內在就是分布式的,分區的和可復制的提交日志服務。

什么是消息組件:

以帥哥和美女聊天為例,帥哥如何和美女交流呢?這中間通常想到的是微信、QQ、電話、郵件等通信媒介,這些通信媒介就是消息組件,帥哥把聊天信息發送給消息組件、消息組件將消息推送給美女,這就是常說的生產者、消費者模型。而且在發送信息時可以將內容進行分類,即所謂的Topic主題。Kafka就是這樣的通信組件,將不同對象組件粘合起來的紐帶,且是解耦合方式傳遞數據。

Apache Kafka與傳統消息系統相比,有以下不同的特點:

  • 分布式系統,易於向外擴展;
  • 在線低延遲,同時為發布和訂閱提供高吞吐量;
  • 將消息存儲到磁盤,因此可以處理1天甚至1周前內容

2Kafka的架構

Kafka既然具備消息系統的基本功能,那么就必然會有組成消息系統的組件:

Topic,Producer和Consumer。Kafka還有其特殊的Kafka Cluster組件。

Topic主題:

代表一種數據的類別或類型,工作、娛樂、生活有不同的Topic,生產者需要說明把說明數據分別放在那些Topic中,里面就是一個個小對象,並將數據數據推到Kafka,消費者獲取數據是pull的過程。一組相同類型的消息數據流。這些消息在Kafka會被分區存放,並且有多個副本,以防數據丟失。每個分區的消息是順序寫入的,並且不可改寫。

-       Producer(生產者):把數據推到Kafka系統的任何對象。

 

- Kafka Cluster(Kafka集群):把推到Kafka系統的消息保存起來的一組服務器,也叫Broker。因為Kafka集群用到了Zookeeper作為底層支持框架,所以由一個選出的服務器作為Leader來處理所有消息的讀和寫的請求,其他服務器作為Follower接受Leader的廣播同步備份數據,以備災難恢復時用。

- Consumer(消費者):從Kafka系統訂閱消息的任何對象。

消費者可以有多個,並且某些消費者還可以組成Consumer Group。多個Consumer Group之間組成消息廣播的關系,所以各個Group可以拉相同的消息數據。在Consumer Group內部,各消費者之間對Consumer Group拉出來的消息數據是隊列先進先出的關系,某個消息數據只能給該Group的一個消費者使用。

數據傳輸基於kernel(內核)級別的(傳輸速度接近0拷貝-ZeroCopy)、沒有用戶空間的參與。Linux本身是軟件,軟件啟動時第一個啟動進程叫init,在init進程啟動后會進入用戶空間;例如:在分布式系統中,機器A上的應用程序需要讀取機器B上的Java服務數據,由於Java程序對應的JVM是用戶空間級別而且數據在磁盤上,A上應用程序讀取數據時會首先進入機器B上的內核空間再進入機器B的用戶空間,讀取用戶空間的數據后,數據再經過B機器上的內核空間分發到網絡中,機器A網卡接收到傳輸過來的數據后再將數據寫入A機器的內核空間,從而最終將數據傳輸給A的用戶空間進行處理。如下圖:

外部系統從Java程序中讀取數據,傳輸給內核空間並依賴網卡將數據寫入到網絡中,從而把數據傳輸出去。其實Java本身是內核的一層外衣,Java Socket編程,操作的各種數據都是在JVM的用戶空間中進行的。而Kafka操作數據是放在內核空間的,通常內核空間處理數據的速度比用戶空間快上萬倍,所以通過kafka可以實現高速讀、寫數據。

3Kafka的用例場景

類似微信,手機和郵箱等等這樣大家熟悉的消息組件,Kafka也可以:

-       支持文字/圖片

-       可以存儲內容

-       分門別類

從內容消費的角度,Kafka把郵箱中的郵件看成是Topic。

二、Kafka的安裝和實戰

http://kafka.apache.org/documentation.html#quickstart

1、安裝和配置Zookeeper

Kafka集群模式需要提前安裝好Zookeeper。

-       提示:Kafka單例模式不需要安裝額外的Zookeeper,可以使用內置的Zookeeper。

-       Kafka集群模式需要至少3台服務器。本課實戰用到的服務器Hostname:master,slave1,slave2。

-       本課中用到的Zookeeper版本是Zookeeper-3.4.6。

1)    下載Zookeeper

進入http://www.apache.org/dyn/closer.cgi/zookeeper/,你可以選擇其他鏡像網址去下載,用官網推薦的鏡像:http://mirror.bit.edu.cn/apache/zookeeper/。提示:可以直接下載群里的Zookeeper安裝文件。

下載zookeeper-3.4.6.tar.gz

1)    安裝Zookeeper

提示:下面的步驟發生在master服務器。

以ubuntu14.04舉例,把下載好的文件放到/root目錄,用下面的命令解壓:

cd /root

tar -zxvf zookeeper-3.4.6.tar.gz

解壓后在/root目錄會多出一個zookeeper-3.4.6的新目錄,用下面的命令把它剪切到指定目錄即安裝好Zookeeper了:

cd /root

mv zookeeper-3.4.6 /usr/local/spark

之后在/usr/local/spark目錄會多出一個zookeeper-3.4.6的新目錄。下面我們講如何配置安裝好的Zookeeper。

2)    配置Zookeeper

提示:下面的步驟發生在master服務器。

  1. 配置.bashrc

-       打開文件:vi /root/.bashrc

-       在PATH配置行前添加:

export ZOOKEEPER_HOME=/usr/local/spark/zookeeper-3.4.6

-       最后修改PATH:

export PATH=${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:${SPARK_HOME}/sbin:${HIVE_HOME}/bin:${KAFKA_HOME}/bin:$PATH

-       使配置的環境變量立即生效:source /root/.bashrc

  1. 創建data目錄

-       cd $ZOOKEEPER_HOME

-       mkdir data

  1. 創建並打開zoo.cfg文件

-       cd $ZOOKEEPER_HOME/conf

-       cp zoo_sample.cfg zoo.cfg

-       vi zoo.cfg

  1. 配置zoo.cfg

# 配置Zookeeper的日志和服務器身份證號等數據存放的目錄。

# 千萬不要用默認的/tmp/zookeeper目錄,因為/tmp目錄的數據容易被意外刪除。

dataDir=../data

# Zookeeper與客戶端連接的端口

clientPort=2181

# 在文件最后新增3行配置每個服務器的2個重要端口:Leader端口和選舉端口

# server.A=B:C:D:其中 A 是一個數字,表示這個是第幾號服務器;

# B 是這個服務器的hostname或ip地址;

# C 表示的是這個服務器與集群中的 Leader 服務器交換信息的端口;

# D 表示的是萬一集群中的 Leader 服務器掛了,需要一個端口來重新進行選舉,

# 選出一個新的 Leader,而這個端口就是用來執行選舉時服務器相互通信的端口。

# 如果是偽集群的配置方式,由於 B 都是一樣,所以不同的 Zookeeper 實例通信

# 端口號不能一樣,所以要給它們分配不同的端口號。

server.1=master:2888:3888

server.2=slave1:2888:3888

server.3=slave2:2888:3888

  1. 創建並打開myid文件

-       cd $ZOOKEEPER_HOME/data

-       touch myid

-       vi myid

  1. 配置myid

按照zoo.cfg的配置,myid的內容就是1。

3)    同步master的安裝和配置到slave1和slave2

-       在master服務器上運行下面的命令

cd /root

scp ./.bashrc root@slave1:/root

scp ./.bashrc root@slave2:/root

cd /usr/local/spark

scp -r ./zookeeper-3.4.6 root@slave1:/usr/local/spark

scp -r ./zookeeper-3.4.6 root@slave2:/usr/local/spark

-       在slave1服務器上運行下面的命令

vi $ZOOKEEPER_HOME/data/myid

按照zoo.cfg的配置,myid的內容就是2。

-       在slave2服務器上運行下面的命令

vi $ZOOKEEPER_HOME/data/myid

按照zoo.cfg的配置,myid的內容就是3。

4)    啟動Zookeeper服務

-       在master服務器上運行下面的命令

zkServer.sh start

-       在slave1服務器上運行下面的命令

source /root/.bashrc

zkServer.sh start

-       在slave1服務器上運行下面的命令

source /root/.bashrc

zkServer.sh start

5)    驗證Zookeeper是否安裝和啟動成功

-       在master服務器上運行命令:jps和zkServer.sh status

root@master:/usr/local/spark/zookeeper-3.4.6/bin# jps

3844 QuorumPeerMain

4790 Jps

zkServer.sh status

root@master:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status

JMX enabled by default

Using config: /usr/local/spark/zookeeper-3.4.6/bin/../conf/zoo.cfg

Mode: follower

-       在slave1服務器上運行命令:jps和zkServer.sh status

source /root/.bashrc

root@slave1:/usr/local/spark/zookeeper-3.4.6/bin# jps

3462 QuorumPeerMain

4313 Jps

root@slave1:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status

JMX enabled by default

Using config: /usr/local/spark/zookeeper-3.4.6/bin/../conf/zoo.cfg

Mode: follower

-       在slave2服務器上運行命令:jps和zkServer.sh status

root@slave2:/usr/local/spark/zookeeper-3.4.6/bin# jps

4073 Jps

3277 QuorumPeerMain

root@slave2:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status

JMX enabled by default

Using config: /usr/local/spark/zookeeper-3.4.6/bin/../conf/zoo.cfg

Mode: leader

      至此,代表Zookeeper已經安裝和配置成功。

2、安裝和配置Kafka

本課中用到的Kafka版本是Kafka-2.10-0.9.0.1。

1)    下載Kafka 

進入http://kafka.apache.org/downloads.html,左鍵單擊kafka_2.10-0.9.0.1.tgz。提示:可以直接下載群里的Kafka安裝文件。

下載kafka_2.10-0.9.0.1.tgz

1)    安裝Kafka

提示:下面的步驟發生在master服務器。

以ubuntu14.04舉例,把下載好的文件放到/root目錄,用下面的命令解壓:

cd /root

tar -zxvf kafka_2.10-0.9.0.1.tgz

解壓后在/root目錄會多出一個kafka_2.10-0.9.0.1的新目錄,用下面的命令把它剪切到指定目錄即安裝好Kafka了:

cd /root

mv kafka_2.10-0.9.0.1 /usr/local

之后在/usr/local目錄會多出一個kafka_2.10-0.9.0.1的新目錄。下面我們講如何配置安裝好的Kafka。

2)    配置Kafka

提示:下面的步驟發生在master服務器。

  1. 配置.bashrc

-       打開文件:vi /root/.bashrc

-       在PATH配置行前添加:

export KAFKA_HOME=/usr/local/kafka_2.10-0.9.0.1

-       最后修改PATH:

export PATH=${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:${SPARK_HOME}/sbin:${HIVE_HOME}/bin:${KAFKA_HOME}/bin:$PATH

-       使配置的環境變量立即生效:source /root/.bashrc

  1. 打開server.properties

-       cd $ZOOKEEPER_HOME/config

-       vi server.properties

  1. 配置server.properties

broker.id=0

port=9092

zookeeper.connect=master:2181,slave1:2181,slave2:2181

3)    同步master的安裝和配置到slave1和slave2

-       在master服務器上運行下面的命令

cd /root

scp ./.bashrc root@slave1:/root

scp ./.bashrc root@slave2:/root

cd /usr/local

scp -r ./kafka_2.10-0.9.0.1 root@slave1:/usr/local

scp -r ./kafka_2.10-0.9.0.1 root@slave2:/usr/local

-       在slave1服務器上運行下面的命令

vi $KAFKA_HOME/config/server.properties

修改broker.id=1。

-       在slave2服務器上運行下面的命令

vi $KAFKA_HOME/config/server.properties

修改broker.id=2。

4)    啟動Kafka服務

-       在master服務器上運行下面的命令

cd $KAFKA_HOME/bin

kafka-server-start.sh ../config/server.properties &

-       在slave1服務器上運行下面的命令

source /root/.bashrc

cd $KAFKA_HOME/bin

kafka-server-start.sh ../config/server.properties &

-       在slave2服務器上運行下面的命令

source /root/.bashrc

cd $KAFKA_HOME/bin

kafka-server-start.sh ../config/server.properties &

5)    驗證Kafka是否安裝和啟動成功

-       在任意服務器上運行命令創建Topic“HelloKafka”:

kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partitions 1 --topic HelloKafka

-       在任意服務器上運行命令為創建的Topic“HelloKafka”生產一些消息:

kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic HelloKafka

輸入下面的消息內容:

This is Spark!

I’m Rocky!

-       在任意服務器上運行命令從指定的Topic“HelloKafka”上消費(拉取)消息:

kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --from-beginning --topic HelloKafka

過一會兒,你會看到打印的消息內容:

This is DT_Spark!

I’m Rocky!

Life is short, you need Spark!

-       在任意服務器上運行命令查看所有的Topic名字:

kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181

-       在任意服務器上運行命令查看指定Topic的概況:

kafka-topics.sh --describe --zookeepermaster:2181,slave1:2181,slave2:2181 --topic HelloKafka

至此,代表Kafka已經安裝和配置成功。

姜偉

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM