Java 隊列同步器 AQS



本文部分摘自《Java 並發編程的藝術》


概述

隊列同步器 AbstractQueuedSynchronize(以下簡稱同步器),是用來構建鎖(Lock)或者其他同步組件(JUC 並發包)的基礎框架,它使用了一個 int 成員變量表示同步狀態,通過內置的 FIFO 隊列來完成資源獲取線程的排隊工作

同步器的主要使用方式是繼承,子類通過繼承同步器並實現它的抽象方法來管理同步狀態,子類推薦被定義為自定義同步組件的靜態內部類。同步器自身沒有實現任何同步接口,它僅僅是定義了若干同步狀態的獲取和釋放方法來供自定義組件使用

一言以蔽之,同步器是實現鎖(也可以是任意同步組件)的一種方式,它屏蔽了更加底層的一些機制,使開發者更易於理解和使用


隊列同步器的接口

同步器的設計是基於模板方法模式的,使用者需要繼承隊列同步器並重寫指定的方法,隨后將同步器組合在自定義同步組件的實現中,並調用同步器提供的模板方法,而這些模板方法將會調用使用者重寫的方法

1. 訪問或修改同步狀態

重寫同步器指定的方法時,需要使用同步器提供的如下三個方法來訪問或修改同步狀態:

  • getState()

    獲取當前同步狀態

  • setState(int newState)

    設置當前同步狀態

  • compareAndSetState(int expect, int update)

    使用 CAS 設置當前狀態,該方法能保證狀態設置的原子性

2. 同步器可重寫的方法

方法名稱 描述
protected boolean tryAcquire(int arg) 獨占式獲取同步狀態,實現該方法需要查詢當前狀態,並判斷同步狀態是否符合預期,然后再進行 CAS 設置同步狀態
protected boolean tryRelease(int arg) 獨占式地釋放同步狀態,等待獲取同步狀態的線程將有機會獲取同步狀態
protected int tryAcquireShared(int arg) 共享式獲取同步狀態,返回大於等於 0 的值,表示獲取成功,否則獲取失敗
protected boolean tryReleaseShared(int arg) 共享式釋放同步狀態
protected boolean isHeldExclusively() 當前同步器是否在獨占模式下被線程占有,一般該方法表示是否被當前線程所獨占

3. 同步器提供的模板方法

方法名稱 描述
void acquire(int arg) 獨占式獲取同步狀態,如果當前線程獲取同步狀態成功,則由該方法返回,否則,將會進入同步隊列等待,該方法將會調用重寫的 tryAcquire(int arg) 方法
void acquireInterruptibly(int arg) 與 acquire(int arg) 相同,但該方法響應中斷,當前線程未獲取到同步狀態而進入同步隊列中,如果當前線程被中斷,則該方法會拋出 InterruptedException 並返回
boolean tryAcquireNanos(int arg, long nanos) 在 acquireInterruptibly(int arg) 的基礎上增加了超時限制
void acquireShared(int arg) 共享式的獲取同步狀態,與獨占式獲取的主要區別是在同一時刻可以有多個線程獲取到同步狀態
void acquireSharedInterruptibly(int arg) 與 acquireShared(int arg) 相同,該方法響應中斷
boolean tryAcquireSharedNanos(int arg, long nanos) 在 acquireSharedInterruptibly 的基礎上增加了超時限制
boolean release(int arg) 獨占式的釋放同步狀態,該方法會在釋放同步狀態之后,將同步隊列中第一個節點包含的線程喚醒
boolean releaseShared(int arg) 共享式的釋放同步狀態
Collection<Thread> getQueuedThreads() 獲取等待在同步隊列上的線程集合

4. 示例

下面通過一個獨占鎖的示例來深入了解一下同步器的工作原理。顧名思義,獨占鎖就是在同一時刻只能有一個線程獲取到鎖,其他獲取鎖的線程只能處於同步隊列中等待,只有獲取鎖的線程釋放了鎖,后繼的線程才能獲取鎖

public class Mutex implements Lock {

    /**
     * 自定義同步器
     */
    private static class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean isHeldExclusively() {
            // 是否處於占用狀態
            return getState() == 1;
        }

        @Override
        public boolean tryAcquire(int acquires) {
            // 當狀態為 0 時獲取鎖
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            // 釋放鎖,將狀態設置為 0
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        /**
         * 返回一個 Condition, 每個 condition 都包含一個 condition 隊列
         */
        Condition newCondition() {
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

Mutex 中定義了一個靜態內部類,該內部類繼承了同步器並實現了獨占式獲取和釋放同步狀態。用戶使用 Mutex 時並不會直接和內部同步器實現打交道,而是調用 Mutex 提供的方法,大大降低了實現一個可靠自定義組件的門檻


隊列同步器的實現

1. 同步隊列

同步器依賴內部的同步雙向隊列來完成同步狀態的管理,當前線程獲取同步狀態失敗后,同步器會將當前線程及其等待狀態等信息構造成一個節點,並加入同步隊列,同時阻塞當前線程。當同步狀態釋放后,會把首節點中的線程喚醒,使其再次嘗試獲取同步狀態

節點是構成同步隊列的基礎,同步器擁有首節點(head)和尾結點(tail),沒有成功獲取同步狀態的線程將會成為節點並加入該隊列的尾部

同步隊列的基本結構如下:

同步器將節點加入到同步隊列的過程如圖所示:

首節點是獲取同步狀態成功的節點,首節點線程在釋放同步狀態時,會喚醒后繼節點,而后繼節點將會在獲取同步狀態成功時將自己設置為首節點,過程如下:

設置首節點是通過獲取同步狀態成功的線程來完成的,由於只有一個線程能夠成功獲取同步狀態,因此設置頭節點的方法並不需要使用 CAS 來保證,只需要將首節點設置成原首節點的后繼節點並斷開原首節點的 next 引用即可

2. 獨占式同步狀態獲取與釋放

通過調用同步器的 acquire(int arg) 方法可以獲取同步狀態,該方法對中斷不敏感,線程獲取同步狀態失敗則進入同步隊列中,后續對線程進行中斷操作,線程不會從同步隊列中移出

獨占式同步狀態獲取流程,也就是 acquire(int arg) 方法調用流程如圖所示:

如果當前線程獲取同步狀態失敗,就會生成一個節點(獨占式 Node.EXCLUSIVE,同一時刻只能有一個線程成功獲取同步狀態),並加入到隊列尾部。一個隊列里有很多節點,而只有前驅節點是頭節點的節點才能嘗試獲取同步狀態,原因有兩個:

  • 頭節點是成功獲取到同步狀態的節點,而頭節點的線程釋放了同步狀態之后,將會喚醒其后繼節點,后繼節點的線程被喚醒后需要檢查自己的前驅節點是否是頭節點
  • 維護同步隊列的 FIFO 原則

因此,如果隊列中的非頭節點線程的前驅節點出隊或者被中斷而從等待狀態返回,那么其隨后會檢查自己的前驅是否為頭節點,如果是則嘗試獲取同步狀態

當前線程獲取同步狀態並執行了相應邏輯之后,就需要釋放同步狀態,使得后繼節點能夠繼續獲取同步狀態。通過調用同步器的 release(int arg) 方法可以釋放同步狀態,該方法執行時,會喚醒頭節點的后繼節點線程

3. 共享式同步狀態獲取與釋放

共享式獲取與獨占式獲取最主要的區別在於同一時刻能否有多個線程同時獲取到同步狀態。以文件的讀寫為例,若一個程序在對文件進行讀操作,那么這一時刻對於該文件的寫操作均被阻塞,而讀操作能夠同時進行。寫操作要求對資源的獨占式訪問,而讀操作可以是共享式訪問,兩種不同的訪問模式在同一時刻對文件或資源的訪問情況,如下圖所示:

通過調用同步器的 acquireShared(int arg) 方法可以共享式地獲取同步狀態,其代碼核心邏輯和 acquire() 差不多,也是判斷當前節點的前驅是否為頭節點,如果是就嘗試獲取同步狀態。頭節點在釋放同步狀態之后,也會喚醒后續處於等待狀態的節點

問題的關鍵在於如何做到多個線程訪問同步狀態,因為按照上面所講的過程,和獨占式幾乎沒有任何區別。獨占式與共享式在實現上的差別其實僅僅在於:每次頭節點釋放同步狀態之后,獨占式只是把其后繼節點設置為頭節點,而共享式還多了一個傳播的過程(筆者能力有限,這一塊沒搞明白,就不瞎寫了。。)

與獨占式一樣,共享式獲取也需要釋放同步狀態,通過調用 releaseShared(int arg) 方法可以釋放同步狀態,並喚醒后續處於等待狀態的節點

4. 獨占式超時獲取同步狀態

通過調用同步器的 doAcquireNanos(int arg, long nanosTimeout) 方法可以超時獲取同步狀態,即在指定的時間段內獲取同步狀態

在介紹這個方法之前,先介紹一下響應中斷的同步狀態獲取過程。Java5 以后,同步器提供了 acquireInterruptibly(int arg) 方法,這個方法在等待獲取同步狀態時,如果當前線程被中斷,會立刻返回,並拋出 InterruptedException

超時獲取同步狀態可以視為響應中斷獲取同步狀態的增強版。獨占式超時和非獨占式獲取在流程上非常相似,其主要區別在於未獲取到同步狀態時的處理邏輯。acquire(int arg) 在未獲取到同步狀態時,會使當前線程一致處於等待狀態,而 doAcquireNanos(int arg, long nanosTimeout) 會使當前線程等待 nanosTimeout 納秒,如果當前線程在 nanosTimeout 納秒內沒有獲取同步狀態,將會從等待邏輯中自動返回


自定義同步組件

設計一個同步工具:同一時刻,只能允許至多兩個線程同時訪問,超過兩個線程的訪問將被阻塞。顯然這是共享式訪問,主要設計思路如下:

  • 重寫 tryAcquireShared(int args) 方法和 tryReleaseShared(int args) 方法
  • 定義初始狀態 status 為 2,當一個線程進行獲取,status 減 1,該線程釋放,status 加 1,為 0 時再有其他線程進行獲取,則阻塞

示例代碼如下:

public class TwinsLock implements Lock {

    private final Sync sync = new Sync(2);

    private static final class Sync extends AbstractQueuedSynchronizer {

        Sync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must large than zero");
            }
            setState(count);
        }

        @Override
        public int tryAcquireShared(int reduceCount) {
            while (true) {
                int current = getState();
                int newCount = current - reduceCount;
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int reduceCount) {
            while (true) {
                int current = getState();
                int newCount = current + reduceCount;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1) > 0;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

再編寫一個測試來驗證 TwinsLock 是否按預期工作

public class TwinsLockTest {


    public static void main(String[] args) {

        final Lock lock = new TwinsLock();

        class Worker extends Thread {

            @Override
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                        SleepUtils.second(1);
                        System.out.println(Thread.currentThread().getName());
                        SleepUtils.second(1);
                    } finally {
                        lock.unlock();
                    }
                }
            }
        }

        for (int i = 0; i < 10; i++) {
            Worker worker = new Worker();
            worker.setDaemon(true);
            worker.start();
        }

        for (int i = 0; i < 10; i++) {
            SleepUtils.second(1);
            System.out.println();
        }
    }
}

運行該測試用例,發現線程名稱成對輸出,說明同一時刻只有兩個線程能夠獲取到鎖



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM