線程、線程池以及CompletableFuture組合式異步編程


一、創建線程的三種常見方式

1、繼承Thread

  1. 創建線程類,繼承Thread
  2. new Thread().start()的方式啟動線程
public static void main(String[] args) {
    System.out.println("線程" + Thread.currentThread().getName() + "開始執行");
    new Thread(new Thread01(), "thread01").start();
    System.out.println("線程" + Thread.currentThread().getName() + "執行完畢");
}
static class Thread01 extends Thread {
    @Override
    public void run() {
        System.out.println("線程" + Thread.currentThread().getName() + "開始執行");
        int i = 10 / 5;
        System.out.println("計算結果為" + i);
    }
}

2、實現Runnable接口

  1. 創建線程類,實現Runnable接口
  2. new Thread(線程類).start()的方式啟動線程
public static void main(String[] args) {

    System.out.println("線程" + Thread.currentThread().getName() + "開始執行");
    new Thread(new Thread02(), "thread02").start();
    System.out.println("線程" + Thread.currentThread().getName() + "執行完畢");
}
static class Thread02 implements Runnable {
    @Override
    public void run() {
        System.out.println("線程" + Thread.currentThread().getName() + "開始執行");
        int i = 10 / 5;
        System.out.println("計算結果為" + i);
    }
}

3、實現Callable<T>接口

  1. 創建線程類,實現Callable接口,可以有返回值
  2. 創建FutureTask<T> futureTask = new FutureTask<>(線程類);
  3. new Thread(futureTask).start()的方式啟動線程
  4. futureTask.get()獲取返回值
public static void main(String[] args) throws Exception {
    System.out.println("線程" + Thread.currentThread().getName() + "開始執行");
    FutureTask<String> stringFutureTask = new FutureTask<>(new Thread03());
    new Thread(stringFutureTask).start();
    System.out.println(stringFutureTask.get());
    System.out.println("線程" + Thread.currentThread().getName() + "執行完畢");
}
static class Thread03 implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("線程" + Thread.currentThread().getName() + "開始執行");
        int i = 10 / 5;
        System.out.println("計算結果為" + 10);
        return "返回到主程序了" + i;
    }
}

二、使用線程池執行線程

1、Executors自帶的線程池

固定大小線程池newFixedThreadPool

// @param nThreads 線程數量,核心線程數和最大線程數均為該值
// @param threadFactory 創建線程的工廠
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

單線程池,按序執行newSingleThreadExecutor

// @param threadFactory 創建線程的工廠
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}

可緩存的線程池,newCachedThreadPool

// @nThreads 線程數量,核心線程數和最大線程數均為該值
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

2、線程池的實現原理

所有Executors框架提供的線程池底層均為java.util.concurrent.ThreadPoolExecutor

/**
 * 通過給定的參數創建一個線程池.
 *
 * @param corePoolSize 一直存活的線程數,即使空閑,除非設置了allowCoreThreadTimeOut。
 * @param maximumPoolSize 最大存活線程數
 * @param keepAliveTime 當前存活線程數大於核心線程數時,空閑線程等待新任務最大時間
 * @param unit 參數keepAliveTime的時間單位
 * @param workQueue 任務執行前保存任務的隊列。只保存由execute方法提交的Runnable任務。
 * @param threadFactory 新建線程的線程工廠
 * @param handler 拒絕策略
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

3、線程池的執行

  1. 創建線程池對象
  2. 使用線程池對象execute進行
static ExecutorService executorService = Executors.newFixedThreadPool(4);

public static void main(String[] args) {
    System.out.println("線程" + Thread.currentThread().getName() + "開始執行");
    executorService.execute(new Thread01());  	// 沒有返回值的異步執行
    executorService.execute(new Thread02());
    executorService.submit(new Thread01());		// 有返回值的異步執行
    executorService.submit(new Thread02());
    System.out.println("線程" + Thread.currentThread().getName() + "執行完畢");
}

三、CompletableFuture組合式異步編程

1、創建異步對象

(1)runAsync supplyAsync方法

CompletableFuture 提供了四個靜態方法來創建一個異步操作。

  • runAsync方法不支持返回值。
  • supplyAsync可以支持返回值。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

(2) 計算結果完成時的回調方法

CompletableFuture的計算結果完成,或者拋出異常的時候,可以執行特定的Action。主要是下面的方法:

//可以處理異常,無返回值
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

//可以處理異常,有返回值
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

可以看到Action的類型是BiConsumer<? super T,? super Throwable>它可以處理正常的計算結果,或者異常情況。

(3) handle 方法,任務完成后執行,可處理異常

handle 是執行任務完成時對結果的處理。 handle 方法和 thenApply 方法處理方式基本一樣。不同的是 handle 是在任務完成后再執行,還可以處理異常的任務。thenApply 只可以執行正常的任務,任務出現異常則不執行 thenApply 方法。

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

(4) 線程串行化

等待之前任務完成后執行

  • thenApply:能接受上一步結果,有返回值
public <U> CompletionStage<U> thenApply(Function<? super T, ? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor);
  • thenAccept:能接受上一步結果,但是無返回值
public <U> CompletionStage<U> thenAccept(Consumer<? super T> action);
public <U> CompletionStage<U> thenAcceptAsync(Consumer<? super T> action);
public <U> CompletionStage<U> thenAcceptAsync(Consumer<? super T> action, Executor executor);
  • thenRun:不能獲取上一步的執行結果
public <U> CompletionStage<U> thenRun(Runnable action);
public <U> CompletionStage<U> thenRunAsync(Runnable action);
public <U> CompletionStage<U> thenRunAsync(Runnable action, Executor executor);

(5) 合並任務,都要完成

thenCombine:組合兩個future,獲取兩個的結果,返回當前任務的返回值

public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn, Executor executor);

thenAcceptBoth:組合兩個future,獲取結果,然后處理任務,沒有返回值

public <U> CompletableFuture<Void> thenAcceptBoth(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action);

public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action);

public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action, Executor executor);

runAfterBoth:組合兩個future,不需要獲取結果,只需要兩個future執行完成后就執行該任務

public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                            Runnable action);

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action);

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action,
                                                 Executor executor);

(6) 合並任務,僅完成一個

applyToEither:組合兩個future完成其中一個執行,獲取它的結果,返回當前任務的返回值

public <U> CompletableFuture<U> applyToEither(
    CompletionStage<? extends T> other, Function<? super T, U> fn);

public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn);

public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn,
    Executor executor);

acceptEither:組合兩個future完成其中一個執行,獲取一個的結果,然后處理任務,沒有返回值

public CompletableFuture<Void> acceptEither(
    CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture<Void> acceptEitherAsync(
    CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture<Void> acceptEitherAsync(
    CompletionStage<? extends T> other, Consumer<? super T> action,
    Executor executor);

runAfterEither:組合兩個future完成其中一個執行,不需要獲取結果,只需要一個future執行完成后就執行該任務

public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                            Runnable action);

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                 Runnable action);

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                 Runnable action,
                                                 Executor executor);

(7) 多任務組合

allOf:等待所有任務完成

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

anyOf:只要有一個完成

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM