提到Zookeeper,不得不先聊聊分布式協調技術
一、什么是分布式協調技術
分布式協調技術 主要用來解決分布式環境當中多個進程之間的同步控制,讓他們有序的去訪問某種臨界資源,防止造成"臟數據"的后果。
那么怎么對這些進程進行調度呢?
這時候我們就需要一個協調器,來讓他們有序的來訪問這個資源。這個協調器就是我們經常提到的那個鎖。通過這個鎖機制,我們就能保證了分布式系統中多個進程能夠有序的訪問該臨界資源。那么我們把這個分布式環境下的這個鎖叫作分布式鎖。但是因為其運行所在的環境存在網絡延遲等不可靠因素的,導致對數據的處理存在許多困難。目前處理分布式協調技術比較好的有Chubby(Google產品,收費)和Zookeeper(Apache產品,免費)。
二、什么是Zookeeper
ZooKeeper是一種為分布式應用所設計的高可用、高性能且一致的開源協調服務,它提供了一項基本服務:分布式鎖服務。由於ZooKeeper的開源特性,后來我們的開發者在分布式鎖的基礎上,摸索了出了其他的使用方法:配置維護、組服務、分布式消息隊列、分布式通知/協調等。
ZooKeeper性能上的特點決定了它能夠用在大型的、分布式的系統當中。從可靠性方面來說,它並不會因為一個節點的錯誤而崩潰。除此之外,它嚴格的序列訪問控制意味着復雜的控制原語可以應用在客戶端上。ZooKeeper在一致性、可用性、容錯性的保證,也是ZooKeeper的成功之處。
三、Zookeeper特性
1、全局數據一致:每個server保存一份相同的數據副本,client無論連接到哪個server,展示的數據都是一致性的,當客戶端操作一個節點的文件時,其他兩個節點會隨之更新,這樣保證了全局數據的一致性。
2、可靠性:如果消息被其中一台服務接受,那么將被所有的服務器接受。
3、順序性:包括全局有序和偏序兩種:全局有序是指如果在一台服務器上消息a在消息b前發布,則在所有Server上消息a都將在消息b前被發布;偏序是指如果一個消息b在消息a后被同一個發送者發布,a必將排在b前面。
4、數據更新原子性:一次數據更新要么成功,要么失敗。
四、Zookeeper部署
1、下載相對應的jar包
wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
2、安裝到指定目錄
tail -zxvf zookeeper-3.4.12.tar.gz
3、重命名配置文件
[oracle@bogon java]$ cd zookeeper-3.4.12/ [oracle@bogon zookeeper-3.4.12]$ cd conf [oracle@bogon conf]$ ll 總用量 16 -rw-rw-r--. 1 oracle oracle 535 3月 27 12:32 configuration.xsl -rw-rw-r--. 1 oracle oracle 2161 3月 27 12:32 log4j.properties -rw-rw-r--. 1 oracle oracle 922 3月 27 12:32 zoo_sample.cfg [oracle@bogon conf]$cp zoo_sample.cfg zoo.cfg
zookeeper默認讀取zoo.cfg配置文件
其實到這單機版的zookeeper就算安裝完畢了,如果為了方便管理,可以為其配置環境變量,這里不做演示
4、啟動
[root@bogon bin]# ./zkServer.sh start #啟動服務端 [root@bogon bin]# ./zkServer.sh status #查看狀態 [root@bogon bin]# ./zkCli.sh #啟動客戶端
五、關於zookeeper數據模型
在zookeeper下,其文件存儲類似於樹一樣具有層次結構,每個文件節點被稱之為Znode
1、每個Znode具有原子性
2、每個Znode存儲數據大小具有限制
3、Znode通過路徑引用,但是其路徑必須是絕對的,因此他們必須由斜杠字符來開頭
4、節點類型包含臨時節點和永久節點。臨時節點會話結束就自動刪除,臨時節點不允許擁有子節點。永久節點的生命周期不依賴與會話。
5、Znode具有序列化特性。隨着其創建,其附帶一個序列號。此序列號對於此及節點的父節點是唯一的,這樣便記錄了每個子節點創建的先后順序。
六、關於Zookeeper相關命令
如果直接輸入zkCil.sh 他會自動匹配本機的zookeeper客戶端,Zookeeper本質就是一個小型的文件存儲系統。
啟動一般語法:./zkCli.sh -timeout 0 -r -server ip:port
如:./zkCli.sh -timeout 3000 -server h1:2181,表示連接到主機h1 超時時間為3秒
1、查詢
ls 語法:ls path [watch] 列出指定節點
[zk: localhost:2181(CONNECTED) 4] ls / #遍歷根目錄 [zookeeper, QQQs, test] [zk: localhost:2181(CONNECTED) 6] ls /test #遍歷根目錄下子節點test [test1]
stat 語法:stat path [watch]
列出指定節點的狀態信息,或者說是元數據
[zk: localhost:2181(CONNECTED) 8] stat / cZxid = 0x0 #節點被創建時的事務ID ctime = Thu Jan 01 08:00:00 CST 1970 #節點被創建的時間 mZxid = 0x0 #最近一次更新的時的事務ID mtime = Thu Jan 01 08:00:00 CST 1970 #最近一次更新的時間 pZxid = 0x1a5 #該節點的子節點列表最近一次被修改的事務ID cversion = 99 #子節點的版本號 dataVersion = 0 #數據版本 aclVersion = 0 #ACL版本號 ephemeralOwner = 0x0 #創建臨時節點的事務ID,如果是持久節點,則該節點為0x0 dataLength = 0 #當前節點的數據長度 numChildren = 3 #當前節點的子節點數目
get 語法:get path [watch]
列出指定節點的數據
[zk: localhost:2181(CONNECTED) 10] get /test
demo
ls2 語法:ls2 path [watch]
是ls的升級版,列出子節點的同時列出節點的狀態信息
[zk: localhost:2181(CONNECTED) 11] ls2 /test [test1] cZxid = 0x1a5 ctime = Thu Jun 07 08:26:10 CST 2018 mZxid = 0x1a5 mtime = Thu Jun 07 08:26:10 CST 2018 pZxid = 0x1a6 cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 4 numChildren = 1
history 查看歷史
History命令可以查看先前執行過的所有的命令,它一般與redo配合使用
[zk: localhost:2181(CONNECTED) 5] history 0 - ls -l 1 - ls / 2 - delquota -n /testquota 3 - ls / 4 - listquota /testquota 5 - history
redo 重新執行先前命令,根據行數執行
[zk: localhost:2181(CONNECTED) 6] redo 1 #redo重新執行先前命令,根據行數執行 [testquota, zookeeper, hellozk]
2、創建
create 語法:create [-s] [-e] path data acl
其中:括號中時可選的,s表示創建永久節點,e表示創建臨時節點,acl表示訪問控制列表
[zk: localhost:2181(CONNECTED) 2] create /test demo Created /test [zk: localhost:2181(CONNECTED) 5] create /test/test1 demo1 Created /test/test1
3、修改
set 語法:set path data [version]
[zk: localhost:2181(CONNECTED) 12] ls / [zookeeper, QQQs, test] [zk: localhost:2181(CONNECTED) 13] set /test demo2 cZxid = 0x1a5 ctime = Thu Jun 07 08:26:10 CST 2018 mZxid = 0x1a7 mtime = Thu Jun 07 08:45:54 CST 2018 #修改時間與修改時間不一致 pZxid = 0x1a6 cversion = 1 dataVersion = 1 #數據版本號為1,表示被更改一次 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 5 numChildren = 1
4、刪除
delete 語法:delete path [version]
只能刪除不含子節點的節點
[zk: localhost:2181(CONNECTED) 12] ls / [zookeeper, QQQs, test] [zk: localhost:2181(CONNECTED) 14] delete /QQQs [zk: localhost:2181(CONNECTED) 15] ls / [zookeeper, test]
rmr 語法:rmr path
能遞歸刪除節點
[zk: localhost:2181(CONNECTED) 15] ls / [zookeeper, test] [zk: localhost:2181(CONNECTED) 16] rmr /test [zk: localhost:2181(CONNECTED) 17] ls / [zookeeper]
5、增加限制
setquota 語法:setquota -n | -b val path
其中:n表示子節點的最大個數;b表示數據值的最大長度;val表示子節點最大個數或者數據值的最大長度;path表示節點路徑
[zk: localhost:2181(CONNECTED) 30] create /testquota 123 #創建具有限制屬 性節點 Created /testquota [zk: localhost:2181(CONNECTED) 35] setquota -n 3 /testquota #設置最大子節點為3 數為3 Comment: the parts are option -n val 3 path /testquota [zk: localhost:2181(CONNECTED) 36] listquota /testquota absolute path is /zookeeper/quota/testquota/zookeeper_limits Output quota for /testquota count=3,bytes=-1 #count:子節點最大數 byte:數據長度(-1表示沒有限制) Output stat for /testquota count=1,bytes=3 #count:當前子節點數(包含自己,1表示當前他沒有子節點)
注意:如果創建的節點數超過了限制數,創建過程中不會報警告,但是日志中會顯示超額
2018-05-05 17:03:56,025 [myid:3] - WARN [CommitProcessor:3:DataTree@302] - Quota exceeded: /testquota count=5 limit=3
Delquota
刪除節點限制
[zk: localhost:2181(CONNECTED) 2] delquota -n /testquota #刪除限制 [zk: localhost:2181(CONNECTED) 4] listquota /testquota absolute path is /zookeeper/quota/testquota/zookeeper_limits Output quota for /testquota count=-1,bytes=-1 #此時顯示-1表示沒有限制條件 Output stat for /testquota count=5,bytes=7
七、關於Zookeeper集群
Zookeeper集群搭建指的是Zookeeper分布式模擬安裝。通常由2n+1台服務器組成。這是因為為了保證leader選舉(基於Paxos算法的實現)能夠得到多數的支持,所以Zookeeper集群的數量一般為奇數。
1、編輯相對應的配置文件
[root@bogon conf]# pwd /usr/java/zookeeper-3.4.12/conf [root@bogon conf]# vim zoo.cfg
2、配置相關數據
1) 指定數據存儲目錄
dataDir=/usr/data/zkdata #指定數據存儲目錄
2) 添加配置Zookeeper服務器的地址即編號
##(心跳端口、選舉端口) server.1=jiaxianseng.host:2888:3888 server.2=jiaxianseng.host1:2888:3888 server.3=jiaxianseng.host2:2888:3888
3、創建數據存儲目錄,在虛擬機中創建相對應的編號
[root@localhost conf]# mkdir -p /usr/data/zkdata [root@localhost conf]#cd /usr/data/zkdata [root@localhost zkdata]#touch myid [root@localhost zkdata]# echo 1 >myid #此表示選舉當前第一台主機為leader
4、分發到另外兩台機器
$scp -r zookeeper-3.4.12/ oracle@jiaxianseng.host1:/usr/java/
$scp -r zookeeper-3.4.12/ oracle@jiaxianseng.host2:/usr/java/
5、分別開啟zookeeper,此時第一台服務器當選為leader,其創建的節點另外兩台可以收到
擴展:如果在本機上玩偽集群,需要注意:
1) 在當前文件夾下需要進行分包配置,設定為三台zookeeper機器
[oracle@localhost zookeeper-3.4.12]$ ll 總用量 12 drwxr-xr-x 10 root root 4096 5月 5 14:40 zk1 drwxr-xr-x 10 root root 4096 5月 5 14:42 zk2 drwxr-xr-x 10 root root 4096 5月 5 12:46 zk3
2) 配置文件設置
因為是在本機上玩,三台端口號應該不同
clientPort=2181
八、Zookeeper監聽機制
Zookeeper提供了分布式數據發布/訂閱功能。它允許客戶端向服務端注冊一個Watcher監聽。當服務端一些時間觸發了這個Watcher,那么就會向指定客戶端發送一個時間通知實現分布式的通知功能,其中節點的增刪改都可以觸發事件。
值得注意的是:Zookeeper監聽機制嚴格按照先注冊再監聽順序,且觸發事件監聽是一次性的。觸發一次發送回調事件情況后,下次觸發不會進行回調,需要重新注冊監聽。
具體實現如下:
1) 玩監聽:前提是先得支持監聽機制;其次是要注冊監聽,使用help命令進行查看
[zk: localhost:2181(CONNECTED) 7] help #使用help命令查看哪行命令支 持監聽 ZooKeeper -server host:port cmd args connect host:port get path [watch] ls path [watch] stat path [watch]
2) 先注冊監聽
[zk: localhost:2181(CONNECTED) 8] create /watchtest 123 #先注冊監聽 Created /watchtest
3) 查看是否被監聽
[zk: localhost:2181(CONNECTED) 10] get /watchtest watch #此節點被監聽
4) 利用第二台機器改變節點數據
[zk: localhost:2181(CONNECTED) 0] set /watchtest 456789
5) 查看第一台機器返回的信息
WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/watchtest
此時第一台會收到節點被改變的提示,但是當節點再次被其他機器改變時
第一台機器不會收到任何信息,說明監聽只會被觸發一次。如果想再次收到監聽信息,只能重新注冊監聽
九、關於Zookeeper Java API
1) 創建maven工程,並導入約束
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.12</version> </dependency>
2) 編寫測試類,連接zookeeper,並創建節點
public class TestZKClient { public static void main(String[] args) throws Exception{ //main快捷:psvm //構造JAVA zookeeper客戶端 //參數:1.連接ip+端口(可配置多個,用分號隔開) 2.time連接超時時間:默認值為30000 ZooKeeper zk=new ZooKeeper("192.168.174.133:2181,192.168.174.133:2182",30000,new Watcher(){ //這里就是事件通知的回調方法 public void process(WatchedEvent event) { System.out.println("事件通知類型"+event.getState()); System.out.println("事件發生類型"+event.getType()); System.out.println("事件發生路徑"+event.getPath()); } } ); /** * 參數1:節點名 ;參數2:數據 ; 參數3:acl權限控制,這里采用默認值 ;參數4:創建節點類型:這里是持久序列化節點 */ zk.create("/myCirls","性感的".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); zk.close(); } }
十、關於Zookeper分布式鎖應用
分布式鎖,這個主要得益於Zookeeper保證了數據的強一致性。鎖服務可以分為兩類,一個保持獨占,另一個是控制時序。
應用:在分布式環境高並發場景下,生產有一定業務含義的唯一的訂單編號
1、編寫服務類:
/** * 訂單編號服務 * @author Administrator * */ public class OrderCodeGenerator { //自增長序列 private static int i=0; //按照“年-月-日-小時-分鍾-秒-自增長序列”的規則生成訂單編號 public String getOrderCode(){ Date date=new Date(); SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-"); return sdf.format(date)+ ++i; } public static void main(String[] args) { OrderCodeGenerator ong=new OrderCodeGenerator(); for(int i=0;i<10;i++){ System.out.println(ong.getOrderCode()); } } }
2、編寫接口
/** * 定義訂單服務接口 * @author Administrator * */ public interface OrderService { void createOrder(); }
3、定義實現類
public class OrderServiceImpl implements OrderService{ //定義為靜態變量,保證訂單號唯一 private static OrderCodeGenerator ocg=new OrderCodeGenerator(); //創建訂單接口 @Override public void createOrder() { // TODO Auto-generated method stub String orderCode=null; //獲取訂單號 orderCode=ocg.getOrderCode(); System.out.println(Thread.currentThread().getName()+"========"+orderCode); } }
4、定義線程模擬多線程創建訂單
public class DistrbutDemo { public static void main(String[] args) { //模擬多個並發創建訂單 //並發數 int currs=10; //方式一:定義一個循環屏障:用於祖塞當前線程的,設置的參數為參與線程數,能保證設置每個線程統一完成每段步驟, //如線程一完成后進入等待,接着線程二完成后進入,等待其余線程完成,才能進入下一步 final CyclicBarrier cb=new CyclicBarrier(currs); /*方式二:定義一個倒計數儲存器:用於等待n個條件到達,每個線程完成數值減一 CountDownLatch cdl=new CountDownLatch(currs); cdl.countDown(); cdl.await();*/ for(int i=0;i<currs;i++){ new Thread(new Runnable() { @Override public void run() { //模擬創建訂單 OrderService os=new OrderServiceImpl(); //穿插線程阻塞 try { cb.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } os.createOrder(); } }).start(); } } }
此時運行代碼,可能會出現沒有10這個訂單號(創建了相同的訂單,說明多線程編程下出現了不安全的情況,此時應該加鎖)
打印結果如下:
Thread-5========2018-06-06-10-30-25-1 Thread-4========2018-06-06-10-30-25-6 Thread-3========2018-06-06-10-30-25-2 Thread-6========2018-06-06-10-30-25-8 Thread-0========2018-06-06-10-30-25-5 Thread-7========2018-06-06-10-30-25-4 Thread-8========2018-06-06-10-30-25-3 Thread-9========2018-06-06-10-30-25-9 Thread-1========2018-06-06-10-30-25-2 Thread-2========2018-06-06-10-30-25-7
5、方式一,加同步鎖
public class OrderServiceImpl implements OrderService{ //定義為靜態變量,保證訂單號唯一 private static OrderCodeGenerator ocg=new OrderCodeGenerator(); //創建訂單接口 @Override public void createOrder() { // TODO Auto-generated method stub String orderCode=null; //加同步鎖:保證線程安全 synchronized (ocg) { //獲取訂單號 orderCode=ocg.getOrderCode(); } System.out.println(Thread.currentThread().getName()+"========"+orderCode); } }
注意:這里同步鎖內參數不能使用this,因為在DistrbutDemo類中每次調用線程都重新new了一次,鎖的不是同一個對象,其不帶加鎖的目的(獲取的不是同一個對象的鎖),而用ocg是因為它是靜態變量,獲取的是同一個對象的鎖
方式二:加鎖lock
public class OrderServiceImpl implements OrderService{ //定義為靜態變量,保證訂單號唯一 private static OrderCodeGenerator ocg=new OrderCodeGenerator(); //使用lock一定設置為靜態變量,保證每個線程競爭的是同一把鎖 private static Lock lock=new ReentrantLock(); //創建訂單接口 @Override public void createOrder() { // TODO Auto-generated method stub String orderCode=null; try{//防止出現異常后鎖不能釋放,所以加try/catch lock.lock(); orderCode=ocg.getOrderCode(); }catch(Exception e){ e.printStackTrace(); }finally{ //記得釋放鎖 lock.unlock(); } System.out.println(Thread.currentThread().getName()+"========"+orderCode); } }
此時運行代碼,不會出現重復的訂單,是個完整的創建了10個訂單
分析:以上代碼確實能夠保證線程安全,但是此只能在單機下玩,如果將服務放在多台機器上調用,這里需要引入分布式鎖。
6、分布式鎖實現的技術
基於數據庫實現分布式鎖:
性能較差,容易出現單點故障
鎖沒有失效時間,容易死鎖
非阻塞式的
不可重入
基於緩存實現分布式鎖
鎖沒有失效時間,容易死鎖
非阻塞式的
不可重入
基於Zookeeper實現分布式鎖
實現相對簡單
可靠性高
性能較差
具體實現如下:
7、引入zk客戶端依賴
<!--對zookeeper客戶端進行了封裝 -->
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
8、編寫序列化類
public class MyZkSerializer implements ZkSerializer{ String charset="UTF-8"; //反序列化 @Override public Object deserialize(byte[] bytes) throws ZkMarshallingError { // TODO Auto-generated method stub try { return new String(bytes,charset); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block throw new ZkMarshallingError(e); } } //序列化 @Override public byte[] serialize(Object obj) throws ZkMarshallingError { // TODO Auto-generated method stub try { return String.valueOf(obj).getBytes(charset); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block throw new ZkMarshallingError(e); } } }
9、編寫zk連接監聽程序
public class ZkWatcherDemo { public static void main(String[] args) { ZkClient client=new ZkClient("192.168.174.128:2181"); client.setZkSerializer(new MyZkSerializer()); client.subscribeDataChanges("/testWatch", new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("----監聽到節點被刪除.."); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { // TODO Auto-generated method stub System.out.println("----監聽到數據變為:"+data); } }); try { //為了方便查看,等待2分鍾 Thread.sleep(2 * 60 * 1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
此時如果更改testWatch值,控制台會打印監聽的值
----監聽到數據變為:55
10、Zookeeper分布式鎖實現:
1) 定義分布式鎖:
//定義分布式鎖 public class ZKDistributeLock implements Lock{ /** * 利用zookeeper的同父子節點不可重名的特點來實現分布式鎖 * 加鎖的規則:去創建指定名稱的節點,如果能創建成功,則獲得鎖(加鎖成功),如果節點已存在,就標識鎖被別人獲取了 * 你就得阻塞,等待 * 鎖釋放規則:刪除指定名稱的節點 */ private String LockPath; private ZkClient client; public ZKDistributeLock(String lockPath) { super(); LockPath = lockPath; client=new ZkClient("192.168.174.128:2181"); client.setZkSerializer(new MyZkSerializer()); } @Override public boolean tryLock() { try{ this.client.createPersistent(LockPath);//創建永久節點 }catch(ZkNodeExistsException e){ return false; } return true; } @Override public void lock() { if(!tryLock()){ //阻塞等待 waitForLock(); //再次嘗試加鎖 lock(); } } private void waitForLock() { //怎么讓自己阻塞 final CountDownLatch cdl=new CountDownLatch(1); //注冊watch,好通知自己什么時候被喚醒 IZkDataListener listener=new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("----監聽到節點被刪除.."); cdl.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { // TODO Auto-generated method stub System.out.println("----監聽到數據變為:"+data); } }; //注冊該事件 client.subscribeDataChanges(LockPath, listener); //這里得判斷節點是否存在,否則會永久阻塞 if(this.client.exists(LockPath)){ try { cdl.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } try{ cdl.await(); }catch(InterruptedException e){ e.printStackTrace(); } client.unsubscribeDataChanges(LockPath, listener); } @Override public void unlock() { //刪除節點 //this.client.deleteRecursive(arg0)表示遞歸刪除 this.client.delete(this.LockPath); } @Override public void lockInterruptibly() throws InterruptedException { // TODO Auto-generated method stub } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { // TODO Auto-generated method stub return false; } @Override public Condition newCondition() { // TODO Auto-generated method stub return null; } }
2) 調用分布式鎖
public class OrderServiceImpl implements OrderService{ //定義為靜態變量,保證訂單號唯一 private static OrderCodeGenerator ocg=new OrderCodeGenerator(); //使用lock一定設置為靜態變量,保證每個線程競爭的是同一把鎖 private static Lock lock=new ZKDistributeLock("/testW"); //創建訂單接口 @Override public void createOrder() { // TODO Auto-generated method stub String orderCode=null; try{//防止出現異常后鎖不能釋放,所以加try/catch lock.lock(); orderCode=ocg.getOrderCode(); }catch(Exception e){ e.printStackTrace(); }finally{ //記得釋放鎖 lock.unlock(); } System.out.println(Thread.currentThread().getName()+"========"+orderCode); } }
打印結果如下:
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection). log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkEventThread). log4j:WARN Please initialize the log4j system properly. log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Thread-4========2018-06-07-10-52-29-1 ----監聽到節點被刪除.. ----監聽到數據變為:null Thread-7========2018-06-07-10-52-29-2 ----監聽到節點被刪除.. ----監聽到數據變為:null Thread-9========2018-06-07-10-52-29-3 ----監聽到節點被刪除.. ----監聽到數據變為:null Thread-1========2018-06-07-10-52-29-4 ----監聽到節點被刪除.. ----監聽到數據變為:null Thread-3========2018-06-07-10-52-29-5 ----監聽到節點被刪除.. ----監聽到數據變為:null Thread-8========2018-06-07-10-52-29-6 ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到數據變為:null Thread-0========2018-06-07-10-52-29-7 ----監聽到節點被刪除.. ----監聽到數據變為:null Thread-2========2018-06-07-10-52-29-8 ----監聽到節點被刪除.. ----監聽到數據變為:null Thread-6========2018-06-07-10-52-29-9 ----監聽到節點被刪除.. ----監聽到數據變為:null Thread-5========2018-06-07-10-52-29-10 ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到節點被刪除.. ----監聽到節點被刪除..
弊端問題:驚群效應
1、每次喚醒操作將所有線程喚醒了,其中一個搶到鎖后,其他沒搶到的又將進入阻塞等待狀態
2、客戶端無故接受了很多與自己無關的事件通知
3、利用持久節點創建的鎖存在死鎖的可能性!(當加鎖后突然業務重啟,釋放鎖未執行)
在集群規模較大的環境中帶來的危害:
1、巨大的服務器性能損耗 2、網絡沖擊 3、可能造成宕機
如打出很多----監聽到節點被刪除..這句話
解決:采用創建順序節點
改進創建的是臨時順序節點,每次都是最小的節點獲得鎖,其下一個節點(更小)注冊watcher
重寫分布式鎖(升級版)
//定義分布式鎖 public class ZKDistributeImproveLock implements Lock { /** * 利用zookeeper的同父子節點不可重名的特點來實現分布式鎖 * 加鎖的規則:去創建指定名稱的節點,如果能創建成功,則獲得鎖(加鎖成功),如果節點已存在,就標識鎖被別人獲取了 你就得阻塞,等待 * 鎖釋放規則:刪除指定名稱的節點 */ private String LockPath; private ZkClient client; // 需要將路徑設置為ThreadLocal類型,否則不能被線程並發去使用 private ThreadLocal<String> currentPath = new ThreadLocal<String>(); private ThreadLocal<String> beforePath = new ThreadLocal<String>(); public ZKDistributeImproveLock(String lockPath) { super(); LockPath = lockPath; client = new ZkClient("192.168.174.128:2181"); client.setZkSerializer(new MyZkSerializer()); if (!this.client.exists(LockPath)) { this.client.createPersistent(LockPath);// 先創建永久節點 } } @Override public boolean tryLock() { if (this.currentPath.get() == null) { // 注意:加鎖時創建的是臨時順序節點 currentPath.set(this.client.createEphemeralSequential(LockPath + "/", "aaa")); } // 獲得所有的子節點 List<String> children = this.client.getChildren(LockPath); // 進行下排序 Collections.sort(children); // 判斷當前節點是否是最小的 if (currentPath.get().equals(LockPath + "/" + children.get(0))) { return true; } else { // 獲取前一個節點 // 得到字節的索引號 int curIndex = children.indexOf(currentPath.get().substring( LockPath.length() + 1)); beforePath.set(LockPath + "/" + children.get(curIndex - 1)); } return false; } @Override public void lock() { if (!tryLock()) { // 阻塞等待 waitForLock(); // 再次嘗試加鎖 lock(); } } private void waitForLock() { // 怎么讓自己阻塞 final CountDownLatch cdl = new CountDownLatch(1); // 注冊watch,好通知自己什么時候被喚醒 IZkDataListener listener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("----監聽到節點被刪除.."); cdl.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { // TODO Auto-generated method stub System.out.println("----監聽到數據變為:" + data); } }; // 注冊該事件 client.subscribeDataChanges(this.beforePath.get(), listener); // 這里得判斷節點是否存在,否則會永久阻塞 if (this.client.exists(this.beforePath.get())) { try { cdl.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } try { cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } client.unsubscribeDataChanges(this.beforePath.get(), listener); } @Override public void unlock() { // 刪除節點 // this.client.deleteRecursive(arg0)表示遞歸刪除 this.client.delete(this.currentPath.get()); } @Override public void lockInterruptibly() throws InterruptedException { // TODO Auto-generated method stub } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { // TODO Auto-generated method stub return false; } @Override public Condition newCondition() { // TODO Auto-generated method stub return null; } }
調用:
public class OrderServiceImpl implements OrderService{ //定義為靜態變量,保證訂單號唯一 private static OrderCodeGenerator ocg=new OrderCodeGenerator(); //使用lock一定設置為靜態變量,保證每個線程競爭的是同一把鎖 private static Lock lock=new ZKDistributeImproveLock("/QQQs"); //創建訂單接口 @Override public void createOrder() { // TODO Auto-generated method stub String orderCode=null; try{//防止出現異常后鎖不能釋放,所以加try/catch lock.lock(); orderCode=ocg.getOrderCode(); }catch(Exception e){ e.printStackTrace(); }finally{ //記得釋放鎖 lock.unlock(); } System.out.println(Thread.currentThread().getName()+"========"+orderCode); } }
打印結果如下:
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection). log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkEventThread). log4j:WARN Please initialize the log4j system properly. log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Thread-4========2018-06-07-10-53-36-1 ----監聽到節點被刪除.. Thread-9========2018-06-07-10-53-36-2 ----監聽到節點被刪除.. Thread-0========2018-06-07-10-53-36-3 ----監聽到節點被刪除.. Thread-1========2018-06-07-10-53-36-4 ----監聽到節點被刪除.. Thread-6========2018-06-07-10-53-36-5 ----監聽到節點被刪除.. Thread-7========2018-06-07-10-53-36-6 ----監聽到節點被刪除.. Thread-5========2018-06-07-10-53-36-7 ----監聽到節點被刪除.. Thread-2========2018-06-07-10-53-36-8 ----監聽到節點被刪除.. Thread-3========2018-06-07-10-53-36-9 ----監聽到節點被刪除.. Thread-8========2018-06-07-10-53-37-10