Java並發編程系列一:Future和CompletableFuture解析與使用


一、Future模式

Java 1.5開始,提供了Callable和Future,通過它們可以在任務執行完畢之后得到任務執行結果。

Future接口可以構建異步應用,是多線程開發中常見的設計模式。

當我們需要調用一個函數方法時。如果這個函數執行很慢,那么我們就要進行等待。但有時候,我們可能並不急着要結果。

因此,我們可以讓被調用者立即返回,讓他在后台慢慢處理這個請求。對於調用者來說,則可以先處理一些其他任務,在真正需要數據的場合再去嘗試獲取需要的數據。

 


1、Callable與Runnable

java.lang.Runnable是一個接口,在它里面只聲明了一個run()方法,run返回值是void,任務執行完畢后無法返回任何結果

public interface Runnable {
    public abstract void run(); }

 

Callable位於java.util.concurrent包下,它也是一個接口,在它里面也只聲明了一個方法叫做call(),這是一個泛型接口,call()函數返回的類型就是傳遞進來的V類型

public interface Callable<V> { V call() throws Exception; }

 

 

2、Future + Callable

Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。必要時可以通過get方法獲取執行結果,該方法會阻塞直到任務返回結果

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

 

怎么使用Future和Callable呢?一般情況下是配合ExecutorService來使用的,在ExecutorService接口中聲明了若干個submit方法的重載版本

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

 

Future+Callable,使用示例如下(采用第一個方法):

 
         
import java.util.Random;
import java.util.concurrent.*;


/**
* @program: callable
* @description: Test
* @author: Mr.Wang
* @create: 2018-08-12 11:37
**/
public class MyTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> result = executor.submit(new Callable<Integer>() {
public Integer call() throws Exception {
return new Random().nextInt();
}
});
executor.shutdown();

try {
System.out.println("result:" + result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

}
 
        

 結果:

result:297483790

 

 其它方式:

import java.util.Random;
import java.util.concurrent.*;

/**
* @program: callable
* @description: testfuture
* @author: Mr.Wang
* @create: 2018-08-12 12:11
**/
public class Testfuture {
public static void main(String[] args){
//第一種方式
FutureTask<Integer> task = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return new Random().nextInt();
}
});
new Thread(task).start();
//第二種方方式
// ExecutorService executor = Executors.newSingleThreadExecutor();
// FutureTask<Integer> task = new FutureTask<Integer>(new Callable<Integer>() {
// @Override
// public Integer call() throws Exception {
// return new Random().nextInt();
// }
// });
// executor.submit(task);

try {
System.out.println("result: "+task.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

}

 

result:-358490809

3、Future 接口的局限性

 了解了Future的使用,這里就要談談Future的局限性。Future很難直接表述多個Future 結果之間的依賴性,開發中,我們經常需要達成以下目的:

  • 將兩個異步計算合並為一個(這兩個異步計算之間相互獨立,同時第二個又依賴於第一個的結果)
  • 等待 Future 集合中的所有任務都完成。
  • 僅等待 Future 集合中最快結束的任務完成,並返回它的結果。


 

二、CompletableFuture

首先,CompletableFuture類實現了CompletionStage和Future接口,因此你可以像Future那樣使用它。

莫急,下面通過例子來一步一步解釋CompletableFuture的使用。

創建CompletableFuture對象

說明:Async結尾的方法都是可以異步執行的,如果指定了線程池,會在指定的線程池中執行,如果沒有指定,默認會在ForkJoinPool.commonPool()中執行。下面很多方法都是類似的,不再做特別說明。

四個靜態方法用來為一段異步執行的代碼創建CompletableFuture對象,方法的參數類型都是函數式接口,所以可以使用lambda表達式實現異步任務

runAsync方法:它以Runnabel函數式接口類型為參數,所以CompletableFuture的計算結果為空。

supplyAsync方法以Supplier<U>函數式接口類型為參數,CompletableFuture的計算結果類型為U。

 

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

 

1、變換結果

 

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

這些方法的輸入是上一個階段計算后的結果,返回值是經過轉化后結果

例子:

import java.util.concurrent.CompletableFuture;

/**
 * @program: callable
 * @description: test
 * @author: Mr.Wang
 * @create: 2018-08-12 12:36
 **/
public class TestCompleteFuture {
    public static void main(String[] args){
        String result = CompletableFuture.supplyAsync(()->{return "Hello ";}).thenApplyAsync(v -> v + "world").join();
        System.out.println(result);
    }
}

結果:

Hello world

 

2、消費結果

 

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

這些方法只是針對結果進行消費,入參是Consumer,沒有返回值

 

例子:

import java.util.concurrent.CompletableFuture;

/**
 * @program: callable
 * @description: test
 * @author: Mr.Wang
 * @create: 2018-08-12 12:36
 **/
public class TestCompleteFuture {
    public static void main(String[] args){
        CompletableFuture.supplyAsync(()->{return "Hello ";}).thenAccept(v -> { System.out.println("consumer: " + v);});
    }
}

結果:

consumer: Hello 

 

3、結合兩個CompletionStage的結果,進行轉化后返回

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

需要上一階段的返回值,並且other代表的CompletionStage也要返回值之后,把這兩個返回值,進行轉換后返回指定類型的值。

說明:同樣,也存在對兩個CompletionStage結果進行消耗的一組方法,例如thenAcceptBoth,這里不再進行示例。

例子:

import java.util.concurrent.CompletableFuture;


/**
 * @program: callable
 * @description: test
 * @author: Mr.Wang
 * @create: 2018-08-12 12:36
 **/
public class TestCompleteFuture {
    public static void main(String[] args){

        String result = CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello";
        }).thenCombine(CompletableFuture.supplyAsync(()->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "world";
        }),(s1,s2)->{return s1 + " " + s2;}).join();
        System.out.println(result);
    }
}

結果:

Hello world

 

4、兩個CompletionStage,誰計算的快,就用那個CompletionStage的結果進行下一步的處理

 

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

兩種渠道完成同一個事情,就可以調用這個方法,找一個最快的結果進行處理,最終有返回值。

例子:

import java.util.concurrent.CompletableFuture;


/**
 * @program: callable
 * @description: test
 * @author: Mr.Wang
 * @create: 2018-08-12 12:36
 **/
public class TestCompleteFuture {
    public static void main(String[] args){

       String result = CompletableFuture.supplyAsync(()->{
           try {
               Thread.sleep(100);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           return "Hi Boy";
       }).applyToEither(CompletableFuture.supplyAsync(()->{
           try {
               Thread.sleep(300);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           return "Hi Girl";
       }),(s)->{return s;}).join();
       System.out.println(result);
    }
}

結果:

Hi Boy

5、運行時出現了異常,可以通過exceptionally進行補償

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

例子:

import java.util.concurrent.CompletableFuture;


/**
 * @program: callable
 * @description: test
 * @author: Mr.Wang
 * @create: 2018-08-12 12:36
 **/
public class TestCompleteFuture {
    public static void main(String[] args){

       String result = CompletableFuture.supplyAsync(()->{
           try {
               Thread.sleep(100);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           if(true) {
               throw new RuntimeException("exception test!");
           }

           return "Hi Boy";
       }).exceptionally(e->{
           System.out.println(e.getMessage());
           return "Hello world!";
       }).join();
       System.out.println(result);
    }
}

結果:

java.lang.RuntimeException: exception test!
Hello world!

 

三、結束

OK,了解了以上使用,基本上就對CompletableFuture比較清楚了。

后面會找個時間說說CompletableFuture實現原理

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM