Kafka入門學習(一)


用簡單的話來說,你可以把Kafka當作可順序寫入的一大卷磁帶, 可以隨時倒帶,快進到某個時間點重放。

 

====常用開源分布式消息系統

*集群:多台機器組成的系統叫集群。

*ActiveMQ還是支持JMS的一種消息中間件。

*阿里巴巴metaq,rocketmq都有kafka的影子。

*kafka的動態擴容目前是通過zookeeper來完成的。

 

====kafka定義及使用背景

是一個分布式消息系統,由Linkedln使用Scala編寫,用作Linkedln的活動流(Activity Stream)

和運營數據處理管道(Pipeline)的基礎,具有高水平擴展和高吞吐量

應用領域:已經被多家不同類型的公司作為多種類型的數據管道和消息系統使用,如:淘寶、支付寶、百度、twitter等。

目前越來越多的開元分布式處理系統都支持與Kafka集成,如

Apache flume(用於日志收集)

Apache Storm(用於實時數據處理)

Spark(用於內存數據處理)

elasticsearch(用於全文檢索)

 

====kafka相關概念

1)AMPQ協議(即Advanced Message Queuing Protocol)

詳細參考博客:http://blog.csdn.net/zhangxinrun/article/details/6411841

--消費者(Consumer):從消息隊列中請求消息的客戶端應用程序;

--生產者(Producer):從broker發布消息的客戶端應用程序;

--APQP服務器端(broker):用來接收生產者發送的消息並將這些消息路由給服務器中的隊列;

 

2)kafka支持的客戶端語言

kafaka客戶端支持當前大部分主流語言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ryby、Go、JavaScript。

可以使用以上任何一種語言和kafka服務器進行通信(即編寫自己的consumer和producer程序

 

3)kafka的架構

 

和傳統的分布式消息隊列一樣,是由生產者向kafka集群生產消息、消費者從kafka集群訂閱消息z這樣的架構所組成。

kafka集群中的消息是按照主題(或者說Topic)來進行組成的。

--主題(Topic):一個主題類似新聞中的體育、娛樂、教育等分類概念,在實際工程中通常一個業務一個主題。

--分區(Partition):一個Topic中的消息數據按照多個分區組織,分區是kafka消息隊列組織的最小單位,一個分區可以看做是一個FIFO(先進先出)隊列;kafka分區是提高kafka性能的關鍵手段。

 

 

這張圖在整體上對kafka集群進行了概要,途中kafka集群是由三台機器(Broker)組成,當然,實際情況可能更多。

相應的有3個分區,Partition-0~Partition-2,圖中能看到每個分區的數據備份了2份。備份的數量可以通過kafka的配置參數來進行配置。在圖中配置成了2.

kafka集群從前端應用程序(producer)生產消息,后端通過各種異構的消費者來訂閱消息。

kafka集群和各種異構的生產者、消費者都使用zookeeper集群來進行分布式協調管理和分布式狀態管理、分布式鎖服務的。

*備份(Replication):為了保證分布式可靠性,kafka0.8開始對每個分區的數據進行備份(不同Broker上),防止其中一個Broker宕機造成分區數據不可用。

 

====zookeeper集群搭建

參考博客:http://www.cnblogs.com/ggjucheng/p/3352591.html

  • 軟件環境:

1)Linux服務器一台、三台、舞台(2*n+1台)。

問:是否可以用偶數台來搭建?

答:不一定,但是沒有必要。根據zookeeper的工作原理,只要有超過半數以上存活,就可以對外提供服務。奇數方便判斷“半數存活”。

2)JDK(我這里選擇jdk-7u80-linux-x64.rpm)

3)zookeeper(我這里選擇zookeeper-3.4.6.tar.gz,kafka在該版本上進行了大量測試,並修復了大量Bug)

 

  • JDK安裝

(省略)

環境變量可以修改兩個文件

1)/etc/profile:對所有用戶都有效的。

2)~/.bashrc:代表的是當前用戶。

 

  • zookeeper安裝

1)解壓縮:tar -zxvf zookeeper-3.4.6.tar.gz

2)配置文件:

--zoo.cfg文件的配置

zoo_sample.cfg是zk官方為我們提供的樣本配置文件。

需要以它為副本復制一個zoo.cfg文件。zoo.cfg中需要配置以下內容:

•dataDir:存放數據

•dataLogDir:存放日志和快照

•server.1=<host>:<Master和Slave之間的通訊端口。默認為2888>:<Leader選舉的端口。默認3888>。

 

集群中的每台機器都需要感知整個集群是由哪幾台機器組成的,在配置文件中,可以按照這樣的格式,每行寫一個機器配置:server.id=host:port:port。

關於這個id,我們稱之為Server ID,標識host機器在集群中的機器序號,在每個ZK機器上,我們需要在數據目錄(數據目錄就是dataDir參數指定的那個目錄)下創建一個myid文件,

myid中就是這個Server ID數字。

 

配置之后如下:

*zkdata和zkdatalog是新建的文件夾。用來存放數據和Log。

*dataLogDir這個屬性如果不進行配置,將默認將zk事務日志和快照存放到dataDir下面,會嚴重影響性能。

*ip地址可以通過hostname -i來查看。

 

--myid的配置

可以通過echo命令來創建myid文件。命令:echo "1" > myid

 

 

3)啟動zookeeper

啟動方法:./zkServer.sh start

 

====kafka集群搭建

  • 軟件環境

Linux服務器一台或多台

已經搭建好的zookeeper集群

kafka_2.9.2-0.8.1.1

 

  • kafka安裝

1)解壓縮kafka壓縮包:tar -zxvf kafka_2.9.2-0.8.1.1

2)修改配置文件。kafka的配置文件很多,我們重點關注server.properties

具體配置內容請參考官方網站的配置:http://kafka.apache.org/documentation.html#brokerconfigs

以及中文博客:http://www.cnblogs.com/quchunhui/p/5356720.html

 

我這里配置了一下幾項:

###Socket Server Settings###

port=9092

host.name=192.168.93.128

 

###Log Basics###

log.dirs=/opt/kafka_2.9.2-0.8.1.1/kafkalog

 

###Log Retention Policy###

message.max.bytes=5048576

default.replication.factor=2             //kafka集群保存消息的副本數

replica.fetch.max.bytes=5048576    //取消息的最大字節數

 

###Zookeeper###

zookeeper.connect=192.168.93.128:2181

 

  • kafka啟動

后台啟動命令:./kafka-server-start.sh -daemon ../config/server.properties

使用jps命令查看進程是否存在,以驗證是否正確啟動。

 

  • 驗證是否搭建正確

(1)首先嘗試創建一個topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

(2)查看所有的topic

./kafka-topics.sh --list --zookeeper localhost:2181

(3)啟動一個consumer

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

(4)向consumer發送消息

./kafka-console-producer.sh --broker-list 192.168.93.129:9092 --topic test

(5)查看創建的topic

./kafka-topics.sh --describe --zookeeper 192.168.93.129:2181 --topic test


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM