如果不用OS提供的mutex,我們該如何實現互斥鎖?(不考慮重入的情況)
1. naive lock
最簡單的想法是,搞一個volatile類型的共享變量flag,值可以是flase(無鎖)或者true(有鎖),競爭線程監聽flag,一旦發現flag為false,那么嘗試cas更新flag為true,更新成功則說明占有了這個鎖,更新失敗說明臨界區已經被其他線程占領,繼續監聽flag並嘗試更新。占有鎖的線程退出的時候,將flag修改為false,表示釋放鎖。
volatile boolean flag = false; void lock() { while (!cas(flag, false, true)) {//返回true:占鎖成功,返回false:占鎖失敗,繼續循環嘗試 } } void unlock() { flag = false; }
這樣做有個問題是無法保證公平性,可能有的倒霉蛋空轉了一輩子也無法cas成功,無法做到按競爭線程先來后到的次序占有鎖。
2. Ticket Lock
為了提供公平,有人發明了Ticket Lock
線程想要競爭某個鎖,需要先領一張ticket,然后監聽flag,發現flag被更新為手上的ticket的值了,才能去占領鎖
就像是在醫院看病一樣,醫生就是臨界區,病人就是線程,病人掛了號領一張單子,單子上寫了一個獨一無二的號碼,病人等的時候就看屏幕,屏幕上顯示到自己的號碼了,才能進去找醫生。
AtomicInteger ticket = new AtomicInteger(0); volatile int flag = 0; void lock() { int my_ticket = ticket.getAndIncrement();//發號必須是一個原子操作,不能多個線程拿到同一個ticket while (my_ticket != flag) { } } void unlock() { flag++; }
現在公平性的問題沒有了,但是所有的線程都在監聽flag變量,而且由於為了保證flag變量變化的可見性,它必須是volatile的。也就是說如果某個線程修改了flag變量,都會引起其他所有監聽線程所在的core的對應於flag變量的cache line被設為invalid,那么這些線程下一次查詢flag變量的時候,就必須從主存里取最新的flag數據了,由於主存帶寬有限,這個開銷較為昂貴(與監聽線程數成正比)。
3. CLH Lock
為了減少緩存一致性帶來的開銷,CLH Lock被發明了。
ps,CLH實際上是指三個人:Craig, Landin, and Hagersten
CLH鎖的核心思想是,1. 競爭線程排隊 2. 監聽變量拆分
CLH鎖維護了一個鏈表waitingList的head與tail,其節點定義如下:
static class Node { volatile boolean flag;//true:當前線程正在試圖占有鎖或者已經占有鎖,false:當前線程已經釋放鎖,下一個線程可以占有鎖了 Node prev;//監聽前一個節點的flag字段 }
初始時需要定義一個dummy節點(dummpy.flag == true, dummy.prev == null),head == tail == dummy
當有線程想要獲取鎖時,先創建一個鏈表節點node,然后將node掛載在waitingList的尾部(嘗試cas(tail, oldTail, node),如果成功將node.prev更新為oldTail,失敗則重試)
然后這個線程就監聽node.prev.flag,什么時候node.prev.flag == false了,說明node的前一個節點對應的線程已經釋放了鎖,本線程此時可以安全的占有鎖了
釋放鎖的時候,將對應的node.flag修改為false即可。
實現代碼如下(相當粗糙,意會即可):
public class CLHLock { volatile Node head, tail;//waitingList public CLHLock() { head = tail = Node.DUMMY; } public Node lock() { //lock-free的將node添加到waitingList的尾部 Node node = new Node(true, null); Node oldTail = tail; while (!cas(tail, oldTail, node)) { oldTail = tail; } node.setPrev(oldTail); while (node.getPrev().isLocked()) {//監聽前驅節點的locked變量 } return node; } public void unlock(Node node) { node.setLocked(false); } static class Node { public Node(boolean locked, Node prev) { this.locked = locked; this.prev = prev; } volatile boolean locked;//true:當前線程正在試圖占有鎖或者已經占有鎖,false:當前線程已經釋放鎖,下一個線程可以占有鎖了 Node prev;//監聽前一個節點的locked字段 public boolean isLocked() { return locked; } public void setLocked(boolean locked) { this.locked = locked; } public Node getPrev() { return prev; } public void setPrev(Node prev) { this.prev = prev; } public static final Node DUMMY = new Node(false, null); } }
這樣做可以極大的減少緩存一致性協議所帶來的開銷。
CLH鎖的變種被應用於Java J.U.C包下的AbstractQueuedSynchronizer
4. MCS鎖
CLH鎖並不是完美的,因為每個線程都是在前驅節點的locked字段上自旋,而在NUMA體系中,有可能多個線程工作在多個不同的socket上的core里。如果前驅節點的內存跟監聽線程的core距離過遠,會有性能問題。
於是MCS鎖誕生了
ps,MCS也是人名簡寫:John M. Mellor-Crummey and Michael L. Scott
MCS與CLH最大的不同在於:CLH是在前驅節點的locked域上自旋,MCS是在自己節點上的locked域上自旋。
具體的實現是,前驅節點在釋放鎖之后,會主動將后繼節點的locked域更新。
也就是把多次對遠端內存的監聽 + 一次對本地內存的更新,簡化成了多次對本地內存的監聽 + 一次對遠端內存的更新。
具體的實現如下
public class MCSLock { volatile Node head, tail;//waitingList public MCSLock() { head = tail = null; } public Node lock() { //lock-free的將node添加到waitingList的尾部 Node node = new Node(true, null); Node oldTail = tail; while (!cas(tail, oldTail, node)) { oldTail = tail; } if (null == oldTail) {//如果等待列表為空,那么獲取鎖成功,直接返回 return node; } oldTail.setNext(node); while (node.isLocked()) {//監聽當前節點的locked變量 } return node; } public void unlock(Node node) { if (node.getNext() == null) { if (cas(tail, node, null)) {//即使當前節點的后繼為null,也要用cas看一下隊列是否真的為空 return; } while (node.getNext() != null) {//cas失敗,說明有后繼節點,只是還沒更新前驅節點的next域,等前驅節點看到后繼節點后,即可安全更新后繼節點的locked域 } } node.getNext().setLocked(false); } static class Node { public Node(boolean locked, Node next) { this.locked = locked; this.next = next; } volatile boolean locked;//true:當前線程正在試圖占有鎖或者已經占有鎖,false:當前線程已經釋放鎖,下一個線程可以占有鎖了 Node next;//后繼節點 public boolean isLocked() { return locked; } public void setLocked(boolean locked) { this.locked = locked; } public Node getNext() { return next; } public void setNext(Node next) { this.next = next; } }
參考資料