Zookeeper是一個分布式服務協調組件,是Hadoop、Hbase、Kafka重要的依賴組件,為分布式應用提供一致性服務的組件。
Zookeeper是Hadoop、HBase、Kafka的重要依賴組件。
Zookeeper主要包含文件系統以及通知機制兩個部分。
2.模型
2.1 Zookeeper的文件系統
Zookeeper維護了一個類似文件系統的數據結構,有根目錄和若干個子目錄 (樹形結構 , 與Linux類似 )
每個目錄都稱為一個znode,每個znode都包含自身節點的數據,同時每個znode下可以包含多個子znode。
在創建znode時必須指定znode的數據,可以為null。
znode保存的數據存在版本號,當進行更新操作時版本號會+1。
刪除znode時,若該znode包含子znode,那么必須先刪除所有的子znode,否則無法刪除。
當使用JAVA進行更新和刪除操作時,需要傳遞數據的版本號,其內部進行CAS判斷,當且僅當傳遞的版本號與當前節點的版本號相同時,才進行操作。
ZNode類型
持久化節點:無論客戶端的連接是否斷開,節點以及節點中的數據都會被保留。 持久化節點並順序編號:在持久化節點的基礎上,對節點進行順序編號(編號最大為Integer.MAX_SIZE,創建/p持久化節點並順序編號時,Zookeeper將把節點命名為/p1,當再次創建/p節點時,自動命名為/p2) 臨時節點:當客戶端的連接斷開,節點以及節點中的數據將會被刪除。 臨時節點並順序編號:在臨時節點的基礎上,對節點進行順序編號。
2.2 Zookeeper的通知機制
客戶端可以監聽它所關注的節點,當節點的狀態發生變化時(比如數據的版本號發生改變、節點被刪除、節點添加子節點或者刪除子節點),Zookeeper將會通知客戶端。
Zookeeper並不是根據節點的值發生改變而觸發通知的,而是根據節點中數據的版本號,所以修改后的內容相同那么一樣會進行通知。
2.3 Zookeeper集群
當搭建了一個Zookeeper集群后,Zookeeper會根據選舉算法,從多個ZK Server中選取一個作為Leader,剩余的作為Follower,Leader會與各個Follower之間建立一個有效的長連接,以保證各個節點的通信正常。
當某個節點收到修改操作時,首先會把請求轉發給Leader,有Leader負責數據的讀寫操作然后再把修改同步給所有的Follower節點,一旦Leader節點掛了,那么會從剩余的Follower節點中選舉一個新的Leader節點(重新選取的時間很短,大概200ms)
2.4 關於Zookeeper集群的腦裂
Zookeeper集群中存在一個Leader,多個Follower,Leader與Follower之間將會通過心跳機制來確保其還存活着(Leader會定期向所有的Follower發送心跳)
如果集群中有部分的Follower由於網絡等問題無法與Leader進行通訊,那么它將收不到Leader的心跳,將會認為Leader已經死亡,但是它們之間是可以互相進行訪問的,此時它們將會組成一個新的集群,選舉出一個新的Leader,那么此時將會有兩個ZK集群,每個集群中都有一個Leader節點。
Zookeeper實際上是不會發生腦裂現象的,由於ZK內部有一個過半機制,也就是說集群至少要有超過一半的節點存活時才能夠對外提供服務。
假設Zookeeper集群中存在5個節點(一個Leader,四個Follower),那么集群中至少要有3個節點存活時才能夠對外提供服務,如果此時有2個Follower或者3個Follower無法與Leader進行通訊,由於存在過半機制,最終也只會保留一個ZK集群,另一個集群將會由於不滿足過半機制而無法建立。
Zookeeper提供的過半機制就是用來解決集群腦裂問題的。
為什么搭建集群時一定要遵循2n+1個節點?
如果存在5個節點的集群,掛了2個,集群仍然能夠運行。
如果存在6個節點的集群,掛了2個,集群仍然能夠運行。
如果存在5個節點的集群,不管有多少個Follower節點與Leader節點的網絡被隔離,集群都能夠正常運行。
如果存在6個節點的集群,如果有3個Follower節點與Leader節點的網絡被隔離,那么集群將不可用(極端情況)
3.Zookeeper的應用場景
3.1 使用Zookeeper作為配置中心
可以將一些重要的,可以動態配置的信息放在ZK當中,然后各個應用都監聽這些節點,當配置更新時,將會通過通知機制去通知各個應用,然后再把節點的值更新到本地的靜態變量當中。 1.自定義注解。 2.使用自定義的BeanPostProcesser,在Bean的初始化后調用指定的方法。 3.通過反射,獲取Bean的類信息,判斷類中的屬性和方法是否有被自定義的注解所標注。 4.如果有則獲取注解,然后根據注解中的Key屬性監聽節點,當節點的值發生改變時,設置Field或者調用Method,更新到本地的靜態變量當中。 5.如果節點不存在則創建節點(觸發通知)
3.2 使用Zookeeper作為注冊中心
1.Provider和Consumer在啟動時會連接ZK,建立一個有效的長連接。 2.Provicer會把服務注冊到ZK當中,並且在/dubbo對應API下的Provider節點下創建臨時節點。 3.Consumer會從ZK中獲取服務列表,然后緩存到本地,並且在/dubbo對應API下的Consumer節點下創建臨時節點。 4.當服務列表發生改變時,也就是新增或減少節點時,能夠通過通知機制去通知Consumer,更新服務列表。 5.當ZK掛了,Consumer仍然可以訪問Provider,走本地,只不過當服務列表發生改變時,無法再通過通知機制去通知Consumer,因此可能會訪問一個已經下線的節點。
3.3 使用Zookeeper作為本地緩存的新增和更新策略(廣播)
應用程序都是以集群的方式進行部署的,每個應用都有本地緩存,當信息新增和更新時需要更新本地緩存的值,此時可以利用ZK。
各個應用都監聽ZK上的一個節點,當信息新增或更新時,更新節點的值,值為記錄的id,那么各個應用都會收到通知,然后根據這個id從數據庫中進行查詢,更新本地緩存。
*如果應用掛了,當它重啟后無法獲取到在它掛了的這段時間內所有已經發生的通知(首次監聽節點會觸發一次節點數據發生改變的事件)
*如果ZK由於自身的原因沒有進行通知,那么對於被通知者來說是沒有感知的。
3.4 使用Zookeeper作為分布式鎖
分布式鎖有兩種類型,分別是保持獨占和控制時序。
保持獨占,即當有多個線程同時申請獲取鎖時,只有一個線程能夠獲取成功,其他線程直接返回(通過Redis中的一個Key或者Zookeeper中的一個節點進行控制)
控制時序,即所有申請獲取鎖的線程最終都能夠獲取到鎖然后執行指定的邏輯(利用Zookeeper的通知機制)
Zookeeper可以實現保持獨占和控制時序兩種類型的分布式鎖
保持獨占(通過Zookeeper中的一個臨時節點)
1.當線程要獲取鎖時,創建一個臨時節點,如果創建失敗,則表示鎖已經被其他線程所持有。
2.如果創建成功則表示獲取了鎖,然后進行業務處理,當處理完畢后釋放鎖,刪除該節點。
控制時序(利用通知機制)
首先創建一個locker持久化節點。 1.當線程要獲取鎖時,需要在locker持久化節點下創建順序編號的臨時節點。 2.然后獲取locker節點下的所有子節點,判斷剛創建的臨時節點的編號在locker的子節點中是否是最小的。 3.如果是最小的,則表示獲取鎖成功,那么進行業務處理,當處理完畢后刪除該節點。 4.如果不是最小的,則找到它的前一個節點,然后對它進行監聽,建立Watch。 5.當節點被刪除時將會通知正在監聽它的節點,此時其他線程就獲取到鎖。
*可以直接使用Curator提供的acquire()和release()方法來使用ZK的分布式鎖。
使用Redis作為分布式鎖與使用ZK作為分布式鎖有什么區別?
1.Redis只能實現保持獨占的分布式鎖,而Zookeeper可以實現保持獨占和控制時序兩種類型的分布式鎖。
2.從性能上來說,Redis的加鎖、解鎖的效率以及抗並發的能力都要比Zookeeper的高。
3.從實現上來說,Redis要做很多特殊的處理,比如如何保證獲取鎖的同步性、當應用宕機時如何保證鎖能夠被釋放、如何保證釋放的鎖是自己的,ZK沒有這些問題(當應用宕機時臨時節點將會被刪除)
4.搭建Zookeeper
1.安裝
由於Zookeeper是由java語言編寫的,因此在安裝Zookeeper之前需要安裝好JDK,並且配置環境變量JAVA_HOME
從Zookeeper官網下載zk並進行解壓
bin目錄
zkEnv.sh:用於配置zk服務啟動時的環境變量 (包括加載配置文件的路徑等)
zkServer.sh:用於啟動zk服務,默認監聽2181端口。
zkCli.sh:用於啟動zk客戶端。
zookeeper.out:用於存放zk運行時的日志。
conf目錄
log4j.properties文件:日志配置文件,默認日志信息都將打印到bin目錄下的zookeeper.out文件 (當使用Zookeeper遇到異常時應該查看此文件下的內容)
zoo_sample.conf文件:zk server的配置文件,zk服務啟動時默認會加載conf目錄下的zoo.cfg配置文件
2.修改配置文件
Zookeeper啟動時會默認加載conf目錄下的zoo.cfg配置文件,因此將conf目錄下的zoo_sample.conf配置文件更名為zoo.cfg。
#initLimit、syncLimit的單位,值是毫秒 tickTime=2000 #集群搭建前所允許的初始化時間 initLimit=10 #Leader發送心跳給Follower,Follower向Leader回復心跳這一過程所允許的最大時長 (rtt,往返時間),一旦超過了這個時間,Leader則認為該Follower宕機。 syncLimit=5 #快照日志的存放目錄 dataDir=/usr/Zookeeper/Zookeeper-3.4.6/zkdata #事務日志的存放目錄 dataLogDir=/usr/Zookeeper/Zookeeper-3.4.6/zklog #zookeeper服務的監聽端口,默認為2181 clientPort=2181
當其中一台zk節點啟動后,剩余的zk節點必須在initLimit規定的時間內全都啟動,否則zk在進行集群的搭建時會認為未啟動的zk節點已經失效。
如果不配置dataLogDir,那么事務日志將寫入到dataDir目錄下 (會嚴重影響zk的性能)
3.啟動Zookeeper
使用zkServer.sh命令啟動Zookeeper服務。
zkServer.sh start
使用jps命令查詢zk進程是否啟動成功,當出現QuorumPeerMain表示zk啟動成功。
4.Zookeeper的基本命令
#連接zk server,默認本機,使用-server選項指定遠程zk服務 zkcli.sh -server 192.168.1.80:2181 #創建ZNode,默認為持久化節點,使用-e選項表示臨時節點,使用-s選項表示順序編號 create -e -s path data #查看ZNode下的子ZNode ls path #獲取ZNode保存的數據 get path #設置ZNode保存的數據 set path data #刪除ZNode(不能遞歸刪除) delete path #退出客戶端 quit
5.搭建Zookeeper集群
1.修改配置文件
在conf文件中添加集群的配置,使用server.num配置集群信息(num必須為整數)
#基礎配置 tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/Zookeeper/Zookeeper-3.4.6/zkdata dataLogDir=/usr/Zookeeper/Zookeeper-3.4.6/zklog clientPort=2181 #集群配置 server.1=192.168.1.119:2888:3888 server.2=192.168.1.122:2888:3888 server.3=192.168.1.125:2888:3888
server.本機節點標識 = host:leader和follower之間的通訊端口:leader的選舉端口
server.其他節點標識 = host:leader和follower之間的通訊端口:leader的選舉端口
在同一個集群中,節點的標識不能夠重復。
Leader和Follower之間的通訊端口默認為2888,Leader的選舉端口默認為3888。
2.創建myid文件
在快照日志目錄下創建myid文件,文件中的值是本機zk節點的唯一標識(必須為整數)
#將1輸入到myid文件中
echo "1" > myid
每個節點都需要這么配置。
3.啟動各個Zookeeper節點
zkServer.sh start
必須要在 initLimit * tickTime 的時間內啟動集群中的所有節點,否則集群有可能搭建失敗。
4.查詢集群中各個節點的狀態
zkServer.sh status
5.注意事項
1.搭建Zookeeper集群時需要遵循2n+1個節點,因為Zookeeper內部有過半機制,當集群中超過一半的節點存活時,那么Zookeeper就能夠對外提供服務。
2.搭建Zookeeper集群時需要關閉防火牆或者開放對應的端口,否則集群中節點之間無法進行通訊。
3.Zookeeper集群在高負荷的工作時會產生大量的事務日志,如果日志長期不進行清理容易將分區中的空間占滿導致服務無法運行,因此需要定期清理zk產生的事務日志(可以借助Linux的crontab命令定期刪除ZK產生的事務日志)
6.Zookeeper可視化工具
ZK UI是一個Zookeeper的可視化平台
1.下載源碼(Maven項目),直接導入idea,然后使用mvn install進行打包,最終得到一個jar包。
https://github.com/DeemOpen/zkui
2.修改config.cfg配置文件
#zuui端口 serverPort=9090 #zk服務地址,多個使用逗號分隔 zkServer=localhost:2181
3.啟動jar包
java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar
4.訪問ZK UI,http://localhost:9090,用戶名/密碼默認為 admin/manager
7.JAVA中操作Zookeeper
可以使用apache提供的zookeeper客戶端或者curator framework來操作ZK。
1.導入Maven依賴
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.13</version> <type>pom</type> </dependency>
2.建立連接
ZooKeeper(StringconnectString, intsessionTimeout, Watcherwatcher) throwsIOException
connectString:ZK服務地址,多個使用逗號分隔。
sessionTimeout:連接Zookeeper的超時時間。
watcher:事件回調接口。
*ZooKeeper實例是通過異步的方式來建立連接的,當連接建立后會調用Watcher的process方法,因此程序為了保證同步建立連接,可以使用CountDownLatch進行控制。
3.調用Zookeeper提供的API
//創建ZNode,指定節點的路徑、數據、ZNode類型 publicStringcreate(finalStringpath, bytedata[], List<ACL>acl,CreateModecreateMode) //獲取ZNode下的子ZNode publicList<String>getChildren(finalStringpath, booleanwatch) //判斷ZNode是否存在 publicStatexists(Stringpath, booleanwatch) //獲取ZNode保存的數據 publicbyte[] getData(Stringpath, booleanwatch, Statstat) //設置ZNode的數據 publicStatsetData(finalStringpath, bytedata[], intversion) //刪除ZNode publicvoiddelete(finalStringpath, intversion)
節點類型
CreateMode.PERSISTENT:持久化節點
CreateMode.PERSISTENT_SEQUENTIAL:持久化節點並順序編號
CreateMode.EPHEMERAL:臨時節點
CreateMode.EPHEMERAL_SEQUENTIAL:臨時節點並順序編號
*當進行更新和刪除操作時,需要傳遞數據的版本號,其內部會進行CAS判斷,當且僅當傳遞的版本號與當前節點的版本號相同時,才進行操作。
*Apache Zookeeper API中有很多方法都支持Watcher參數,Watcher用於監聽節點的狀態,當節點的狀態發生改變時將會調用Watcher的process方法進行處理。
完整示例
/** * @Auther: ZHUANGHAOTANG * @Date: 2018/11/12 14:55 * @Description: */ publicclassZKUtils{ /** * 日志輸出 */ privatestaticLoggerlogger=LoggerFactory.getLogger(ZKUtils.class); /** * ZK服務列表 */ privatestaticfinalStringURLS="192.168.1.80:2181,192.168.1.81:2181,192.168.1.83:2181"; /** * 連接Zookeeper的超時時長(單位:毫秒) */ privatestaticfinalintSESSION_TIMEOUT=3000; /** * Zookeeper連接對象 */ privatestaticZooKeeperzk=null; static{ try{ CountDownLatchcountDownLatch=newCountDownLatch(1); zk=newZooKeeper(URLS, SESSION_TIMEOUT, newWatcher() { @Override publicvoidprocess(WatchedEventevent) { if(Event.KeeperState.SyncConnected==event.getState()) { countDownLatch.countDown();//倒數器-1 } } }); countDownLatch.await(); } catch(Exceptione) { logger.info("Zookeeper獲取連接失敗,{}", e); } } /** * 創建節點 * * @param path * @param data * @param createMode * @throws Exception */ publicstaticvoidcreatePath(Stringpath, Stringdata, CreateModecreateMode) throwsException{ if(StringUtils.isBlank(path)) { thrownewException("path is null"); } if(!exists(path)) { zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode); } } /** * 獲取子節點 * * @param path * @return * @throws Exception */ publicstaticList<String>getSubNode(Stringpath) throwsException{ if(StringUtils.isBlank(path)) { thrownewException("path is null"); } returnzk.getChildren(path, false); } /** * 判斷節點是否存在 * * @param path * @return * @throws Exception */ publicstaticbooleanexists(Stringpath) throwsException{ if(StringUtils.isBlank(path)) { thrownewException("path is null"); } if(zk.exists(path, false) !=null) { returntrue; } returnfalse; } /** * 獲取節點中的數據 * * @param path * @return * @throws Exception */ publicstaticStringgetData(Stringpath) throwsException{ if(StringUtils.isBlank(path)) { thrownewException("path is null"); } returnnewString(zk.getData(path, false, null)); } /** * 更新節點中的數據 * * @param path * @param data * @throws Exception */ publicstaticvoidsetData(Stringpath, Stringdata) throwsException{ if(StringUtils.isBlank(path)) { thrownewException("path is null"); } zk.setData(path, data.getBytes(), -1); } /** * 刪除節點 * * @param path * @throws Exception */ publicstaticvoiddeletePath(Stringpath) throwsException{ if(StringUtils.isBlank(path)) { thrownewException("path is null"); } zk.delete(path, -1); } }