Ticket Lock, CLH Lock, MCS Lock


如果不用OS提供的mutex,我們該如何實現互斥鎖?(不考慮重入的情況)

 

1. naive lock

最簡單的想法是,搞一個volatile類型的共享變量flag,值可以是flase(無鎖)或者true(有鎖),競爭線程監聽flag,一旦發現flag為false,那么嘗試cas更新flag為true,更新成功則說明占有了這個鎖,更新失敗說明臨界區已經被其他線程占領,繼續監聽flag並嘗試更新。占有鎖的線程退出的時候,將flag修改為false,表示釋放鎖。

    volatile boolean flag = false;

    void lock() {
        while (!cas(flag, false, true)) {//返回true:占鎖成功,返回false:占鎖失敗,繼續循環嘗試

        }
    }

    void unlock() {
        flag = false;
    }

這樣做有個問題是無法保證公平性,可能有的倒霉蛋空轉了一輩子也無法cas成功,無法做到按競爭線程先來后到的次序占有鎖。

 

2. Ticket Lock

為了提供公平,有人發明了Ticket Lock

線程想要競爭某個鎖,需要先領一張ticket,然后監聽flag,發現flag被更新為手上的ticket的值了,才能去占領鎖

就像是在醫院看病一樣,醫生就是臨界區,病人就是線程,病人掛了號領一張單子,單子上寫了一個獨一無二的號碼,病人等的時候就看屏幕,屏幕上顯示到自己的號碼了,才能進去找醫生。

    AtomicInteger ticket = new AtomicInteger(0);
    volatile int flag = 0;

    void lock() {
        int my_ticket = ticket.getAndIncrement();//發號必須是一個原子操作,不能多個線程拿到同一個ticket
        while (my_ticket != flag) {

        }
    }

    void unlock() {
        flag++;
    }

現在公平性的問題沒有了,但是所有的線程都在監聽flag變量,而且由於為了保證flag變量變化的可見性,它必須是volatile的。也就是說如果某個線程修改了flag變量,都會引起其他所有監聽線程所在的core的對應於flag變量的cache line被設為invalid,那么這些線程下一次查詢flag變量的時候,就必須從主存里取最新的flag數據了,由於主存帶寬有限,這個開銷較為昂貴(與監聽線程數成正比)。

 

3. CLH Lock

為了減少緩存一致性帶來的開銷,CLH Lock被發明了。

ps,CLH實際上是指三個人:Craig, Landin, and Hagersten

CLH鎖的核心思想是,1. 競爭線程排隊 2. 監聽變量拆分

CLH鎖維護了一個鏈表waitingList的head與tail,其節點定義如下:

    static class Node {
        volatile boolean flag;//true:當前線程正在試圖占有鎖或者已經占有鎖,false:當前線程已經釋放鎖,下一個線程可以占有鎖了
        Node prev;//監聽前一個節點的flag字段
    }

初始時需要定義一個dummy節點(dummpy.flag == true, dummy.prev == null),head == tail == dummy

當有線程想要獲取鎖時,先創建一個鏈表節點node,然后將node掛載在waitingList的尾部(嘗試cas(tail, oldTail, node),如果成功將node.prev更新為oldTail,失敗則重試)

然后這個線程就監聽node.prev.flag,什么時候node.prev.flag == false了,說明node的前一個節點對應的線程已經釋放了鎖,本線程此時可以安全的占有鎖了

釋放鎖的時候,將對應的node.flag修改為false即可。

實現代碼如下(相當粗糙,意會即可):

public class CLHLock {
    volatile Node head, tail;//waitingList

    public CLHLock() {
        head = tail = Node.DUMMY;
    }

    public Node lock() {
        //lock-free的將node添加到waitingList的尾部
        Node node = new Node(true, null);
        Node oldTail = tail;
        while (!cas(tail, oldTail, node)) {
            oldTail = tail;
        }
        node.setPrev(oldTail);

        while (node.getPrev().isLocked()) {//監聽前驅節點的locked變量
        }

        return node;
    }

    public void unlock(Node node) {
        node.setLocked(false);
    }

    static class Node {
        public Node(boolean locked, Node prev) {
            this.locked = locked;
            this.prev = prev;
        }

        volatile boolean locked;//true:當前線程正在試圖占有鎖或者已經占有鎖,false:當前線程已經釋放鎖,下一個線程可以占有鎖了
        Node prev;//監聽前一個節點的locked字段

        public boolean isLocked() {
            return locked;
        }

        public void setLocked(boolean locked) {
            this.locked = locked;
        }

        public Node getPrev() {
            return prev;
        }

        public void setPrev(Node prev) {
            this.prev = prev;
        }

        public static final Node DUMMY = new Node(false, null);
    }
}

這樣做可以極大的減少緩存一致性協議所帶來的開銷。

CLH鎖的變種被應用於Java J.U.C包下的AbstractQueuedSynchronizer

 

4. MCS鎖

CLH鎖並不是完美的,因為每個線程都是在前驅節點的locked字段上自旋,而在NUMA體系中,有可能多個線程工作在多個不同的socket上的core里。如果前驅節點的內存跟監聽線程的core距離過遠,會有性能問題。

於是MCS鎖誕生了

ps,MCS也是人名簡寫:John M. Mellor-Crummey and Michael L. Scott

MCS與CLH最大的不同在於:CLH是在前驅節點的locked域上自旋,MCS是在自己節點上的locked域上自旋。

具體的實現是,前驅節點在釋放鎖之后,會主動將后繼節點的locked域更新。

也就是把多次對遠端內存的監聽 + 一次對本地內存的更新,簡化成了多次對本地內存的監聽 + 一次對遠端內存的更新。

具體的實現如下

public class MCSLock {
    volatile Node head, tail;//waitingList

    public MCSLock() {
        head = tail = null;
    }

    public Node lock() {
        //lock-free的將node添加到waitingList的尾部
        Node node = new Node(true, null);
        Node oldTail = tail;
        while (!cas(tail, oldTail, node)) {
            oldTail = tail;
        }

        if (null == oldTail) {//如果等待列表為空,那么獲取鎖成功,直接返回
            return node;
        }

        oldTail.setNext(node);
        while (node.isLocked()) {//監聽當前節點的locked變量
        }

        return node;
    }

    public void unlock(Node node) {
        if (node.getNext() == null) {
            if (cas(tail, node, null)) {//即使當前節點的后繼為null,也要用cas看一下隊列是否真的為空
                return;
            }
            while (node.getNext() != null) {//cas失敗,說明有后繼節點,只是還沒更新前驅節點的next域,等前驅節點看到后繼節點后,即可安全更新后繼節點的locked域

            }
        }
        node.getNext().setLocked(false);
    }

    static class Node {
        public Node(boolean locked, Node next) {
            this.locked = locked;
            this.next = next;
        }

        volatile boolean locked;//true:當前線程正在試圖占有鎖或者已經占有鎖,false:當前線程已經釋放鎖,下一個線程可以占有鎖了
        Node next;//后繼節點

        public boolean isLocked() {
            return locked;
        }

        public void setLocked(boolean locked) {
            this.locked = locked;
        }

        public Node getNext() {
            return next;
        }

        public void setNext(Node next) {
            this.next = next;
        }
    }

 

參考資料

CLH鎖 、MCS鎖

基於隊列的鎖:mcs lock簡介

Spin Lock Performance


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM