Java並發包源碼學習之AQS框架(三)LockSupport和interrupt


接着上一篇文章今天我們來介紹下LockSupport和Java中線程的中斷(interrupt)

其實除了LockSupport,Java之初就有Object對象的wait和notify方法可以實現線程的阻塞和喚醒。那么它們的區別 是什么呢?

主要的區別應該說是它們面向的對象不同。阻塞和喚醒是對於線程來說的,LockSupport的park/unpark更符合這個語義,以“線程”作為方法的參數, 語義更清晰,使用起來也更方便。而wait/notify的實現使得“線程”的阻塞/喚醒對線程本身來說是被動的,要准確的控制哪個線程、什么時候阻塞/喚醒很困難, 要不隨機喚醒一個線程(notify)要不喚醒所有的(notifyAll)。

park-vs-wait.png

wait/notify最典型的例子應該就是生產者/消費者了:

class BoundedBuffer1 {
    private int contents;

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    public synchronized void put(Object x) {
        while (count == items.length) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }

        items[putptr] = x;
        if (++putptr == items.length)
            putptr = 0;
        ++count;
        notifyAll();
    }

    public synchronized Object take() {
        while (count == 0) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        Object x = items[takeptr];
        if (++takeptr == items.length)
            takeptr = 0;
        --count;
        notifyAll();
        return x;
    }

    public static class Producer implements Runnable {

        private BoundedBuffer1 q;

        Producer(BoundedBuffer1 q) {
            this.q = q;
            new Thread(this, "Producer").start();
        }

        int i = 0;

        public void run() {
            int i = 0;
            while (true) {
                q.put(i++);
            }
        }
    }

    public static class Consumer implements Runnable {

        private BoundedBuffer1 q;

        Consumer(BoundedBuffer1 q) {
            this.q = q;
            new Thread(this, "Consumer").start();
        }

        public void run() {
            while (true) {
                System.out.println(q.take());
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        final BoundedBuffer1 buffer = new BoundedBuffer1();
        new Thread(new Producer(buffer)).start();
        new Thread(new Consumer(buffer)).start();
    }
}
View Code

上面的例子中有一點需要知道,在調用對象的wait之前當前線程必須先獲得該對象的監視器(synchronized),被喚醒之后需要重新獲取到監視器才能繼續執行。

//wait會先釋放當前線程擁有的監視器
obj.wait();
//會re-acquire監視器

LockSupport並不需要獲取對象的監視器。LockSupport機制是每次unpark給線程1個“許可”——最多只能是1,而park則相反,如果當前 線程有許可,那么park方法會消耗1個並返回,否則會阻塞線程直到線程重新獲得許可,在線程啟動之前調用park/unpark方法沒有任何效果。

// 1次unpark給線程1個許可
LockSupport.unpark(Thread.currentThread());
// 如果線程非阻塞重復調用沒有任何效果
LockSupport.unpark(Thread.currentThread());
// 消耗1個許可
LockSupport.park(Thread.currentThread());
// 阻塞
LockSupport.park(Thread.currentThread());

因為它們本身的實現機制不一樣,所以它們之間沒有交集,也就是說LockSupport阻塞的線程,notify/notifyAll沒法喚醒。

實際上現在很少能看到直接用wait/notify的代碼了,即使生產者/消費者也基本都會用LockCondition來實現,我會在后面《Java並發包源碼學習之AQS框架(五)ConditionObject源碼分析》 文章中再回頭看這個例子。

總結下LockSupportpark/unparkObjectwait/notify

  • 面向的對象不同;
  • 跟Object的wait/notify不同LockSupport的park/unpark不需要獲取對象的監視器;
  • 實現的機制不同,因此兩者沒有交集。

雖然兩者用法不同,但是有一點,LockSupport的park和Object的wait一樣也能響應中斷。

public static void main(String[] args) throws InterruptedException {
    final Thread t = new Thread(new Runnable() {
        @Override
        public void run() {
            LockSupport.park();
            System.out.println("thread " + Thread.currentThread().getId() + " awake!");
        }
    });

    t.start();
    Thread.sleep(3000);

    // 2. 中斷
    t.interrupt();
}
thread 9 awake!

在我之前的一篇博客“如何正確停止一個線程”有介紹過Thread.interrupt()

Thread.interrupt()方法不會中斷一個正在運行的線程。這一方法實際上完成的是,在線程受到阻塞時拋出一個中斷信號,這樣線程就得以退出阻塞的狀態。更確切的說,如果線程被Object.wait, Thread.join和Thread.sleep三種方法之一阻塞,那么,它將接收到一個中斷異常(InterruptedException),從而提早地終結被阻塞狀態。

LockSupport.park()也能響應中斷信號,但是跟Thread.sleep()不同的是它不會拋出InterruptedException, 那怎么知道線程是被unpark還是被中斷的呢,這就依賴線程的interrupted status,如果線程是被中斷退出阻塞的那么該值被設置為true, 通過Thread的interruptedisInterrupted方法都能獲取該值,兩個方法的區別是interrupted獲取后會Clear,也就是將interrupted status重新置為false。

AQS和Java線程池中都大量用到了中斷,主要的作用是喚醒線程、取消任務和清理(如ThreadPoolExecutor的shutdown方法),AQS中的acquire方法也有中斷和不可中斷兩種。 其中對於InterruptedException如何處理最重要的一個原則就是Don't swallow interrupts,一般兩種方法:

  • 繼續設置interrupted status
  • 拋出新的InterruptedException
try {
    ………
} catch (InterruptedException e) {
    // Restore the interrupted status
    Thread.currentThread().interrupt();
    // or thow a new
    //throw new InterruptedException();
}

AQS的acquire就用到了第一種方法。

關於InterruptedException處理的最佳實踐可以看IBM的這篇文章

最后按照慣例做下引申。上面BoundedBuffer1類的puttake方法中的wait為什么要放在一個while循環里呢? 你如果去看Object.wait()方法的Javadoc的話會發現官方也是建議下面這樣的用法:

synchronized (obj) {
    while (<condition does not hold>)
        ……
        obj.wait();
    ……
}

StackOverflow上有一個問題里一個叫xagyg的回答解釋的比較清楚,有興趣的可以看下。 簡單來說因為:

wait前會釋放監視器,被喚醒后又要重新獲取,這瞬間可能有其他線程剛好先獲取到了監視器,從而導致狀態發生了變化, 這時候用while循環來再判斷一下條件(比如隊列是否為空)來避免不必要或有問題的操作。 這種機制還可以用來處理偽喚醒(spurious wakeup),所謂偽喚醒就是no reason wakeup,對於LockSupport.park()來說就是除了unparkinterrupt之外的原因。

LockSupport也會有同樣的問題,所以看AQS的源碼會發現很多地方都有這種re-check的思路,我們下一篇文就來看下AbstractQueuedSynchronizer類的源碼。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM