在我們使用多個線程來同時運行多個任務時,可以通過使用鎖(互斥)來同步兩個或多個任務的行為,從而使得一個任務不會干涉另一個任務使用的資源。
這個問題已經解決了,下一步是學習如何使任務彼此之間可以協作,以使得多個任務可以一起工作去解決某個問題。在這類問題中不可避免會碰到某些部分必須在其他部分被解決之前解決。在解決這類問題時,關鍵是多個任務之間如何“握手”,即通過何種方式來通知對方。在Java中有多種方式以及工具可以幫助我們實現這種“握手”。方式比較多,總結了一下,主要如下:
互斥+信號(synchronized、ReentrantLock)
利用並發工具包中的構件(CountLatchDown、Cyclicbarrier)
通過線程池的一個方法(awaitTermination方法)
1. 中斷和檢查中斷機制
利用中斷和加入線程(join)屬於Thread提供的原生機制,用來實現線程之間的通信。
關於中斷,Java中的線程是通過中斷狀態來標識,可以通過線程的實例方法interrupt()來設置(該方法內部會調用一個native方法interrupt0()進行設置)。中斷狀態只是一個狀態,並不會直接影響線程的運行,Java中是通過一些間接的方式來達到控制線程的效果,比如:
- 檢查在循環中斷狀態,判斷是否需要退出線程;
- 或者通過對中斷狀態進行監控,用一旦發現改變就拋出異常這種方式來通知程序;
我們通過兩個實例看一下:
public class ThreadDemo { public static void main(String[] args){ Thread t = new Thread(new ThreadCheckInterrupt()); t.start(); try { Thread.sleep(100); }catch (InterruptedException e) { } t.interrupt(); } static class ThreadCheckInterrupt implements Runnable{ @Override public void run() { System.out.println("thread start --" + System.currentTimeMillis()); while(!Thread.interrupted()) { } System.out.println("thread interrupt -- " + System.currentTimeMillis()); } } } /** * 輸出結果 **/ thread start --1556775343297 thread interrupt -- 1556775343407
上面的例子中,主線程啟動子線程t,子線程進入循環,判斷條件是!Thread.interrupted(),每次循環開始都檢查一下中斷狀態,如果中斷狀態被設置了,則退出循環,這就是檢查中斷狀態機制,這里例子中是在主線程中調用子線程的interrupt()方法來設置子線程中斷狀態的。
public class ThreadDemo { public static void main(String[] args){ Thread t = new Thread(new ThreadInterruptDemo()); t.start(); try { Thread.sleep(100); }catch (InterruptedException e) { } t.interrupt(); } static class ThreadInterruptDemo implements Runnable{ @Override public void run() { try { Thread.sleep(1000); }catch (InterruptedException e) { System.out.println("interrupt works"); } } } } /** * 輸出結果 **/ interrupt works
在這個例子中,主線程仍然開啟一個子線程,主線程休眠100ms,子線程休眠1000ms,確保主線程能夠在子線程休眠時設置其中斷狀態,以觸發中斷異常的拋出。這種通過中斷來觸發異常來通知程序的方式即是Java中的中斷機制。
在調用wait()、sleep()、join()等方法導致的線程阻塞期間,如果有其他線程調用了該線程的interrupt()來中斷該線程,則會導致前面這種阻塞狀態停止,拋出InterruptedException異常,並且其中斷狀態會復原。
2. 加入線程(Join)
join()方法也是Thread提供的一個實例方法,提供了一個類似線程之間等待的效果。一個線程可以調用另一個線程的join()方法,調用之后,該線程將進入等待狀態,直到另一個線程執行完畢,該線程才會繼續執行,示例如下:
public class ThreadDemo { public static void main(String[] args){ System.out.println("main thread start , time -- > " + System.currentTimeMillis()); Thread t = new Thread(new ThreadJoin()); t.start(); try { t.join(); } catch (InterruptedException e) {} System.out.println("main thread end , time -- > " + System.currentTimeMillis()); } static class ThreadJoin implements Runnable{ @Override public void run() { System.out.println("sub thread start"); try { Thread.sleep(1000); }catch (InterruptedException e) { } System.out.println("sub thread end"); } } } /** * 輸出結果 **/ main thread start , time -- > 1556801790138 sub thread start sub thread end main thread end , time -- > 1556801791138
主線程開始之后,啟動子線程t,並調用t的join()方法,可以看到,直到子線程執行結束,主線程才繼續執行,這就實現了線程之間的等待效果。
join()方法也有帶參數的重載版本,可以指定等待的時間,即使當超過指定時間等待的線程仍然沒有執行結束,join()方法將返回,不會繼續等待。
3. 互斥+信號(synchronized、ReentrantLock)
Java中線程之間的互斥主要是借助於synchronized、ReentrantLock來實現,互斥既可以保證任務之間的串行,也可以保證某一時刻只有一個任務可以響應某個信號。在互斥之上,我們為任務添加了一種途徑,可以將其自身掛起,直至某些外部條件發生變化,表示是時候讓這個任務向前繼續執行。這種握手可以通過Object的方法wait()和notify()來安全地實現,也可以通過Java SE5中並發類庫提供的具有await()和signal()方法的Condition對象。
通過synchronized實現的互斥和Object提供的wait()、notify()/notifyAll()方法互相配合,可以實現線程之間的等待通知機制。調用wait()、notify()等方法的對象必須為對象鎖,且必須在同步塊內執行調用。調用wait()方法的線程將進入等待狀態,且會釋放已經獲取的同步鎖;調用notify()/notifyAll()方法的線程將會通知處於等待狀態的線程,從等待狀態切換到准備競爭同步鎖,一旦搶到鎖則繼續從之前等待的地方(即wait()處)執行:
public class ThreadDemo{ static String content; static String LOCK = "lock"; public static void main(String[] args){ new Thread(){ @Override public void run(){ synchronized(LOCK){ content = "hello world"; LOCK.notifyAll(); } } }.start(); synchronized(LOCK){ while(content == null){ try{ LOCK.wait(); }catch(InterruptedException e){ e.printStackTrace(); } System.out.println(content.toUpperCase()); } } } } // 輸出 HELLO WORLD
而ReentrantLock提供的互斥是和其內部類Condition提供的await()、signal()相配合來實現線程之間的等待通知機制。與上面一樣,await()、signal()方法必須在獲取鎖的情況下才能夠調用,否則會拋出異常。Condition實例對象通過ReentrantLock對象獲取。
public class ReentrantLockCondition { public static final ReentrantLock lock = new ReentrantLock(); public static final Condition c = lock.newCondition(); public static void main(String[] args) { System.out.println("main thread start , time -- > " + System.currentTimeMillis()); Thread sub = new Thread(new ConditionTest()); try { lock.lock();
sub.start(); c.await(); } catch (InterruptedException e) { }finally { lock.unlock(); } System.out.println("main thread end , time -- > " + System.currentTimeMillis()); } static class ConditionTest implements Runnable{ @Override public void run() { System.out.println("sub thread start , time -- > " + System.currentTimeMillis()); try { lock.lock(); Thread.sleep(1000); c.signal(); }catch(InterruptedException e) { }finally { lock.unlock(); } System.out.println("sub thread end , time -- > " + System.currentTimeMillis()); } } } /** * 輸出結果 **/ main thread start , time -- > 1556883481141 sub thread start , time -- > 1556883481141 sub thread end , time -- > 1556883482141 main thread end , time -- > 1556883482141
如上,主線程獲取鎖,啟動子線程,並調用await()方法進入阻塞狀態,並釋放鎖,這時子線程會獲取鎖,休眠1s之后調用signal()方法通知主線程,子線程釋放鎖之后,主線程可以獲取鎖,獲取成功之后繼續執行。所以結果是先有主線程啟動的日志,然后是子線程啟動和結束的日志,最后則是主線程結束的日志。
4. 利用並發工具包中的構件(CountLatchDown、CyclicBarrier)
java.util.concurrent包在JDK1.5之后引入了大量用來解決並發問題的新類,比如CountDownLatch、CyclicBarrier等,各有自己的特點,我們仍然看一些例子:
4.1 CountLatchDown
它可以用來同步一個或多個任務,強制它們等待由其他任務執行的一組操作完成,用法入下:
public class CountDownLatchSimpleDemo { public static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) { System.out.println("main thread start , time -> " + System.currentTimeMillis()); Thread t1 = new Thread(new Count()); Thread t2 = new Thread(new Count()); t1.start(); t2.start(); try { c.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("main thread end , time -> " + System.currentTimeMillis()); } static class Count implements Runnable{ private static int counter = 0; private final int id = counter++; @Override public void run() { System.out.println("thread " + id + " start , time -> " + System.currentTimeMillis()); try { Thread.sleep(1000); }catch(InterruptedException e) { } System.out.println("thread " + id + " end , time -> " + System.currentTimeMillis()); c.countDown(); } } } /** * 輸出結果 **/ main thread start , time -> 1557065917966 thread 0 start , time -> 1557065917968 thread 1 start , time -> 1557065917968 thread 0 end , time -> 1557065918968 thread 1 end , time -> 1557065918968 main thread end , time -> 1557065918968
主線程啟動兩個子線程,然后調用await()方法之后進入等待狀態,兩個子線程啟動之后會調用CountDownLatch的countDown()方法,總共調用了兩次,就將CountDownLatch對象初始化時指定的計數2消去,這時候主線程就會從等待狀態中被喚醒,繼續執行。
4.2 CyclicBarrier
這個東西江湖人稱柵欄,可以協調多個線程一起運行,實現類似多個線程都達到某個狀態之后再繼續運行的效果。實現機制簡單介紹下:
- 首先將要參與的線程初始化;
- 調用await()方法的線程將會進入等待狀態;
- 直到指定數量的線程都調用了await()方法,之前處於等待狀態的線程才會從調用await()處繼續執行;
public class CyclicBarrierSimpleDemo { public static AtomicInteger id = new AtomicInteger(); public static AtomicInteger count = new AtomicInteger(); public static CyclicBarrier barrier = new CyclicBarrier(2, ()->{ if(count.get() > 2) { return; } count.incrementAndGet(); System.out.println("barrier trip " + count.get()); } ); public static void main(String[] args) { Thread t1 = new Thread(new SubTask()); Thread t2 = new Thread(new SubTask()); t1.start(); t2.start(); try { Thread.sleep(500); }catch(InterruptedException e) { } t1.interrupt(); t2.interrupt(); } static class SubTask implements Runnable{ int threadId = id.getAndIncrement(); @Override public void run() { while(!Thread.interrupted()) { try { Thread.sleep(100); System.out.println("thread " + threadId + " play"); barrier.await(); } catch (InterruptedException e) { System.out.println("thread " + threadId + " interrupt"); return; } catch (BrokenBarrierException e) { e.printStackTrace(); } } System.out.println("thread " + threadId + " end"); } } } /** * 運行結果 **/ thread 0 play thread 1 play barrier trip 1 thread 1 play thread 0 play barrier trip 2 thread 1 play thread 0 play barrier trip 3 thread 1 play thread 0 play thread 0 interrupt thread 1 interrupt
如上是一個簡單的demo,主線程會啟動兩個子線程,子線程會調用CyclicBarrier的await()方法,從上面輸出結果可以看出,子線程之間會互相等待,執行一次,越過一次柵欄。
5. 共享內存變量(volatile)
volatile關鍵字可以保證可見性和有序性,這里也是利用其可見性來實現線程之間的通信的,因為被volatile修飾的變量一旦發生變化,對其它線程是可見的。通過監控某個共享變量,當其狀態發生改變時,就可以認為是收到別的線程的信號了。
public class VolatileDemo { public static volatile boolean index = false; public static void main(String[] args) { Thread sub = new Thread(new SubThread()); sub.start(); System.out.println("main thread notify sub thread , time -- > " + System.currentTimeMillis()); try { Thread.sleep(200); }catch(InterruptedException e) { } index = true; } } class SubThread implements Runnable{ @Override public void run() { System.out.println("sub thread start , time -- > " + System.currentTimeMillis()); while(!VolatileDemo.index) { } System.out.println("sub thread end , time -- > " + System.currentTimeMillis()); } } /** * 運行結果 **/ main thread notify sub thread , time -- > 1557145592276 sub thread start , time -- > 1557145592276 sub thread end , time -- > 1557145592477
如上面的例子,線程一直監控共享變量index,當主線程將index改為true時,子線程可以馬上監控到,就可以退出while循環了。
6. 管道(PipedWriter、PipedReader)
通過IO阻塞的方式,我們也能實現線程之間的通信:
public class PipeNotifyDemo { public static void main(String[] args) throws IOException { PipedWriter writer = new PipedWriter(); PipedReader reader = new PipedReader(); writer.connect(reader); Thread t1 = new Thread(()->{ System.out.println("writer running"); try { for(int i = 0 ; i < 5 ; i++) { writer.write(i); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); }finally { try { writer.close(); } catch (IOException e) { e.printStackTrace(); } } System.out.println("writer ending"); }); Thread t2 = new Thread(()->{ System.out.println("reader running"); int message = 0; try { while((message = reader.read()) != -1) { System.out.println("message = " + message + " , time -- > " + System.currentTimeMillis()); } }catch(Exception e) { }finally { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } System.out.println("reader ending"); }); t1.start(); t2.start(); } } /** * 輸出結果 **/ reader running writer running message = 0 , time -- > 1557148872499 message = 1 , time -- > 1557148872499 message = 2 , time -- > 1557148873499 message = 3 , time -- > 1557148875499 message = 4 , time -- > 1557148875499 writer ending reader ending
線程2源源不斷收到線程1寫入的數據。
7. 通過線程池的一個方法(awaitTermination方法)
線程池中有一個方法awaitTermination()方法,可以用來判斷調用線程池的shutdown()方法之后,線程池中的線程是否執行完畢。因為shutdown()方法是不會阻塞的,調用之后線程池不會接受新任務,但是已有的任務會繼續執行,所以通過awaitTermination方法來判斷是否存在還在執行的任務,這也算是一種線程之前的通信吧。
public class ThreadPoolNotifyDemo { public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(()->{ System.out.println("thread1 running"); try { Thread.sleep(2000); }catch(InterruptedException e) { e.printStackTrace(); } }); exec.execute(()->{ System.out.println("thread2 running"); try { Thread.sleep(3000); }catch(InterruptedException e) { e.printStackTrace(); } }); exec.shutdown(); while(!exec.awaitTermination(1, TimeUnit.SECONDS)) { System.out.println("thread in thread pool is still running , time -- > " + System.currentTimeMillis()); } System.out.println("main thread over"); } } /** * 輸出結果 **/ thread1 running thread2 running thread in thread pool is still running , time -- > 1557150326140 thread in thread pool is still running , time -- > 1557150327141 main thread over
8. 總結
關於線程間通信的方法五花八門,本文只是羅列了一下,如果不正確,還請指正。話說這會兒還在上班的卧鋪車廂上寫博客呢。。。
