Thread.interrupt()的理解 作者:zoterap 鏈接:https://www.jianshu.com/p/84df313ff1d2
目標
一個線程不應該由其他線程來強制中斷或停止,而是應該由線程自己自行停止。
Thread.interrupt 的作用其實也不是中斷線程,而是「通知線程應該中斷」
代碼
package com.zoterap.javabasic.current; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.SECONDS; public class ThreadInterruptDemo { public static String getCurrentTime() { return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss SSS")); } public static void main(String[] args) { Thread t = new Thread(() -> { for (int i = 0; i < 10; i++) { System.out.println(format("current i[%d], dateTime[%s]", i, getCurrentTime())); try { SECONDS.sleep(1); } catch (InterruptedException e) { System.out.println("EXCEPTION:" + e.getMessage()); Thread.currentThread().interrupt(); } } }); /** * 開啟線程 */ t.start(); /** * 主線程休眠5秒 */ try { SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } /** * 打印線程,嘗試中斷 */ t.interrupt(); } }
結果
current i[0], dateTime[2019-02-13 21:24:33 988] current i[1], dateTime[2019-02-13 21:24:35 011] current i[2], dateTime[2019-02-13 21:24:36 015] current i[3], dateTime[2019-02-13 21:24:37 020] current i[4], dateTime[2019-02-13 21:24:38 022] EXCEPTION:sleep interrupted current i[5], dateTime[2019-02-13 21:24:38 963] EXCEPTION:sleep interrupted current i[6], dateTime[2019-02-13 21:24:38 964] EXCEPTION:sleep interrupted current i[7], dateTime[2019-02-13 21:24:38 964] EXCEPTION:sleep interrupted current i[8], dateTime[2019-02-13 21:24:38 964] EXCEPTION:sleep interrupted current i[9], dateTime[2019-02-13 21:24:38 965] EXCEPTION:sleep interrupted
Thread.interrupt()到底做了啥?
在以前可以通過thread.stop()可以讓一個線程去停止另一個線程,但這種方法太暴力,突然間停止其他線程會導致被停止的線程無法完成一些清理工作,所以stop()已經被拋棄了。
Java線程的終止操作最初是直接暴露給用戶的,java.lang.Thread類提供了stop()方法,允許用戶暴力的終止一個線程並退出臨界區(釋放所有鎖,並在當前調用棧拋出ThreadDeath Exception)。 同樣的,Thread.suspend()和Thread.resume()方法允許用戶靈活的暫停和恢復線程。然而這些看似簡便的API在JDK1.2就被deprecate掉了,原因是stop()方法本質上是不安全的,它會強制釋放掉線程持有的鎖,這樣臨界區的數據中間狀態就會遺留出來,從而造成不可預知的后果。
當然Java線程不可能沒有辦法終止,在Java程序中,唯一的也是最好的辦法就是讓線程從run()方法返回。更具體來說,有以下幾種情況:
- 對於runnable的線程,利用一個變量做標記位,定期檢查
private volatile boolean flag = true; public void stop() { flag = false; } public void run() { while (flag) { //do something... } }
- 對於非runnable的線程,應該采取中斷的方式退出阻塞,並處理捕獲的中斷異常
- 對於大部分阻塞線程的方法,使用Thread.interrupt(),可以立刻退出等待,拋出InterruptedException. 這些方法包括Object.wait(), Thread.join(),Thread.sleep(),以及各種AQS衍生類:Lock.lockInterruptibly()等任何顯示聲明throws InterruptedException的方法。
private volatile Thread thread; public void stop() { thread.interrupt(); } public void run() { thread = Thread.currentThread(); while (flag) { try { Thread.sleep(interval); } catch (InterruptedException e){ //current thread was interrupted return; } } }
- 被阻塞的nio Channel也會響應interrupt(),拋出ClosedByInterruptException,相應nio通道需要實現java.nio.channels.InterruptibleChannel接口
private volatile Thread thread; public void stop() { thread.interrupt(); } public void run() { thread = Thread.currentThread(); BufferedReader in = new BufferedReader(new InputStreamReader(Channels.newInputStream( new FileInputStream(FileDescriptor.in).getChannel()))); while (flag) { try { String line = null; while ((line = in.readLine()) != null) { System.out.println("Read line:'"+line+"'"); } } catch (ClosedByInterruptException e) { //current channel was interrupted return; } catch (IOException e) { e.printStackTrace(); } } }
如果使用的是傳統IO(非Channel,如ServerSocket.accept),所在線程被interrupt時不會拋出ClosedByInterruptException。但可以使用流的close方法實現退出阻塞。
- 還有一些阻塞方法不會響應interrupt,如等待進入synchronized段、Lock.lock()。他們不能被動的退出阻塞狀態。
而Thread.interrupt的作用其實不是中斷線程,而是通知線程應該中斷了,給這個線程發一個信號,告訴它,它應該結束了,設置一個停止標志, 具體到底中斷還是運行,應該由被通知的線程自己處理。具體來說,對一個線程,調用interrupt()時:
- 如果一個線程處於了阻塞狀態(如線程調用了thread.sleep、thread.join、thread.wait、1.5中的condition.await、以及可中斷的通道上的 I/O 操作方法后可進入阻塞狀態),則在線程在檢查中斷標示時如果發現中斷標示為true,則會在這些阻塞方法(sleep、join、wait、1.5中的condition.await及可中斷的通道上的 I/O 操作方法)調用處拋出InterruptedException異常,
- 如果線程處於正常活動狀態,那么會將該線程中的中斷標志設置為true,僅此而已, 被設置中斷標志的線程將繼續正常執行,不受影響。
interrupt()並不能真正中斷線程,需要被調用的線程自己進行配合才行。一個線程如果有被中斷的需求,那么就可以這樣做: - 在正常運行任務時,經常檢查本線程的中斷標志位,如果被設置了中斷標志,就自行停止線程
- 在調用阻塞方法時正常處理InterruptException異常,
hread thread = new Thread(() -> { while (!Thread.interrupted()) { // do more work. } }); thread.start(); // 一段時間以后 thread.interrupt();
上面代碼中thread.interrupted()清除標志位是為了下次繼續監測標志位,不然會對被中斷線程的下次運行有影響。通過interrupt()和.interrupted()方法兩者的配合可以實現正常去停止一個線程,線程A通過調用線程B的interrupt方法通知線程B讓它結束線程,在線程B的run方法內部,通過循環檢查.interrupted()方法是否為真來接收線程A的信號,如果為真就可以拋出一個異常,在catch中完成一些清理工作,然后結束線程。Thread.interrupted()會清除標志位,並不是代表線程又恢復了,可以理解為僅僅是代表它已經響應完了這個中斷信號然后又重新置為可以再次接收信號的狀態。從始至終,理解一個關鍵點,interrupt()方法僅僅是改變一個標志位的值而已,和線程的狀態並沒有必然的聯系。
源碼分析
Thread.interrupt()方法設計的目的,是提示一個線程應該終止,但不強制該線程終止。程序員可以來決定如何響應這個終止提示。直接上源碼:
//Class java.lang.Thread public void interrupt() { if (this != Thread.currentThread()) checkAccess(); synchronized (blockerLock) { Interruptible b = blocker; //中斷觸發器 if (b != null) { interrupt0(); b.interrupt(this); //觸發回調接口 return; } } interrupt0(); }
校驗權限
如果不是當前線程自我中斷,會先做一次權限檢查。如果被中斷的線程屬於系統線程組(即JVM線程),checkAccess()方法會使用系統的System.getSecurityManager()來判斷權限。由於Java默認沒有開啟安全策略,此方法其實會跳過安全檢查。
觸發中斷回調接口
如果線程的中斷觸發器blocker不為null,則觸發中斷觸發回調接口Interruptible。那么這個觸發器blocker是什么時候被設置的呢?
上面提到,如果一個nio通道實現了InterruptibleChannel接口,就可以響應interrupt()中斷,其原理就在InterruptibleChannel接口的抽象實現類AbstractInterruptibleChannel的方法begin()中:
//Class java.nio.channels.spi.AbstractInterruptibleChannel protected final void begin() { if (interruptor == null) { interruptor = new Interruptible() { public void interrupt(Thread target) { synchronized (closeLock) { if (!open) return; open = false; interrupted = target; try {//關閉io通道 AbstractInterruptibleChannel.this.implCloseChannel(); } catch (IOException x) { } } }}; } blockedOn(interruptor);//將interruptor設置為當前線程的blocker Thread me = Thread.currentThread(); if (me.isInterrupted()) interruptor.interrupt(me); } protected final void end(boolean completed) throws AsynchronousCloseException { blockedOn(null); Thread interrupted = this.interrupted; if (interrupted != null && interrupted == Thread.currentThread()) { interrupted = null; throw new ClosedByInterruptException(); } if (!completed && !open) throw new AsynchronousCloseException(); } //Class java.nio.channels.Channels.ReadableByteChannelImpl public int read(ByteBuffer dst) throws IOException { ...... try { begin(); bytesRead = in.read(buf, 0, bytesToRead); finally { end(bytesRead > 0); } ...... }
以上述代碼為例,nio通道ReadableByteChannel每次執行阻塞方法read()前,都會執行begin(),把Interruptible回調接口注冊到當前線程上,以實現能夠響應其他線程的中斷。當線程收到中斷時,Thread.interrupt()觸發回調接口,在回調接口Interruptible中關閉io通道並返回,最后在finally塊中執行end(),end()方法會檢查中斷標記,拋出ClosedByInterruptException。
interrupt0()
無論是否設置了中斷觸發回調blocker,都會執行這個關鍵的native方法interrupt0():
private native void interrupt0();
以openJDK7的Hotspot虛擬機為例,先找到native方法映射
//jdk\src\share\native\java\lang\Thread.c static JNINativeMethod methods[] = { {"start0", "()V", (void *)&JVM_StartThread}, {"stop0", "(" OBJ ")V", (void *)&JVM_StopThread}, {"isAlive", "()Z", (void *)&JVM_IsThreadAlive}, {"suspend0", "()V", (void *)&JVM_SuspendThread}, {"resume0", "()V", (void *)&JVM_ResumeThread}, {"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority}, {"yield", "()V", (void *)&JVM_Yield}, {"sleep", "(J)V", (void *)&JVM_Sleep}, {"currentThread", "()" THD, (void *)&JVM_CurrentThread}, {"countStackFrames", "()I", (void *)&JVM_CountStackFrames}, {"interrupt0", "()V", (void *)&JVM_Interrupt}, {"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted}, {"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock}, {"getThreads", "()[" THD, (void *)&JVM_GetAllThreads}, {"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads}, };
可以找到interrupt0對應JVM_Interrupt這個函數,繼續找到實現代碼
//hotspot\src\share\vm\prims\jvm.cpp JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread)) JVMWrapper("JVM_Interrupt"); // Ensure that the C++ Thread and OSThread structures aren't freed before we operate oop java_thread = JNIHandles::resolve_non_null(jthread); MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock); // We need to re-resolve the java_thread, since a GC might have happened during the // acquire of the lock JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)); if (thr != NULL) { Thread::interrupt(thr); } JVM_END
可以看到這是一個JNI方法,JVM_ENTRY是JNI調用的宏。關鍵函數Thread::interrupt(thr),繼續跟蹤:
//hotspot\src\share\vm\runtime\thread.cpp void Thread::interrupt(Thread* thread) { trace("interrupt", thread); debug_only(check_for_dangling_thread_pointer(thread);) os::interrupt(thread); }
關鍵函數os::interrupt,os此時分為linux、solaris和windows,以linux為例繼續跟蹤:
//hotspot\src\os\linux\vm\os_linux.cpp void os::interrupt(Thread* thread) { assert(Thread::current() == thread || Threads_lock->owned_by_self(), "possibility of dangling Thread pointer"); OSThread* osthread = thread->osthread(); if (!osthread->interrupted()) { osthread->set_interrupted(true); // More than one thread can get here with the same value of osthread, // resulting in multiple notifications. We do, however, want the store // to interrupted() to be visible to other threads before we execute unpark(). OrderAccess::fence(); ParkEvent * const slp = thread->_SleepEvent ; if (slp != NULL) slp->unpark() ; } // For JSR166. Unpark even if interrupt status already was set if (thread->is_Java_thread()) ((JavaThread*)thread)->parker()->unpark(); ParkEvent * ev = thread->_ParkEvent ; if (ev != NULL) ev->unpark() ; }
每一個Java線程都與一個osthread一一對應,如果相應的os線程沒有被中斷,則會設置osthread的interrupt標志位為true(對應一個volatile int),並喚醒線程的SleepEvent。隨后喚醒線程的parker和ParkEvent。
簡而言之,interrupt操作會對三種事件進行unpark喚醒,分別是thread->_SleepEvent、thread->parker()和thread->_ParkEvent,這些變量的具體聲明如下:
//hotspot\src\share\vm\runtime\thread.cpp public: ParkEvent * _ParkEvent ; // for synchronized() ParkEvent * _SleepEvent ; // for Thread.sleep // JSR166 per-thread parker private: Parker* _parker; public: Parker* parker() { return _parker; }
喚醒ParkEvent
Thread類中包含了兩種作用不同的ParkEvent,_ParkEvent變量用於synchronized同步塊和Object.wait(),_SleepEvent變量用於Thread.sleep(),ParkEvent類的聲明如下:
class ParkEvent : public os::PlatformEvent { ... //略,直接看父類 } //hotspot\src\os\linux\vm\os_linux.cpp class PlatformEvent : public CHeapObj { private: pthread_mutex_t _mutex [1] ; pthread_cond_t _cond [1] ; ... public: PlatformEvent() { int status; status = pthread_cond_init (_cond, NULL); assert_status(status == 0, status, "cond_init"); status = pthread_mutex_init (_mutex, NULL); assert_status(status == 0, status, "mutex_init"); _Event = 0 ; _nParked = 0 ; _Assoc = NULL ; } void park () ; void unpark () ; int park (jlong millis) ; ... } ;
ParkEvent包含了一把mutex互斥鎖和一個cond條件變量,並在構造函數中進行了初始化,線程的阻塞和喚醒(park和unpark)就是通過他們實現的:
-
PlatformEvent::park() 方法會調用庫函數pthread_cond_wait(_cond, _mutex)實現線程等待。 synchronized塊的進入和Object.wait()的線程等待都是通過PlatformEvent::park()方法實現, (Thread.join()是使用的Object.wait()實現的)
-
PlatformEvent::park(jlong millis) 方法會調用庫函數pthread_cond_timedwait(_cond, _mutex, _abstime)實現計時條件等待,Thread.sleep(millis)就是通過PlatformEvent::park(jlong millis)實現
-
PlatformEvent::unpark() 方法會調用庫函數pthread_cond_signal (_cond)喚醒上述等待的條件變量。Thread.interrupt()就會觸發其子類SleepEvent和ParkEvent的unpark()方法。synchronized塊的退出也會觸發unpark()。其所在對象ObjectMonitor維護了ParkEvent數組作為喚醒隊列,synchronized同步塊退出時,會觸發ParkEvent::unpark()方法來喚醒等待進入同步塊的線程,或等待在Object.wait()的線程。
上述Thread類的兩個ParkEvent成員變量:_ParkEvent和_SleepEvent,都會在Thread.interrupt()時觸發unpark()動作。
對於_ParkEvent來說,它可以代表一個synchronized等待進入同步塊的時事件,也可以代表一個Object.wait()等待條件變量的事件。不同的是,如果是synchronized等待事件,被喚醒后會嘗試獲取鎖,如果失敗則會通過循環繼續park()等待,因此synchronized等待實際上是不會被interrupt()中斷的;如果是Object.wait()事件,則會通過標記為判斷出是否是被notify()喚醒的,如果不是則拋出InterruptedException實現中斷。
對於_SleepEvent相對簡單一些,它只代表線程sleep動作,可能是java.lang.Thread.sleep(),也可能是jvm內部調用的線程os::sleep()。如果是java.lang.Thread.sleep(),則會通過線程的is_interrupted標記位來判斷拋出InterruptedException。
喚醒Parker
除了喚醒SleepEvent和ParkEvent,Thread.interrupt()還會調用thread->parker()->unpark()來喚醒Thread的parker變量。Parker類與上面的ParkEvent類很相似,都持有一把mutex互斥鎖和一個cond條件變量。具體代碼見:
class Parker : public os::PlatformParker { public: // For simplicity of interface with Java, all forms of park (indefinite, // relative, and absolute) are multiplexed into one call. void park(bool isAbsolute, jlong time); void unpark(); ... //略,直接看父類 } //hotspot\src\os\linux\vm\os_linux.hpp class PlatformParker : public CHeapObj { protected: pthread_mutex_t _mutex [1] ; pthread_cond_t _cond [1] ; public: PlatformParker() { int status; status = pthread_cond_init (_cond, NULL); assert_status(status == 0, status, "cond_init"); status = pthread_mutex_init (_mutex, NULL); assert_status(status == 0, status, "mutex_init"); } }
與ParkEvent一樣,Parker使用着自己的鎖和park()/unpark()方法。
- Parker::park(bool isAbsolute, jlong time) 方法會調用庫函數pthread_cond_timedwait(_cond, _mutex, _abstime)實現計時條件等待,如果time=0則會直接使用pthread_cond_wait(_cond, _mutex)實現線程等待
- Parker::unpark() 方法會調用庫函數pthread_cond_signal (_cond)喚醒上述等待的條件變量
如源碼注釋,Thread的_parker變量更具有通用性。凡是在Java代碼里通過unsafe.park()/unpark()的調用都會對應到Thread的_parker變量去執行。而unsafe.park()/unpark()由java.util.concurrent.locks.LockSupport類調用,它支持了java.util.concurrent的各種鎖、條件變量等線程同步操作。例如:ReentrantLock, CountDownLatch, ReentrantReadWriteLock, Semaphore, ThreadPoolExecutor, ConditionObject, ArrayBlockingQueue等。
線程被Thread.interrupt()中斷時,並不意味着上述類的等待方法都會返回並拋出InterruptedException。盡管上述類最終等待在的unsafe.unpark()方法都會被喚醒,其是否繼續執行park()等待仍取決於具體實現。例如,Lock.lock()方法不會響應中斷,Lock.lockInterruptibly()方法則會響應中斷並拋出異常,二者實現的區別就在於park()等待被喚醒時是否繼續執行park()來等待鎖
如何處理InterruptedException
- 方式1 如果自己很清楚當前線程被中斷后的處理方式,則按自己的方式處理。 通常是做好善后工作,主動退出線程
- 方式2 直接在方法聲明中throws InterruptedException,丟給上層處理。這種方式也很常見,將中斷的處置權交給具體的業務來處理
- 方式3 重新設置中斷標記位,Thread.currentThread().interrupt(),交給后續方法處理
原因是底層拋出InterruptedException時會清除中斷標記位,捕獲到異常后如果不想處理,可以重新設置中斷標記位
try { ... Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
注意 請不要吞掉InterruptedException,可能會導致上層的調用方出現不可預料的結果
小結
終止一個Java線程最好的方式,就是讓run()方法主動退出。因為強制的讓一個線程被動的退出是很不安全的,內部的數據不一致會對程序造成不可預知的后果。
為了能夠通知一個線程需要被終止,Java提供了Thread.interrupt()方法,該方法會設置線程中斷的標記位,並喚醒可中斷的阻塞方法,包括Thread.sleep(),Object.wait(),nio通道的IO等待,以及LockSupport.park()。識別一個方法是否會被中斷,只需要看其聲明中是否會throws InterruptedException或ClosedByInterruptException。
每個Java線程都會對應一個osthread,它持有了三種條件變量,分別用於Thread.sleep(),Object.wait()和unsafe.park()。Thread.interrupt()會依次喚醒三個條件變量,以達到中斷的目的。線程的同步與喚醒最終都使用了pthread_cond_wait和pthread_cond_signal這些pthread庫函數。