雜記
ZooKeeper的用途:distributed coordination;maintaining configuration information, naming, providing distributed synchronization, and providing group services.
Zookeeper的節點都是存放在內存中的,所以讀寫速度很快。更新日志被記錄到了磁盤中,以便用於恢復數據。在更新內在中節點數之前,會先序列化到磁盤中。
為避免單點失效,zookeeper的數據是在多個server上留有備份的。不管客戶端連接到的是哪個server,它看到的數據都是一致的。如果client和一個server的TCP連接失效,它會嘗試連接另一個server。眾多server中有一個是leader。
所有的server 都必須知道彼此的存在。
zookeeper在讀寫比例為10:1時性能最佳。
每個znode上data的讀寫都是原子操作。
讀是局部性的,即client只需要從與它相連的server上讀取數據即可;而client有寫請求的話,與之相連的server會通知leader,然后leader會把寫操作分發給所有server。所以定要比讀慢很多。
在建立zookeeper連接時,給定的地址字符串可以是這樣的:"192.168.1.1:3000,192.168.1.2:3000,192.168.1.3:3000/app/a",以后的所有操作就都是在/app/a下進行的。實際上只連接到一台ZooKeeper機器就可了,沒必要指定每台zk機器的IP和端口,即用“192.168.1.2:3000/app/a”也是可以的。
當client與一個server斷連接時(可能是因為server失效了),它就收不到任何watches;當它與另一個server建立好連接后,它就會收到"session expired"通知。
ACL不是遞歸的,它只針對當前節點,對子節點沒有任何影響。
默認情況下日志文件和數據文件是放在同一個目錄下的,為縮短延遲提高響應性,你可以把日志文件單獨放在另一個目錄下。
為避免swaping,運行java時最好把可用物理內在調得大一些,比如對於4G的內在,可以把它調到3G。java有以下兩個運行參數:
-Xms<size>
設置虛擬機可用內存堆的初始大小,缺省單位為字節,該大小為1024的整數倍並且要大於1MB,可用k(K)或m(M)為單位來設置較大的內存數。初始堆大小為2MB。
例如:-Xms6400K,-Xms256M
-Xmx<size>
設置虛擬機內存堆的最大可用大小,缺省單位為字節。該值必須為1024整數倍,並且要大於2MB。可用k(K)或m(M)為單位來設置較大的內存數。缺省堆最大值為64MB。
例如:-Xmx81920K,-Xmx80M
CreateMode
PERSISTENT:創建后只要不刪就永久存在
EPHEMERAL:會話結束年結點自動被刪除,EPHEMERAL結點不允許有子節點
SEQUENTIAL:節點名末尾會自動追加一個10位數的單調遞增的序號,同一個節點的所有子節點序號是單調遞增的
PERSISTENT_SEQUENTIAL:結合PERSISTENT和SEQUENTIAL
EPHEMERAL_SEQUENTIAL:結合EPHEMERAL和SEQUENTIAL
package basic; import java.io.IOException; import java.util.List; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; public class Demo { private static final int TIMEOUT = 3000; public static void main(String[] args) throws IOException { ZooKeeper zkp = new ZooKeeper("localhost:2181", TIMEOUT, null); try { // 創建一個EPHEMERAL類型的節點,會話關閉后它會自動被刪除 zkp.create("/node1", "data1".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL); if (zkp.exists("/node1", false) != null) { System.out.println("node1 exists now."); } try { // 當節點名已存在時再去創建它會拋出KeeperException(即使本次的ACL、CreateMode和上次的不一樣) zkp.create("/node1", "data1".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } catch (KeeperException e) { System.out.println("KeeperException caught:" + e.getMessage()); } // 關閉會話 zkp.close(); zkp = new ZooKeeper("localhost:2181", TIMEOUT, null); //重新建立會話后node1已經不存在了 if (zkp.exists("/node1", false) == null) { System.out.println("node1 dosn't exists now."); } //創建SEQUENTIAL節點 zkp.create("/node-", "same data".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL); zkp.create("/node-", "same data".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL); zkp.create("/node-", "same data".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL); List<String> children = zkp.getChildren("/", null); System.out.println("Children of root node:"); for (String child : children) { System.out.println(child); } zkp.close(); } catch (Exception e) { System.out.println(e.getMessage()); } } }
第一次運行輸出:
node1 exists now.
KeeperException caught:KeeperErrorCode = NodeExists for /node1
node1 dosn't exists now.
Children of root node:
node-0000000003
zookeeper
node-0000000002
node-0000000001
第二次運行輸出:
node1 exists now.
KeeperException caught:KeeperErrorCode = NodeExists for /node1
node1 dosn't exists now.
Children of root node:
node-0000000003
zookeeper
node-0000000002
node-0000000001
node-0000000007
node-0000000005
node-0000000006
注意兩次會話中創建的PERSISTENT_SEQUENTIAL節點序號並不是連續的,比如上例中缺少了node-0000000004.
Watcher & Version
watcher分為兩大類:data watches和child watches。getData()和exists()上可以設置data watches,getChildren()上可以設置child watches。
setData()會觸發data watches;
create()會觸發data watches和child watches;
delete()會觸發data watches和child watches.
如果對一個不存在的節點調用了exists(),並設置了watcher,而在連接斷開的情況下create/delete了該znode,則watcher會丟失。
在server端用一個map來存放watcher,所以相同的watcher在map中只會出現一次,只要watcher被回調一次,它就會被刪除----map解釋了watcher的一次性。比如如果在getData()和exists()上設置的是同一個data watcher,調用setData()會觸發data watcher,但是getData()和exists()只有一個會收到通知。
1 import java.io.IOException; 2 3 import org.apache.zookeeper.CreateMode; 4 import org.apache.zookeeper.KeeperException; 5 import org.apache.zookeeper.WatchedEvent; 6 import org.apache.zookeeper.Watcher; 7 import org.apache.zookeeper.ZooDefs.Ids; 8 import org.apache.zookeeper.ZooKeeper; 9 import org.apache.zookeeper.data.Stat; 10 11 public class SelfWatcher implements Watcher{ 12 13 ZooKeeper zk=null; 14 15 @Override 16 public void process(WatchedEvent event) { 17 System.out.println(event.toString()); 18 } 19 20 SelfWatcher(String address){ 21 try{ 22 zk=new ZooKeeper(address,3000,this); //在創建ZooKeeper時第三個參數負責設置該類的默認構造函數 23 zk.create("/root", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); 24 }catch(IOException e){ 25 e.printStackTrace(); 26 zk=null; 27 }catch (KeeperException e) { 28 e.printStackTrace(); 29 } catch (InterruptedException e) { 30 e.printStackTrace(); 31 } 32 } 33 34 void setWatcher(){ 35 try { 36 Stat s=zk.exists("/root", true); 37 if(s!=null){ 38 zk.getData("/root", false, s); 39 } 40 } catch (KeeperException e) { 41 e.printStackTrace(); 42 } catch (InterruptedException e) { 43 e.printStackTrace(); 44 } 45 } 46 47 void trigeWatcher(){ 48 try { 49 Stat s=zk.exists("/root", false); //此處不設置watcher 50 zk.setData("/root", "a".getBytes(), s.getVersion()); //修改數據時需要提供version,version設為-1表示強制修改 51 }catch(Exception e){ 52 e.printStackTrace(); 53 } 54 } 55 56 void disconnect(){ 57 if(zk!=null) 58 try { 59 zk.close(); 60 } catch (InterruptedException e) { 61 e.printStackTrace(); 62 } 63 } 64 65 public static void main(String[] args){ 66 SelfWatcher inst=new SelfWatcher("127.0.0.1:2181"); 67 inst.setWatcher(); 68 inst.trigeWatcher(); 69 inst.disconnect(); 70 } 71 72 }
可以在創建Zookeeper時指定默認的watcher回調函數,這樣在getData()、exists()和getChildren()收到通知時都會調用這個函數--只要它們在參數中設置了true。所以如果把代碼22行的this改為null,則不會有任何watcher被注冊。
上面的代碼輸出:
WatchedEvent state:SyncConnected type:None path:null
WatchedEvent state:SyncConnected type:NodeDataChanged path:/root
之所會輸出第1 行是因為本身在建立ZooKeeper連接時就會觸發watcher。輸出每二行是因為在代碼的第36行設置了true。
WatchEvent有三種類型:NodeDataChanged、NodeDeleted和NodeChildrenChanged。
調用setData()時會觸發NodeDataChanged;
調用create()時會觸發NodeDataChanged和NodeChildrenChanged;
調用delete()時上述三種event都會觸發。
如果把代碼的第36--39行改為:
Stat s=zk.exists("/root", false);
if(s!=null){
zk.getData("/root", true, s);
}
或
Stat s=zk.exists("/root", true);
if(s!=null){
zk.getData("/root", true, s);
}
跟上面的輸出是一樣的。這也證明了watcher是一次性的。
設置watcher的另外一種方式是不使用默認的watcher,而是在getData()、exists()和getChildren()中指定各自的watcher。示例代碼如下:
1 public class SelfWatcher{ 2 3 ZooKeeper zk=null; 4 5 private Watcher getWatcher(final String msg){ 6 return new Watcher(){ 7 @Override 8 public void process(WatchedEvent event) { 9 System.out.println(msg+"\t"+event.toString()); 10 } 11 }; 12 } 13 14 SelfWatcher(String address){ 15 try{ 16 zk=new ZooKeeper(address,3000,null); //在創建ZooKeeper時第三個參數負責設置該類的默認構造函數 17 zk.create("/root", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); 18 }catch(IOException e){ 19 e.printStackTrace(); 20 zk=null; 21 }catch (KeeperException e) { 22 e.printStackTrace(); 23 } catch (InterruptedException e) { 24 e.printStackTrace(); 25 } 26 } 27 28 void setWatcher(){ 29 try { 30 Stat s=zk.exists("/root", getWatcher("EXISTS")); 31 if(s!=null){ 32 zk.getData("/root", getWatcher("GETDATA"), s); 33 } 34 } catch (KeeperException e) { 35 e.printStackTrace(); 36 } catch (InterruptedException e) { 37 e.printStackTrace(); 38 } 39 } 40 41 void trigeWatcher(){ 42 try { 43 Stat s=zk.exists("/root", false); //此處不設置watcher 44 zk.setData("/root", "a".getBytes(), s.getVersion()); 45 }catch(Exception e){ 46 e.printStackTrace(); 47 } 48 } 49 50 void disconnect(){ 51 if(zk!=null) 52 try { 53 zk.close(); 54 } catch (InterruptedException e) { 55 e.printStackTrace(); 56 } 57 } 58 59 public static void main(String[] args){ 60 SelfWatcher inst=new SelfWatcher("127.0.0.1:2181"); 61 inst.setWatcher(); 62 inst.trigeWatcher(); 63 inst.disconnect(); 64 } 65 66 }
輸出:
GETDATA WatchedEvent state:SyncConnected type:NodeDataChanged path:/root
EXISTS WatchedEvent state:SyncConnected type:NodeDataChanged path:/root
上例中由於exists和getData分別設置了兩個不同的Watcher實例,所以雖然watcher都是由同了一個NodeDataChanged觸發的,但exists()和getData()都會收到通知。由於16行創建Zookeeper時沒有設置watcher(參數為null),所以建立連接時沒有收到通知。
關於Version:為了方便進行cache validations 和coordinated updates,每個znode都有一個stat結構體,其中包含:version的更改記錄、ACL的更改記錄、時間戳。znode的數據每更改一次,version就會加1。客戶端每次檢索data的時候都會把data的version一並讀出出來。修改數據時需要提供version。
zk.delete("/root", -1); //觸發data watches和children watches。version設為-1時表示要強制刪除
zk.getChildren("/root", getWatcher("LISTCHILDREN")); //getChildren()上可以設置children watches
輸出:
LISTCHILDREN WatchedEvent state:SyncConnected type:NodeDeleted path:/root
zk.delete("/root", -1); //觸發data watches和children watches
Stat s=zk.exists("/root", getWatcher("EXISTS")); //exists()上可以設置data watches
if(s!=null){
zk.getChildren("/root", getWatcher("LISTCHILDREN"));
}
輸出:
EXISTS WatchedEvent state:SyncConnected type:NodeDeleted path:/root
LISTCHILDREN WatchedEvent state:SyncConnected type:NodeDeleted path:/root
zk.delete("/root", -1); //觸發data watches和children watches
Stat s=zk.exists("/root", getWatcher("EXISTS"));
if(s!=null){
zk.getData("/root", getWatcher("GETDATA"), s);
zk.getChildren("/root", getWatcher("LISTCHILDREN"));
}
輸出:
GETDATA WatchedEvent state:SyncConnected type:NodeDeleted path:/root
LISTCHILDREN WatchedEvent state:SyncConnected type:NodeDeleted path:/root
EXISTS WatchedEvent state:SyncConnected type:NodeDeleted path:/root
tat s=zk.exists("/root", false);
zk.setData("/root", "a".getBytes(), s.getVersion());
zk.delete("/root", -1);
Stat s=zk.exists("/root", getWatcher("EXISTS"));
if(s!=null){
zk.getData("/root", getWatcher("GETDATA"), s);
zk.getChildren("/root", getWatcher("LISTCHILDREN"));
}
輸出:
GETDATA WatchedEvent state:SyncConnected type:NodeDataChanged path:/root
EXISTS WatchedEvent state:SyncConnected type:NodeDataChanged path:/root
LISTCHILDREN WatchedEvent state:SyncConnected type:NodeDeleted path:/root
按說data watches觸發了兩次,但是exists()和getData()只會收到一次通知。
Barriers and Queues
1)所有的線程都到達barrier后才能進行后續的計算
或者
2)所有的線程都完成自己的計算后才能離開barrier
Double Barrier是指同時具有上述兩點。
Queue就不說了,一個產生--消費模型,先生產的先被消費。
Double Barrier的實現:
enter barrier:
1.建一個根節點"/root"
2.想進入barrier的線程在"/root"下建立一個子節點"/root/c_i"
3.循環監聽"/root"孩子節點數目的變化,當其達到size時就說明有size個線程都已經barrier點了
leave barrier:
1.想離開barrier的線程刪除其在"/root"下建立的子節點
2.循環監聽"/root"孩子節點數目的變化,當size減到0時它就可以離開barrier了
Queue的實現:
1.建立一個根節點"/root"
2.生產線程在"/root"下建立一個SEQUENTIAL子節點
3.消費線程檢查"/root"有沒有子節點,如果沒有就循環監聽"/root"子節點的變化,直到它有子節點。刪除序號最小的子節點。
原代碼:
package sync;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
public class SyncPrimitive implements Watcher {
static ZooKeeper zk = null;
static Integer mutex;
String root;
//同步原語
SyncPrimitive(String address) {
if (zk == null) {
try {
System.out.println("Starting ZK:");
//建立Zookeeper連接,並且指定watcher
zk = new ZooKeeper(address, 3000, this);
//初始化鎖對象
mutex = new Integer(-1);
System.out.println("Finished starting ZK:" + zk);
} catch (IOException e) {
System.out.println(e.toString());
zk = null;
}
}
}
@Override
synchronized public void process(WatchedEvent event) {
synchronized (mutex) {
//有事件發生時,調用notify,使其他wait()點得以繼續
mutex.notify();
}
}
static public class Barrier extends SyncPrimitive {
int size;
String name;
Barrier(String address, String root, int size) {
super(address);
this.root = root;
this.size = size;
if (zk != null) {
try {
//一個barrier建立一個根目錄
Stat s = zk.exists(root, false); //不注冊watcher
if (s == null) {
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out
.println("keeper exception when instantiating queue:"
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception.");
}
}
try {
//獲取自己的主機名
name = new String(InetAddress.getLocalHost()
.getCanonicalHostName().toString());
} catch (UnknownHostException e) {
System.out.println(e.toString());
}
}
boolean enter() throws KeeperException, InterruptedException {
//在根目錄下創建一個子節點.create和delete都會觸發children wathes,這樣getChildren就會收到通知,process()就會被調用
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
//一直等,直到根目錄下的子節點數目達到size時,函數退出
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() < size) {
mutex.wait(); //釋放mutex上的鎖
} else {
return true;
}
}
}
}
boolean leave() throws KeeperException, InterruptedException {
//刪除自己創建的節點
zk.delete(root + "/" + name, 0);
//一直等,直到根目錄下有子節點時,函數退出
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() > 0) {
mutex.wait();
} else {
return true;
}
}
}
}
}
static public class Queue extends SyncPrimitive {
Queue(String address, String name) {
super(address);
this.root = name;
if (zk != null) {
try {
//一個queue建立一個根目錄
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out
.println("keeper exception when instantiating queue:"
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception.");
}
}
}
//參數i是要創建節點的data
boolean produce(int i) throws KeeperException, InterruptedException {
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
b.putInt(i);
value = b.array();
//根目錄下創建一個子節點,因為是SEQUENTIAL的,所以先創建的節點具有較小的序號
zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}
int consume() throws KeeperException, InterruptedException {
int retvalue = -1;
Stat stat = null;
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true); //並不能保證list[0]就是序號最小的
//如果根目錄下沒有子節點就一直等
if (list.size() == 0) {
System.out.println("Going to wait");
mutex.wait();
}
//找到序號最小的節點將其刪除
else {
Integer min = new Integer(list.get(0).substring(7));
for (String s : list) {
Integer tmp = new Integer(s.substring(7));
if (tmp < min)
min = tmp;
}
System.out.println("Temporary value:" + root
+ "/element" + min);
byte[] b = zk.getData(root + "/element" + min, false,
stat);
zk.delete(root + "/element" + min, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();
return retvalue;
}
}
}
}
}
public static void main(String[] args) {
if (args[0].equals("qTest"))
queueTest(args);
else
barrierTest(args);
}
private static void barrierTest(String[] args) {
Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
try {
boolean flag = b.enter();
System.out.println("Enter barrier:" + args[2]);
if (!flag)
System.out.println("Error when entering the barrier");
} catch (KeeperException e) {
} catch (InterruptedException e) {
}
Random rand = new Random();
int r = rand.nextInt(100);
for (int i = 0; i < r; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
try {
b.leave();
} catch (KeeperException e) {
} catch (InterruptedException e) {
}
System.out.println("Left barrier");
}
private static void queueTest(String[] args) {
Queue q = new Queue(args[1], "/app1");
System.out.println("Input:" + args[1]);
int i;
Integer max = new Integer(args[2]);
if (args[3].equals("p")) {
System.out.println("Producer");
for (i = 0; i < max; i++)
try {
q.produce(10 + 1);
} catch (KeeperException e) {
} catch (InterruptedException e) {
}
} else {
System.out.println("Consumer");
for (i = 0; i < max; i++)
try {
int r = q.consume();
System.out.println("Item:" + r);
} catch (KeeperException e) {
i--;
} catch (InterruptedException e) {
}
}
}
}
Locks
獲得鎖:
1.創建根節點"/root"
2.在根節點下新建子節點"/root/c-xxxxxx",SEQUENTIAL模式
3.對根節點調用getChildren(),如果第2步創建的節點是所有子節點中序號最小的,則獲得鎖;否則進入第4步
4.在序號最小的子節點上調用exists(),當序號最小的子節點被刪除后返回第3步
釋放鎖:
刪除自己創建的子節點即可
原代碼:
package sync;
import java.io.IOException;
import java.net.InetAddress;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class Locks implements Watcher{
static ZooKeeper zk=null;
static Integer mutex=null;
String name=null;
String path=null;
@Override
synchronized public void process(WatchedEvent event) {
synchronized(mutex){
mutex.notify();
}
}
Locks(String address){
try{
zk=new ZooKeeper(address,2000,this);
zk.create("/lock", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
mutex=new Integer(-1);
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
}catch(IOException e){
zk=null;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private int minSeq(List<String> list){
int min=Integer.parseInt(list.get(0).substring(14));
for(int i=1;i<list.size();i++){
if(min<Integer.parseInt(list.get(i).substring(14)))
min=Integer.parseInt(list.get(i).substring(14));
}
return min;
}
boolean getLock() throws KeeperException, InterruptedException{
//create方法返回新建的節點的完整路徑
path=zk.create("/lock/"+name+"-", new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
int min;
while(true){
synchronized(mutex){
List<String> list=zk.getChildren("/lock", false);
min=minSeq(list);
//如果剛建的節點是根節點的所有子節點中序號最小的,則獲得了鎖,可以返回true
if(min==Integer.parseInt(path.substring(14))){
return true;
}else{
mutex.wait(); //等待事件(新建節點或刪除節點)發生
while(true){
Stat s=zk.exists("/lock/"+name+"-"+min, true); //查看序號最小的子節點還在不在
if(s!=null) //如果還在,則繼續等待事件發生
mutex.wait();
else //如果不在,則跳外層循環中,查看新的最小序號的子節點是誰
break;
}
}
}
}
}
boolean releaseLock() throws KeeperException, InterruptedException{
if(path!=null){
zk.delete(path, -1);
path=null;
}
return true;
}
public static void main(String []args) throws KeeperException, InterruptedException{
Locks lock1=new Locks("localhost:2181");
if(lock1.getLock()){
System.out.println("T1 Get lock at "+System.currentTimeMillis());
for(int i=0;i<1000;++i)
Thread.sleep(5000);
lock1.releaseLock();
}
Locks lock2=new Locks("localhost:2181");
if(lock2.getLock()){
System.out.println("T2 Get lock at "+System.currentTimeMillis());
lock2.releaseLock();
}
}
}
讀鎖(共享鎖)和寫鎖(排斥鎖)並存的情況跟單獨只有排斥鎖的情況有幾點不同:
1.當一個線程想施加讀鎖時就新建一個節點"/root/read-xxxxxx",施加寫鎖時就新建一個節點"/root/write-xxxxxx";
2.欲施加讀鎖的線程查看"/root"下有沒有“write"開頭的節點,如果沒有則直接獲得讀鎖;如果有,但是"write"節點的序號比自己剛才創建的"read"節點的序號要大說明是先施加的讀鎖后施加的寫鎖,所以依然獲得讀鎖;else,在序號最小的"write"節點上調用exists,等待它被刪除。
