並發編程之:異步調用獲取返回值


大家好,我是小黑,一個在互聯網苟且偷生的農民工。

Runnable

在創建線程時,可以通過new Thread(Runnable)方式,將任務代碼封裝在Runnablerun()方法中,將Runnable作為任務提交給Thread,或者使用線程池的execute(Runnable)方法處理。

public class RunnableDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(new MyRunnable());
    }
}

class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("runnable正在執行");
    }
}

Runnable的問題

如果你之前有看過或者寫過Runnable相關的代碼,肯定會看到有說Runnable不能獲取任務執行結果的說法,這就是Runnable存在的問題,那么可不可以改造一下來滿足使用Runnable並獲取到任務的執行結果呢?答案是可以的,但是會比較麻煩。

首先我們不能修改run()方法讓它有返回值,這違背了接口實現的原則;我們可以通過如下三步完成:

  1. 我們可以在自定義的Runnable中定義變量,存儲計算結果;
  2. 對外提供方法,讓外部可以通過方法獲取到結果;
  3. 在任務執行結束之前如果外部要獲取結果,則進行阻塞;

如果你有看過我之前的文章,相信要做到功能並不復雜,具體實現可以看我下面的代碼。

public class RunnableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyRunnable<String> myRunnable = new MyRunnable<>();
        new Thread(myRunnable).start();
        System.out.println(LocalDateTime.now() + " myRunnable啟動~");
        MyRunnable.Result<String> result = myRunnable.getResult();
        System.out.println(LocalDateTime.now() + " " + result.getValue());
    }
}

class MyRunnable<T> implements Runnable {
    // 使用result作為返回值的存儲變量,使用volatile修飾防止指令重排
    private volatile Result<T> result;

    @Override
    public void run() {
        // 因為在這個過程中會對result進行賦值,保證在賦值時外部線程不能獲取,所以加鎖
        synchronized (this) {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(LocalDateTime.now() + " run方法正在執行");
                result = new Result("這是返回結果");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 賦值結束后喚醒等待線程
                this.notifyAll();
            }
        }
    }
	// 方法加鎖,只能有一個線程獲取
    public synchronized Result<T> getResult() throws InterruptedException {
		// 循環校驗是否已經給結果賦值
        while (result == null) {
            // 如果沒有賦值則等待
            this.wait();
        }
        return result;
    }
	// 使用內部類包裝結果而不直接使用T作為返回結果
    // 可以支持返回值等於null的情況
    static class Result<T> {
        T value;
        public Result(T value) {
            this.value = value;
        }
        public T getValue() {
            return value;
        }
    }
}

從運行結果我們可以看出,確實能夠在主線程中獲取到Runnable的返回結果。

以上代碼看似從功能上可以滿足了我們的要求,但是存在很多並發情況的問題,實際開發中極不建議使用。在我們實際的工作場景中這樣的情況非常多,我們不能每次都這樣自定義搞一套,並且很容易出錯,造成線程安全問題,那么在JDK中已經給我們提供了專門的API來滿足我們的要求,它就是Callable

Callable

我們通過Callable來完成我們上面說的1-1億的累加功能。

public class CallableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Long max = 100_000_000L;
        Long avgCount = max % 3 == 0 ? max / 3 : max / 3 + 1;
        // 在FutureTask中存放結果
        List<FutureTask<Long>> tasks = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            Long begin = 1 + avgCount * i;
            Long end = 1 + avgCount * (i + 1);
            if (end > max) {
                end = max;
            }
            FutureTask<Long> task = new FutureTask<>(new MyCallable(begin, end));
            tasks.add(task);
            new Thread(task).start();
        }
        
        for (FutureTask<Long> task : tasks) {
            // 從task中獲取任務處理結果
            System.out.println(task.get());
        }
    }
}
class MyCallable implements Callable<Long> {
    private final Long min;
    private final Long max;
    public MyCallable(Long min, Long max) {
        this.min = min;
        this.max = max;
    }
    @Override
    public Long call() {
        System.out.println("min:" + min + ",max:" + max);
        Long sum = 0L;
        for (Long i = min; i < max; i++) {
            sum = sum + i;
        }
        // 可以返回計算結果
        return sum;
    }
}

運行結果:

可以在創建線程時將Callable對象封裝在FutureTask對象中,交給Thread對象執行。

FutureTask之所以可以作為Thread創建的參數,是因為FutureTaskRunnable接口的一個實現類。

既然FutureTask也是Runnable接口的實現類,那一定也有run()方法,我們來通過源碼看一下是怎么做到有返回值的。

首先在FutureTask中有如下這些信息。

public class FutureTask<V> implements RunnableFuture<V> {
    // 任務的狀態
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    // 具體任務對象
    private Callable<V> callable;
    // 任務返回結果或者異常時返回的異常對象
    private Object outcome; 
    // 當前正在運行的線程
    private volatile Thread runner;
	// 
    private volatile WaitNode waiters;
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
}
public void run() {
    // 任務狀態的校驗
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 執行callable的call方法獲取結果
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // 有異常則設置返回值為ex
                setException(ex);
            }
            // 執行過程沒有異常則將結果set
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

在這個方法中的核心邏輯就是執行callable的call()方法,將結果賦值,如果有異常則封裝異常。

然后我們看一下get方法如何獲取結果的。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        // 這里會阻塞等待
        s = awaitDone(false, 0L);
    // 返回結果
    return report(s);
}
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        // 狀態異常情況會拋出異常
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

在FutureTask中除了get()方法還提供有一些其他方法。

  • get(timeout,unit):獲取結果,但只等待指定的時間;

  • cancel(boolean mayInterruptIfRunning):取消當前任務;

  • isDone():判斷任務是否已完成。

CompletableFuture

在使用FutureTask來完成異步任務,通過get()方法獲取結果時,會讓獲取結果的線程進入阻塞等待,這種方式並不是最理想的狀態。

JDK8中引入了CompletableFuture,對Future進行了改進,可以在定義CompletableFuture傳入回調對象,任務在完成或者異常時,自動回調。

public class CompletableFutureDemo {
    public static void main(String[] args) throws InterruptedException {
        // 創建CompletableFuture時傳入Supplier對象
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new MySupplier());
        //執行成功時
        future.thenAccept(new MyConsumer());
        // 執行異常時
        future.exceptionally(new MyFunction());
        // 主任務可以繼續處理,不用等任務執行完畢
        System.out.println("主線程繼續執行");
        Thread.sleep(5000);
        System.out.println("主線程執行結束");
    }
}

class MySupplier implements Supplier<Integer> {
    @Override
    public Integer get() {
        try {
            // 任務睡眠3s
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 3 + 2;
    }
}
// 任務執行完成時回調Consumer對象
class MyConsumer implements Consumer<Integer> {
    @Override
    public void accept(Integer integer) {
        System.out.println("執行結果" + integer);
    }
}
// 任務執行異常時回調Function對象
class MyFunction implements Function<Throwable, Integer> {
    @Override
    public Integer apply(Throwable type) {
        System.out.println("執行異常" + type);
        return 0;
    }
}

以上代碼可以通過lambda表達式進行簡化。

public class CompletableFutureDemo {
    public static void main(String[] args) throws InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            try {
                // 任務睡眠3s
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 3 + 2;
        });
        //執行成功時
        future.thenAccept((x) -> {
            System.out.println("執行結果" + x);
        });
        future.exceptionally((type) -> {
            System.out.println("執行異常" + type);
            return 0;
        });
        System.out.println("主線程繼續執行");
        Thread.sleep(5000);
        System.out.println("主線程執行結束");
    }
}

通過示例我們發現CompletableFuture的優點:

  • 異步任務結束時,會自動回調某個對象的方法;
  • 異步任務出錯時,會自動回調某個對象的方法;
  • 主線程設置好回調后,不再關心異步任務的執行。

當然這些優點還不足以體現CompletableFuture的強大,還有更厲害的功能。

串行執行

多個CompletableFuture可以串行執行,如第一個任務先進行查詢,第二個任務再進行更新

public class CompletableFutureDemo {
    public static void main(String[] args) throws InterruptedException {
        // 第一個任務
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1234);
        // 第二個任務
        CompletableFuture<Integer> secondFuture = future.thenApplyAsync((num) -> {
            System.out.println("num:" + num);
            return num + 100;
        });
        secondFuture.thenAccept(System.out::println);
        System.out.println("主線程繼續執行");
        Thread.sleep(5000);
        System.out.println("主線程執行結束");
    }
}

並行任務

CompletableFuture除了可以串行,還支持並行處理。

public class CompletableFutureDemo {
    public static void main(String[] args) throws InterruptedException {
        // 第一個任務
        CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> 1234);
        // 第二個任務
        CompletableFuture<Integer> twoFuture = CompletableFuture.supplyAsync(() -> 5678);
		// 通過anyOf將兩個任務合並為一個並行任務
        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(oneFuture, twoFuture);

        anyFuture.thenAccept(System.out::println);
        System.out.println("主線程繼續執行");
        Thread.sleep(5000);
        System.out.println("主線程執行結束");
    }
}

通過anyOf()可以實現多個任務只有一個成功,CompletableFuture還有一個allOf()方法實現了多個任務必須都成功之后的合並任務。

小結

Runnable接口實現的異步線程默認不能返回任務運行的結果,當然可以通過改造實現返回,但是復雜度高,不適合進行改造;

Callable接口配合FutureTask可以滿足異步任務結果的返回,但是存在一個問題,主線程在獲取不到結果時會阻塞等待;

CompletableFuture進行了增強,只需要指定任務執行結束或異常時的回調對象,在結束后會自動執行,並且支持任務的串行,並行和多個任務都執行完畢后再執行等高級方法。


以上就是本期的全部內容,我們下期見,如果覺得有用點個關注唄。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM