一、線程回顧
1、初始化線程的 4 種方式
1)、繼承 Thread
2)、實現 Runnable 接口
3)、實現 Callable 接口 + FutureTask (可以拿到返回結果,可以處理異常)
4)、線程池
方式 1 和方式 2:主進程無法獲取線程的運算結果。不適合當前場景
方式 3:主進程可以獲取線程的運算結果,但是不利於控制服務器中的線程資源。可以導致
服務器資源耗盡。
方式 4:通過如下兩種方式初始化線程池
Executors.newFiexedThreadPool(3);
//或者
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit unit,
workQueue, threadFactory, handler);
通過線程池性能穩定,也可以獲取執行結果,並捕獲異常。但是,在業務復雜情況下,一
個異步調用可能會依賴於另一個異步調用的執行結果。
2、線程池的七大參數
corePoolSize:池中一直保持的線程的數量,即使線程空閑。除非設置了 allowCoreThreadTimeOut
maximumPoolSize:池中允許的最大的線程數
keepAliveTime:當線程數大於核心線程數的時候,線程在最大多長時間沒有接到新任務就會終止釋放,
最終線程池維持在 corePoolSize 大小
unit:時間單位
workQueue:阻塞隊列,用來存儲等待執行的任務,如果當前對線程的需求超過了 corePoolSize
大小,就會放在這里等待空閑線程執行。
threadFactory:創建線程的工廠,比如指定線程名等
handler:拒絕策略,如果線程滿了,線程池就會使用拒絕策略。
運行流程:
1、線程池創建,准備好 core 數量的核心線程,准備接受任務
2、新的任務進來,用 core 准備好的空閑線程執行。
(1) 、core 滿了,就將再進來的任務放入阻塞隊列中。空閑的 core 就會自己去阻塞隊
列獲取任務執行
(2) 、阻塞隊列滿了,就直接開新線程執行,最大只能開到 max 指定的數量
(3) 、max 都執行好了。Max-core 數量空閑的線程會在 keepAliveTime 指定的時間后自
動銷毀。最終保持到 core 大小
(4) 、如果線程數開到了 max 的數量,還有新任務進來,就會使用 reject 指定的拒絕策
略進行處理
3、所有的線程創建都是由指定的 factory 創建的。
3、常見的 4 種線程池
newCachedThreadPool
創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
newFixedThreadPool
創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
newScheduledThreadPool
創建一個定長線程池,支持定時及周期性任務執行。
newSingleThreadExecutor
創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
4、開發中為什么使用線程池
降低資源的消耗
通過重復利用已經創建好的線程降低線程的創建和銷毀帶來的損耗
提高響應速度
因為線程池中的線程數沒有超過線程池的最大上限時,有的線程處於等待分配任務的狀態,當任務來時無需創建新的線程就能執行
提高線程的可管理性
線程池會根據當前系統特點對池內的線程進行優化處理,減少創建和銷毀線程帶來的系統開銷。無限的創建和銷毀線程不僅消耗系統資源,還降低系統的穩定性,使用線程池進行統一分配
二、CompletableFuture 異步編排
業務場景:
查詢商品詳情頁的邏輯比較復雜,有些數據還需要遠程調用,必然需要花費更多的時間。
假如商品詳情頁的每個查詢,需要如下標注的時間才能完成
那么,用戶需要 5.5s 后才能看到商品詳情頁的內容。很顯然是不能接受的。
如果有多個線程同時完成這 6 步操作,也許只需要 1.5s 即可完成響應。
Future 是 Java 5 添加的類,用來描述一個異步計算的結果。你可以使用`isDone`方法檢查計算是否完成,或者使用`get`阻塞住調用線程,直到計算完成返回結果,你也可以使用`cancel`方法停止任務的執行。
CompletableFuture 和 FutureTask 同屬於 Future 接口的實現類,都可以獲取線程的執行結果。
1、創建異步對象
CompletableFuture 提供了四個靜態方法來創建一個異步操作。
runXxxx 都是沒有返回結果的,supplyXxx 都是可以獲取返回結果的
可以傳入自定義的線程池,否則就用默認的線程池;
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("當前線程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("運行結果:" + i); }, executor);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("當前線程:" + Thread.currentThread().getId()); int i = 10 / 0; System.out.println("運行結果:" + i); return i; }, executor); Integer result = future.get();
2、計算完成時回調方法
whenComplete 可以處理正常和異常的計算結果,exceptionally 處理異常情況。
whenComplete 和 whenCompleteAsync 的區別:
whenComplete:是執行當前任務的線程執行繼續執行 whenComplete 的任務。
whenCompleteAsync:是執行把 whenCompleteAsync 這個任務繼續提交給線程池
來進行執行。
方法不以 Async 結尾,意味着 Action 使用相同的線程執行,而 Async 可能會使用其他線程
執行(如果是使用相同的線程池,也可能會被同一個線程選中執行)
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("當前線程:" + Thread.currentThread().getId()); int i = 10 / 0; System.out.println("運行結果:" + i); return i; }, executor).whenComplete((res,exception) -> { //雖然能得到異常信息,但是沒法修改返回數據 System.out.println("異步任務成功完成了...結果是:" + res + "異常是:" + exception); }).exceptionally(throwable -> { //可以感知異常,同時返回默認值 return 10; });
3、handle 方法
和 complete 一樣,可對結果做最后的處理(可處理異常),可改變返回值。
/** * 方法執行完后端處理 */ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("當前線程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("運行結果:" + i); return i; }, executor).handle((result,thr) -> { if (result != null) { return result * 2; } if (thr != null) { System.out.println("異步任務成功完成了...結果是:" + result + "異常是:" + thr); return 0; } return 0; });
4、線程串行化方法
thenApply 方法:當一個線程依賴另一個線程時,獲取上一個任務返回的結果,並返回當前任務的返回值。
thenAccept 方法:消費處理結果。接收任務的處理結果,並消費處理,無返回結果。
thenRun 方法:只要上面的任務執行完成,就開始執行 thenRun,只是處理完任務后,執行thenRun 的后續操作
帶有 Async 默認是異步執行的。同之前。
以上都要前置任務成功完成。
Function<? super T,? extends U>
T:上一個任務返回結果的類型U:當前任務的返回值類型
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("當前線程:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("運行結果:" + i); return i; }, executor).thenApplyAsync(res -> { System.out.println("任務2啟動了..." + res); return "Hello" + res; }, executor); System.out.println("main......end....." + future.get());
5、兩任務組合 - 都要完成

兩個任務必須都完成,觸發該任務。
thenCombine:組合兩個 future,獲取兩個 future 的返回結果,並返回當前任務的返回值
thenAcceptBoth:組合兩個 future,獲取兩個 future 任務的返回結果,然后處理任務,沒有返回值。
runAfterBoth:組合兩個 future,不需要獲取 future 的結果,只需兩個 future 處理完任務后,
處理該任務。
6、兩任務組合 - 一個完成

當兩個任務中,任意一個 future 任務完成的時候,執行任務。
applyToEither:兩個任務有一個執行完成,獲取它的返回值,處理任務並有新的返回值。
acceptEither:兩個任務有一個執行完成,獲取它的返回值,處理任務,沒有新的返回值。
runAfterEither:兩個任務有一個執行完成,不需要獲取 future 的結果,處理任務,也沒有返回值。
7、多任務組合
allOf:等待所有任務完成
anyOf:只要有一個任務完成
三.案例
1.MyThreadConfig.java
//@EnableConfigurationProperties(ThreadPoolConfigProperties.class) @Configuration public class MyThreadConfig { @Bean public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) { return new ThreadPoolExecutor( pool.getCoreSize(), pool.getMaxSize(), pool.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingDeque<>(100000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); } }
2.ThreadPoolConfigProperties.java
@ConfigurationProperties(prefix = "gulimall.thread") @Component @Data public class ThreadPoolConfigProperties { private Integer coreSize; private Integer maxSize; private Integer keepAliveTime; }
3.使用異步線程
@Service("skuInfoService")
public class SkuInfoServiceImpl extends ServiceImpl<SkuInfoDao, SkuInfoEntity> implements SkuInfoService {
@Resource
private ThreadPoolExecutor executor;
@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
SkuItemVo skuItemVo = new SkuItemVo();
CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
//1、sku基本信息的獲取 pms_sku_info
SkuInfoEntity info = this.getById(skuId);
skuItemVo.setInfo(info);
return info;
}, executor);
CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {
//3、獲取spu的銷售屬性組合
List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());
skuItemVo.setSaleAttr(saleAttrVos);
}, executor);
CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
//4、獲取spu的介紹 pms_spu_info_desc
SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
skuItemVo.setDesc(spuInfoDescEntity);
}, executor);
CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {
//5、獲取spu的規格參數信息
List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());
skuItemVo.setGroupAttrs(attrGroupVos);
}, executor);
// Long spuId = info.getSpuId();
// Long catalogId = info.getCatalogId();
//2、sku的圖片信息 pms_sku_images
CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId);
skuItemVo.setImages(imagesEntities);
}, executor);
CompletableFuture<Void> seckillFuture = CompletableFuture.runAsync(() -> {
//3、遠程調用查詢當前sku是否參與秒殺優惠活動
R skuSeckilInfo = seckillFeignService.getSkuSeckilInfo(skuId);
if (skuSeckilInfo.getCode() == 0) {
//查詢成功
SeckillSkuVo seckilInfoData = skuSeckilInfo.getData("data", new TypeReference<SeckillSkuVo>() {
});
skuItemVo.setSeckillSkuVo(seckilInfoData);
if (seckilInfoData != null) {
long currentTime = System.currentTimeMillis();
if (currentTime > seckilInfoData.getEndTime()) {
skuItemVo.setSeckillSkuVo(null);
}
}
}
}, executor);
//等到所有任務都完成
CompletableFuture.allOf(saleAttrFuture,descFuture,baseAttrFuture,imageFuture,seckillFuture).get();
return skuItemVo;
}
}
推薦博客:https://blog.csdn.net/sermonlizhi/article/details/123356877
