Java多線程(三)——FutureTask/CompletableFuture


iwehdio的博客園:https://www.cnblogs.com/iwehdio/

學習自:

1、FutureTask

  • 無論是Runnable還是Callable,它們其實和線程沒半毛錢關系,它們是任務類,只有Thread是線程類。

  • JDK那么多類,有且僅有Thread類能通過start0()方法向操作系統申請線程資源(本地方法)。

    image-20210114210703219

  • 並且,在JVM的設定中Java的線程和操作系統的線程是一一對應的:

    image-20210114210741531

  • 而Runnable和Callable如果沒有線程或線程池去執行它們,就什么也不是,只是一坨普通的代碼。

    public class AsyncAndWaitTest {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 方式1:重寫Thread#run()
            Thread thread = new Thread() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "========>正在執行");
                }
            };
            thread.start();
    
            // 方式2:構造方法傳入Runnable實例
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "========>正在執行");
            }).start();
    
            // 方式3:線程池 + Callable
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            Future<String> submit = executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName() + "========>正在執行");
                Thread.sleep(3 * 1000L);
                return "success";
            });
            String result = submit.get();
            System.out.println("result=======>" + result);
            // 關閉線程池
            executorService.shutdown();
        }
    }
    
  • FutureTask = 任務 + 結果。

    • 第四種方法:通過Thread的構造器傳入Runnable實例(FutureTask,內部包裝了Runnable/Callable)。
    • 基本使用:
    public class AsyncAndWaitTest {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            // FutureTask實現了Runnable,可以看做是一個任務
            FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    System.out.println(Thread.currentThread().getName() + "========>正在執行");
                    try {
                        Thread.sleep(3 * 1000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "success";
                }
            });
            
            System.out.println(Thread.currentThread().getName() + "========>啟動任務");
    
            // 傳入futureTask,啟動線程執行任務
            new Thread(futureTask).start();
    
            // 但它同時又實現了Future,可以獲取異步結果(會阻塞3秒)
            String result = futureTask.get();
            System.out.println("任務執行結束,result====>" + result);
        }
    
    }
    
  • FutureTask這個名字!它既是一個任務,又能存儲任務執行的結果。反映在程序上就是既能傳入Thread執行,又能通過futureTask.get()獲取任務執行結果。

image-20210114211605350

  • FutureTask有以下2個特征:

    • 能包裝Runnable和Callable(構造器傳入),但本身卻又實現了Runnable接口,即本質是Runnable。
    • 既然是Runnable,所以FutureTask能作為任務被Thread執行,但詭異的是FutureTask#get()可以獲取結果。
  • FutureTask如何包裝Runnable/Callable:

    • 使用:

      image-20210114211841491

    • 通過FutureTask構造器傳入Runnable/Callable的,所以我們去看看FutureTask的構造器:

      image-20210114211941786

    • FutureTask內部維護Callable類型的成員變量,對於Callable任務,直接賦值即可:

      image-20210114212020753

    • 而對於Runnable任務,需要先調用Executors#callable()把Runnable先包裝成Callable:

      image-20210114212109758

    • Executors#callable()用到了適配器模式:

      image-20210114212158174

    • 而RunnableAdapter實現了Callable接口,所以包裝后的RunnableAdapter可以賦值給FutureTask.callable。

      image-20210114212212182

    • 也就是說:

      • Runnable --> Executors.callable() --> RunnableAdapter implements Callable --> FutureTask.callable
      • Callable --> FutureTask.callable
  • Runnable和Callable的返回值問題:

    • Callable#call()是有返回值的,而Runnable#run()沒有。它們都包裝成FutureTask后,一個有返回值,一個沒返回值,怎么處理呢。

    • 設計成有返回值的,畢竟Callable.call()明明有返回值,你總不能硬生生丟掉吧。至於Runnable.run()確實沒返回值,但也好辦,搞個假的返回即可。

      image-20210114212450677

    • 等到Thread執行FutureTask時,會先取出FutureTask.callable,然后調用callable.call():

      • 如果是真的Callable,調用Callable.call()會返回真實的result
      • 如果是Runnable包裝的RunnableAdapter,會返回事先傳入的result
      • 這也是上面的程序中,為什么Runnable要多傳一個參數的原因
  • FutureTask是如何被Thread執行的:

    • thread執行自己的run方法。這里的target是FutureTask,所以target.run()就是FutureTask#run()。

      image-20210114212942670

    • 結果最終存哪呢?

      image-20210114213058867

    • 也是FutureTask的一個成員變量:

      image-20210114213115890

    • 進一步印證了說 FutureTask = 任務 + 結果。

  • 為什么get()是阻塞的?

    • 在FutureTask中定義了很多任務狀態:

      image-20210114213239662

      • 剛創建
      • 即將完成
      • 完成
      • 拋異常
      • 任務取消
      • 任務即將被打斷
      • 任務被打斷
    • 這些狀態的設置意義在哪?

      • 一個任務,有時可能非常耗時。而當用戶使用futureTask.get()時,必然是希望獲取最終結果的。如果FutureTask不幫我們阻塞,就有可能獲取空結果。此時為了獲取最終結果,用戶不得不在外部自己寫阻塞程序。
      • 所以,get()內部會判斷當前任務的狀態,只有當任務完成才返回。
    • 線程從阻塞到獲取結果,中間必然經歷類似喚醒的操作,怎么做到的?

      • 秘密就在awaitDone():核心的就是 for循環 + LockSupport。
      • LockSupport是一個線程阻塞工具類,所有的方法都是靜態方法,可以讓線程在任意位置阻塞,當然也有喚醒的方法。
      • LockSupport主要有兩類方法:parkunpark。即讓線程停下和啟動。

      image-20210114213403621

      • 類似於:

        public class ParkTest {
        
            @Test
            public void testPark() throws InterruptedException {
                // 存儲線程
                List<Thread> threadList = new ArrayList<>();
        
                // 創建5個線程
                for (int i = 0; i < 5; i++) {
                    Thread thread = new Thread(() -> {
                        System.out.println("我是" + Thread.currentThread().getName() + ", 我開始工作了~");
                        LockSupport.park(this);
                        System.out.println("我是" + Thread.currentThread().getName() + ", 我又活過來了~");
                    });
                    thread.start();
                    threadList.add(thread);
                }
        
                Thread.sleep(3 * 1000L);
                System.out.println("====== 所有線程都阻塞了,3秒后全部恢復了 ======");
        
                // unPark()所有線程
                for (Thread thread : threadList) {
                    LockSupport.unpark(thread);
                }
        
                // 等所有線程執行完畢
                Thread.sleep(3 * 1000L);
            }
        
        }
        
    • 也就是說,調用get()后,如果當前沒有結果,就會被park(),等有了結果再unpark()並往下走:

      image-20210114213807726

    • 取出outcome返回:

      image-20210114213822669

  • FutureTask如何異步返回結果:

    image-20210114214552476

    • 往線程池submit了一個Callable,結果馬上返回了result(FutureTask):

      image-20210114214746646

    • 觀察:

      • 返回的FutureTask里包含剛才丟進去的Callable
      • result.outcome目前還是null
    • 實際上,返回的futureTask並不是真正的結果,它內部持有outcome引用,它才指向真正的結果。而在任務完成之前,outcome引用指向的是null。

      image-20210114214843951

  • 何時調用futureTask.get()?

    • 用戶調用get()必然是想到得到最終結果的,所以為了保證一定能得到結果,JDK把FutureTask#get()設計成阻塞的。

    • 建議不要立即調用get(),否則程序完全沒有發揮異步優勢,由異步阻塞變成同步阻塞。

      image-20210114215039393

    • 開啟多線程,當然應該發揮多線程的優勢:

      image-20210114215102247

      image-20210114215154400

  • isDone() + get():

    • 但是實際開發時,異步線程具體會耗時多久有時很難預估,受網絡、數據庫等各方面影響。所以很難做到在合適的地方get()然后一擊即中。
    • FutureTask提供了isDone()方法:

    image-20210114215310620

    • 當然,這種做法也不是很優雅。JDK1.8提供了CompletableFuture解決這個問題。

2、CompletableFuture

  • FutureTask#get()本身是阻塞的,假設當前有三個下載任務在執行:

    • task1(預計耗時5秒)
    • task2(預計耗時1秒)
    • task3(預計耗時1秒)
  • 如果阻塞獲取時不湊巧把task1.get()排在最前面,那么會造成一定的資源浪費,因為task2和task3早就已經准備好了,可以先拿出來處理,以獲得最佳的用戶體驗。

    image-20210115192952604

  • 雖然可以結合輪詢+isDone()的方式改進,但仍存在以下問題:

    • 輪詢間隔多少合適?
    • 為了避免while(true)阻塞主線程邏輯,可能需要開啟單獨的線程輪詢,浪費一個線程。
    • 仍然無法處理復雜的任務依賴關系。
  • CompletableFuture的簡單使用:

    @Test
    public void testCallBack() throws InterruptedException, ExecutionException {
        // 提交一個任務,返回CompletableFuture
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                System.out.println("=============>異步線程開始...");
                System.out.println("=============>異步線程為:" + Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("=============>異步線程結束...");
                return "supplierResult";
            }
        });
        
    	// 阻塞獲取結果
        System.out.println("異步結果是:" + completableFuture.get());
        System.out.println("main結束");
    }
    
    • 整個過程看起來和同步沒啥區別,因為我們在main線程中使用了CompletableFuture#get(),直接阻塞了。

    image-20210115193510981

    • CompletableFuture和FutureTask的異同點:

      • 相同:都實現了Future接口,所以都可以使用諸如Future#get()、Future#isDone()、Future#cancel()等方法

      • 不同:

        • FutureTask實現了Runnable,所以它可以作為任務被執行,且內部維護outcome,可以存儲結果
        • CompletableFuture沒有實現Runnable,無法作為任務被執行,所以你無法把它直接丟給線程池執行,相反地,你可以把Supplier#get()這樣的函數式接口實現類丟給它執行
        • CompletableFuture實現了CompletionStage,支持異步回調
    • FutureTask和CompletableFuture最大的區別在於,FutureTask需要我們主動阻塞獲取,而CompletableFuture支持異步回調。

    • CompletableFuture好像承擔的其實是線程池的角色,而Supplier#get()則對應Runnable#run()、Callable#call()。

  • CompletionStage的基本使用:

    @Test
    public void testCallBack() throws InterruptedException, ExecutionException {
        // 提交一個任務,返回CompletableFuture(注意,並不是把CompletableFuture提交到線程池,它沒有實現Runnable)
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                System.out.println("=============>異步線程開始...");
                System.out.println("=============>異步線程為:" + Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("=============>異步線程結束...");
                return "supplierResult";
            }
        });
    
        // 異步回調:上面的Supplier#get()返回結果后,異步線程會回調BiConsumer#accept()
        completableFuture.whenComplete(new BiConsumer<String, Throwable>() {
            @Override
            public void accept(String s, Throwable throwable) {
                System.out.println("=============>異步任務結束回調...");
                System.out.println("=============>回調線程為:" + Thread.currentThread().getName());
            }
        });
    
        // CompletableFuture的異步線程是守護線程,一旦main結束就沒了,為了看到打印結果,需要讓main休眠一會兒
        System.out.println("main結束");
        TimeUnit.SECONDS.sleep(15);
    }
    
    • 結果:

      =============>異步線程開始...
      =============>異步線程為:ForkJoinPool.commonPool-worker-9
      main結束
      =============>異步線程結束...
      =============>異步任務結束回調...
      =============>回調線程為:ForkJoinPool.commonPool-worker-9
      
  • 主線程調用了CompletableFuture#whenComplete():

    • 這個方法定義在CompletionStage接口中:

      public interface CompletionStage<T> {
          public CompletionStage<T> whenComplete
              (BiConsumer<? super T, ? super Throwable> action);
          
          // 省略其他方法...
      }
      
    • 而CompletableFuture實現了whenComplete():

      public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
          // 省略其他方法...
          
          public CompletableFuture<T> whenComplete(
              BiConsumer<? super T, ? super Throwable> action) {
              return uniWhenCompleteStage(null, action);
          }
          
          private CompletableFuture<T> uniWhenCompleteStage(Executor e, BiConsumer<? super T, ? super Throwable> f) {
              if (f == null) throw new NullPointerException();
              CompletableFuture<T> d = new CompletableFuture<T>();
              if (e != null || !d.uniWhenComplete(this, f, null)) {
                  UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f);
                  push(c);
                  c.tryFire(SYNC);
              }
              return d;
          }
          
          // 省略其他方法...
      }
      
    • CompletionStage是什么呢?

      • 是一個“很簡單”的接口。完全獨立,沒有繼承任何其他接口,所有方法都是它自己定義的。
      public interface CompletionStage<T> {
          // 定義了超級多類似whenComplete()的方法
      }
      
      • 是個不簡單的接口。因為CompletableFuture實現Future的同時,還實現了它。Future方法就6、7個,而CompletionStage的方法超級多,所以如果你打開CompletableFuture的源碼,目之所及幾乎都是它對CompletionStage的實現。
      public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
          // 一些字段
          // 實現Future的方法
          
          // 實現CompletionStage的方法
          // 一些私有方法,配合CompletionStage
          // 一些內部類,配合CompletionStage
      }
      
      • 異步回調其實和CompletionStage有着很大的關系。
    • 總而言之,CompletionStage是一個接口,定義了一些方法,CompletableFuture實現了這些方法並設計出了異步回調的機制

  • 異步線程會回調BiConsumer#accept(),而CompletableFuture#whenComplete()是主線程調用的。即CompletionStage中定義的諸如whenComplete()等方法雖然和異步回調有關系,但並不是最終被回調的方法,最終被回調的其實是whenComplete(BiConsumer)傳進去的BiConsumer#accept()。

    image-20210115195319241

  • 異步線程哪來的,Supplier如何被執行?

    • 跟隨主線程進入CompletableFuture#supplyAsync():

      image-20210115195959491

    • 注釋:返回一個新的CompletableFuture,該future是由運行在{@link ForkJoinPool#commonPool()}中的任務異步完成的,其值是通過調用給定的Supplier獲得的。

      • 即異步線程來自ForkJoinPool線程池。
      • 通過CompletableFuture#supplyAsync(supplier)傳入Supplier,返回CompletableFuture對象,它包含一個未來的value,且這個value會在稍后由異步線程執行Supplier#get()產生。
    • CompletableFuture#supplyAsync(supplier)內部調用了asyncSupplyStage(asyncPool, supplier),此時傳入了一個線程池asyncPool,它是CompletableFuture的成員變量:

      image-20210115200409657

      image-20210115200335107

    • useCommonPool為true時會使用ForkJoinPool,而useCommonPool取決於運行當前程序的硬件是否支持多核CPU。

    • 主線程傳進來的Supplier壓根沒有實現Runnable/Callable接口,怎么被異步線程執行呢?

      image-20210115200651552

    • 和ExecutorService#submit()一樣的套路:包裝成Task再執行。只不過這次被包裝成了AsyncSupply,而不是FutureTask:

      image-20210115200746525

    • AsyncSupply和當初的FutureTask頗為相似,都實現了Future和Runnable,具備 任務+結果 雙重屬性:

      image-20210115200809769

    • 最終就是把Supplier包裝好,傳入線程池的execute()中運行。等線程池分配出線程,最終會執行AsyncSupply#run()。

    • AsyncSupply#run()在方法內調用f.get(),也就是Supplier#get(),阻塞獲取結果並通過d.completeValue(v)把值設置到CompletableFuture中,而CompletableFuture d已經在上一步asyncSupplyStage()中被返回。最終效果和線程池+FutureTask是一樣的,先返回Future實例,再通過引用把值放進去。

    image-20210115201646593

    • 從這個層面上來看,CompletableFuture相當於一個自帶線程池的Future,而CompletableFuture#supplyAsync(Supplier)倒像是ExecutorService#submit(Runnable/Callable),內部也會包裝任務,最終丟給Executor#execute(Task)。
    • 只不過ExecutorService是把Runnable#run()/Callable#call()包裝成FutureTask,而CompletableFuture則把亂七八糟的Supplier#get()等函數式接口的方法包裝成ForkJoinTask。
  • 回調機制的原理:

    • CompletableFuture的回調機制,其實本質上是對多個CompletableFuture內部函數的順序執行,只不過發起者是異步線程而不是主線程

    • CompletableFuture#thenApply(),與CompletableFuture#whenComplete()本質是一樣的(也是CompletableFuture對CompletionStage的實現):

      @RunWith(SpringRunner.class)
      @SpringBootTest
      public class CompletableFutureTest {
      
          @Test
          public void testCallBack() throws InterruptedException {
              // 任務一:把第一個任務推進去,順便開啟異步線程
              CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(new Supplier<String>() {
                  @Override
                  public String get() {
                      System.out.println("=============>異步線程開始...");
                      try {
                          TimeUnit.SECONDS.sleep(10);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      System.out.println("=============>completableFuture1任務結束...");
                      System.out.println("=============>執行completableFuture1的線程為:" + Thread.currentThread().getName());
                      return "supplierResult";
                  }
              });
              System.out.println("completableFuture1:" + completableFuture1);
      
              // 任務二:把第二個任務推進去,等待異步回調
              CompletableFuture<String> completableFuture2 = completableFuture1.thenApply(new Function<String, String>() {
                  @Override
                  public String apply(String s) {
                      try {
                          TimeUnit.SECONDS.sleep(10);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      System.out.println("=============>completableFuture2任務結束 result=" + s);
                      System.out.println("=============>執行completableFuture2的線程為:" + Thread.currentThread().getName());
                      return s;
                  }
              });
              System.out.println("completableFuture2:" + completableFuture2);
      
              // 任務三:把第三個任務推進去,等待異步回調
              CompletableFuture<String> completableFuture3 = completableFuture2.thenApply(new Function<String, String>() {
                  @Override
                  public String apply(String s) {
                      try {
                          TimeUnit.SECONDS.sleep(10);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      System.out.println("=============>completableFuture3任務結束 result=" + s);
                      System.out.println("=============>執行completableFuture3的線程為:" + Thread.currentThread().getName());
                      return s;
                  }
              });
              System.out.println("completableFuture3:" + completableFuture3);
      
              System.out.println("主線程結束");
              TimeUnit.SECONDS.sleep(40);
          }
      }
      
    • 分析主線程的主干:

      • CompletableFuture#supplyAsync(Supplier):包裝Supplier為AsyncSupply,調用executor.execute(),等待異步線程回調Supplier#get()
      • CompletableFuture#thenApply(Function)
      • CompletableFuture#thenApply(Function)
    • 主線程在執行“任務一”的CompletableFuture#supplyAsync(Supplier)時,將Supplier包裝成AsyncSupply任務,並開啟了異步線程,此后異步線程會阻塞在Supplier#get():

      image-20210115212049077

      image-20210115212056600

    • Supplier#get()是異步線程開啟后執行的第一站!

    • 與此同時,主線程繼續執行后面的“任務二”、“任務三”,並且都會到達uniApply(),且都返回false,因為a.result==null。

    • 當主線程從任務二進來,調用thenApply()。最終會到達uniApply(),通過控制台的日志,我們發現a其實就是completableFuture1。因為uniApply()的上一步傳入的this:

      image-20210115212144996

    • 也就是說:

      • 主線程 ---> completableFuture1.thenApply(Function#apply) ---> !d.uniApply(this, f#apply, null)
      • a.result就是completableFuture1.result,而completableFuture1的值來自Supplier#get(),此時確實還是null(異步線程阻塞設定的秒數秒后才會)。
    • 所以此時d.uniApply(this, f, null) 為false,那么!d.uniApply(this, f, null) 為true,就會進入if語句:

      image-20210115212301773

    • 主要做了3件事:

      • 傳入Executor e、新建的CompletableFuture d、當前completableFuture1、Function f,構建UniApply
      • push(uniApply)
      • uniApply.tryFire(SYNC)
    • 任務一做了兩件事:

      • 開啟異步線程
      • 等待回調
    • 由於要開啟線程,自己也要作為任務被執行,所以Supplier#get()被包裝成AsyncSupply,是一個Task。而后續的幾個任務其實只做了一件事:等待回調。只要能通過實例執行方法即可,和任務一有所不同,所以只是被包裝成UniApply對象。

    • push(uniApply)姑且認為是把任務二的Function#apply()包裝后塞到任務棧中。

    • 但uniApply.tryFire(SYNC)是干嘛的呢?里面又調了一次uniApply():

      image-20210115212533531

    • SYNC=0,所以最終判斷!d.uniApply(this, f, this) ==true,tryFire(SYNC)返回null,后面的d.postFire(a, mode)此時並不會執行,等后面異步線程復蘇后,帶着任務一的結果再次調用時,效果就截然不同了。

    • 總結一下,“任務二”、“任務三”操作都是一樣的,都做了3件事:

      • 主線程調用CompletableFuture#thenApply(Function f)傳入f,構建UniApply對象,包裝Function#apply()
      • 把構建好的UniApply對象push到棧中
      • 返回CompletableFuture d

      image-20210115212633923

      image-20210115212734838

    • 等過了100秒,supplyAsync(Supplier)中的Supplier#get()返回結果后,異步線程繼續往下走:

      • postComplete()也會走uniApply(),但這次已經有了異步結果result,所以流程不會被截斷,最終會調用Function#apply(s),而這個s是上一個函數的執行結果
      • 也就是說,新的CompletableFuture對象調用Function#apply()處理了上一個CompletableFuture產生的結果。

      image-20210115212939769

  • CompletableFuture與FutureTask線程數對比:

    • CompletableFuture和FutureTask耗費的線程數是一致的,但對於FutureTask來說,無論是輪詢還是阻塞get,都會導致主線程無法繼續其他任務,又或者主線程可以繼續其他任務,但要時不時check FutureTask是否已經完成任務,比較糟心。而CompletableFuture則會根據我們編排的順序逐個回調,是按照既定路線執行的。

      image-20210115213205392

    • 其實無論是哪種方式,異步線程其實都需要阻塞等待結果,期間不能處理其他任務。但對於FutureTask而言,在異步線程注定無法復用的前提下,如果想要獲取最終結果,需要主線程主動查詢或者額外開啟一個線程查詢,並且可能造成阻塞,而CompletableFuture的異步任務執行、任務結果獲取都是異步線程獨立完成。

    • 即,1個異步線程阻塞執行任務 + 回調異步結果 > 1個異步線程阻塞執行任務 + 1個線程阻塞查詢任務。



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM