ZooKeeper 的作用
Kafka使用ZooKeeper管理集群,ZooKeeper用於協調服務器或集群拓撲,ZooKeeper是配置信息的一致性文件系統。你可以選擇Kafka自帶的Zookeeper,也可以選擇單獨部署,一台Linux主機開放三個端口即可構建一個簡單的偽ZooKeeper集群。
zookeeper中記錄了broker的id 、消費者消費數據的offset,消費者與partition的對應關系(ConsumerA—> Partition-0, ConsumerB—> Partition-1)
ZooKeeper可以將拓撲更改發送到Kafka,如果集群中的某台服務器宕機或者某個topic被添加、刪除,集群中的每個節點都可以知道新服務器何時加入,ZooKeeper提供Kafka Cluster配置的同步視圖。
Zookeeper簡介
ZooKeeper 是一個開源的分布式框架,提供了協調分布式應用的基本服務。它向外部應用 暴露一組通用服務——分布式同步(Distributed Synchronization)、命名服務(Naming Service)、集群維護(group maintenance)等,簡化分布式應用協調及其管理的難度。 它是 google 的 chubby 一個開源的實現。 它本身可以搭建成一個集群,這個 zk 集群用來對應用程序集群進行管理,監視應用程序集群中各個節點的狀態,並根據應用程序集群中各個節點提交的反饋信息決定下一步的合理操作。 做分布式鎖很有效
Zookeeper下載和安裝
實現的是Zookeeper集群的創建,注意要修改zoo.dfg文件名稱
(base) root@top1server:~# wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
(base) root@top1server:~# tar -zvxf zookeeper-3.4.10.tar.gz
(base) root@top1server:~# cp -r zookeeper-3.4.10/ zookeeper1
(base) root@top1server:~# cp -r zookeeper-3.4.10/ zookeeper2
(base) root@top1server:~# cp -r zookeeper-3.4.10/ zookeeper3
(base) root@top1server:~/zookeeper/zookeeper1/conf# mv zoo_sample.cfg zoo.cfg
(base) root@top1server:~/zookeeper/zookeeper1/bin# ./zkServer.sh start
(base) root@top1server:~/zookeeper/zookeeper1/bin# ./zkServer.sh status
部署三個節點的Zookeeper偽分布集群
在同一台服務器上,部署一個 3 個 ZooKeeper 節點組成的集群,這樣的集群叫偽分布式集群,而如果集群中的 3 個節點分別部署在 3 個服務器上,那么這種集群就叫真正的分布式集群。
(base) root@top1server:~/zookeeper# cd zookeeper1
(base) root@top1server:~/zookeeper/zookeeper1# mkdir data
(base) root@top1server:~/zookeeper/zookeeper1# mkdir logs
(base) root@top1server:~/zookeeper/zookeeper1# touch data/myid
(base) root@top1server:~/zookeeper/zookeeper1# cd data/
(base) root@top1server:~/zookeeper/zookeeper1/data# cat myid
1
設置配置文件 zoo.cfg 的內容如下:
配置文件中的配置項的含義參見下面的介紹。
- 用同樣的方法,在 zookeeper2 和 zookeeper3 的相應位置創建 zoo.cfg,文件內容復制zookeeper1 的 zoo.cfg。
- 只不過需要改動 clientport、dataDir、dataLogDir 三個配置項,zookeeper2 的 clientport 改為 2182,zookeeper3 的 clientport 改為 2183,而 dataDir和 dataLogDir 都修改為相應的目錄,就好了。
啟動 zk 集群
接下來再分別啟動服務
(base) root@top1server:~/zookeeper/zookeeper1/bin# sh zkServer.sh start
(base) root@top1server:~/zookeeper/zookeeper2/bin# sh zkServer.sh start
(base) root@top1server:~/zookeeper/zookeeper3/bin# sh zkServer.sh start
這樣 zookeeper 集群的 3 個節點都啟動起來了。
一個leader多個server,也遵循2n+1
的選舉策略,想保證高可用至少有三台機器
客戶端接入集群
進入 zookeeper 集群中任意一個節點的 bin 目錄下,啟動一個客戶端,接入已經啟動好的zookeeper 集群。
這里的 server 可以填寫集群中的任何一個節點的 ip,端口號是對應 ip的節點的配置文件中 clientport 的值。
./zkCli.sh –server 127.0.0.1:2181
真實分布式集群需要注意的地方
真正的分布式集群和偽分布式集群不一樣的地方在於配置文件。
- clientport 端口各個節點一樣就行。
- server.1=127.0.0.1:8880:7770 中的 ip 要修改成對應的 server 的 ip,后邊的兩個端口號不需要不同,各個節點都一樣就可以了。
其他地方偽分布式和真正分布式都是一樣的。
ZooKeeper 配置文件中的配置項的含義
配置文件中配置項的含義:
- tickTime: zookeeper 中使用的基本時間單位,毫秒值,比如可以設為 1000,那么基本時間單位就是 1000ms,也就是 1s。
- initLimit: zookeeper 集群中的包含多台 server,其中一台為 leader,集群中其余的 server 為 follower,initLimit 參數配置初始化連接時,follower 和 leader 之間的最長 心 跳 時 間 。 如 果 該 參 數 設 置 為 5 , 就 說 明 時 間 限 制 為 5 倍 tickTime , 即5*1000=5000ms=5s。
- syncLimit: 該參數配置 leader 和 follower 之間發送消息,請求和應答的最大時間長度。如果該參數設置為 2,說明時間限制為 2 倍 tickTime,即 2000ms。
- dataDir: 數據目錄. 可以是任意目錄,一般是節點安裝目錄下 data 目錄。
- dataLogDir: log目錄, 同樣可以是任意目錄,一般是節點安裝目錄下的logs目錄。如果沒有設置該參數,將使用和 dataDir 相同的設置。
- clientPort: 監聽 client 連接的端口號。
- server.X=A:B:C 其中 X 是一個數字, 表示這是第幾號 server,它的值和 myid 文件中的值對應。A 是該 server 所在的 IP 地址。B 是配置該 server 和集群中的 leader 交換消息所使用的端口。C 配置選舉 leader 時所使用的端口。由於配置的是偽集群模式,所以各個 server 的 B, C 參數必須不同,如果是真正分布式集群,那么 B 和 C 在各個節點上可以相同,因為即使相同由於節點處於不同的服務器也不會導致端口沖突
Zookeeper 常用命令
- 啟動 ZK 服務: bin/zkServer.sh start
- 查看 ZK 服務狀態: bin/zkServer.sh status
- 停止 ZK 服務: bin/zkServer.sh stop
- 重啟 ZK 服務: bin/zkServer.sh restart
- 連接服務器: zkCli.sh -server 127.0.0.1:2181
創建節點
使用 create 命令,可以創建一個 Zookeeper 節點, 如
create [-s] [-e] path data acl
其中,-s 或-e 分別指定節點特性,順序或臨時節點,若不指定,則表示持久節點;acl 用來進行權限控制。
順序節點可以看到創建的節點后面添加了一串數字以示區別。
臨時節點在退出會自動刪除
使用 create /zk-permanent 123 命令創建 zk-permanent 永久節點
可以看到永久節點不同於順序節點,不會自動在后面添加一串數字。
讀取節點
與讀取相關的命令有 ls 命令和 get 命令,ls 命令可以列出 Zookeeper 指定節點下的所有子節點,只能查看指定節點下的第一級的所有子節點;
get 命令可以獲取 ZK 指定節點的數據內容和屬性信息。其用法分別如下
ls path [watch]
get path [watch]
ls2 path [watch]
ls2可以查看所有內容
get可以獲取系欸但的數據內容和屬性
更新節點
使用 set 命令,可以更新指定節點的數據內容,用法如下
set path data [version]
其中,data 就是要更新的新內容,version 表示數據版本,如將/zk-permanent 節點的數據更新為 456,可以使用如下命令:set /zk-permanent 456
dataVersion 會變為 1 了,表示進行了更新。
刪除節點
使用 delete 命令可以刪除 Zookeeper 上的指定節點,用法如下
delete path [version]
其 中 version 也 是 表 示 數 據 版 本 , 使 用 delete /zk-permanent 命 令 即 可 刪 除/zk-permanent 節點
值得注意的是,若刪除節點存在子節點,那么無法刪除該節點,必須先刪除子節點,再刪除父節點
zk的使用都是靠這些節點使用
分布式鎖的生成是用臨時順序節點來實現的
Kafka概述
Kafka消息隊列
(1)點對點模式(一對一,消費者主動拉取數據,消息收到后消息清除)
點對點模型通常是一個基於拉取或者輪詢的消息傳送模型,這種模型從隊列中請求信息,而不是將消息推送到客戶端。這個模型的特點是發送到隊列的消息被一個且只有一個接收者接收處理,即使有多個消息監聽者也是如此。
(2)發布/訂閱模式(一對多,數據生產后,推送給所有訂閱者)
發布訂閱模型則是一個基於推送的消息傳送模型。發布訂閱模型可以有多種不同的訂閱者,臨時訂閱者只在主動監聽主題時才接收消息,而持久訂閱者則監聽主題的所有消息,即使當前訂閱者不可用,處於離線狀態。
為什么需要消息隊列
1)解耦:
允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
2)冗余:
消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
3)擴展性:
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。
4)靈活性 & 峰值處理能力:
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。如果為以能處理這類峰值訪問為標准來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
5)可恢復性:
系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。
6)順序保證:
在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。(Kafka 保證一個 Partition 內的消息的有序性)
7)緩沖:
有助於控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。
8)異步通信:
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
什么是 Kafka
在流式計算中,Kafka 一般用來緩存數據,Storm 通過消費 Kafka 的數據進行計算。
1)Apache Kafka 是一個開源消息系統,由 Scala 寫成。是由 Apache 軟件基金會開發的一個開源消息系統項目。
2)Kafka 最初是由 LinkedIn 公司開發,並於 2011 年初開源。2012 年 10 月從 Apache Incubator畢業。該項目的目標是為處理實時數據提供一個統一、高通量、低等待的平台。
3)Kafka 是一個分布式消息隊列。Kafka 對消息保存時根據 Topic 進行歸類,發送消息者稱為Producer,消息接受者稱為 Consumer,此外 kafka 集群有多個 kafka 實例組成,每個實例(server)稱為broker。
4)無論是 kafka 集群,還是 consumer 都依賴於 zookeeper 集群保存一些 meta 信息,來保證系統可用性。
1)Producer :消息生產者,就是向 kafka broker 發消息的客戶端;
2)Consumer :消息消費者,向 kafka broker 取消息的客戶端;
3)Topic :可以理解為一個隊列;
4) Consumer Group (CG):這是 kafka 用來實現一個 topic 消息的廣播(發給所有的 consumer)和單播(發給任意一個 consumer)的手段。一個 topic 可以有多個 CG。topic 的消息會復制(不是真的復制,是概念上的)到所有的 CG,但每個 partion 只會把消息發給該 CG 中的一個 consumer。如果需要實現廣播,只要每個 consumer 有一個獨立的 CG 就可以了。要實現單播只要所有的consumer 在同一個 CG。用 CG 還可以將 consumer 進行自由的分組而不需要多次發送消息到不同的 topic;
5)Broker :一台 kafka 服務器就是一個 broker。一個集群由多個 broker 組成。一個 broker 可以容納多個 topic;
6)Partition:為了實現擴展性,一個非常大的 topic 可以分布到多個 broker(即服務器)上,一個 topic可以分為多個 partition,每個 partition 是一個有序的隊列。partition 中的每條消息都會被分配一個有序的 id(offset)。kafka 只保證按一個 partition 中的順序將消息發給 consumer,不保證一個 topic 的整體(多個 partition 間)的順序;
7)Offset:kafka 的存儲文件都是按照 offset.kafka 來命名,用 offset 做名字的好處是方便查找。例如你 想 找 位 於 2049 的 位 置 , 只 要 找 到 2048.kafka 的 文 件 即 可 。 當 然 the first offset 就 是
00000000000.kafka。
Kafka 單節點運行方式
下載代碼
下載 kafka_2.13-2.7.0 版本並且解壓。
https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz
wget https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar -zvxf kafka_2.13-2.7.0.tgz
cd kafka_2.13-2.7.0/
首先為每個 broker 創建一個配置文件,直接復制多個server.properties
,並且修改他們的log日志和id
Kafka 集群部署方式
配置broker的id
設置log文件地址
關聯zookeeper
啟動服務
現在啟動 Kafka 服務器:
bin/kafka-server-start.sh config/server.properties
創建topic
創建一個名為“test”的 topic,它有一個分區和一個副本:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
運行 list(列表)命令來查看這個 topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181 test
除了手工創建 topic 外,你也可以配置你的 broker,當發布一個不存在的 topic 時自動創建 topic。