談到阻塞,相信大家都不會陌生了。阻塞的應用場景真的多得不要不要的,比如 生產-消費模式,限流統計等等。什么 ArrayBlockingQueue, LinkedBlockingQueue, DelayQueue... 都是阻塞隊列的實現啊,多簡單!
阻塞,一般有兩個特性很亮眼:1. 不耗cpu的等待;2. 線程安全;
額,要這么說也ok的。畢竟,我們遇到的問題,到這里就夠解決了。但是有沒有想過,這容器的阻塞又是如何實現的呢?
好吧,翻開源碼,也很簡單了:(比如ArrayBlockingQueue的take、put....)
// ArrayBlockingQueue /** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) // 阻塞的點 notFull.await(); enqueue(e); } finally { lock.unlock(); } } /** * Inserts the specified element at the tail of this queue, waiting * up to the specified wait time for space to become available if * the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; // 阻塞的點 nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 阻塞的點 notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
看來,最終都是依賴了AbstractQueuedSynchronizer類(著名的AQS)的await方法,看起來像那么回事。那么這個同步器的阻塞又是如何實現的呢?
java的代碼總是好跟蹤的:
// AbstractQueuedSynchronizer.await()
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 此處進行真正的阻塞 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
如上,可以看到,真正的阻塞工作又轉交給了另一個工具類: LockSupport的 park 方法了,這回跟鎖扯上了關系,看起來已經越來越接近事實了:
// LockSupport.park()
/** * Disables the current thread for thread scheduling purposes unless the * permit is available. * * <p>If the permit is available then it is consumed and the call returns * immediately; otherwise * the current thread becomes disabled for thread scheduling * purposes and lies dormant until one of three things happens: * * <ul> * <li>Some other thread invokes {@link #unpark unpark} with the * current thread as the target; or * * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * * <li>The call spuriously (that is, for no reason) returns. * </ul> * * <p>This method does <em>not</em> report which of these caused the * method to return. Callers should re-check the conditions which caused * the thread to park in the first place. Callers may also determine, * for example, the interrupt status of the thread upon return. * * @param blocker the synchronization object responsible for this * thread parking * @since 1.6 */ public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); }
看得出來,這里的實現就比較簡潔了,先獲取當前線程,設置阻塞對象,阻塞,然后解除阻塞。
好吧,到底什么是真正的阻塞,我們還是不得而知!
UNSAFE.park(false, 0L); 是個什么東西? 看起來就是這一句起到了最關鍵的作用呢!但由於這里已經是 native代碼,我們已經無法再簡單的查看源碼了!那咋整呢?
那不行就看C/C++的源碼唄,看一下parker的定義(park.hpp):
class Parker : public os::PlatformParker { private: volatile int _counter ; Parker * FreeNext ; JavaThread * AssociatedWith ; // Current association public: Parker() : PlatformParker() { _counter = 0 ; FreeNext = NULL ; AssociatedWith = NULL ; } protected: ~Parker() { ShouldNotReachHere(); } public: // For simplicity of interface with Java, all forms of park (indefinite, // relative, and absolute) are multiplexed into one call. c中暴露出兩個方法給java調用 void park(bool isAbsolute, jlong time); void unpark(); // Lifecycle operators static Parker * Allocate (JavaThread * t) ; static void Release (Parker * e) ; private: static Parker * volatile FreeList ; static volatile int ListLock ; };
那 park() 方法到底是如何實現的呢? 其實是繼承的 os::PlatformParker 的功能,也就是平台相關的私有實現,以 linux 平台實現為例(os_linux.hpp):
// linux中的parker定義 class PlatformParker : public CHeapObj<mtInternal> { protected: enum { REL_INDEX = 0, ABS_INDEX = 1 }; int _cur_index; // which cond is in use: -1, 0, 1 pthread_mutex_t _mutex [1] ; pthread_cond_t _cond [2] ; // one for relative times and one for abs. public: // TODO-FIXME: make dtor private ~PlatformParker() { guarantee (0, "invariant") ; } public: PlatformParker() { int status; status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr()); assert_status(status == 0, status, "cond_init rel"); status = pthread_cond_init (&_cond[ABS_INDEX], NULL); assert_status(status == 0, status, "cond_init abs"); status = pthread_mutex_init (_mutex, NULL); assert_status(status == 0, status, "mutex_init"); _cur_index = -1; // mark as unused } };
看到 park.cpp 中沒有重寫 park() 和 unpark() 方法,也就是說阻塞實現完全交由特定平台代碼處理了(os_linux.cpp):
// park方法的實現,依賴於 _counter, _mutex[1], _cond[2] void Parker::park(bool isAbsolute, jlong time) { // Ideally we'd do something useful while spinning, such // as calling unpackTime(). // Optional fast-path check: // Return immediately if a permit is available. // We depend on Atomic::xchg() having full barrier semantics // since we are doing a lock-free update to _counter. if (Atomic::xchg(0, &_counter) > 0) return; Thread* thread = Thread::current(); assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread *jt = (JavaThread *)thread; // Optional optimization -- avoid state transitions if there's an interrupt pending. // Check interrupt before trying to wait if (Thread::is_interrupted(thread, false)) { return; } // Next, demultiplex/decode time arguments timespec absTime; if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all return; } if (time > 0) { unpackTime(&absTime, isAbsolute, time); } // Enter safepoint region // Beware of deadlocks such as 6317397. // The per-thread Parker:: mutex is a classic leaf-lock. // In particular a thread must never block on the Threads_lock while // holding the Parker:: mutex. If safepoints are pending both the // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock. ThreadBlockInVM tbivm(jt); // Don't wait if cannot get lock since interference arises from // unblocking. Also. check interrupt before trying wait if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { return; } int status ; if (_counter > 0) { // no wait needed _counter = 0; status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; // Paranoia to ensure our locked and lock-free paths interact // correctly with each other and Java-level accesses. OrderAccess::fence(); return; } #ifdef ASSERT // Don't catch signals while blocked; let the running threads have the signals. // (This allows a debugger to break into the running thread.) sigset_t oldsigs; sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals(); pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs); #endif OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self() assert(_cur_index == -1, "invariant"); if (time == 0) { _cur_index = REL_INDEX; // arbitrary choice when not timed status = pthread_cond_wait (&_cond[_cur_index], _mutex) ; } else { _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ; if (status != 0 && WorkAroundNPTLTimedWaitHang) { pthread_cond_destroy (&_cond[_cur_index]) ; pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr()); } } _cur_index = -1; assert_status(status == 0 || status == EINTR || status == ETIME || status == ETIMEDOUT, status, "cond_timedwait"); #ifdef ASSERT pthread_sigmask(SIG_SETMASK, &oldsigs, NULL); #endif _counter = 0 ; status = pthread_mutex_unlock(_mutex) ; assert_status(status == 0, status, "invariant") ; // Paranoia to ensure our locked and lock-free paths interact // correctly with each other and Java-level accesses. OrderAccess::fence(); // If externally suspended while waiting, re-suspend if (jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); } } // unpark 實現,相對簡單些 void Parker::unpark() { int s, status ; status = pthread_mutex_lock(_mutex); assert (status == 0, "invariant") ; s = _counter; _counter = 1; if (s < 1) { // thread might be parked if (_cur_index != -1) { // thread is definitely parked if (WorkAroundNPTLTimedWaitHang) { status = pthread_cond_signal (&_cond[_cur_index]); assert (status == 0, "invariant"); status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant"); } else { // must capture correct index before unlocking int index = _cur_index; status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant"); status = pthread_cond_signal (&_cond[index]); assert (status == 0, "invariant"); } } else { pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } } else { pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } }
從上面代碼可以看出,阻塞主要借助於三個變量,_cond, _mutex, _counter, 調用linux系統的 pthread_cond_wait, pthread_mutex_lock, pthread_mutex_unlock (一組POSIX標准的阻塞接口)等平台相關的方法進行阻塞了!
而 park.cpp中,則只有 Allocate、Release 等的一些常規操作!
// 6399321 As a temporary measure we copied & modified the ParkEvent:: // allocate() and release() code for use by Parkers. The Parker:: forms // will eventually be removed as we consolide and shift over to ParkEvents // for both builtin synchronization and JSR166 operations. volatile int Parker::ListLock = 0 ; Parker * volatile Parker::FreeList = NULL ; Parker * Parker::Allocate (JavaThread * t) { guarantee (t != NULL, "invariant") ; Parker * p ; // Start by trying to recycle an existing but unassociated // Parker from the global free list. // 8028280: using concurrent free list without memory management can leak // pretty badly it turns out. Thread::SpinAcquire(&ListLock, "ParkerFreeListAllocate"); { p = FreeList; if (p != NULL) { FreeList = p->FreeNext; } } Thread::SpinRelease(&ListLock); if (p != NULL) { guarantee (p->AssociatedWith == NULL, "invariant") ; } else { // Do this the hard way -- materialize a new Parker.. p = new Parker() ; } p->AssociatedWith = t ; // Associate p with t p->FreeNext = NULL ; return p ; } void Parker::Release (Parker * p) { if (p == NULL) return ; guarantee (p->AssociatedWith != NULL, "invariant") ; guarantee (p->FreeNext == NULL , "invariant") ; p->AssociatedWith = NULL ; Thread::SpinAcquire(&ListLock, "ParkerFreeListRelease"); { p->FreeNext = FreeList; FreeList = p; } Thread::SpinRelease(&ListLock); }
綜上源碼,在進行阻塞的時候,底層並沒有(並不一定)要用while死循環來阻塞,更多的是借助於操作系統的實現來進行阻塞的。當然,這也更符合大家的猜想!
從上的代碼我們也發現一點,底層在做許多事的時候,都不忘考慮線程中斷,也就是說,即使在阻塞狀態也是可以接收中斷信號的,這為上層語言打開了方便之門。
如果要細說阻塞,其實還遠沒完,不過再往操作系統層面如何實現,就得再下點功夫,去翻翻資料了,把底線壓在操作系統層面,大多數情況下也夠用了!
