前言
最近一直在看JUC下面的一些東西,發現很多東西都是以前用過,但是真是到原理層面自己還是很欠缺。
剛好趁這段時間不太忙,回來了便一點點學習總結。
前言
最近一直在看JUC下面的一些東西,發現很多東西都是以前用過,但是真是到原理層面自己還是很欠缺。
剛好趁這段時間不太忙,回來了便一點點學習總結。
由於自己水平有限,可能存在大量漏洞和思考不周到的地方,不吝賜教。
Future 模式
一種非常經典的設計模式,這種設計模式主要就利用空間換時間得到概念,也就是說異步執行(需要開啟一個新的線程)。
在互聯網高並發的應用服務中,我們隨處可見這種理念和代碼,主要就是使用了這種模式。
Future模式非常適合在處理耗時很長的業務邏輯時進行使用,可以有效的減小系統的響應時間,提高系統的吞吐量。
Future代碼示例:
/**
* @Description:
* @Author: wangmeng
* @Date: 2018/12/17-20:44
*/
public class UseFuture implements Callable<String> {
private String param;
public UseFuture(String param) {
this.param = param;
}
@Override
public String call() throws Exception {
//模擬執行業務邏輯的耗時
TimeUnit.SECONDS.sleep(3);
String result = this.param + " 處理完成!";
return result;
}
public static void main(String[] args) throws Exception{
String queryStr = "query1";
String queryStr2 = "query2";
FutureTask<String> future1 = new FutureTask<String>(new UseFuture(queryStr));
FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr2));
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(future1);//異步操作
executorService.submit(future2);//異步操作
System.out.println("執行中...");
TimeUnit.SECONDS.sleep(2);//處理其他相關的任務。
String result1 = future1.get();
String result2 = future2.get();
System.out.println("數據處理完成。。" + result1);
System.out.println("數據處理完成。。" + result2);
}
}
應用場景
Future模式有點類似於商品訂單,比如在網購時,當看中某一件商品時,就可以提交訂單,當訂單處理完成后,在家等待商品送貨上門即可。
或者說更形象的我們發送Ajax請求的時候,頁面是異步的進行后台處理,用戶無需一直等待請求的結果,可以繼續瀏覽或操作其他內容。
Future實現原理
看到上面示例代碼,我們是通過executorService.submit(future1) 來提交線程的,進一步看看里面具體的邏輯。
1、 AbstractExecutorService
中submit()源碼:
2、FutureTask
中run()源碼:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
這個是核心代碼,首先我們需要知道FutureTask中有一個volatile state全局變量,通過這個值來界定任務是否已經執行完畢。
將上面run方法一點點拆解如下:
先判斷state狀態,如果不是NEW說明執行完畢,直接return掉。
后面使用CAS操作,判斷這個任務是否已經執行,這里FutureTask有個全局的volatile runner字段,這里通過cas將當前線程指定給runner。
這里可以防止callable被執行多次。
接着往下看:
查看set方法具體實現:
繼續往下跟,查看finishCompletion方法:
FutureTask中有一個WaiteNode單鏈表,當執行futureTask.get()方法時,多個線程會將等待的線程的next指向下一個想要get獲取結果的線程。
finishCompletion主要就是使用Unsafe.unpark()進行喚醒操作。
3,FutureTask.get() 源碼
get() 方法會進行自旋操作等待,直到FutureTask中的state狀態大於NORMAL(表示自行完成),然后才會通過FutureTask的outcome獲取返回值。
接着往下跟awaitDone方法:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
還是老樣子,一點點分析:
不知道自己理解的是否有偏差,有問題歡迎大家隨時指出,感謝備至。
到了這里 就不再講解了,后面還有report、cancel等方法,大家可以自行參閱源碼。
總結
結合上述分析可得 FutureTask 執行活動圖如下:
同時也可以看出,在 FutureTask 中內部維護了一個單向鏈表 waiters , 在執行 get 的時候會向其中添加節點:
最后特別感謝掘金此位博主的分享,參考如下:
FutureTask 源碼分析