Java 並發異步編程,原來十個接口的活現在只需要一個接口就搞定!


  • 引言
  • 多線程並發執行任務,取結果歸集
    • 狀態
    • 隊列
    • CAS操作
  • 實戰演練
  • 總結
  • 小甜點

什么?對你沒有聽錯,也沒有看錯 ..多線程並發執行任務,取結果歸集~~ 

引言

先來看一些APP的獲取數據,諸如此類,一個頁面獲取N多個,多達10個左右的一個用戶行為數據,比如:點贊數,發布文章數,點贊數,消息數,關注數,收藏數,粉絲數,卡券數,紅包數........... 真的是多~ 我們看些圖:

 

 

 

 

平時要10+接口的去獲取數據(因為當你10+個查詢寫一起,那估計到半分鍾才能響應了),一個頁面上N多接口,真是累死前端的寶寶了,前端開啟多線程也累啊,我們做后端的要體量一下前端的寶寶們,畢竟有句話叫"程序員何苦為難程序員~ "

今天我們也可以一個接口將這些數據返回~ 還賊TM快,解決串行編程,阻塞編程帶來的苦惱~

多線程並發執行任務,取結果歸集

今天豬腳就是 Future、FutureTask、ExecutorService...

  • 用上FutureTask任務獲取結果老少皆宜,就是CPU有消耗。FutureTask也可以做閉鎖(實現了Future的語義,表示一種抽象的可計算的結果)。通過把Callable(相當於一個可生成結果的Runnable)作為一個屬性,進而把它自己作為一個執行器去繼承Runnable,FutureTask 實際上就是一個支持取消行為的異步任務執行器 。
  • Callable就是一個回調接口,可以泛型聲明返回類型,而Runnable是線程去執行的方法.這個很簡單~大家想深入了解就進去看源碼好了~ 因為真的很簡單~
  • FutureTask實現了Future,提供了start, cancel, query等功能,並且實現了Runnable接口,可以提交給線程執行。
  • Java並發工具類的三板斧    狀態,隊列,CAS

狀態

 /**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions: //可能發生的狀態過度過程
* NEW -> COMPLETING -> NORMAL // 創建-->完成-->正常
* NEW -> COMPLETING -> EXCEPTIONAL // 創建-->完成-->異常
* NEW -> CANCELLED // 創建-->取消
* NEW -> INTERRUPTING -> INTERRUPTED // 創建-->中斷中-->中斷結束
*/

private volatile int state; // 執行器狀態

private static final int NEW = 0; // 初始值 由構造函數保證
private static final int COMPLETING = 1; // 完成進行時 正在設置任務結果
private static final int NORMAL = 2; // 正常結束 任務正常執行完畢
private static final int EXCEPTIONAL = 3; // 發生異常 任務執行過程中發生異常
private static final int CANCELLED = 4; // 已經取消 任務已經取消
private static final int INTERRUPTING = 5; // 中斷進行時 正在中斷運行任務的線程
private static final int INTERRUPTED = 6; // 中斷結束 任務被中斷

/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

還不明白就看圖:

 

 

 

 

public interface Future<T> {
/**
*取消任務
*@param mayInterruptIfRunning
*是否允許取消正在執行卻沒有執行完畢的任務,如果設置true,則表示可以取消正在執行過程中的任務
*如果任務正在執行,則返回true
*如果任務還沒有執行,則無論mayInterruptIfRunning為true還是false,返回true
*如果任務已經完成,則無論mayInterruptIfRunning為true還是false,返回false
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
*任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true
*/
boolean isCancelled();
/**
*任務是否完成
*/
boolean isDone();
/**
*通過阻塞獲取執行結果
*/
T get() throws InterruptedException, ExecutionException;
/**
*通過阻塞獲取執行結果。如果在指定的時間內沒有返回,則返回null
*/
T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

  • Future

      (1)cancle    可以停止任務的執行 但不一定成功 看返回值true or  false
    (2)get 阻塞獲取callable的任務結果,即get阻塞住調用線程,直至計算完成返回結果
    (3)isCancelled 是否取消成功
    (4)isDone 是否完成

重點說明:

Furture.get()獲取執行結果的值,取決於執行的狀態,如果任務完成,會立即返回結果,否則一直阻塞直到任務進入完成狀態,然后返回結果或者拋出異常。

“運行完成”表示計算的所有可能結束的狀態,包含正常結束,由於取消而結束和由於異常而結束。當進入完成狀態,他會停止在這個狀態上,只要state不處於 NEW 狀態,就說明任務已經執行完畢。

FutureTask負責將計算結果從執行任務的線程傳遞到調用這個線程的線程,而且確保了,傳遞過程中結果的安全發布

UNSAFE 無鎖編程技術,確保了線程的安全性~ 為了保持無鎖編程CPU的消耗,所以用狀態標記,減少空轉的時候CPU的壓力

  • 任務本尊:callable
  • 任務的執行者:runner
  • 任務的結果:outcome
  • 獲取任務的結果:state + outcome + waiters
  • 中斷或者取消任務:state + runner + waiters

run方法

  1. 檢查state,非NEW,說明已經啟動,直接返回;否則,設置runner為當前線程,成功則繼續,否則,返回。
  2. 調用Callable.call()方法執行任務,成功則調用set(result)方法,失敗則調用setException(ex)方法,最終都會設置state,並調用finishCompletion()方法,喚醒阻塞在get()方法上的線程們。
  3. 如注釋所示,如果省略ran變量,並把"set(result);" 語句移動到try代碼塊"ran = true;" 語句處,會怎樣呢?首先,從代碼邏輯上看,是沒有問題的,但是,考慮到"set(result);"方法萬一拋出異常甚至是錯誤了呢?set()方法最終會調用到用戶自定義的done()方法,所以,不可省略。
  4. 如果state為INTERRUPTING, 則主動讓出CPU,自旋等待別的線程執行完中斷流程。見handlePossibleCancellationInterrupt(int s) 方法。
public void run() {
// UNSAFE.compareAndSwapObject, CAS保證Callable任務只被執行一次 無鎖編程
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable; // 拿到執行任務
if (c != null && state == NEW) { // 任務不為空,並且執行器狀態是初始值,才會執行;如果取消就不執行了
V result;
boolean ran; // 記錄是否執行成功
try {
result = c.call(); // 執行任務
ran = true; // 成功
} catch (Throwable ex) {
result = null; // 異常,清空結果
ran = false; // 失敗
setException(ex); // 記錄異常
}
if (ran) // 問題:ran變量可以省略嗎,把set(result);移到try塊里面?
set(result); // 設置結果
}
} finally {
runner = null; // 直到set狀態前,runner一直都是非空的,為了防止並發調用run()方法。
int s = state;
if (s >= INTERRUPTING) // 有別的線程要中斷當前線程,把CPU讓出去,自旋等一下
handlePossibleCancellationInterrupt(s);
}
}

private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING) // 當state為INTERRUPTING時
while (state == INTERRUPTING) // 表示有線程正在中斷當前線程
Thread.yield(); // 讓出CPU,自旋等待中斷
}

再啰嗦下: run方法重點做了以下幾件事:

將runner屬性設置成當前正在執行run方法的線程
調用callable成員變量的call方法來執行任務
設置執行結果outcome, 如果執行成功, 則outcome保存的就是執行結果;如果執行過程中發生了異常, 則outcome中保存的就是異常,設置結果之前,先將state狀態設為中間態
對outcome的賦值完成后,設置state狀態為終止態(NORMAL或者EXCEPTIONAL)
喚醒Treiber棧中所有等待的線程
善后清理(waiters, callable,runner設為null)
檢查是否有遺漏的中斷,如果有,等待中斷狀態完成。

怎么能少了get 方法呢,一直阻塞獲取參見:awaitDone

    public V get() throws InterruptedException, ExecutionException {
int s = state; // 執行器狀態
if (s <= COMPLETING) // 如果狀態小於等於COMPLETING,說明任務正在執行,需要等待
s = awaitDone(false, 0L); // 等待
return report(s); // 報告結果
}

順便偷偷看下get(long, TimeUnit) ,就是get的方法擴展,增加了超時時間,超時后我還沒拿到就生氣拋異常....

public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null) // 參數校驗
throw new NullPointerException();
int s = state; // 執行器狀態
if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) // 如果狀態小於等於COMPLETING,說明任務正在執行,需要等待;等待指定時間,state依然小於等於COMPLETING
throw new TimeoutException(); // 拋出超時異常
return report(s); // 報告結果
}

那么再看awaitDone ,要知道會寫死循環while(true)|for (;;)的都是高手~

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L; // 計算deadline
WaitNode q = null; // 等待結點
boolean queued = false; // 是否已經入隊
for (;;) {
if (Thread.interrupted()) { // 如果當前線程已經標記中斷,則直接移除此結點,並拋出中斷異常
removeWaiter(q);
throw new InterruptedException();
}

int s = state; // 執行器狀態
if (s > COMPLETING) { // 如果狀態大於COMPLETING,說明任務已經完成,或者已經取消,直接返回
if (q != null)
q.thread = null; // 復位線程屬性
return s; // 返回
} else if (s == COMPLETING) // 如果狀態等於COMPLETING,說明正在整理結果,自旋等待一會兒
Thread.yield();
else if (q == null) // 初始,構建結點
q = new WaitNode();
else if (!queued) // 還沒入隊,則CAS入隊
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) { // 是否允許超時
nanos = deadline - System.nanoTime(); // 計算等待時間
if (nanos <= 0L) { // 超時
removeWaiter(q); // 移除結點
return state; // 返回結果
}
LockSupport.parkNanos(this, nanos); // 線程阻塞指定時間
} else
LockSupport.park(this); // 阻塞線程
}
}


至此,線程安排任務和獲取我就不啰嗦了~~~~

隊列

接着我們來看隊列,在FutureTask中,隊列的實現是一個單向鏈表,它表示所有等待任務執行完畢的線程的集合。我們知道,FutureTask實現了Future接口,可以獲取“Task”的執行結果,那么如果獲取結果時,任務還沒有執行完畢怎么辦呢?那么獲取結果的線程就會在一個等待隊列中掛起,直到任務執行完畢被喚醒。這一點有點類似於AQS中的sync queue,在下文的分析中,大家可以自己對照它們的異同點。

我們前面說過,在並發編程中使用隊列通常是將當前線程包裝成某種類型的數據結構扔到等待隊列中,我們先來看看隊列中的每一個節點是怎么個結構:

static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}


可見,相比於AQS的sync queue所使用的雙向鏈表中的Node,這個WaitNode要簡單多了,它只包含了一個記錄線程的thread屬性和指向下一個節點的next屬性。

值得一提的是,FutureTask中的這個單向鏈表是當做棧來使用的,確切來說是當做Treiber棧來使用的,不了解Treiber棧是個啥的可以簡單的把它當做是一個線程安全的棧,它使用CAS來完成入棧出棧操作(想進一步了解的話可以看這篇文章)。為啥要使用一個線程安全的棧呢,因為同一時刻可能有多個線程都在獲取任務的執行結果,如果任務還在執行過程中,則這些線程就要被包裝成WaitNode扔到Treiber棧的棧頂,即完成入棧操作,這樣就有可能出現多個線程同時入棧的情況,因此需要使用CAS操作保證入棧的線程安全,對於出棧的情況也是同理。

由於FutureTask中的隊列本質上是一個Treiber(驅動)棧,那么使用這個隊列就只需要一個指向棧頂節點的指針就行了,在FutureTask中,就是waiters屬性:

/** Treiber stack of waiting threads */
private volatile WaitNode waiters;


事實上,它就是整個單向鏈表的頭節點。

綜上,FutureTask中所使用的隊列的結構如下:

 

 

CAS操作

CAS操作大多數是用來改變狀態的,在FutureTask中也不例外。我們一般在靜態代碼塊中初始化需要CAS操作的屬性的偏移量:

    // Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}


從這個靜態代碼塊中我們也可以看出,CAS操作主要針對3個屬性,包括state、runner和waiters,說明這3個屬性基本是會被多個線程同時訪問的。其中state屬性代表了任務的狀態,waiters屬性代表了指向棧頂節點的指針,這兩個我們上面已經分析過了。runner屬性代表了執行FutureTask中的“Task”的線程。為什么需要一個屬性來記錄執行任務的線程呢?這是為了中斷或者取消任務做准備的,只有知道了執行任務的線程是誰,我們才能去中斷它。

定義完屬性的偏移量之后,接下來就是CAS操作本身了。在FutureTask,CAS操作最終調用的還是Unsafe類的compareAndSwapXXX方法,關於Unsafe,由於帶薪碼文 這里不再贅述。

實戰演練

一切沒有例子的講解都是耍流氓`               >>>      `蔥姜切沫~~加入生命的源泉....


實戰項目以springboot為項目腳手架,github地址 : github.com/leaJone/myb…

1.MyFutureTask實現類

內部定義一個線程池進行任務的調度和線程的管理以及線程的復用,大家可以根據自己的實際項目情況進行配置

其中線程調度示例:核心線程 8 最大線程 20 保活時間30s 存儲隊列 10 有守護線程 拒絕策略:將超負荷任務回退到調用者 說明 : 默認使用核心線程(8)數執行任務,任務數量超過核心線程數就丟到隊列,隊列(10)滿了就再開啟新的線程,新的線程數最大為20,當任務執行完,新開啟的線程將存活30s,若沒有任務就消亡,線程池回到核心線程數量.

import com.boot.lea.mybot.dto.UserBehaviorDataDTO;
import com.boot.lea.mybot.service.UserService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.*;


/**
* @author Lijing
* @date 2019年7月29日
*/
@Slf4j
@Component
public class MyFutureTask {


@Resource
UserService userService;

/**
* 核心線程 8 最大線程 20 保活時間30s 存儲隊列 10 有守護線程 拒絕策略:將超負荷任務回退到調用者
*/
private static ExecutorService executor = new ThreadPoolExecutor(8, 20,
30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10),
new ThreadFactoryBuilder().setNameFormat("User_Async_FutureTask-%d").setDaemon(true).build(),
new ThreadPoolExecutor.CallerRunsPolicy());


@SuppressWarnings("all")
public UserBehaviorDataDTO getUserAggregatedResult(final Long userId) {

System.out.println("MyFutureTask的線程:" + Thread.currentThread());


long fansCount = 0, msgCount = 0, collectCount = 0,
followCount = 0, redBagCount = 0, couponCount = 0;

// fansCount = userService.countFansCountByUserId(userId);
// msgCount = userService.countMsgCountByUserId(userId);
// collectCount = userService.countCollectCountByUserId(userId);
// followCount = userService.countFollowCountByUserId(userId);
// redBagCount = userService.countRedBagCountByUserId(userId);
// couponCount = userService.countCouponCountByUserId(userId);

try {

Future<Long> fansCountFT = executor.submit(() -> userService.countFansCountByUserId(userId));
Future<Long> msgCountFT = executor.submit(() -> userService.countMsgCountByUserId(userId));
Future<Long> collectCountFT = executor.submit(() -> userService.countCollectCountByUserId(userId));
Future<Long> followCountFT = executor.submit(() -> userService.countFollowCountByUserId(userId));
Future<Long> redBagCountFT = executor.submit(() -> userService.countRedBagCountByUserId(userId));
Future<Long> couponCountFT = executor.submit(() -> userService.countCouponCountByUserId(userId));

//get阻塞
fansCount = fansCountFT.get();
msgCount = msgCountFT.get();
collectCount = collectCountFT.get();
followCount = followCountFT.get();
redBagCount = redBagCountFT.get();
couponCount = couponCountFT.get();

} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
log.error(">>>>>>聚合查詢用戶聚合信息異常:" + e + "<<<<<<<<<");
}
UserBehaviorDataDTO userBehaviorData =
UserBehaviorDataDTO.builder().fansCount(fansCount).msgCount(msgCount)
.collectCount(collectCount).followCount(followCount)
.redBagCount(redBagCount).couponCount(couponCount).build();
return userBehaviorData;
}


}


2.service業務方法

常規業務查詢方法,為了特效,以及看出實際的效果,我們每個方法做了延時

import com.boot.lea.mybot.mapper.UserMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class UserServiceImpl implements UserService {

@Autowired
UserMapper userMapper;

@Override
public long countFansCountByUserId(Long userId) {
try {
Thread.sleep(10000);
System.out.println("獲取FansCount===睡眠:" + 10 + "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("UserService獲取FansCount的線程 " + Thread.currentThread().getName());
return 520;
}

@Override
public long countMsgCountByUserId(Long userId) {
System.out.println("UserService獲取MsgCount的線程 " + Thread.currentThread().getName());
try {
Thread.sleep(10000);
System.out.println("獲取MsgCount===睡眠:" + 10 + "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 618;
}

@Override
public long countCollectCountByUserId(Long userId) {
System.out.println("UserService獲取CollectCount的線程 " + Thread.currentThread().getName());
try {
Thread.sleep(10000);
System.out.println("獲取CollectCount==睡眠:" + 10 + "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 6664;
}

@Override
public long countFollowCountByUserId(Long userId) {
System.out.println("UserService獲取FollowCount的線程 " + Thread.currentThread().getName());
try {
Thread.sleep(10000);
System.out.println("獲取FollowCount===睡眠:" + 10+ "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return userMapper.countFollowCountByUserId(userId);
}

@Override
public long countRedBagCountByUserId(Long userId) {
System.out.println("UserService獲取RedBagCount的線程 " + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(4);
System.out.println("獲取RedBagCount===睡眠:" + 4 + "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 99;
}

@Override
public long countCouponCountByUserId(Long userId) {
System.out.println("UserService獲取CouponCount的線程 " + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(8);
System.out.println("獲取CouponCount===睡眠:" + 8+ "s");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 66;
}
}



3.controller調用

/**
* @author LiJing
* @ClassName: UserController
* @Description: 用戶控制器
* @date 2019/7/29 15:16
*/
@RestController
@RequestMapping("user/")
public class UserController {


@Autowired
private UserService userService;


@Autowired
private MyFutureTask myFutureTask;


@GetMapping("/index")
@ResponseBody
public String index() {
return "啟動用戶模塊成功~~~~~~~~";
}

//http://localhost:8080/api/user/get/data?userId=4

@GetMapping("/get/data")
@ResponseBody
public UserBehaviorDataDTO getUserData(Long userId) {
System.out.println("UserController的線程:" + Thread.currentThread());
long begin = System.currentTimeMillis();
UserBehaviorDataDTO userAggregatedResult = myFutureTask.getUserAggregatedResult(userId);
long end = System.currentTimeMillis();
System.out.println("===============總耗時:" + (end - begin) /1000.0000+ "秒");
return userAggregatedResult;
}


}


我們啟動項目:開啟調用 http://localhost:8080/api/user/get/data?userId=4

當我們線程池配置為:核心線程 8 最大線程 20 保活時間30s 存儲隊列 10 的時候,我們測試的結果如下:

 

 

 

 

結果 :我們看到每個server method的執行線程都是從線程池中發起的線程名:User_Async_FutureTask-%d, 總耗時從累計的52秒縮短到10秒,即取決於最耗時的方法查詢時間.

那我們再將注釋代碼放開,進行串行查詢進行測試:

 

 

 

 

 

 

 

結果 :我們使用串行的方式進行查詢,結果匯總將達到52秒,那太可怕了~~

總結

使用FutureTask的時候,就是將任務runner以caller的方式進行回調,阻塞獲取,最后我們將結果匯總,即完成了開啟多線程異步調用我們的業務方法.

            Future<Long> fansCountFT = executor.submit(new Callable<Long>() {
@Override
public Long call() throws Exception {
return userService.countFansCountByUserId(userId);
}
});

這里使用的只是一個簡單的例子,具體項目可以定義具體的業務方法進行歸並處理,其實在JDK1.8以后,又有了ExecutorCompletionService,ForkJoinTask,CompletableFuture這些都可以實現上述的方法,我們后續會做一些這些方法使用的案例,期望大家的關注,文章中有不足之處,歡迎指正~

小甜點

 

 

所以:我們要用到親愛的Spring的異步編程,異步編程有很多種方式:比如常見的Future的sync,CompletableFuture.supplyAsync,,@Async,哈哈 其實都離不開Thread.start()...,等等我說個笑話:

老爸有倆孩子:小紅和小明。老爸想喝酒了,他讓小紅去買酒,小紅出去了。然后老爸突然想吸煙了,於是老爸讓小明去買煙。在面對對象的思想中,一般會把買東西,然后買回來這件事作為一個方法,如果按照順序結構或者使用多線程同步的話,小明想去買煙就必須等小紅這個買東西的操作進行完。這樣無疑增加了時間的開銷(萬一老爸尿憋呢?)。異步就是為了解決這樣的問題。你可以分別給小紅小明下達指令,讓他們去買東西,然后你就可以自己做自己的事,等他們買回來的時候接收結果就可以了。

package com.boot.lea.mybot.futrue;

/**
* @ClassName: TestFuture
* @Description: 演示異步編程
* @author LiJing
* @date 2019/8/5 15:16
*/
@SuppressWarnings("all")
public class TestFuture {
static ExecutorService executor = Executors.newFixedThreadPool(2);

public static void main(String[] args) throws InterruptedException {
//兩個線程的線程池
//小紅買酒任務,這里的future2代表的是小紅未來發生的操作,返回小紅買東西這個操作的結果
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("爸:小紅你去買瓶酒!");
try {
System.out.println("小紅出去買酒了,女孩子跑的比較慢,估計5s后才會回來...");
Thread.sleep(5000);
return "我買回來了!";
} catch (InterruptedException e) {
System.err.println("小紅路上遭遇了不測");
return "來世再見!";
}
}, executor);

//小明買煙任務,這里的future1代表的是小明未來買東西會發生的事,返回值是小明買東西的結果
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("爸:小明你去買包煙!");
try {
System.out.println("小明出去買煙了,可能要3s后回來...");
Thread.sleep(3000);

throw new InterruptedException();
// return "我買回來了!";
} catch (InterruptedException e) {
System.out.println("小明路上遭遇了不測!");
return "這是我托人帶來的口信,我已經不在了。";
}
}, executor);

//獲取小紅買酒結果,從小紅的操作中獲取結果,把結果打印
future2.thenAccept((e) -> {
System.out.println("小紅說:" + e);
});
//獲取小明買煙的結果
future1.thenAccept((e) -> {
System.out.println("小明說:" + e);
});

System.out.println("爸:等啊等 西湖美景三月天嘞......");
System.out.println("爸: 我覺得無聊甚至去了趟廁所。");
Thread.currentThread().join(9 * 1000);
System.out.println("爸:終於給老子買來了......huo 酒");
//關閉線程池
executor.shutdown();
}
}

運行結果:

 

 




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM