因為在看ArrayBlockIngQueue 發現問題。其中put,take,offer(e,time,unit), poll(time,unit)是阻塞的方法,offer(e),poll(),是非阻塞方法,
其中offer(e),offer(e,timeout,unit)兩個方法中的獲取鎖的方法不一樣。那為什么要這么做呢?
可以比較下面兩份代碼,一個用lock,一個用lockInterruptibly。
1 public boolean offer(E e) { 2 checkNotNull(e); 3 final ReentrantLock lock = this.lock; 4 // 這里用這個lock方法。 5 lock.lock(); 6 try { 7 if (count == items.length) 8 return false; 9 else { 10 insert(e); 11 return true; 12 } 13 } finally { 14 lock.unlock(); 15 } 16 } 17 18 public boolean offer(E e, long timeout, TimeUnit unit) 19 throws InterruptedException { 20 21 checkNotNull(e); 22 long nanos = unit.toNanos(timeout); 23 final ReentrantLock lock = this.lock; 24 //這里用這個方法 25 lock.lockInterruptibly(); 26 try { 27 while (count == items.length) { 28 if (nanos <= 0) 29 return false; 30 nanos = notFull.awaitNanos(nanos); 31 } 32 insert(e); 33 return true; 34 } finally { 35 lock.unlock(); 36 } 37 }
下面來介紹一下 兩個方法都有什么不同。
lockInterruptibly 優先考慮響應中斷,再去獲取鎖。
/** * Acquires the lock unless the current thread is * {@linkplain Thread#interrupt interrupted}. * 獲取鎖,除非當前線程被打斷了。 意思就是是如果不被打斷,就能獲取到鎖。 * * <p>Acquires the lock if it is not held by another thread and returns * immediately, setting the lock hold count to one. * 如果鎖沒有被別的線程持有,則獲得這個鎖,立刻返回,設置當前鎖持有的個數是1. * * <p>If the current thread already holds this lock then the hold count * is incremented by one and the method returns immediately. 如果當前線程已經持有鎖了,那么持有鎖的個數增加1,並立刻返回。 * * <p>If the lock is held by another thread then the * current thread becomes disabled for thread scheduling * purposes and lies dormant until one of two things happens: * 如果鎖被別的線程持有,那么當前線程將出於線程調度的目的變得不可用,直到發生下面兩種情況之一: * <ul> * * <li>The lock is acquired by the current thread; or *鎖由當前線程獲得。 * <li>Some other thread {@linkplain Thread#interrupt interrupts} the * current thread. *或者其他線程打斷當前線程 * </ul> * * <p>If the lock is acquired by the current thread then the lock hold * count is set to one. *如果鎖被當前線程獲得,那么鎖持有計數被設置為1. * <p>If the current thread: * * <ul> * * <li>has its interrupted status set on entry to this method; or * * <li>is {@linkplain Thread#interrupt interrupted} while acquiring * the lock, * * </ul> * * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * 如果當前線程在進入這個方法時有中斷狀態或者在獲取鎖時,被打斷,則拋出InterruptedException異常,並且清除interrupted status * <p>In this implementation, as this method is an explicit * interruption point, preference is given to responding to the * interrupt over normal or reentrant acquisition of the lock. * 在這個實現中,由於這個方法明顯是一個中斷點,優先考慮響應中斷,而不是正常的或者可重入鎖獲取。 * @throws InterruptedException if the current thread is interrupted */ public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } /** * Acquires in exclusive mode, aborting if interrupted. * Implemented by first checking interrupt status, then invoking * at least once {@link #tryAcquire}, returning on * success. Otherwise the thread is queued, possibly repeatedly * blocking and unblocking, invoking {@link #tryAcquire} * until success or the thread is interrupted. This method can be * used to implement method {@link Lock#lockInterruptibly}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } /** * Acquires in exclusive interruptible mode. * @param arg the acquire argument */ private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { // 自旋 獲取鎖。 for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
lock() 調用了抽象類Sync(實現了AQS)的lock()方法,這是一個抽象方法,有兩個實現,一個公平鎖,一個非公平鎖,最終都調用到acquire(int arg)方法,內部處理了中斷。
優先考慮獲取鎖,待獲取鎖成功后,才響應中斷。
1 /** 2 * 獲得鎖。 3 * 4 * 如果鎖不被其他線程持有,則獲取鎖,並立即返回,將鎖持有計數設置為1。 5 * 6 * 如果當前線程已經持有鎖,那么持有計數將增加1,方法立即返回。 7 * 8 * 如果鎖由另一個線程持有,則當前線程將出於線程調度目的而禁用, 9 * 並處於休眠狀態,直到獲得鎖,此時鎖持有計數被設置為1。 10 */ 11 public void lock() { 12 sync.lock(); 13 } 14 /** 15 * 執行鎖。子類化的主要原因是允許非公平版本的快速路徑。 16 */ 17 abstract void lock(); 18 /** 19 * 以獨占模式獲取,忽略中斷。通過至少調用一次{@link #tryAcquire}來實現,成功后返回。 20 * 否則,線程將排隊,可能會反復阻塞和解除阻塞,調用{@link #tryAcquire}直到成功。 21 * 此方法可用於實現方法{@link Lock# Lock}。 22 * 23 * @param arg the acquire argument. This value is conveyed to 24 * {@link #tryAcquire} but is otherwise uninterpreted and 25 * can represent anything you like. 26 */ 27 public final void acquire(int arg) { 28 //tryAcquire(arg) 試圖以獨占模式獲取。這個方法應該查詢對象的狀態是否允許以獨占模式獲取它, 29 //如果允許,也應該查詢是否允許以獨占模式獲取它。 30 31 //acquireQueued 獲取隊列中已存在線程的獨占不可中斷模式。用於條件等待方法以及獲取。 32 if (!tryAcquire(arg) && 33 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 34 //中斷當前線程。 35 selfInterrupt(); 36 }
比較兩個方法發現:
lock優先考慮獲取鎖,待獲取鎖成功后,才響應中斷。這個方法不拋出中斷異常。
lockInterruptibly 優先考慮響應中斷,再去獲取鎖。這個方法會拋出中斷異常。
所以我們返回來看:
offer(e), 這個方法的實現用是調用lock, 實現的功能是入隊之后,要么返回true,要么返回false,還有如果入隊的元素是null,那么會拋空指針異常,但是這個方法不會拋出被中斷的異常(InterruptedException)。
這個方法內部也設置了中斷狀態,但是不會拋出中斷異常。非阻塞方法。入隊不成功,就返回。
offer(e,timeout,unit) 這個方法調用lockInterruptibly, 實現的功能是在指定的時間內如果入隊成功,則返回true,反之,返回false,如果在等待入隊的過程中被其他線程打斷,會拋出異常。
這個方法會拋出中斷異常。而且是阻塞方法。
2019-09-25
看 java並發編程實戰,讀到5.4 阻塞方法與中斷方法,
看到這句話。
BlockingQueue的put和take等方法會拋出受檢查異常InterruptedException,這與類庫中其他一些方法的做法相同。例如Thread.sleep.
當某方法拋出InterruptedException時,表示該方法是一個阻塞方法,如果這個方法被中斷,那么它將努力提前結束阻塞狀態。
參考資料