java的線程是通過java.lang.Thread類來實現的。
在Java當中,線程通常都有五種狀態,創建、就緒、運行、阻塞和死亡。
第一是創建狀態。在生成線程對象,並沒有調用該對象的start方法,這是線程處於創建狀態。
第二是就緒狀態。當調用了線程對象的start方法之后,該線程就進入了就緒狀態,但是此時線程調度程序還沒有把該線程設置為當前線程,此時處於就緒狀態。在線程運行之后,從等待或者睡眠中回來之后,也會處於就緒狀態。
第三是運行狀態。線程調度程序將處於就緒狀態的線程設置為當前線程,此時線程就進入了運行狀態,開始運行run函數當中的代碼。
第四是阻塞狀態。線程正在運行的時候,被暫停,通常是為了等待某個時間的發生(比如說某項資源就緒)之后再繼續運行。sleep,suspend,wait等方法都可以導致線程阻塞。
第五是死亡狀態。如果一個線程的run方法執行結束或者調用stop方法后,該線程就會死亡。對於已經死亡的線程,無法再使用start方法令其進入就緒。
線程的初始化
1)繼承Thread

run()與start()方法的區別
a.start()方法來啟動線程,真正實現了多線程運行。這時無需等待run方法體代碼執行完畢,可以直接繼續執行下面的代碼;通過調用Thread類的start()方法來啟動一個線程, 這時此線程是處於就緒狀態, 並沒有運行。 然后通過此Thread類調用方法run()來完成其運行操作的, 這里方法run()稱為線程 體,它包含了要執行的這個線程的內容, Run方法運行結束, 此線程終止。然后CPU再調度其它線程。
b.run()方法當作普通方法的方式調用。程序還是要順序執行,要等待run方法體執行完畢后,才可繼續執行下面的代碼; 程序中只有主線程——這一個線程, 其程序執行路徑還是只有一條, 這樣就沒有達到寫線程的目的。
2)實現Runnable接口

3)實現Callable接口 + FutureTask

可以拿到返回結果,可以處理異常
多線程運行FutureTask,只會運行一次,其他線程會引用第一次計算的結果
4)線程池
方式一:啟動線程池(Executors可根據相關需求創建線程方法)
//執行長期任務性能好,創建一個線程池,一池有N個固定的線程,有固定線程數的線程
ExecutorService service = Executors.newCachedThreadPool(10);
//一個任務一個任務的執行,一池一線程
ExecutorService service = Executors.newSingleThreadExecutor()
//執行很多短期異步任務,線程池根據需要創建新線程,但在先前構建的線程可用時將重用它們。可擴容,遇強則強
ExecutorService service = Executors.newCachedThreadPool()
方式二:啟動線程池
int corePoolSize = 10;//核心線程數;線程池啟動后默認創建的線程數量【除非設置了allowCoreThreadTimeOut(回收核心線程)】,等待調用去執行異步任務 int maximumPoolSize = 200;//最大線程數量;控制資源 long keepAliveTime = 10;//存活時間;在當前的線程數 > 核心線程數時,釋放空閑線程的時間 TimeUnit unit = TimeUnit.SECONDS;//時間單位 BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>();//阻塞隊列,如果任務有很多就會放在這里,有空閑線程就會取出該隊列里的任務(隊列方式有多種,看相關接口介紹,設置隊列的數量根據系統測試后能承受的最大數量,否則會導致內存不夠) ThreadFactory threadFactory = Executors.defaultThreadFactory();//線程創建工廠(可以根據自己相關需求,重寫這個創建工廠) RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();//如果隊列滿了,按照指定的拒絕策略執行任務(根據相關需求,制定一些相關拒絕策略) ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
工作流程:
a.線程池創建,啟動好核心的線程數量,准備接受任務
b.新的任務來了,用先用核心空閑線程去執行,如果核心線程都在運行狀態 ,就將任務放置到阻塞隊列中,核心線程空閑后去阻塞隊列獲取任務執行
c.阻塞線程滿了,就創建新的線程執行(創建新的線程最大只能開到maximumPoolSize指定的數量)
d.如最大線程都滿了,且 就執行handler(拒絕策略)
e.線程把所有任務都執行完,有空閑線程,就會在指定存活的時間,釋放空閑線程
submit()與execute()區別
submit()是可以取得返回值
以上四種方法的區別
a.Thread與Runnable不能得到返回值,使用Callable+FutrueTask可以取得
b.Thread、Runnable、Callable+FutrueTask不能控制資源,線程池可以
wait與sleep的區別
功能都是當前線程暫停
wait會放開手去睡,放開手里的鎖
sleep握緊手去睡,醒了手里還有鎖
CountDownLatchDemo(減少計數)
CountDownLatch主要有兩個方法,當一個或多個線程調用await方法時,這些線程會阻塞。
其它線程調用countDown方法會將計數器減1(調用countDown方法的線程不會阻塞)
當計數器的值變為0時,因await方法阻塞的線程會被喚醒,繼續執行。
public class CountDownLatchDemo{
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
//6個上自習的同學,各自離開教室的時間不一致
for (int i = 1; i <=6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"\t 號同學離開教室");
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"\t****** 班長關門走人,main線程是班長");
}
} `
CyclicBarrier(循環柵欄)
* 讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,
* 直到最后一個線程到達屏障時,屏障才會開門,所有
* 被屏障攔截的線程才會繼續干活。
* 線程進入屏障通過CyclicBarrier的await()方法。
public class CyclicBarrierDemo{
private static final int NUMBER = 7;
public static void main(String[] args) {
//CyclicBarrier(int parties, Runnable barrierAction)
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, ()->{
System.out.println("*****集齊7顆龍珠就可以召喚神龍");
}) ;
for (int i = 1; i <= 7; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+"\t 星龍珠被收集 ");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
Semaphore(信號燈)
* acquire(獲取) 當一個線程調用acquire操作時,它要么通過成功獲取信號量(信號量減1),
* 要么一直等下去,直到有線程釋放信號量,或超時。
* release(釋放)實際上會將信號量的值加1,然后喚醒等待的線程。
* 信號量主要用於兩個目的,一個是用於多個共享資源的互斥使用,另一個用於並發線程數的控制。
public class SemaphoreDemo{
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);//模擬3個停車位
for (int i = 1; i <=6; i++) //模擬6部汽車 {
new Thread(() -> {
try{
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"\t 搶到了車位");
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName()+"\t------- 離開");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
CompletableFuture
1.創建異步對象
CompletableFuture提供了四個靜態方法來創建一個異步操作
public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); }
a.以run開頭的方法都是沒有返回結果的,supply都是可以獲取到返回結果的
b.有Executor參數的方法是可以傳入自定義的線程池,沒有的就使用默認的線程池
1) runAsync方法

2)supplyAsync 方法

2.計算完成時回調方法
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor); public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
1)whenComplete方法

返回異常處理結果

使用handle方法執行完成后的處理

2)whenCompleteAsync
whenComplete 可以處理正常和異常的計算結果,exceptionally處理異常情況
whenComplete 和 whenCompleteAsync 的區別
whenComplete :是執行當前任務的線程繼續執行whenComplete的任務
whenCompleteAsync :是執行的當前任務繼續提交給線程池來執行(可能是同一個線程(是相同的線程池)執行,也可能是其他線程執行)
3.線程串行化方法(上個任務執行完,才能處理下一步任務的)
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);public CompletableFuture<Void> thenAccept(Consumer<? super T> action);public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);public CompletableFuture<Void> thenRun(Runnable action);public CompletableFuture<Void> thenRunAsync(Runnable action);public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);
1).加了async的方法,是執行完上一個任務,會再開另一個線程來處理下一個任務,默認是異步執行的,否則是跟上一個任務用同一個線程
2).thenAccept方法,消費處理結果,接收上一個任務的結果進行處理
3).thenRun方法,無法獲取上一步的執行結果
4).thenApply方法,當一個線程依賴另一個線程時,獲取上一個任務的結果,並返回當前任務的返回值
4.兩任務組合(都需求完成后)
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor);public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
1)thenCombine:組合兩個future,獲取兩個future的返回結果,並返回當前任務的返回值
2)thenAcceptBoth:組合兩個future,獲取兩個future任務的返回結果,然后處理任務,沒有返回值
3)runAfterBoth:組合兩個future,不需要獲取future的結果,只需兩個future處理完任務后,處理該任務
5.兩個任務組合有一個完成
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor);public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor);public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
1)applyToEither:兩個任務有一個執行完成,獲取它的返回值 ,並處理任務並有新的返回值
2)acceptEither:兩個任務有一個執行完成,獲取它的返回值,處理任務,沒有新的返回值
3)runAfterEither:兩個任務有一個執行完成,不需要獲取future的結果,處理任務,沒有返回值
6.多任務組合
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs); public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
1)allOf:等待所有任務完成
2)anyOf:只要有一個任務完成
