Java並發編程系列-(4) 顯式鎖與AQS


目前已經更新完《Java並發編程》,《Docker教程》和《JVM性能優化》,歡迎關注【后端精進之路】,輕松閱讀全部文章。

Java並發編程:

Docker教程:

JVM性能優化:

4 顯示鎖和AQS

4.1 Lock接口

核心方法

Java在java.util.concurrent.locks包中提供了一系列的顯示鎖類,其中最基礎的就是Lock接口,該接口提供了幾個常見的鎖相關的操作。

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

下面分別進行介紹:

  • void lock();

獲取鎖。如果鎖不可用,出於線程調度目的,將禁用當前線程,並且在獲得鎖之前,該線程將一直處於休眠狀態。

  • void lockInterruptibly();

如果當前線程未被中斷,則獲取鎖。如果鎖可用,則獲取鎖,並立即返回。與lock()接口唯一的區別是可以被中斷。

  • boolean tryLock();

試圖獲取鎖,若鎖可用,則獲取鎖,並立即返回值true。若鎖不可用,則此方法將立即返回值false。

  • boolean tryLock(long time, TimeUnit unit) throws

與上個方法不同的就是給定了超時時間,若鎖在給定的等待時間內空閑,並且當前線程未被中斷,則獲取鎖。

  • Condition newCondition();

返回綁定到此 Lock 實例的新 Condition 實例。

使用模板

通常使用顯示鎖Lock時,會采用下面的操作流程:

lock.lock();
try {
    //...需要保證線程安全的代碼。
} finally {
    lock.unlock();
}

Lock的lock()方法保證了只有一個線程能夠執有此鎖。對於任何一個lock()方法,都需要一個unlock()方法與之對應,通常情況下為了保證unlock()方法總是能夠執行,unlock()方法被置於finally中。

Lock VS synchronized

Synchronized是Java的關鍵字,當它用來修飾一個方法或一個代碼塊時,能夠保證在同一時刻最多只有一個線程執行該代碼。因為當調用Synchronized修飾的代碼時,並不需要顯示的加鎖和解鎖的過程,代碼簡潔,一般稱之為隱式鎖。

Lock是一個接口,提供了無條件的、可輪詢的、定時的、可中斷的鎖獲取操作,所有的加鎖和解鎖操作方法都是顯示的,因而稱為顯示鎖。

4.2 ReentrantLock

可重入鎖ReentrantLock是對Lock接口的一種實現,支持當一個線程獲取鎖以后,可以再次得到該對象鎖。

ReentrantLock在初始化時,需要設定該鎖的公平性:

  • 如果在時間上,先對鎖進行獲取的請求,一定先被滿足,這個鎖就是公平的,不滿足,就是非公平的
  • 非公平的效率一般來講更高

ReentrantLock的特性如下:

1. 可重入

synchronized和ReentrantLock均有可重入性,即一個線程請求得到一個對象鎖后再次請求此對象鎖,可以再次得到該對象鎖。

在使用synchronized時,當一個線程已經進入到synchronized方法/塊中時,可以進入到本類的其他synchronized方法/塊中。

2. 可中斷

在lockInterruptibly()鎖定的同時,還可以響應中斷通知。一旦接收到中斷通知,就會拋出InterruptedException異常。

這點與synchronized不同,在synchronized加鎖的代碼中,無法獲取中斷通知。

3. 可設置超時

ReentrantLock.tryLock()方法用於嘗試鎖定。參數為等待時間。該方法返回boolean值。若鎖定成功,則返回true。鎖定失敗,則返回false。tryLock方法在超時不能獲得鎖時,就返回false,不會永久等待構成死鎖。

4. 公平鎖

ReentrantLock內部利用AQS的線程隊列,可以實現公平鎖,但是性能相比非公平鎖會差一點。

在構造方法中,ReentrantLock(boolean fair),fair默認為false,當設置為true時,及表示當前構造的鎖是公平鎖。

當需要可定時的、可輪詢的與可中斷的鎖獲取操作,公平隊列,或者非塊結構的鎖,建議使用ReentrantLock。否則,請使用synchronized。在Java 1.6之后,ReentrantLock和synchronized性能相差不大,所以一般情況下,使用synchronized就足夠了,只有當有特定需求時,可以使用可重入鎖。

4.3 Lock與Condition實現消息傳遞

利用Lock和Condition可以實現消息的等待和通知,這里我們利用ReentrantLock來進行舉例。

注意在使用condition時,需要首先lock.newCondition來獲取Condition對象,如果有多個條件,需要針對不同的條件來獲取condition。

發送信號,調用condition.signal()方法;等待,調用condition.await()方法。

注意與notify與wait的區別,后者Object的方法,一般用在一個對象上進行等待,等待的線程和某個特定的對象綁定。當需要notify所有線程時,為了保證我們的消息被所有線程接收到,通常使用notifyAll發送消息。但是使用condition對象,await和signal操作都是在condition對象是進行的,所以使用signal通知時,不會存在等待其他消息的線程阻止消息傳遞,所以通常使用signal而不是signalAll。

public class ExpressCond {
    public final static String CITY = "ShangHai";
    private int km;/*快遞運輸里程數*/
    private String site;/*快遞到達地點*/
    private Lock lock = new ReentrantLock();
    private Condition keCond = lock.newCondition();
    private Condition siteCond = lock.newCondition();

    public ExpressCond() {
    }

    public ExpressCond(int km, String site) {
        this.km = km;
        this.site = site;
    }

    /* 變化公里數,然后通知處於wait狀態並需要處理公里數的線程進行業務處理*/
    public void changeKm(){
        lock.lock();
        try {
        	this.km = 101;
        	keCond.signal();
        }finally {
        	lock.unlock();
        }
    }

    /* 變化地點,然后通知處於wait狀態並需要處理地點的線程進行業務處理*/
    public  void changeSite(){
    	lock.lock();
        try {
        	this.site = "BeiJing";
        	siteCond.signal();
        }finally {
        	lock.unlock();
        }    	
    }

    /*當快遞的里程數大於100時更新數據庫*/
    public void waitKm(){
    	lock.lock();
    	try {
        	while(this.km<=100) {
        		try {
        			keCond.await();
    				System.out.println("check km thread["+Thread.currentThread().getId()
    						+"] is be notifed.");
    			} catch (InterruptedException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
        	}    		
    	}finally {
    		lock.unlock();
    	}

        System.out.println("the Km is "+this.km+",I will change db");
    }

    /*當快遞到達目的地時通知用戶*/
    public void waitSite(){
    	lock.lock();
        try {
        	while(CITY.equals(this.site)) {
        		try {
        			siteCond.await();
    				System.out.println("check site thread["+Thread.currentThread().getId()
    						+"] is be notifed.");
    			} catch (InterruptedException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
        	}
        }finally {
        	lock.unlock();
        } 
        System.out.println("the site is "+this.site+",I will call user");
    }
}

下面是測試函數,將會喚醒一個等待km變化的線程。

public class TestCond {
    private static ExpressCond express = new ExpressCond(0,ExpressCond.CITY);

    /*檢查里程數變化的線程,不滿足條件,線程一直等待*/
    private static class CheckKm extends Thread{
        @Override
        public void run() {
        	express.waitKm();
        }
    }

    /*檢查地點變化的線程,不滿足條件,線程一直等待*/
    private static class CheckSite extends Thread{
        @Override
        public void run() {
        	express.waitSite();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for(int i=0;i<3;i++){
            new CheckSite().start();
        }
        for(int i=0;i<3;i++){
            new CheckKm().start();
        }

        Thread.sleep(1000);
        express.changeKm();//快遞里程變化
    }
}

4.4 ReadWriteLock 和 ReentrantReadWriteLock

ReadWriteLock接口提供了單獨的讀鎖和寫鎖,

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

ReentrantReadWriteLock類是ReadWriteLock接口的一個實現,它與ReentrantLock類一樣提供了公平競爭與不公平競爭兩種機制,默認也是使用非公平競爭機制。

ReentrantReadWriteLock的可以被多個讀者訪問和一個寫者訪問,提供了讀寫分離功能:

  • 讀-讀不互斥:讀讀之間不阻塞。
  • 讀-寫互斥:讀阻塞寫,寫也會阻塞讀。
  • 寫-寫互斥:寫寫阻塞。

ReentrantReadWriteLock在讀多寫少的場景下,具有很強的性能優勢。

WriteLock VS ReadLock

1.重入方面其內部的WriteLock可以獲取ReadLock,但是反過來ReadLock無法獲得WriteLock。

2.WriteLock可以降級為ReadLock,順序是:先獲得WriteLock再獲得ReadLock,然后釋放WriteLock,這時候線程將保持Readlock的持有。反過來ReadLock想要升級為WriteLock則不可能。

4.不管是ReadLock還是WriteLock都支持Interrupt,語義與ReentrantLock一致。

5.WriteLock支持Condition並且與ReentrantLock語義一致,而ReadLock則不能使用Condition,否則拋出UnsupportedOperationException異常。

ReentrantLock VS ReentrantReadWriteLock

  1. ReentrantLock是排他鎖,使用非公平競爭機制時,搶占的機會相對還是比較少的,只有當新請求恰逢鎖釋放時才有機會搶占,所以發生線程飢餓的現象幾乎很少。

  2. ReentrantReadWriteLock是共享鎖,或者說讀讀共享,並且經常使用於讀多寫少的場景,即請求讀操作的線程多而頻繁而請求寫操作的線程極少且間隔長,在這種場景下,使用非公平競爭機制極有可能造成寫線程飢餓。比如,R1線程此時持有讀鎖且在進行讀取操作,W1線程請求寫鎖所以需要排隊等候,在R1釋放鎖之前,如果R2,R3,...,Rn 不斷的到來請求讀鎖,因為讀讀共享,所以他們不用等待馬上可以獲得鎖,如此下去W1永遠無法獲得寫鎖,一直處於飢餓狀態。


參考鏈接:

4.5 LockSupport

LockSupport是一個方便的線程阻塞工具,它可以在線程的任何位置讓線程阻塞。與Thread.suspend()方法相比,它彌補了由於resume()方法導致線程無法繼續執行的情況。和Object.wait()方法相比,它不需要先獲得某個對象的鎖,也不會拋出InterruptedException異常。

LockSupport主要有兩個方法,

  • LockSupport.park()

park()方法會阻塞當前線程(線程進入Waiting狀態),除非它獲取了"許可證"。

  • LockSupport.unpark(Thread t)

unpark(Thread t)方法會給線程t頒發一個"許可證"。

LockSupport使用了類似信號量的機制,它為每一個線程准備了一個許可,如果許可可用,park()方法會立刻返回,並且消費這個許可(也就是將許可變為不可用);如果許可不可用,就會阻塞,而unpack()方法則使得一個許可變為可用(但是和信號量不同的是,許可不可累加,永遠只能擁有不超過一個許可)。

4.6 AQS

AQS:AbstractQueuedSynchronizer,即隊列同步器。它是構建鎖或者其他同步組件的基礎框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),是JUC並發包中的核心基礎組件。JUC並發包的作者(Doug Lea)期望它能夠成為實現大部分同步需求的基礎。

AQS解決了實現同步器時涉及到的大量細節問題,例如獲取同步狀態、FIFO同步隊列。基於AQS來構建同步器可以帶來很多好處。它不僅能夠極大地減少實現工作,而且也不必處理在多個位置上發生的競爭問題。

AQS通過內置的FIFO同步隊列來完成資源獲取線程的排隊工作,如果當前線程獲取同步狀態失敗(鎖)時,AQS則會將當前線程以及等待狀態等信息構造成一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。

AQS使用了模板方法設計模式,子類通過繼承同步器並實現它的抽象方法來管理同步狀態。

AQS模板方法

AQS使用一個int類型的成員變量state來表示同步狀態,當state>0時表示已經獲取了鎖,當state = 0時表示釋放了鎖。它提供了如下三個方法來對同步狀態state進行操作,當然AQS可以確保對state的操作是安全的。

  • getState():返回同步狀態的當前值;

  • setState(int newState):設置當前同步狀態;

  • compareAndSetState(int expect, int update):使用CAS設置當前狀態,該方法能夠保證狀態設置的原子性;

獨占式獲取:

tryAcquire(int arg):獨占式獲取同步狀態,獲取同步狀態成功后,其他線程需要等待該線程釋放同步狀態才能獲取同步狀態

tryAcquireNanos(int arg,long nanos):超時獲取同步狀態,如果當前線程在nanos時間內沒有獲取到同步狀態,那么將會返回false,已經獲取則返回true;

tryRelease(int arg):獨占式釋放同步狀態;

acquire(int arg):獨占式獲取同步狀態,如果當前線程獲取同步狀態成功,則由該方法返回,否則,將會進入同步隊列等待,該方法將會調用可重寫的tryAcquire(int arg)方法;

acquireInterruptibly(int arg):與acquire(int arg)相同,但是該方法響應中斷,當前線程為獲取到同步狀態而進入到同步隊列中,如果當前線程被中斷,則該方法會拋出InterruptedException異常並返回;

isHeldExclusively():當前同步器是否在獨占式模式下被線程占用,一般該方法表示是否被當前線程所獨占;

共享式獲取:

tryAcquireShared(int arg):共享式獲取同步狀態,返回值大於等於0則表示獲取成功,否則獲取失敗;

tryReleaseShared(int arg):共享式釋放同步狀態;

acquireShared(int arg):共享式獲取同步狀態,如果當前線程未獲取到同步狀態,將會進入同步隊列等待,與獨占式的主要區別是在同一時刻可以有多個線程獲取到同步狀態;

acquireSharedInterruptibly(int arg):共享式獲取同步狀態,響應中斷;

tryAcquireSharedNanos(int arg, long nanosTimeout):共享式獲取同步狀態,增加超時限制;

獨占式釋放鎖:

release(int arg):獨占式釋放同步狀態,該方法會在釋放同步狀態之后,將同步隊列中第一個節點包含的線程喚醒;

共享式釋放鎖:

releaseShared(int arg):共享式釋放同步狀態;

當在實現自己的lock類時,需要子類覆蓋如下方法,
獨占式獲取 tryAcquire
獨占式釋放 tryRelease
共享式獲取 tryAcquireShared
共享式釋放 tryReleaseShared
這個同步器是否處於獨占模式 isHeldExclusively

CLH同步隊列

CLH同步隊列是一個FIFO雙向隊列,AQS依賴它來完成同步狀態的管理,當前線程如果獲取同步狀態失敗時,AQS則會將當前線程以及等待狀態等信息打包成一個節點(Node),並將其加入到CLH同步隊列,同時會阻塞當前線程。當同步狀態釋放時,會把首節點喚醒(公平鎖),使其再次嘗試獲取同步狀態。

Picture1.png

在CLH同步隊列中,一個節點表示一個線程,它保存着線程的引用(thread)、狀態(waitStatus)、前驅節點(prev)、后繼節點(next),

  • CANCELLED,值為1 。場景:當該線程等待超時或者被中斷,需要從同步隊列中取消等待,則該線程被置1,即被取消(這里該線程在取消之前是等待狀態)。節點進入了取消狀態則不再變化;
  • SIGNAL,值為-1。場景:后繼的節點處於等待狀態,當前節點的線程如果釋放了同步狀態或者被取消(當前節點狀態置為-1),將會通知后繼節點,使后繼節點的線程得以運行;
  • CONDITION,值為-2。場景:節點處於等待隊列中,節點線程等待在Condition上,當其他線程對Condition調用了signal()方法后,該節點從等待隊列中轉移到同步隊列中,加入到對同步狀態的獲取中;
  • PROPAGATE,值為-3。場景:表示下一次的共享狀態會被無條件的傳播下去;
  • INITIAL,值為0,初始狀態。

其定義如下:

static final class Node {
    /** 共享 */
    static final Node SHARED = new Node();
    /** 獨占 */
    static final Node EXCLUSIVE = null;
    /**
     * 因為超時或者中斷,節點會被設置為取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變為其他狀態;
     */
    static final int CANCELLED =  1;
    /**
     * 后繼節點的線程處於等待狀態,而當前節點的線程如果釋放了同步狀態或者被取消,將會通知后繼節點,使后繼節點的線程得以運行
     */
    static final int SIGNAL    = -1;
    /**
     * 節點在等待隊列中,節點線程等待在Condition上,當其他線程對Condition調用了signal()后,該節點將會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中
     */
    static final int CONDITION = -2;
    /**
     * 表示下一次共享式同步狀態獲取將會無條件地傳播下去
     */
    static final int PROPAGATE = -3;
    /** 等待狀態 */
    volatile int waitStatus;
    /** 前驅節點 */
    volatile Node prev;
    /** 后繼節點 */
    volatile Node next;
    /** 獲取同步狀態的線程 */
    volatile Thread thread;
    Node nextWaiter;
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    Node() {
    }
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

對於CLH同步隊列,一般有如下幾種操作:

1. 節點加入到同步隊列

隊列的主要變化是tail指向新節點、新節點的prev指向當前最后的節點,當前最后一個節點的next指向當前節點。

整個流程圖如下:

Picture1.png

具體實現可以查看addWaiter(Node node)方法:

    private Node addWaiter(Node mode) {
        //新建Node
        Node node = new Node(Thread.currentThread(), mode);
        //快速嘗試添加尾節點
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //CAS設置尾節點
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //多次嘗試
        enq(node);
        return node;
    }

addWaiter(Node node)先通過快速嘗試設置尾節點,如果失敗,則調用enq(Node node)方法設置尾節點

    private Node enq(final Node node) {
        //多次嘗試,直到成功為止
        for (;;) {
            Node t = tail;
            //tail不存在,設置為首節點
            if (t == null) {
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //設置為尾節點
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

兩個方法都是通過一個CAS方法compareAndSetTail(Node expect, Node update)來設置尾節點,該方法可以確保節點是線程安全添加的。在enq(Node node)方法中,AQS通過“死循環”的方式來保證節點可以正確添加,只有成功添加后,當前線程才會從該方法返回,否則會一直執行下去。

2. 首節點移出同步隊列

首節點的線程釋放同步狀態后,將會喚醒它的后繼節點(next),而后繼節點將會在獲取同步狀態成功時將自己設置為首節點,這個過程非常簡單,head執行該節點並斷開原首節點的next和當前節點的prev即可,注意在這個過程是不需要使用CAS來保證的,因為只有一個線程能夠成功獲取到同步狀態。

流程圖如下:

Picture1.png

獨占式同步狀態獲取與釋放

AQS提供了acquire(int arg)方法來進行獨占式同步狀態獲取,實現如下:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

其中相關函數的定義為:

  • tryAcquire:去嘗試獲取鎖,獲取成功則設置鎖狀態並返回true,否則返回false。該方法自定義同步組件自己實現,該方法必須要保證線程安全的獲取同步狀態。
  • addWaiter:如果tryAcquire返回FALSE(獲取同步狀態失敗),則調用該方法將當前線程加入到CLH同步隊列尾部。
  • acquireQueued:當前線程會根據公平性原則來進行阻塞等待(自旋),直到獲取鎖為止;並且返回當前線程在等待過程中有沒有中斷過。
  • selfInterrupt:產生一個中斷。

acquireQueued方法為一個自旋的過程,當前線程(Node)進入同步隊列后,就會進入一個自旋的過程,當條件滿足,獲取到同步狀態后,就可以從這個自旋過程中退出,否則會一直執行下去。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            //中斷標志
            boolean interrupted = false;
            /*
             * 自旋過程,其實就是一個死循環而已
             */
            for (;;) {
                //當前線程的前驅節點
                final Node p = node.predecessor();
                //當前線程的前驅節點是頭結點,且同步狀態成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //獲取失敗,線程等待
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

當前線程會一直嘗試獲取同步狀態,當然前提是只有其前驅節點為頭結點才能夠嘗試獲取同步狀態,主要是為了保持FIFO同步隊列原則。頭節點釋放同步狀態后,將會喚醒其后繼節點,后繼節點被喚醒后需要檢查自己是否為頭節點。

在上面的流程中,當獲取失敗時,需要判斷是否阻塞當前線程,

if (shouldParkAfterFailedAcquire(p, node) &&        parkAndCheckInterrupt())
    interrupted = true;

在獲取同步狀態失敗后,線程並不是立馬進行阻塞,需要檢查該線程的狀態,檢查狀態的方法為 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,該方法主要靠前驅節點判斷當前線程是否應該被阻塞,代碼如下:

   private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //前驅節點
        int ws = pred.waitStatus;
        //狀態為signal,表示當前線程處於等待狀態,直接放回true
        if (ws == Node.SIGNAL)
            return true;
        //前驅節點狀態 > 0 ,則為Cancelled,表明該節點已經超時或者被中斷了,需要從同步隊列中取消
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } 
        //前驅節點狀態為Condition、propagate
        else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

這段代碼主要檢查當前線程是否需要被阻塞,具體規則如下:

  • 如果當前線程的前驅節點狀態為SIGNAL,則表明當前線程需要被阻塞,直接返回true,當前線程阻塞

  • 如果當前線程的前驅節點狀態為CANCELLED(ws > 0),則表明該線程的前驅節點已經等待超時或者被中斷了,則需要從CLH隊列中將該前驅節點刪除掉,直到回溯到前驅節點狀態 <= 0 ,返回false

  • 如果前驅節點非SIGNAL,非CANCELLED,則通過CAS的方式將其前驅節點設置為SIGNAL,返回false

如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,則調用parkAndCheckInterrupt()方法阻塞當前線程:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

parkAndCheckInterrupt() 方法主要是把當前線程掛起,從而阻塞住線程的調用棧,同時返回當前線程的中斷狀態。其內部則是調用LockSupport工具類的park()方法來阻塞該方法。

當線程釋放同步狀態后,則需要喚醒該線程的后繼節點:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //喚醒后繼節點
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

調用unparkSuccessor(Node node)喚醒后繼節點:

    private void unparkSuccessor(Node node) {
        //當前節點狀態
        int ws = node.waitStatus;
        //當前狀態 < 0 則設置為 0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        //當前節點的后繼節點
        Node s = node.next;
        //后繼節點為null或者其狀態 > 0 (超時或者被中斷了)
        if (s == null || s.waitStatus > 0) {
            s = null;
            //從tail節點來找可用節點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //喚醒后繼節點
        if (s != null)
            LockSupport.unpark(s.thread);
    }

可能會存在當前線程的后繼節點為null,超時、被中斷的情況,如果遇到這種情況了,則需要跳過該節點,但是為何是從tail尾節點開始,而不是從node.next開始呢?原因在於node.next仍然可能會存在null或者取消了,所以采用tail回溯辦法找第一個可用的線程。最后調用LockSupport的unpark(Thread thread)方法喚醒該線程。

以上就是整個獨占式獲取和釋放的過程,流程圖如下:

Picture1.png

獨占式獲取響應中斷

AQS提供了acquire(int arg)方法以供獨占式獲取同步狀態,但是該方法對中斷不響應,對線程進行中斷操作后,該線程會依然位於CLH同步隊列中等待着獲取同步狀態。為了響應中斷,AQS提供了acquireInterruptibly(int arg)方法,該方法在等待獲取同步狀態時,如果當前線程被中斷了,會立刻響應中斷拋出異常InterruptedException。

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

首先校驗該線程是否已經中斷了,如果是則拋出InterruptedException,否則執行tryAcquire(int arg)方法獲取同步狀態,如果獲取成功,則直接返回,否則執行doAcquireInterruptibly(int arg)。doAcquireInterruptibly(int arg)定義如下:

private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireInterruptibly(int arg)方法與acquire(int arg)方法僅有兩個差別。

1.方法聲明拋出InterruptedException異常。

2.在中斷方法處不再是使用interrupted標志,而是直接拋出InterruptedException異常。

獨占式超時獲取

AQS除了提供上面兩個方法外,還提供了一個增強版的方法:tryAcquireNanos(int arg,long nanos)。該方法為acquireInterruptibly方法的進一步增強,它除了響應中斷外,還有超時控制。即如果當前線程沒有在指定時間內獲取同步狀態,則會返回false,否則返回true。

共享式同步狀態獲取與釋放

AQS提供acquireShared(int arg)方法共享式獲取同步狀態:

     public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            //獲取失敗,自旋獲取同步狀態
            doAcquireShared(arg);
    }

從上面程序可以看出,方法首先是調用tryAcquireShared(int arg)方法嘗試獲取同步狀態,如果獲取失敗則調用doAcquireShared(int arg)自旋方式獲取同步狀態,共享式獲取同步狀態的標志是返回 >= 0 的值表示獲取成功。

    private void doAcquireShared(int arg) {
        /共享式節點
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //前驅節點
                final Node p = node.predecessor();
                //如果其前驅節點,獲取同步狀態
                if (p == head) {
                    //嘗試獲取同步
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

tryAcquireShared(int arg)方法嘗試獲取同步狀態,返回值為int,當其 >= 0 時,表示能夠獲取到同步狀態,這個時候就可以從自旋過程中退出。

默認AQS沒有提供tryAcquireShared的實現,需要子類自己實現該方法,

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

注意到獨占式獲取鎖不同的是,如果tryAcquireShared的返回值大於0,會進行setHeadAndPropagate的操作,下面是該方法的實現,可以看到當某個節點被設置為head之后,如果它的后繼節點是SHARED狀態的,那么將繼續通過doReleaseShared方法嘗試往后喚醒節點,實現了共享狀態的向后傳播。doReleaseShared后面會仔細分析。

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

獲取同步狀態后,完成相應的任務之后,需要調用release(int arg)方法釋放同步狀態,方法如下:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

在doReleaseShared中,如果頭節點的狀態為SIGNAL,則通過CAS將頭節點的狀態設置為0,並且喚醒后續阻塞的線程;接着再通過CAS設置節點的狀態為Node.PROPAGATE。

    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

關於doReleaseShared的幾點分析:

  1. 調用該方法的線程可能有很多:在共享鎖中,持有共享鎖的線程可以有多個,這些線程都可以調用releaseShared方法釋放鎖;而這些線程想要獲得共享鎖,則它們必然曾經成為過頭節點,或者就是現在的頭節點。因此,如果是在releaseShared方法中調用的doReleaseShared,可能此時調用方法的線程已經不是頭節點所代表的線程了,頭節點可能已經被易主好幾次了。
  2. 調用該方法的目的:無論是在acquireShared中調用,還是在releaseShared方法中調用,該方法的目的都是在當前共享鎖是可獲取的狀態時,喚醒head節點的下一個節點。這一點看上去和獨占鎖似乎一樣,但是它們的一個重要的差別是——在共享鎖中,當頭節點發生變化時,是會回到循環中再立即喚醒head節點的下一個節點的。也就是說,在當前節點完成喚醒后繼節點的任務之后將要退出時,如果發現被喚醒后繼節點已經成為了新的頭節點,則會立即觸發喚醒head節點的下一個節點的操作,如此周而復始。
  3. 只有在當前head沒有易主時,才會退出,否則繼續循環。因為當前可能有多個線程在隊列中,比如A -> B -> C -> D, 如果A喚醒B,則B成為新的頭節點,接着B會調用doReleaseShared去喚醒C,此時A線程中的head變成了C,因此也加入到了喚醒D的隊伍中,此時可能出現A、B、C同時喚醒D的情況,提高了系統效率。當隊列中的所有線程都喚醒之后,此時程序退出。

參考:

Condition實現

在之前的例子中,使用Condition和Lock實現了消息的等待和通知,這節介紹Condiition在AQS中的實現。

JDK的Object對象提供了wait/notify的機制,也能實現消息的等待與通知,Condition與之的差別主要體現在以下幾點:

  • 調用wait方法的線程首先必須是已經進入了同步代碼塊,即已經獲取了監視器鎖;與之類似,調用await方法的線程首先必須獲得lock鎖。
  • 調用wait方法的線程會釋放已經獲得的監視器鎖,進入當前監視器鎖的等待隊列(wait set)中;與之類似,調用await方法的線程會釋放已經獲得的lock鎖,進入到當前Condtion對應的條件隊列中。
  • 調用監視器鎖的notify方法會喚醒等待在該監視器鎖上的線程,這些線程將開始參與鎖競爭,並在獲得鎖后,從wait方法處恢復執行;與之類似,調用Condtion的signal方法會喚醒對應的條件隊列中的線程,這些線程將開始參與鎖競爭,並在獲得鎖后,從await方法處開始恢復執行。

在AQS的Condition實現中,和獨占鎖的爭奪類似的是,每創建一個Condtion對象就會對應一個Condtion隊列,每一個調用了Condtion對象的await方法的線程都會被包裝成Node扔進一個條件隊列中,就像這樣:

Picture2.png

同樣的,在Condition中也會用到之前介紹的同步隊列,當等待隊列中的節點獲得信號通知時,會將等待隊列的節點移到同步隊列。

以下是await時節點的變化,

await.png

以下是signal信號發出時節點的變化,

signal.png

Condition的整個await/signal流程如下:

1、Condition提供了await()方法將當前線程阻塞,並提供signal()方法支持另外一個線程將已經阻塞的線程喚醒。
2、Condition需要結合Lock使用
3、線程調用await()方法前必須獲取鎖,調用await()方法時,將線程構造成節點加入等待隊列,同時釋放鎖,並掛起當前線程
4、其他線程調用signal()方法前也必須獲取鎖,當執行signal()方法時將等待隊列的節點移入到同步隊列,當線程退出臨界區釋放鎖的時候,喚醒同步隊列的首個節點

5507455-37635d0723174712.png

下面結合源代碼進行分析:

await實現

調用await阻塞當前線程

      public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //將當前線程封裝成Node加入到等待隊列尾部
            Node node = addConditionWaiter();
            //釋放鎖
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //判斷當前節點是否已經在同步隊列中,如果是則退出循環,如果不是就阻塞當前線程
            //其他線程如果發出了signal信號之后,會把等待隊列的線程移入同步隊列,此時就會退出循環,進入下面的重新獲取鎖的acquireQueued
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //其他發出signal信號的線程釋放鎖之后,該線程被喚醒並重新競爭鎖
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

        //線程加入等待隊列尾部
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {//清除cancell態的節點
                unlinkCancelledWaiters();
                t = lastWaiter;//t指向最后一個狀態正確的節點
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)//列表為空,初始化為第一個節點
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

signal/signalAll實現

將等待隊列的節點移入同步隊列(signalAll只是循環執行signal而已)

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;//得到firstWaiter
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
      //將節點從等待隊列移入同步隊列
      final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;//cas節點狀態錯誤,說明已經cancell了,直接返回false

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);//加入同步隊列
        int ws = p.waitStatus;
        //設置前置節點狀態為signal,可重入鎖那篇文章分析過,為了喚醒線程而設置
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);//特殊情況下喚醒線程並重新同步,一般情況下這里不會執行
        return true;
    }

參考:

ReentrantReadWriteLock實現

ReentrantReadWriteLock在內部也是利用了AQS進行鎖的競爭與釋放,同時也實現了ReadWriteLock接口。

為了同時保存讀鎖和寫鎖的狀態,在內部用一個int保存讀和寫的狀態。讀狀態從高16位讀出,寫狀態從低16位讀出,在保證讀寫鎖互斥的前提下,直接利用了AQS現有的數據結構。

        static final int SHARED_SHIFT   = 16;
        //實際是65536
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        //最大值 65535
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        // 同樣是65535
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** 獲取讀的狀態  */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** 獲取寫鎖的獲取狀態 */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

寫鎖為獨占式的,因此讀鎖的獲取和釋放和AQS原生的實現一致。
讀鎖是共享式的,獲取讀鎖的狀態,並且加1.

 final boolean tryReadLock() {
            Thread current = Thread.currentThread();
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false; //寫鎖被其他線程獲取了,直接返回false
                int r = sharedCount(c); //獲取讀鎖的狀態
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) { //嘗試獲取讀鎖
                    if (r == 0) { //說明第一個獲取到了讀鎖
                        firstReader = current; //標記下當前線程是第一個獲取的
                        firstReaderHoldCount = 1; //重入次數
                    } else if (firstReader == current) {
                        firstReaderHoldCount++; //次數+1
                    } else {
                        //cachedHoldCounter 為緩存最后一個獲取鎖的線程
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get(); //緩存最后一個獲取鎖的線程
                        else if (rh.count == 0)// 當前線程獲取到了鎖,但是重入次數為0,那么把當前線程存入進去
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return true;
                }
            }
        }

讀鎖的釋放:

 protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)//如果是首次獲取讀鎖,那么第一次獲取讀鎖釋放后就為空了
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) { //表示全部釋放完畢
                    readHolds.remove();  //釋放完畢,那么久把保存的記錄次數remove掉
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                 // nextc 是 state 高 16 位減 1 后的值
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc)) //CAS設置狀態
                    
                    return nextc == 0; //這個判斷如果高 16 位減 1 后的值==0,那么就是讀狀態和寫狀態都釋放了
            }
        }

鎖降級

鎖降級算是獲取讀鎖的特例,如在A線程已經獲取寫鎖的情況下,再調取讀鎖加鎖函數則可以直接獲取讀鎖,但此時其他線程仍然無法獲取讀鎖或寫鎖,在A線程釋放寫鎖后,如果有節點等待則會喚醒后續節點,后續節點可見的狀態為目前有A線程獲取了讀鎖。

AQS實戰-實現三元共享鎖

下面的例子里,利用AQS實現了三元共享鎖,也就是當前鎖只能被三個線程獲取。

public class TripleLock implements Lock  {

    //為3表示允許兩個線程同時獲得鎖
    private final Sync sync = new Sync(3);

    private static final class Sync extends AbstractQueuedSynchronizer {

        Sync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must large than zero.");
            }
            setState(count);
        }

        public int tryAcquireShared(int reduceCount) {
            for (;;) {
                int current = getState();
                int newCount = current - reduceCount;
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }

        public boolean tryReleaseShared(int returnCount) {
            for (;;) {
                int current = getState();
                int newCount = current + returnCount;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }
    }

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1) >= 0;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

測試程序中,主線程每隔一秒鍾打印換行,工作線程直接打印當前的線程名,從結果可以看到,每一個時刻只有三個工作線程在同時運行。

public class testTripleLock {
    public void test() {
        final Lock lock = new TripleLock();
        
        class Worker extends Thread {
            public void run() {
                    lock.lock();
                    try {
                        System.out.println(Thread.currentThread().getName());
                        SleepTools.second(2);
                    } finally {
                        lock.unlock();
                    }
                    SleepTools.second(2);
            }
        }
        // 啟動10個子線程
        for (int i = 0; i < 10; i++) {
            Worker w = new Worker();
            w.setDaemon(true);
            w.start();
        }
        // 主線程每隔1秒換行
        for (int i = 0; i < 10; i++) {
        	SleepTools.second(1);
            System.out.println();
        }
    }

    public static void main(String[] args) {
        testTripleLock testMyLock = new testTripleLock();
        testMyLock.test();
    }
}

本文由『后端精進之路』原創,首發於博客 http://teckee.github.io/ , 轉載請注明出處

搜索『后端精進之路』關注公眾號,立刻獲取最新文章和價值2000元的BATJ精品面試課程

后端精進之路.png


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM