使用CompletableFuture進行異步任務編排


1.JDK5引入了Future進行異步任務的處理,Future 的接口主要方法有以下幾個:

(1)boolean cancel (boolean mayInterruptIfRunning) 取消任務的執行。參數指定是否立即中斷任務執行,或者等等任務結束

(2)boolean isCancelled () 任務是否已經取消,任務正常完成前將其取消,則返回 true

(3)boolean isDone () 任務是否已經完成。需要注意的是如果任務正常終止、異常或取消,都將返回true

(4)V get () throws InterruptedException, ExecutionException 等待任務執行結束,然后獲得V類型的結果。InterruptedException 線程被中斷異常, ExecutionException任務執行異常,如果任務被取消,還會拋出CancellationException

(5) get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 同上面的get功能一樣,多了設置超時時間。參數timeout指定超時時間,uint指定時間的單位,在枚舉類TimeUnit中有相關的定義。如果計 算超時,將拋出TimeoutException

一般情況下Future 配合Callable 使用,獲取異步任務執行的結果,一般使用get()方法設置超時時間,但是在任務執行結束前的這段時間內線程是阻塞的,也就不說異步的了。同時為了獲取一般只能采取輪詢isDone()方法,這樣就顯得使用方法很單一,無法適應復雜情況下的異步任務編排。

2.JDK8 引入了CompletableFuture 來進行異步任務的編排,克服了Future的一些缺點,並且且進行了很多擴展。下面對CompletableFuture進行一個小的總結。
3.CompletableFuture 的方法主要有以下幾個特點:
(1)以Async結尾的方法都是異步執行的
(2)以run開頭的方法一般無返回值,而已supply開頭的方法是有返回值的,如 runAsync 和supplyAsync
  (3)   以 then 開頭的方法都會在上一個任務執行結束之后執行下一個任務。如 thenApply 和 thenAccept
(4)以Accept結尾的方法均為消耗上個任務執行的結果,無返回值。
(5)以run開頭的方法忽略上個任務的執行結果,在上個任務執行結束后執行下個方法。
(6)以Executor 結尾的方法可以自定義線程池,如果沒有指定線程池,則會默認在ForkJoinPool.commonPool() 線程池中執行。
4.具體實例
(1) 任務執行類的方法
  • RunAsync 執行異步任務,無返回值
package completablefuture;

import java.util.concurrent.CompletableFuture;

/**
 * @Author lizhilong
 * @create 2019/11/18 18:07
 * @desc
 * runAsyn 無返回結果,執行get()方法時,任務被觸發。
 */
public class RunAsync {
    public  static  void  main(String [] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
            System.out.println("Hello");
        });
        System.out.println("--------------");
        try {
            future.get();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
  • SupplyAsync 執行異步任務,有返回值
package completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

/**
 * @Author lizhilong
 * @create 2019/11/18 18:17
 * @desc supplyAsync 方法有返回值,在get()方法后被觸發
 */
public class SupplyAsync {
    public  static void main(String[] args){
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                return "Hello";
            }
        });

        System.out.println("-------------");

        try {
            String s = future1.get();
            System.out.println(s);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
  • thenApply 在上個方法執行結束后將返回值作為入參執行下個方法
package completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

/**
 * @Author lizhilong
 * @create 2019/11/18 18:41
 * @desc
 * 以Async結尾的方法都會異步執行
 * thenApply/thenApplyAsync 會在上個方法執行完之后然后繼續執行
 */
public class ThenApply {
    public static void main(String[] args) {

        CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(3000);
                }catch (Exception e){
                    e.printStackTrace();
                }
                return "Hello";
            }
        }).thenApplyAsync(s1 -> {
          return  s1+"=="+"World";
        });

        try {
            String s = future.get();
            System.out.println(s);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

運行結果

Hello==World
  • ThenAccept 消耗上個任務執行的結果,無返回值。
package completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

/**
 * @Author lizhilong
 * @create 2019/11/18 19:20
 * @desc
 * thenAccept 對上個任務產生的結果進行消耗,與ThenApply 不同的是無返回結果
 * 所以第二個thenAccept 返回 null
 */
public class ThenAccept {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                return "hello";
            }
        }).thenAccept(s1 -> System.out.println(s1+" world"))
                .thenAccept(s2-> System.out.println("---"+s2));

        try {
            future.get();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

執行結果:

hello world
---null
  • ThenRun 不關心上一步的執行結果,在上一步任務執行結束后執行下一步任務
package completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

/**
 * @Author lizhilong
 * @create 2019/11/19 11:44
 * @desc thenRun 不關心上一步執行的結果,上一步執行結束后執行下一步
 * thenRunAsync 異步執行
 */
public class ThenRun {

    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(3000);
                }catch (Exception e){
                    e.printStackTrace();
                }
                return "Hello";
            }
        }).thenRunAsync(new Runnable() {
            @Override
            public void run() {
                System.out.println("World");
            }
        });

        try {
            future.get();
        }catch (Exception e ){
            e.printStackTrace();
        }

    }
}

運行結果:

World
  • ThenApplyWithExecutor 在自定義的線程池執行異步任務
package completablefuture;

import java.util.concurrent.*;

/**
 * @Author lizhilong
 * @create 2019/11/18 18:59
 * @desc
 * 自定義線程池的方式來處理有先后順序的任務
 */
public class ThenApplyWithExecutor {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();

        CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
            return "Hello";
        },service).thenApplyAsync(s1->{
            return s1 + "   World";
        },service).thenApplyAsync(s2 ->{
            return s2 +"    China";
        },service);

        try {
            String s = future.get();
            System.out.println(s);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            service.shutdownNow();
        }
    }
}

運行結果:

Hello   World    China
  • runAfterBoth/runAfterBothAsync 在前面任務執行結束后執行新的任務
package completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

/**
 * @Author lizhilong
 * @create 2019/11/19 15:33
 * @desc runAfterBothAsync 忽略前面任務的執行結果,在前面任務執行結束之后在執行后面的runable任務
 */
public class RunAfterBothAsync {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("st1 end");
                return "Hello";
            }
        }).runAfterBothAsync(CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(7000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("st2 end");
                return "World";
            }
        }), new Runnable() {
            @Override
            public void run() {
                System.out.println("I LOVE CHINA");
            }
        });

        try {
            future.get();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

運行結果:

st1 end
st2 end
I LOVE CHINA

(2)任務結果消費/組合

  • thenCombine/thenCombineAsync 任務的運行結果進行組合后輸出
package completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

/**
 * @Author lizhilong
 * @create 2019/11/19 13:40
 * @desc thenCombineAsync 將任務的執行結果進行合並后輸出
 * 最后的合並必操作須等兩個任務都執行結束后才可以進行
 */
public class CompletionStage {

    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("st1 end");
                return "Hello";
            }
        }).thenCombineAsync(
                CompletableFuture.supplyAsync(new Supplier<String>() {
                                                  @Override
                                                  public String get() {
                                                      try {
                                                          Thread.sleep(7000);
                                                      } catch (Exception e) {
                                                          e.printStackTrace();
                                                      }
                                                      System.out.println("st2 end");
                                                      return "World";
                                                  }
                                              }
                ), (r1, r2) -> r1 + " " + r2);

        try {
            String s = future.get();
            System.out.println(s);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

運行結果:

st1 end
st2 end
Hello World
  • ThenAcceptBothAsync 對異步任務的執行結果進行消耗,無返回值
package completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

/**
 * @Author lizhilong
 * @create 2019/11/19 14:59
 * @desc thenAcceptBothAsync 消費任務的執行結果,無返回值
 * 消費動作的執行發生在任務均完成的情況下
 */
public class ThenAcceptBothAsync {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("st1 end");
                return "Hello";
            }
        }).thenAcceptBothAsync(CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(7000);
                }catch (Exception e){
                    e.printStackTrace();
                }
                System.out.println("st2 end");
                return "World";
            }
        }) ,(r1, r2) -> System.out.println(r1 +" "+r2));

        try {
            future.get();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

運行結果:

st1 end
st2 end
Hello World

(3)根據任務執行完成的先后順訊進行后續操作

  • applyToEither/applyToEitherAsync 獲取最先執行完成的任務的結果
package completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

/**
 * @Author lizhilong
 * @create 2019/11/19 15:43
 * @desc ApplyToEitherAsync 獲取多個任務執行最快的任務結果
 */
public class ApplyToEitherAsync {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("st1 end");
                return "CHINA";
            }
        }).applyToEitherAsync(CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(7000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("st2 end");
                return "AUS";
            }
        }), (r) -> {
            return r;
        }).applyToEitherAsync(CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(2000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("st3 end");
                return "UK";
            }
        }), r1->{
            return  r1;
        });

        try {
            String s = future.get();
            System.out.println(s);
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

運行結果:

st3 end
UK
  • acceptEither/acceptEitherAsync 消耗最先執行完的任務的返回結果
package completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

/**
 * @Author lizhilong
 * @create 2019/11/19 16:13
 * @desc AcceptEitherAsync 消費最先完成的任務返回的結果
 */
public class AcceptEitherAsync {

    public static void main(String[] args) throws  Exception{
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return "Hello";
            }
        }).acceptEitherAsync( CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(5000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return "World";
            }
        }), System.out::println).get();
    }
}

運行結果:

Hello
  • RunAfterEither/RunAfterEitherAsync 在前面的任務有任何一個完成后運行
package completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

/**
 * @Author lizhilong
 * @create 2019/11/19 18:28
 * @desc RunAfterEither 是在前面任務有一個完成以后再去執行的,即最先完成的任務后運行
 *
 */
public class RunAfterEither {
    public static void main(String[] args) throws  Exception{
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("st1 end");
                return "Hello";
            }
        }).runAfterEitherAsync(CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(5000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("st2 end");
                return "World";
            }
        }), new Runnable() {
            @Override
            public void run() {
                System.out.println("I LOVE CHINA");
            }
        }).get();
    }
}

運行結果:

st1 end
I LOVE CHINA

(4)任務完成時

  • complete 任務完成后執行后續操作
package completablefuture;

import java.util.concurrent.CompletableFuture;

/**
 * @Author lizhilong
 * @create 2019/11/18 18:21
 * @desc 任務完成以后 執行后續操作
 */
public class Complete {

    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
           return  "Hello";
        });

        future.complete("world");
        System.out.println("----------------");
        try {
            String s = future.get();
            System.out.println(s);

        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

運行結果:

----------------
world
  • whenCompleteAsync/whenComplete 在前面任務執行完成后執行后續操作,可以獲取前面任務的執行結果和異常
package completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

/**
 * @Author lizhilong
 * @create 2019/11/19 18:46
 * @desc WhenComplete 任務完成后執行相應操作,可以獲取上步任務執行的結果或者異常
 */
public class WhenComplete {

    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                if (1==2) {
                    throw new RuntimeException("測試異常");
                }
                return "Hello";
            }
        }).whenCompleteAsync((s, e) -> {
            System.out.println(s);
            System.out.println(e.getMessage());
        });

        try {
            String s = future.get();
            System.out.println(s);
        }catch (Exception e){
            System.out.println(e.getMessage());
        }
    }
}

運行結果:

Hello
java.lang.NullPointerException

(5)任務執行異常

  • completeExceptionally 任務完成后拋異常
package completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

/**
 * @Author lizhilong
 * @create 2019/11/18 18:30
 * @desc 任務完成以后拋異常
 */
public class CompleteExceptionally {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                return "Hello";
            }
        });

        future.completeExceptionally( new Exception());
        try {
            String s = future.get();
            System.out.println(s);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

運行結果:

java.util.concurrent.ExecutionException: java.lang.Exception
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    at completablefuture.CompleteExceptionally.main(CompleteExceptionally.java:22)
Caused by: java.lang.Exception
    at completablefuture.CompleteExceptionally.main(CompleteExceptionally.java:20)
  • exceptionally 執行任務過程中產生異常
package completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

/**
 * @Author lizhilong
 * @create 2019/11/19 18:35
 * @desc 任務產生異常時進行相應操作
 */
public class Exceptionally {

    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                int x = 10 / 0;
                return "Hello";
            }
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "World";
        });

        try {
            String s = future.get();
            System.out.println(s);
        }catch (Exception e){

        }
    }
}

運行結果:

java.lang.ArithmeticException: / by zero
World
  • handleAsync/handle 在使用 exceptionally 可以獲取異常時的異常,但是無法獲取正常執行時的結果
異常:
 public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                if(true){
                    throw  new RuntimeException("測試異常");
                }
                return "Hello";
            }
        }).handleAsync((r, e) -> {
            if (e != null) {
                return e.getMessage();
            }
            return "World";
        });

        try {
            System.out.println(future.get());
        }catch (Exception e){
            e.printStackTrace();
        }
    }

執行結果:

java.lang.RuntimeException: 測試異常

正常:

package completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

/**
 * @Author lizhilong
 * @create 2019/11/19 18:58
 * @desc
 */
public class HandleNormal {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                return "Hello";
            }
        }).handleAsync((r, e) -> {
            if (e != null) {
                return e.getMessage();
            }
            return "World";
        });

        try {
            System.out.println(future.get());
        }catch (Exception e){
            e.printStackTrace();
        }
    }

}

執行結果:

World

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM