主題:
- 線程的未捕獲異常
- 線程工廠
- 線程暫停
- 線程池
線程的未捕獲異常
在線程異常的時候,多線程運行不能按照順序執行過程中捕獲異常的方式來處理異常,異常會被直接拋出到控制台(由於線程的本質,使得你不能捕獲從線程中逃逸的異常。一旦異常逃逸出任務的run方法,它就會向外傳播到控制台,除非你采用特殊的形式捕獲這種異常。)
如下例:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadException implements Runnable{ @Override public void run() { throw new RuntimeException(); } //現象:控制台打印出異常信息,並運行一段時間后才停止 public static void main(String[] args){ //就算把線程的執行語句放到try-catch塊中也無濟於事 try{ ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new ThreadException()); }catch(RuntimeException e){ System.out.println("Exception has been handled!"); } } }
使用UncaughtExceptionHandler可以捕獲異常,並且重啟新的線程來完成redo工作,這個例子里是重新初始化:init();
import io.github.viscent.mtia.util.Debug; import io.github.viscent.mtia.util.Tools; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; public class ThreadMonitorDemo { volatile boolean inited = false; static int threadIndex = 0; final static Logger LOGGER = Logger.getAnonymousLogger(); final BlockingQueue<String> channel = new ArrayBlockingQueue<String>(100); public static void main(String[] args) throws InterruptedException { ThreadMonitorDemo demo = new ThreadMonitorDemo(); demo.init(); for (int i = 0; i < 100; i++) { demo.service("test-" + i); } Thread.sleep(2000); System.exit(0); } public synchronized void init() { if (inited) { return; } Debug.info("init..."); WokrerThread t = new WokrerThread(); t.setName("Worker0-" + threadIndex++); // 為線程t關聯一個UncaughtExceptionHandler t.setUncaughtExceptionHandler(new ThreadMonitor()); t.start(); inited = true; } public void service(String message) throws InterruptedException { channel.put(message); } private class ThreadMonitor implements Thread.UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { Debug.info("Current thread is `t`:%s, it is still alive:%s", Thread.currentThread() == t, t.isAlive()); // 將線程異常終止的相關信息記錄到日志中 String threadInfo = t.getName(); LOGGER.log(Level.SEVERE, threadInfo + " terminated:", e); // 創建並啟動替代線程 LOGGER.info("About to restart " + threadInfo); // 重置線程啟動標記 inited = false; init(); } }// 類ThreadMonitor定義結束 private class WokrerThread extends Thread { @Override public void run() { Debug.info("Do something important..."); String msg; try { for (;;) { msg = channel.take(); process(msg); } } catch (InterruptedException e) { // 什么也不做 } } private void process(String message) { Debug.info(message); // 模擬隨機性異常 int i = (int) (Math.random() * 100); if (i < 2) { throw new RuntimeException("test"); } Tools.randomPause(100); } }// 類ThreadMonitorDemo定義結束 }
線程工廠
工廠設計模式是java中最常用的設計模式之一。它是一種創造性的模式,可用於開發一個或多個類需要的對象。有了這個工廠,我們可以集中創建對象。
創造邏輯的集中給我們帶來了一些優勢,例如:
很容易更改所創建對象的類或創建這些對象的方式。
很容易為有限的資源限制對象的創建。例如,我們只能有N個類型的對象。
很容易生成關於對象創建的統計數據。
線程工廠實際上是一個接口:
public interface ThreadFactory { Thread newThread(Runnable r); }
實際上就是使用工廠模式在創建線程的時候統一操作。
ThreadPoolExecutor具有ThreadFactory作為參數的構造函數。執行器創建新線程時使用此工廠。
使用ThreadFactory,可以自定義由executor創建的線程,以便它們具有正確的線程名、優先級、設置為daemon等。
import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; public class CustomThreadFactory implements ThreadFactory { private int counter; private String name; private List<String> stats; public CustomThreadFactory(String name) { counter = 1; this.name = name; stats = new ArrayList<String>(); } @Override public Thread newThread(Runnable runnable) { Thread t = new Thread(runnable, name + "-Thread_" + counter); t.setDaemon(true);//守護線程 t.setPriority(Thread.MAX_PRIORITY );//統一設置優先級 counter++; stats.add(String.format("Created thread %d with name %s on %s \n", t.getId(), t.getName(), new Date())); return t; } public String getStats() { StringBuffer buffer = new StringBuffer(); Iterator<String> it = stats.iterator(); while (it.hasNext()) { buffer.append(it.next()); } return buffer.toString(); } public static void main(String[] args) { CustomThreadFactory factory = new CustomThreadFactory("CustomThreadFactory"); Task task = new Task(); Thread thread; System.out.printf("Starting the Threads\n\n"); for (int i = 1; i <= 10; i++) { thread = factory.newThread(task); thread.start(); } System.out.printf("All Threads are created now\n\n"); System.out.printf("Give me CustomThreadFactory stats:\n\n" + factory.getStats()); } } class Task implements Runnable { @Override public void run() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } }
線程暫停
有3種暫停的方式
- 1、自己進入睡眠暫停;
- 2、使用CyclicBarrier,在某個階段,多個線程統一暫停(await()),這里有介紹https://www.cnblogs.com/starcrm/p/12469364.html
- 3、使用Condition信號暫停;
第一種情況的例子:
try { Thread.sleep(2000); } catch (InterruptedException ex) { }
第三種情況的例子,通過主線程進行控制:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class PauseControl extends ReentrantLock { private static final long serialVersionUID = 176912639934052187L; // 線程暫掛標志 private volatile boolean suspended = false; private final Condition condSuspended = newCondition(); /** * 暫停線程 */ public void requestPause() { suspended = true; } /** * 恢復線程 */ public void proceed() { lock(); try { suspended = false; condSuspended.signalAll(); } finally { unlock(); } } /** * 當前線程僅在線程暫掛標記不為true的情況下才執行指定的目標動作。 * * @targetAction 目標動作 * @throws InterruptedException */ public void pauseIfNeccessary(Runnable targetAction) throws InterruptedException { lock(); try { while (suspended) { condSuspended.await(); } targetAction.run(); } finally { unlock(); } } } import java.util.Scanner; public class ThreadPauseDemo { final static PauseControl pc = new PauseControl(); public static void main(String[] args) { final Runnable action = new Runnable() { @Override public void run() { Debug.info("Master,I'm working..."); Tools.randomPause(300); } }; Thread slave = new Thread() { @Override public void run() { try { for (;;) { pc.pauseIfNeccessary(action); } } catch (InterruptedException e) { // 什么也不做 } } }; slave.setDaemon(true); slave.start(); askOnBehaveOfSlave(); } static void askOnBehaveOfSlave() { String answer; int minPause = 2000; try (Scanner sc = new Scanner(System.in)) { for (;;) { Tools.randomPause(8000, minPause); pc.requestPause(); Debug.info("Master,may I take a rest now?%n"); Debug.info("%n(1) OK,you may take a rest%n" + "(2) No, Keep working!%nPress any other key to quit:%n"); answer = sc.next(); if ("1".equals(answer)) { pc.requestPause(); Debug.info("Thank you,my master!"); minPause = 8000; } else if ("2".equals(answer)) { Debug.info("Yes,my master!"); pc.proceed(); minPause = 2000; } else { break; } }// for結束 }// try結束 Debug.info("Game over!"); } }
線程池
線程池是預初始化線程的集合。一般來說,集合的大小是固定的,但不是強制的。它有助於使用相同的線程執行N個任務。如果有比線程更多的任務,那么任務需要在隊列結構(FIFO–先進先出)中等待。
當任何線程完成其執行時,它都可以從隊列中獲取新任務並執行它。所有任務完成后,線程將保持活動狀態,並等待線程池中的更多任務。
線程池的意義在於:
1、減少在創建和銷毀線程上所花的時間以及系統資源的開銷,提升任務執行性能。
2、控制進程中線程數量的峰值,避免系統開銷過大。
通過線程池,可創建一定數量的線程,並由線程池管理。在需要執行任務時,直接使用其中的線程。任務執行完成后,線程保留,並可用於執行下一個任務。如果任務比線程多,則等待線程空閑。
直接構建ThreadPoolExecutor
我們可以使用以下構造函數實例化ThreadPoolExecutor,其中ThreadFactory和RejectedExecutionHandler是可選參數,默認值分別是Executors.defaultThreadFactory()和ThreadPoolExecutor.AbortPolicy
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
- int corePoolSize : 定義核心池大小。對於每個新請求,即使池中有空閑線程,也會根據核心池大小創建一個新線程
- int maximumPoolSize : 當發出請求並且核心池大小中的所有線程都忙時,將創建一個新線程,直到它達到最大池大小。在最大池大小之后,所有新請求都將進入隊列。
- long keepAliveTime: 這是等待空閑線程死亡的時間。只有當線程計數大於核心池大小且小於或等於最大核心池大小時,空閑線程才會在keepAliveTime之后死亡。
- TimeUnit unit: keepAliveTime參數的時間單位
- BlockingQueue<Runnable> workQueue : BlockingQueue是一個隊列,如果一個線程想要獲取一個元素,並且隊列是空的,那么線程將被阻塞,並等待元素在隊列中可用。以同樣的方式添加元素時,如果隊列中沒有空間,線程將被阻塞並等待獲得可用空間。
- ThreadFactory threadFactory : 這是可選參數。傳遞用戶定義的ThreadFactory
- RejectedExecutionHandler handler: 這是可選參數。在兩種情況下,ThreadPoolExecutor.execute()可以拒絕新任務。
- 1、執行器已關閉。
- 2、超過工作隊列容量和最大線程使用有限的界限,即它們是飽和時
- ThreadPoolExecutor.AbortPolicy:它中止任務並始終拋出RejectedExecutionException。
- ThreadPoolExecutor.CallerRunsPolicy:它自己(客戶端)執行被拒絕的任務。
- ThreadPoolExecutor.DiscardPolicy:刪除任務。
- ThreadPoolExecutor.DiscardOldestPolicy:當ThreadPoolExecutor.execute()由於工作隊列的有限邊界和最大限制而拒絕任務時,此策略只將任務放在工作隊列的頭。
默認策略是ThreadPoolExecutor.AbortPolicy。
例子
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorDemoOne { public static void main(final String[] args) throws Exception { final ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 3, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); executor.execute(new BookReader("Ramayan")); executor.execute(new BookReader("Mahabharat")); executor.execute(new BookReader("Veda")); System.out.println("Old Max Pool Size:"+ executor.getMaximumPoolSize()); executor.setMaximumPoolSize(4); System.out.println("New Max Pool Size:"+ executor.getMaximumPoolSize()); executor.shutdown(); } } class BookReader implements Runnable { private String bookName; public BookReader(String bookName) { this.bookName = bookName; } @Override public void run() { for(int i = 0; i<5; i++) { System.out.println("Reading book: "+ bookName); try { Thread.sleep(200); } catch (InterruptedException ex) { System.out.println("I'm interrupted"); } } } }
使用Executors工廠方法創建ThreadPoolExecutor實例
可以使用Executors類提供的靜態工廠方法來獲取ThreadPoolExecutor,而不是直接使用上述構造函數之一創建ThreadPoolExecutor的實例。
- newCachedThreadPool()–創建一個線程池,該線程池根據需要創建新線程,提供無容量的阻塞隊列 SynchronousQueue,因此任務提交之后,將會創建新的線程執行;線程空閑超過60s將會銷毀 。
- newCachedThreadPool(ThreadFactory ThreadFactory)–創建一個線程池,該線程池根據需要創建新線程,但在可用時將重用先前構造的線程,並在需要時使用提供的ThreadFactory創建新線程。
- newFixedThreadPool(int nThreads)–創建一個線程池,該線程池重用在共享的無邊界隊列上操作的固定數量的線程。構造一個固定線程數目的線程池,配置的corePoolSize與maximumPoolSize大小相同,同時使用了一個無界LinkedBlockingQueue存放阻塞任務,因此多余的任務將存在再阻塞隊列,不會由RejectedExecutionHandler處理
- newFixedThreadPool(int nThreads,ThreadFactory ThreadFactory)–創建一個線程池,該線程池重用在共享的無邊界隊列上操作的固定數量的線程,在需要時使用提供的ThreadFactory創建新線程。
- newSingleThreadExecutor()–創建一個執行器,該執行器使用一個工作線程並在無限隊列上操作,由一個線程串行執行任務。
- newSingleThreadExecutor(ThreadFactory ThreadFactory)–創建一個執行器,該執行器使用一個在無限隊列上操作的工作線程,並在需要時使用提供的ThreadFactory創建一個新線程。
Executors的例子
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExecutorExp { public static void main(String[] args) { ExecutorService executor = Executors.newSingleThreadExecutor(); for(int i = 0; i < 4; i++) { executor.execute(new VoidTask()); } executor.shutdown(); } } class VoidTask implements Runnable{ @Override public void run() { System.out.println("Executing task (thread name)- " + Thread.currentThread().getName()); // delay to keep the thread busy // so that pool is used try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
newFixedThreadPool.
此方法創建一個線程池,該線程池重用在共享的無界隊列上操作的固定數量的線程。使用此方法時,內部Executors類使用以下參數創建ThreadPoolExecutor實例:
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
實例
public class ExecutorExp { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(2); for(int i = 0; i < 4; i++) { executor.execute(new VoidTask()); } executor.shutdown(); } } class VoidTask implements Runnable{ @Override public void run() { System.out.println("Executing task (thread name)- " + Thread.currentThread().getName()); // delay to keep the thread busy // so that pool is used try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
newCachedThreadPool
該線程池根據需要創建新線程,但在以前構造的線程可用時將重用它們。使用此方法時,內部Executors類使用以下參數創建ThreadPoolExecutor實例
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
實例

public class ExecutorExp { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); for(int i = 0; i < 40; i++) { executor.execute(new VoidTask()); } executor.shutdown(); } } class VoidTask implements Runnable{ @Override public void run() { System.out.println("Executing task (thread name)- " + Thread.currentThread().getName()); // delay to keep the thread busy // so that pool is used try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } //為了驗證最大線程數Integer.MAX_VALUE,我們看一下結果,感覺這個 Executing task (thread name)- pool-1-thread-3 Executing task (thread name)- pool-1-thread-4 Executing task (thread name)- pool-1-thread-1 Executing task (thread name)- pool-1-thread-2 Executing task (thread name)- pool-1-thread-7 Executing task (thread name)- pool-1-thread-8 Executing task (thread name)- pool-1-thread-5 Executing task (thread name)- pool-1-thread-6 Executing task (thread name)- pool-1-thread-11 Executing task (thread name)- pool-1-thread-10 Executing task (thread name)- pool-1-thread-9 Executing task (thread name)- pool-1-thread-12 Executing task (thread name)- pool-1-thread-15 Executing task (thread name)- pool-1-thread-13 Executing task (thread name)- pool-1-thread-14 Executing task (thread name)- pool-1-thread-16 Executing task (thread name)- pool-1-thread-17 Executing task (thread name)- pool-1-thread-18 Executing task (thread name)- pool-1-thread-19 Executing task (thread name)- pool-1-thread-20 Executing task (thread name)- pool-1-thread-21 Executing task (thread name)- pool-1-thread-23 Executing task (thread name)- pool-1-thread-22 Executing task (thread name)- pool-1-thread-24 Executing task (thread name)- pool-1-thread-25 Executing task (thread name)- pool-1-thread-26 Executing task (thread name)- pool-1-thread-27 Executing task (thread name)- pool-1-thread-28 Executing task (thread name)- pool-1-thread-29 Executing task (thread name)- pool-1-thread-30 Executing task (thread name)- pool-1-thread-31 Executing task (thread name)- pool-1-thread-32 Executing task (thread name)- pool-1-thread-33 Executing task (thread name)- pool-1-thread-34 Executing task (thread name)- pool-1-thread-35 Executing task (thread name)- pool-1-thread-36 Executing task (thread name)- pool-1-thread-37 Executing task (thread name)- pool-1-thread-39 Executing task (thread name)- pool-1-thread-38 Executing task (thread name)- pool-1-thread-40
構造有定時功能的線程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory); }
實例
import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ScheduledThreadPoolExecutorExample { public static void main(String[] args) { ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2); DelayTask task = new DelayTask("Repeat Task"); System.out.println("Created : " + task.getName()); executor.scheduleWithFixedDelay(task, 2, 2, TimeUnit.SECONDS); } } class DelayTask implements Runnable { private String name; public DelayTask(String name) { this.name = name; } public String getName() { return name; } public void run() { System.out.println("Executing : " + name + ", Current Seconds : " + new Date().getSeconds()); } }
Callable接口
Callable接口與Runnable接口相似,都用於定義線程的可執行任務。Callable在JDK 1.5引入,與Runnable相比,有三個優勢:
1、可以在任務中拋出異常
2、可以終止任務
3、可以獲取任務的返回值
這時候就會出現一個問題:新舊兩種定義線程任務的方式,怎么前后兼容呢?
這時候就出現了FutureTask這個類
public class FutureTask<V> implements RunnableFuture<V> { // 封裝實現Callable接口的任務 public FutureTask(Callable<V> callable) { //...} // 封裝實現Runnable接口的任務,result是返回值,在任務完成后賦值(futureTask.get()會返回這個值)。 public FutureTask(Runnable runnable, V result) {//...} } public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
使用FutureTask包裝Runnable
import java.util.concurrent.Callable; public class MyRunnable implements Runnable { @Override public void run() { for(int i=0;i<3;i++){ System.out.println("num="+(i+1)); } } } import java.util.concurrent.FutureTask; public class Test { public static void main(String[] agrs){ String str = "default value"; FutureTask<String> task = new FutureTask<String>(new MyRunnable(),str); new Thread(task).start(); System.out.println(getFutureValue(task)); System.out.println("after task.get()!"); } public static String getFutureValue(FutureTask<String> task){ String str = "default value1"; try{ str = task.get(); }catch (Exception e){ System.out.println("task is canceled!"); } return str; } }
線程池管理線程任務
通過ExecutorService,可以使用4個方法執行線程任務。其中3個執行Runnable任務:
其中3個執行Runnable任務:
- void execute(Runnable command);
- < T > Future< T > submit(Runnable task, T result);
- Future< ? > submit(Runnable task);
1個執行Callable任務:
- < T > Future< T > submit(Callable< T > task); //執行Callable任務
只有通過submit方法執行的線程任務,才能獲取到Future,才能通過Future管理線程。
示例代碼如下:
import java.util.concurrent.Callable; public class MyCallable implements Callable<String> { @Override public String call() throws Exception{ int num=0; for(int i=0;i<3;i++){ num++; Thread.sleep(100); System.out.println("num="+num); } return String.valueOf(num); } } import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class Test { public static void main(String[] agrs){ ExecutorService service = Executors.newFixedThreadPool(2); Future<String> future = service.submit(new MyCallable()); // 使用submit()執行Callable任務 System.out.println("執行結果:"+getFutureValue(future)); System.out.println("after task.get()!"); service.shutdown(); } public static String getFutureValue(Future<String> task){ String str = "default value"; try{ str = task.get(); }catch (Exception e){ System.out.println("task is canceled!"); } return str; } }
Executor和ExecutorService的區別
見圖
Executors 是工廠helper類,包含幾個用於為您創建預配置線程池實例的方法。這些類是一個很好的線程池起點。如果不需要應用任何自定義微調,請使用它。
Executor和ExecutorService接口用於在Java中處理不同的線程池實現。通常,您應該保持代碼與線程池的實際實現分離,並在整個應用程序中使用這些接口。
Executor接口有一個execute方法來提交Runnable實例以供執行。