一、創建線程的三種常見方式
1、繼承Thread
類
- 創建線程類,繼承Thread
new Thread().start()
的方式啟動線程
public static void main(String[] args) {
System.out.println("線程" + Thread.currentThread().getName() + "開始執行");
new Thread(new Thread01(), "thread01").start();
System.out.println("線程" + Thread.currentThread().getName() + "執行完畢");
}
static class Thread01 extends Thread {
@Override
public void run() {
System.out.println("線程" + Thread.currentThread().getName() + "開始執行");
int i = 10 / 5;
System.out.println("計算結果為" + i);
}
}
2、實現Runnable
接口
- 創建線程類,實現Runnable接口
new Thread(線程類).start()
的方式啟動線程
public static void main(String[] args) {
System.out.println("線程" + Thread.currentThread().getName() + "開始執行");
new Thread(new Thread02(), "thread02").start();
System.out.println("線程" + Thread.currentThread().getName() + "執行完畢");
}
static class Thread02 implements Runnable {
@Override
public void run() {
System.out.println("線程" + Thread.currentThread().getName() + "開始執行");
int i = 10 / 5;
System.out.println("計算結果為" + i);
}
}
3、實現Callable<T>
接口
- 創建線程類,實現Callable接口,可以有返回值
- 創建
FutureTask<T> futureTask = new FutureTask<>(線程類);
new Thread(futureTask).start()
的方式啟動線程futureTask.get()
獲取返回值
public static void main(String[] args) throws Exception {
System.out.println("線程" + Thread.currentThread().getName() + "開始執行");
FutureTask<String> stringFutureTask = new FutureTask<>(new Thread03());
new Thread(stringFutureTask).start();
System.out.println(stringFutureTask.get());
System.out.println("線程" + Thread.currentThread().getName() + "執行完畢");
}
static class Thread03 implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("線程" + Thread.currentThread().getName() + "開始執行");
int i = 10 / 5;
System.out.println("計算結果為" + 10);
return "返回到主程序了" + i;
}
}
二、使用線程池執行線程
1、Executors自帶的線程池
固定大小線程池newFixedThreadPool
// @param nThreads 線程數量,核心線程數和最大線程數均為該值
// @param threadFactory 創建線程的工廠
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
單線程池,按序執行newSingleThreadExecutor
// @param threadFactory 創建線程的工廠
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
可緩存的線程池,newCachedThreadPool
// @nThreads 線程數量,核心線程數和最大線程數均為該值
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
2、線程池的實現原理
所有Executors框架提供的線程池底層均為java.util.concurrent.ThreadPoolExecutor
/**
* 通過給定的參數創建一個線程池.
*
* @param corePoolSize 一直存活的線程數,即使空閑,除非設置了allowCoreThreadTimeOut。
* @param maximumPoolSize 最大存活線程數
* @param keepAliveTime 當前存活線程數大於核心線程數時,空閑線程等待新任務最大時間
* @param unit 參數keepAliveTime的時間單位
* @param workQueue 任務執行前保存任務的隊列。只保存由execute方法提交的Runnable任務。
* @param threadFactory 新建線程的線程工廠
* @param handler 拒絕策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
3、線程池的執行
- 創建線程池對象
- 使用線程池對象
execute
進行
static ExecutorService executorService = Executors.newFixedThreadPool(4);
public static void main(String[] args) {
System.out.println("線程" + Thread.currentThread().getName() + "開始執行");
executorService.execute(new Thread01()); // 沒有返回值的異步執行
executorService.execute(new Thread02());
executorService.submit(new Thread01()); // 有返回值的異步執行
executorService.submit(new Thread02());
System.out.println("線程" + Thread.currentThread().getName() + "執行完畢");
}
三、CompletableFuture
組合式異步編程
1、創建異步對象
(1)runAsync
和 supplyAsync
方法
CompletableFuture
提供了四個靜態方法來創建一個異步操作。
runAsync
方法不支持返回值。supplyAsync
可以支持返回值。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
(2) 計算結果完成時的回調方法
當CompletableFuture
的計算結果完成,或者拋出異常的時候,可以執行特定的Action。主要是下面的方法:
//可以處理異常,無返回值
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
//可以處理異常,有返回值
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
可以看到Action的類型是BiConsumer<? super T,? super Throwable>
它可以處理正常的計算結果,或者異常情況。
(3) handle 方法,任務完成后執行,可處理異常
handle 是執行任務完成時對結果的處理。 handle 方法和 thenApply 方法處理方式基本一樣。不同的是 handle 是在任務完成后再執行,還可以處理異常的任務。thenApply 只可以執行正常的任務,任務出現異常則不執行 thenApply 方法。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
(4) 線程串行化
等待之前任務完成后執行
- thenApply:能接受上一步結果,有返回值
public <U> CompletionStage<U> thenApply(Function<? super T, ? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor);
- thenAccept:能接受上一步結果,但是無返回值
public <U> CompletionStage<U> thenAccept(Consumer<? super T> action);
public <U> CompletionStage<U> thenAcceptAsync(Consumer<? super T> action);
public <U> CompletionStage<U> thenAcceptAsync(Consumer<? super T> action, Executor executor);
- thenRun:不能獲取上一步的執行結果
public <U> CompletionStage<U> thenRun(Runnable action);
public <U> CompletionStage<U> thenRunAsync(Runnable action);
public <U> CompletionStage<U> thenRunAsync(Runnable action, Executor executor);
(5) 合並任務,都要完成
thenCombine:組合兩個future,獲取兩個的結果,返回當前任務的返回值
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor);
thenAcceptBoth:組合兩個future,獲取結果,然后處理任務,沒有返回值
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor);
runAfterBoth:組合兩個future,不需要獲取結果,只需要兩個future執行完成后就執行該任務
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor);
(6) 合並任務,僅完成一個
applyToEither:組合兩個future完成其中一個執行,獲取它的結果,返回當前任務的返回值
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor);
acceptEither:組合兩個future完成其中一個執行,獲取一個的結果,然后處理任務,沒有返回值
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor);
runAfterEither:組合兩個future完成其中一個執行,不需要獲取結果,只需要一個future執行完成后就執行該任務
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor);
(7) 多任務組合
allOf:等待所有任務完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
anyOf:只要有一個完成
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);