Java並發編程總結3——AQS、ReentrantLock、ReentrantReadWriteLock


本文內容主要總結自《Java並發編程的藝術》第5章——Java中的鎖。

 

一、AQS

AbstractQueuedSynchronizer(簡稱AQS),隊列同步器,是用來構建鎖或者其他同步組建的基礎框架。該類主要包括:

1、模式,分為共享和獨占。

2、volatile int state,用來表示鎖的狀態。

3、FIFO雙向隊列,用來維護等待獲取鎖的線程。

AQS部分代碼及說明如下:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    static final class Node {
        /** 共享模式,表示可以多個線程獲取鎖,比如讀寫鎖中的讀鎖 */
        static final Node SHARED = new Node();
        /** 獨占模式,表示同一時刻只能一個線程獲取鎖,比如讀寫鎖中的寫鎖 */
        static final Node EXCLUSIVE = null;

        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
    }

    /** AQS類內部維護一個FIFO的雙向隊列,負責同步狀態的管理,當前線程獲取同步狀態失敗時,同步器會將當前線程以及等待狀態等 構造成一個節點Node並加入同步隊列;當同步狀態釋放時,會把首節點中線程喚醒,使其再次嘗試同步狀態 */
    private transient volatile Node head;
    private transient volatile Node tail;

    /** 狀態,主要用來確定lock是否已經被占用;在ReentrantLock中,state=0表示鎖空閑,>0表示鎖已被占用;可以自定義,改寫tryAcquire(int acquires)等方法即可 */
    private volatile int state;
}

這里主要說明下雙向隊列,通過查看源碼分析,隊列是這個樣子的:

head -> node1 -> node2 -> node3(tail)

注意:head初始時是一個空節點(所謂的空節點意思是節點中沒有具體的線程信息),之后表示的是獲取了鎖的節點。因此實際上head->next(即node1)才是同步隊列中第一個可用節點。

AQS的設計基於模版方法模式,使用者通過繼承AQS類並重寫指定的方法,可以實現不同功能的鎖。可重寫的方法主要包括:

 

 

二、通過ReentrantLock學習AQS的使用

1、公平鎖的獲取

/**
 * Sync object for fair locks
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /** * 首先嘗試獲取鎖,如果tryAcquire(arg)返回true,獲取鎖成功; * 如果失敗,則調用acquireQueued(addWaiter(Node.EXCLUSIVE), arg),將當前線程封裝成Node節點加入到同步隊列隊尾,之后阻塞當前線程 */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    /** * 獲取state的值,如果等於0表示鎖空閑,可以嘗試獲取; * 查看當前線程是否是FIFO隊列中的第一個可用節點,如果是第一個,則嘗試通過CAS方式獲取鎖, 這保證了等待時間最長的必定先獲取鎖 */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
  
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                /** * 如果發現當前節點的前一個節點為head,那么嘗試獲取鎖,成功之后刪除head節點並將自己設置為head,退出循環; * 如果當前節點為阻塞狀態,需要unpark()喚醒,release()方法會執行喚醒操作 */
                if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC
                    failed = false; return interrupted; } /** * 為了避免無意義的自旋,同步隊列中的線程會通過park(this)方法用於阻塞當前線程 */
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}

 

2、公平鎖的釋放

更新狀態值state,之后喚醒同步隊列中的第一個等待節點,unparkSuccessor(Node node)。

 

三、公平鎖和非公平鎖

ReentrantLock默認的鎖為非公平鎖,其主要原因在於:與公平鎖相比,可以避免大量的線程切換,極大的提高性能。

先看一個非公平鎖的例子: 

public class AQS2 {
    private ReentrantLock lock = new ReentrantLock(false);
    private Thread[] threads = new Thread[3];

    public AQS2() {
        for (int i = 0; i < 3 ; i++) {
            threads[i] = new Thread(new Runnable() {
                public void run() {
                    for (int i = 0; i < 2; i++) {
                        try {
                            lock.lock();
                            Thread.sleep(100);
                            System.out.println(Thread.currentThread().getName());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } finally {
                            lock.unlock();
                        }
                    }
                }
            });
        }
    }

    public void startThreads() {
        for (Thread thread : threads) {
            thread.start();
        }
    }

    public static void main(String[] args) {
        AQS2 aqs2 = new AQS2();
        aqs2.startThreads();
    }
}

運行結果為:

 

這段代碼(每個線程2次獲取鎖/釋放鎖)的運行結果我一開始沒有想清楚,之前我是這么想的:

Thread0先獲取鎖,之后sleep 100ms,那么等待獲取鎖的同步隊列為:

head -> thread1 -> thread2 -> thread0 -> thread1 -> thread2。

從運行結果可知,第二次獲取鎖的還是thread0,但是鎖的釋放release(int args)卻總是從同步隊列的第一個可用節點開始,那就把thread1從隊列中移除了,邏輯明顯不對了。

后來重新看了代碼,比較了非公平鎖和公平鎖之間的不同時,才終於明白。

非公平鎖獲取鎖最大的不一樣的地方在於:線程可以無視sync同步隊列插隊!一旦插隊成功,獲得了鎖,那么該線程當然也就不用在排隊了。所以以上程序的同步隊列應該為:

head -> thread1 -> thread2。

非公平鎖源代碼主要的不同點有2點:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

       //不同點1
    final void lock() {
        if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {        //不同點2
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

        thread0第一次釋放鎖之后,會立刻通過lock.lock()操作繼續嘗試獲取鎖。非公平鎖的lock()方法會直接嘗試獲取鎖,無視同步隊列,因此很大概率會再次獲得鎖;如果失敗了,那么執行nonfairTryAcquire(int acquires)方法,該方法和tryAcquire(int acquires)最大的不同在於,缺少了hasQueuedPredecessors()的判斷,即不需要判斷當前線程是否是同步隊列的第一個可用節點,甚至也不需要判斷當前線程是否在同步隊列中,直接嘗試獲取鎖即可。

 

四、ReentrantReadWriteLnock

        理解了AQS的原理后,讀寫鎖也就不難理解了。讀寫鎖分為2個鎖,讀鎖和寫鎖。讀鎖在同一時刻允許多個線程訪問,通過改寫int tryAcquireShared(int arg)以及boolean tryReleaseShared(int arg)方法即可;寫鎖為獨占鎖,通過改寫boolean tryAcquire(int arg)以及boolean tryRelease(int arg)方法即可。

        由於AQS中只提供了一個int state來表示鎖的狀態,那么如何表示讀和寫2個鎖呢?解決辦法是前16位表示讀鎖,后16位表示寫鎖。由於鎖的狀態只有16位,因此無論是對於讀鎖或者是寫鎖,其state最大值均為65535,即所有獲得了鎖的線程的拿到鎖的總次數(由於是重進入鎖,因此每個線程可以拿到n個鎖)不超過65536。由於讀寫鎖主要的應用場景為多讀少寫,所以如果感覺讀鎖的65535不夠用,可以自己改寫讀寫鎖即可,比如分配int state的前24位為讀鎖,后8位為寫鎖。

        讀寫鎖還提供了一些新的方法,比如final int getReadHoldCount(),返回當前線程獲取讀鎖的次數。由於讀狀態保存的是所有獲取讀鎖的線程讀鎖次數的總和,因此每個線程自己的讀鎖次數需要單獨保存,引入了ThreadLocal,由線程自身維護。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM