線程、線程池、CompletableFuture異步編排


java的線程是通過java.lang.Thread類來實現的。

在Java當中,線程通常都有五種狀態,創建、就緒、運行、阻塞和死亡。
  第一是創建狀態。在生成線程對象,並沒有調用該對象的start方法,這是線程處於創建狀態。
  第二是就緒狀態。當調用了線程對象的start方法之后,該線程就進入了就緒狀態,但是此時線程調度程序還沒有把該線程設置為當前線程,此時處於就緒狀態。在線程運行之后,從等待或者睡眠中回來之后,也會處於就緒狀態。
  第三是運行狀態。線程調度程序將處於就緒狀態的線程設置為當前線程,此時線程就進入了運行狀態,開始運行run函數當中的代碼。
  第四是阻塞狀態。線程正在運行的時候,被暫停,通常是為了等待某個時間的發生(比如說某項資源就緒)之后再繼續運行。sleep,suspend,wait等方法都可以導致線程阻塞。
  第五是死亡狀態。如果一個線程的run方法執行結束或者調用stop方法后,該線程就會死亡。對於已經死亡的線程,無法再使用start方法令其進入就緒。

線程的初始化

1)繼承Thread

       

  run()與start()方法的區別

    a.start()方法來啟動線程,真正實現了多線程運行。這時無需等待run方法體代碼執行完畢,可以直接繼續執行下面的代碼;通過調用Thread類的start()方法來啟動一個線程, 這時此線程是處於就緒狀態, 並沒有運行。 然后通過此Thread類調用方法run()來完成其運行操作的, 這里方法run()稱為線程                 體,它包含了要執行的這個線程的內容, Run方法運行結束, 此線程終止。然后CPU再調度其它線程。

            b.run()方法當作普通方法的方式調用。程序還是要順序執行,要等待run方法體執行完畢后,才可繼續執行下面的代碼; 程序中只有主線程——這一個線程, 其程序執行路徑還是只有一條, 這樣就沒有達到寫線程的目的。

2)實現Runnable接口

  

3)實現Callable接口 + FutureTask

  

  可以拿到返回結果,可以處理異常

  多線程運行FutureTask,只會運行一次,其他線程會引用第一次計算的結果

4)線程池

  方式一:啟動線程池(Executors可根據相關需求創建線程方法)

//執行長期任務性能好,創建一個線程池,一池有N個固定的線程,有固定線程數的線程
ExecutorService service = Executors.newCachedThreadPool(10);

//一個任務一個任務的執行,一池一線程
ExecutorService service = Executors.newSingleThreadExecutor()

//執行很多短期異步任務,線程池根據需要創建新線程,但在先前構建的線程可用時將重用它們。可擴容,遇強則強
ExecutorService service = Executors.newCachedThreadPool()

  方式二:啟動線程池

int corePoolSize = 10;//核心線程數;線程池啟動后默認創建的線程數量【除非設置了allowCoreThreadTimeOut(回收核心線程)】,等待調用去執行異步任務
int maximumPoolSize = 200;//最大線程數量;控制資源
long keepAliveTime = 10;//存活時間;在當前的線程數 > 核心線程數時,釋放空閑線程的時間
TimeUnit unit = TimeUnit.SECONDS;//時間單位
BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>();//阻塞隊列,如果任務有很多就會放在這里,有空閑線程就會取出該隊列里的任務(隊列方式有多種,看相關接口介紹,設置隊列的數量根據系統測試后能承受的最大數量,否則會導致內存不夠)
ThreadFactory threadFactory = Executors.defaultThreadFactory();//線程創建工廠(可以根據自己相關需求,重寫這個創建工廠)
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();//如果隊列滿了,按照指定的拒絕策略執行任務(根據相關需求,制定一些相關拒絕策略)
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);

       工作流程:

             a.線程池創建,啟動好核心的線程數量,准備接受任務

             b.新的任務來了,用先用核心空閑線程去執行,如果核心線程都在運行狀態 ,就將任務放置到阻塞隊列中,核心線程空閑后去阻塞隊列獲取任務執行

             c.阻塞線程滿了,就創建新的線程執行(創建新的線程最大只能開到maximumPoolSize指定的數量)

     d.如最大線程都滿了,且 就執行handler(拒絕策略)

      e.線程把所有任務都執行完,有空閑線程,就會在指定存活的時間,釋放空閑線程

       submit()與execute()區別

  submit()是可以取得返回值 

以上四種方法的區別 

        a.Thread與Runnable不能得到返回值,使用Callable+FutrueTask可以取得

        b.Thread、Runnable、Callable+FutrueTask不能控制資源,線程池可以

wait與sleep的區別

  功能都是當前線程暫停

  wait會放開手去睡,放開手里的鎖

  sleep握緊手去睡,醒了手里還有鎖

CountDownLatchDemo(減少計數)

  CountDownLatch主要有兩個方法,當一個或多個線程調用await方法時,這些線程會阻塞。

  其它線程調用countDown方法會將計數器減1(調用countDown方法的線程不會阻塞)

  當計數器的值變為0時,因await方法阻塞的線程會被喚醒,繼續執行。

public class CountDownLatchDemo{   
    public static void main(String[] args) throws InterruptedException   {                         
            CountDownLatch countDownLatch = new CountDownLatch(6); 
             //6個上自習的同學,各自離開教室的時間不一致
             for (int i = 1; i <=6; i++)  {
                  new Thread(() -> {              
                       System.out.println(Thread.currentThread().getName()+"\t 號同學離開教室");              
                       countDownLatch.countDown();          
                  }, String.valueOf(i)).start();       
              }       
              countDownLatch.await();       
              System.out.println(Thread.currentThread().getName()+"\t****** 班長關門走人,main線程是班長");             
     }  
}     ` 

CyclicBarrier(循環柵欄)

* 讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,

 * 直到最后一個線程到達屏障時,屏障才會開門,所有 

* 被屏障攔截的線程才會繼續干活。 

* 線程進入屏障通過CyclicBarrier的await()方法。

public class CyclicBarrierDemo{  
     private static final int NUMBER = 7;    
     public static void main(String[] args)  {     
          //CyclicBarrier(int parties, Runnable barrierAction)           
          CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, ()->{
               System.out.println("*****集齊7顆龍珠就可以召喚神龍");
          }) ;    
      
          for (int i = 1; i <= 7; i++) {       
               new Thread(() -> {          
                  try { 
                      System.out.println(Thread.currentThread().getName()+"\t 星龍珠被收集 ");            
                      cyclicBarrier.await();          
                 } catch (InterruptedException | BrokenBarrierException e) { 
                      // TODO Auto-generated catch block            
                     e.printStackTrace();          
                 }              
             }, String.valueOf(i)).start();     
         }        
     }
}    

Semaphore(信號燈)

 * acquire(獲取) 當一個線程調用acquire操作時,它要么通過成功獲取信號量(信號量減1), 

 * 要么一直等下去,直到有線程釋放信號量,或超時。

 * release(釋放)實際上會將信號量的值加1,然后喚醒等待的線程。 

 * 信號量主要用於兩個目的,一個是用於多個共享資源的互斥使用,另一個用於並發線程數的控制。

public class SemaphoreDemo{  
    public static void main(String[] args)  {     
        Semaphore semaphore = new Semaphore(3);//模擬3個停車位          
        for (int i = 1; i <=6; i++) //模擬6部汽車     {       
             new Thread(() -> {          
                 try{            
                     semaphore.acquire();            
                     System.out.println(Thread.currentThread().getName()+"\t 搶到了車位");            
                     TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                     System.out.println(Thread.currentThread().getName()+"\t------- 離開");          
                } catch (InterruptedException e) { 
                     e.printStackTrace();          
                }finally {            
                     semaphore.release();          
                }       
             }, String.valueOf(i)).start();     
        }       
    }
} 

  

CompletableFuture

1.創建異步對象

CompletableFuture提供了四個靜態方法來創建一個異步操作

public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
}

public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
}

a.以run開頭的方法都是沒有返回結果的,supply都是可以獲取到返回結果的

b.有Executor參數的方法是可以傳入自定義的線程池,沒有的就使用默認的線程池

1) runAsync方法

 

 

 2)supplyAsync 方法

 

 

 2.計算完成時回調方法

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
1)whenComplete方法

    返回異常處理結果 

    

 

 

    使用handle方法執行完成后的處理

   

   2)whenCompleteAsync 

whenComplete 可以處理正常和異常的計算結果,exceptionally處理異常情況
whenComplete 和 whenCompleteAsync 的區別
whenComplete :是執行當前任務的線程繼續執行whenComplete的任務
whenCompleteAsync :是執行的當前任務繼續提交給線程池來執行(可能是同一個線程(是相同的線程池)執行,也可能是其他線程執行)

 

 

 

3.線程串行化方法(上個任務執行完,才能處理下一步任務的)

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);public CompletableFuture<Void> thenAccept(Consumer<? super T> action);public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);public CompletableFuture<Void> thenRun(Runnable action);public CompletableFuture<Void> thenRunAsync(Runnable action);public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);

  1).加了async的方法,是執行完上一個任務,會再開另一個線程來處理下一個任務,默認是異步執行的,否則是跟上一個任務用同一個線程

  2).thenAccept方法,消費處理結果,接收上一個任務的結果進行處理

  3).thenRun方法,無法獲取上一步的執行結果

  4).thenApply方法,當一個線程依賴另一個線程時,獲取上一個任務的結果,並返回當前任務的返回值 

4.兩任務組合(都需求完成后)

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor);public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

  1)thenCombine:組合兩個future,獲取兩個future的返回結果,並返回當前任務的返回值 

  2)thenAcceptBoth:組合兩個future,獲取兩個future任務的返回結果,然后處理任務,沒有返回值

  3)runAfterBoth:組合兩個future,不需要獲取future的結果,只需兩個future處理完任務后,處理該任務

5.兩個任務組合有一個完成

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor);public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor);public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

  1)applyToEither:兩個任務有一個執行完成,獲取它的返回值 ,並處理任務並有新的返回值 

  2)acceptEither:兩個任務有一個執行完成,獲取它的返回值,處理任務,沒有新的返回值

  3)runAfterEither:兩個任務有一個執行完成,不需要獲取future的結果,處理任務,沒有返回值

6.多任務組合

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

  1)allOf:等待所有任務完成 

  2)anyOf:只要有一個任務完成 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM