簡介
Zookeeper 是一個分布式應用程序的分布式開源協調服務。是Apache Hadoop 的一個子項目,主要用來解決分布式應用中經常遇到的一些數據管理問題,例如:統一命名服務、狀態同步服務、集群管理、分布式應用配置項的管理等。
Zookeeper 工作原理
ZooKeeper 核心是原子廣播,該機制保證了各個Server之間的同步,實現這個機制的協議叫做 Zab協議 ,Zab協議有兩個模式,他們分別是 “恢復模式 & 廣播模式”。
恢復模式
Zab協議會讓ZK集群進入崩潰恢復模式的情況如下:
(1)當服務框架在啟動過程中
(2)當Leader服務器出現網絡中斷,崩潰退出與重啟等異常情況。
(3)當集群中已經不存在過半的服務器與Leader服務器保持正常通信。
在所有的follower服務器中選舉一台為Leader,當leader被選舉出來,集群中有多數服務與新的Leader完成狀態同步之后就會退出恢復模式,用來保證至少有一半的follower能和Leader保持數據一致,當多數的follower集群與leader數據保持一致的時候,就會進入消息廣播模式。
狀態同步保證了 Leader 和 Server具有相同的系統狀態,所謂的狀態同步其實就是數據的同步。
一旦leader 已經和多數的follower進行狀態同步之后,它就開始廣播消息,並且進入廣播模式,這時候當一個server加入Zookeeper 服務中,它會在恢復模式下啟動,發現leader,並和leader進行狀態同步,同步結束后,它也參與消息廣播,Zookeeper服務一直維持在 Broadcast狀態,直到leader崩潰了或者leader失去了大部分的followers支持。
廣播模式
消息廣播模式,Zab協議消息廣播過程使用的是原子廣播協議,類似於一個二階段提交,但是又有點不一樣,並不是所有的follower節點都需要返回ack才進行一致性事務完成,只需要多數以上即可。
針對每個客戶端的事務請求,leader服務器會為其生成對應的事務Proposal,並將其發送給集群中其余所有的機器,然后再分別收集各自的選票,最后進行事務提交。
- leader 接收到消息請求后,將消息賦予一個全局唯一的 64 位自增的 Id,我們通常稱之為zxid,通過 zxid 的大小比較即可實現有序的特性。
- leader 通過 隊列 保證發送的順序性,將帶有zxid的消息作為一個提案(proposal)分發給所有follower
- 當follower 接收到proposal,先將proposal寫到本地事務日志,寫事務成功后再向Leader 回一個ACK確認
- 當leader 接收到多數的ack確認后,leader 會向所有follower 發送 commit 命令,同意會在本地執行該消息。
- 當follower 收到消息 commit 命令后,就會執行該消息。
消息廣播模式流程示意圖如下:
首先客戶端會輪詢Zookeeper集群中的各個節點,當輪詢到一台是follower,如果是讀的請求,follower會返回請求結果,如果是增刪改操作,follower 會向leader生成事務請求,針對客戶端的事務請求,針對客戶端的事務請求,leader會為這個生成對應的事務Proposal,然后發送集群中所有follower服務器,然后分別在收集各自的選票,最后進行事務提交。
Zab協議的二階段提交,在提交過程中移除了中斷提交過程的操作,對於Zookeeper集群來說,超過半數反饋Ack確認就代表事務成功,這種方式無法完成所有節點事務一致性問題,所以Zab協議采用恢復模式來解決數據不一致的問題。
消息廣播協議是基於具有FIFO特性的TCP協議進行通訊,因此可以保證消息廣播過程中的接收和發送的順序性。
事務ID
為了保證事務的順序一致性,Zookeeper 采用了遞增的事務ID號(zxid)來標識事務,所有的操作(proposal)都會在被提出時加上zxid,zxid是一個64位的數字,他高32位是epoch用來標識leader關系是否發生變化,每當有新的leader 被選舉出來,都會有一個新的epoch,標識當前屬於哪個leader的領導。
對於Zookeeper 來說,每次的變化都會產生一個唯一的事務id,zxid(ZooKeeper Transaction Id)通過zxid ,可以確定更新操作的先后順序,如果說 zxid1 小於 zxid2,說明 zxid1比zxid先發生。
Zookeeper 模型
Zookeeper 是一個目錄樹結構,名稱是由斜杠 (/) 分隔的一系列路徑元素。ZooKeeper 命名空間中的每個節點都由路徑標識。
ZooKeeper 層級樹狀結構
根節點 / 包含兩個節點(/modele1 & /module2),其中節點 /module1 包含三個子節點(/module1/app1 & /module1/app2 & /module1/app3),在Zookeeper 中,節點以絕對路徑表示,不存在相對路徑,出了根節點以外,其他節點不能以 / 結尾。
特性
資源共享: 例如存儲空間,計算能力,數據,和服務等等
擴展性: 從軟件和硬件上增加系統的規模
並發性: 多個用戶同時訪問
性能: 確保當負載增加的時候,系統想要時間不會有影響
容錯性: 盡管一些組件暫時不可用了,整個系統仍然是可用的
API抽象: 系統的獨立組件對用戶隱藏,僅僅暴露服務
Zookeeper的角色
- 領導者(leader) :負責進行投票的發起和決議,更新系統狀態
- 學習者(learner) :包括跟隨者(follower)和觀察者(observer),follower用於接受客戶端請求並想客戶端返回結果,在選主過程中參與投票
- Observer :可以接受客戶端連接,將寫請求轉發給leader,但observer不參加投票過程,只同步leader的狀態,observer的目的是為了擴展系統,提高讀取速度
- 客戶端(client) :請求發起方
保證
順序一致性: 客戶端的更新將按發送順序應用。
原子性: 更新成功或失敗,沒有部分結果。
統一視圖: 無論服務器連接到哪個服務器,客戶端都將看到相同的服務視圖。即,即使客戶端故障轉移到具有相同會話的不同服務器,客戶端也永遠不會看到系統的舊視圖。
可靠性: 一旦應用更新了,它將從那時起一直存在,直到客戶端覆蓋更新。
及時性: 系統的客戶視圖保證在特定時間范圍內是最新的。
Znode 節點
Znode有兩種類型: 持久節點和臨時節點 ,Znode的類型在創建的之后就不能在進行修改了。
臨時節點
臨時節點在客戶端會話結束的時候,Zookeeper 會將臨時節點(znode)刪除,並且臨時節點不能有子節點。利用臨時節點的特性,我們可以使用臨時節點來進行集群管理以及發現服務的上下線等。
創建臨時節點命令:create -e /module1/app1 app1
創建一個臨時節點為 “/module1/app1” ,數據為 “app1”
持久節點
持久節點不依賴於客戶端會話,只有當客戶端明確要刪除持久節點(znode)的時候才會被刪除
創建臨時節點命令:create /module1 module1
創建一個臨時節點為 “/module1” ,數據為 “module1”
順序節點
ZooKeeper 中還提供了一種順序節點的類型,每次創建順序節點時候,ZooKeeper 都會在路徑后面自動添加10為的數據中,例如
0000000001 計數器會保證在同一父節點下唯一,創建節點的時候會添加順序,常見分布式鎖。
順序節點只是節點的一種特性,也就說不管是 持久節點還是 臨時節點 都可以設置為順序節點,所以Znode類型可以理解為 4種類型:
- 持久節點
- 臨時節點
- 持久順序節點
- 臨時順序節點
創建順序節點命令(加上 “-s”參數):create -s /module1/app app
我們會看到 Created /module1/app0000000001
意思是我們創建了一個持久順序節點“/module1/app0000000001” 如果再執行上面命令 會生成節點 “/module1/app0000000002”,同理 如果我們 create -s
后面添加 -e 參數,就表示我們創建了一個臨時節點。
節點數據
- 創建節點的時候,我們可以指定節點中存儲的數據,ZooKeeper可以保證讀寫都是原子操作,而且每次讀寫操作都是對數據的完整讀取或者完成寫入,不提供對數據的部分讀取或者寫入操作。
- ZooKeeper 雖然提供了節點存儲數據的功能,但是我們並不能把它當成一個數據庫,重點不要把Zookeeper 當成數據庫用,因為Zookeeper 規定了節點的數據大小不能超過1M,所以我們不能在節點上存儲過多的數據,盡可能保證小的數據量,因為數據過大,會導致ZK的性能下降。
- 如果確實需要存儲大量的數據,一般可以在分布式數據庫或者Redis保存這部分數據,然后在Znode中保留數據庫中的索引。
Zookeeper單機模式安裝
java 環境
配置JAVA環境,檢驗環境 java -version
下載安裝Zookeeper
下載地址:https://zookeeper.apache.org/releases.html
下載解壓Zookeeper
cd /usr/local/
get https://dlcdn.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz
cd zookeeper-3.7.0/
重命名配置文件 zoo_sample.cfg
cp conf/zoo_sample.cfg conf/zoo.cfg
啟動ZK
./bin/zkServer.sh start
連接ZK客戶端
./bin/zkCli.sh
當我們看到下圖的信息的時候,表示我們啟動成功
Zookeeper命令
基本命令
- create : 在樹中的某個位置創建一個節點
- delete : 刪除一個節點存在:測試節點是否存在於某個位置
- get data : 從節點讀取數據
- set data: 將數據寫入節點
- get children : 檢索節點的子節點列表
- sync : 等待數據被傳播
操作Zookeeper
- 查看Zookeeper中包含的key
ls /
- 創建一個新的Znode
創建成功以后我們可以使用ls /
查看我們創建的內容
create /zkMxn muxiaonong
ls /
[zkMxn, zookeeper]
get
命令獲取創建Znode的內容
get /zkMxn
- set 命令來對 zk 所關聯的字符串進行設置
set /zkMxn mxn666
- 刪除Znode
delete /zkMxn
Java Api操作 ZK
1. 導入Jar包
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
<!--junit單元測試-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>runtime</scope>
</dependency>
2. API操作Zookeeper
創建Zookeeper對象
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
this(connectString, sessionTimeout, watcher, false);
}
- connectString: 連接的地址,包括主機名和端口號,多個的話用逗號隔開
- sessionTimeout: 等待客戶端通信的最長時間,客戶端如果超過這個時間沒有和服務端進行通信,那么就認為該客戶端已經終止,一般設置值為 5-10秒,單位為毫秒
- watcher: 監聽器,用於接收會話事件的接口,需要自己定義,實現process()方法
連接Zookeeper
Zookeeper zkClient = "";
String connectStr = "192.168.2.1:2181";
zkClient = new ZooKeeper(connectStr, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) { }
});
創建節點
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException {}
- path: 節點路徑
- data: 節點數據
- acl: 節點權限,例如:
ZooDefs.Ids.OPEN_ACL_UNSAFE
OPEN_ACL_UNSAFE:完全開發,采用world驗證模式,由於每個ZK連接都有world驗證模式,所以當我們節點設置了該參數時,對所有連接開放
CREATOR_ALL_ACL: 創建該Znode連接的擁有所有權限,這里采用的是auth驗證模式,用sessionID做驗證,如果設置了該參數,只有創建改Znode節點的連接才能對這個Znode進行任何操作
READ_ACL_UNSAFE:所有的客戶端都可讀,這里采用world驗證模式,和第一條同理,所有連接都可以讀取該znode
- createMode: 節點類型,例如:
CreateMode.PERSISTENT
PERSISTENT:持久節點
PERSISTENT_SEQUENTIAL:持久有序節點
EPHEMERAL:短暫節點
EPHEMERAL_SEQUENTIAL:短暫有序節點
完整APIDemo:
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;
/** @Author mxn
* @Description //TODO ZooKeeper Java API測試
* @Date 10:22 2021/9/29
* @Param
* @return
**/
@Slf4j
public class ZookeeperTest {
// IP 和端口
private final static String ipAddress = "192.168.2.123:2181";
public static void main(String[] args) {
ZookeeperTest test = new ZookeeperTest();
String key = "/zkMxn";
String value = "wo is muxiaonong";
//創建Znode
test.add(key,value);
// 獲取節點數據
// test.get(key);
//修改節點數據
// test.modify(key,"wo is zhuzhuxia");
//刪除節點
// test.delete(key);
}
/**
* @return
* @Author mxn
* @Description //TODO 獲取ZooKeeper連接
* @Date 10:22 2021/9/29
* @Param
**/
public static ZooKeeper getConntection() {
ZooKeeper zooKeeper = null;
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
//watch機制(回調),監聽是否連接成功
zooKeeper = new ZooKeeper(ipAddress, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
//如果受收到了服務端的響應事件,連接成功
countDownLatch.countDown();
}
}
});
countDownLatch.await();
log.info("zookeeper狀態:{}",zooKeeper.getState());//CONNECTED
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return zooKeeper;
}
/** @Author lyy
* @Description //TODO 關閉ZooKeeper連接
* @Date 14:57 2021/9/29
* @Param
* @return
**/
public static void closeConnection(ZooKeeper zooKeeper) {
try {
// zooKeeper.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/** @Author lyy
* @Description //TODO 添加節點
* @Date 13:36 2021/9/29
* @Param
* @return
**/
public void add(String key ,String value) {
ZooKeeper zooKeeper = ZookeeperTest.getConntection();
try {
//參數類型
//1.key
//2.value
//3.對應的ACL,當前節點的權限控制
//4.設置當前節點類型
zooKeeper.create(key, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
ZookeeperTest.closeConnection(zooKeeper);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/** @Author lyy
* @Description //TODO 獲取節點信息
* @Date 14:57 2021/9/29
* @Param
* @return
**/
public void get(String key) {
ZooKeeper zooKeeper = ZookeeperTest.getConntection();
Stat stat = new Stat();
String data = null;
try {
byte[] bytes = zooKeeper.getData(key, null, stat);
data = new String(bytes, "gbk");
log.info("當前節點信息:{}",data);
ZookeeperTest.closeConnection(zooKeeper);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
/** @Author lyy
* @Description //TODO 修改節點信息
* @Date 14:57 2021/9/29
* @Param
* @return
**/
public void modify(String key,String newValue) {
ZooKeeper zooKeeper = ZookeeperTest.getConntection();
Stat stat = new Stat();
//version樂觀鎖概念,此處需要獲取version信息,則需要先get拿到節點信息
try {
//獲取節點(修改需要version信息)
zooKeeper.getData(key, null, stat);
//再修改
zooKeeper.setData(key, newValue.getBytes(), stat.getVersion());
ZookeeperTest.closeConnection(zooKeeper);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/** @Author lyy
* @Description //TODO 刪除節點
* @Date 14:57 2021/9/29
* @Param
* @return
**/
public void delete(String key) {
ZooKeeper zooKeeper = ZookeeperTest.getConntection();
Stat stat = new Stat();
try {
//獲取節點(刪除需要version信息)
zooKeeper.getData(key, null, stat);
//刪除節點
zooKeeper.delete(key, stat.getVersion());
ZookeeperTest.closeConnection(zooKeeper);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
總結
到這里我們對Zookeeper 大概有個入門級的了解了,不過Zookeeper遠遠比我們這里講述的功能多,如何用Zookeeper實現集群管理、分布式鎖,隊列等等,小農會在后面的文章中進行講解,關注我,后續精彩內容第一時間推送。
我是牧小農,怕什么真理無窮,進一步有進一步的歡喜,大家加油!!!