在scala中是沒有原生線程的,其底層使用的是java的Thread機制。但是在scala中對java Thread進行了封裝,實現了更便於操作線程的Future。
官方文檔: Futures provide a way to reason about performing many operations in parallel– in an efficient and non-blocking way.
在使用的時候只需要通過object Future 的apply方法傳入執行體即可啟動,那么future是如何開始運行的呢?又是如何把運行體加入到線程的執行體中的呢?其底層運行機制又是什么呢?下面就逐步看一下。
先看一段代碼.注意在代碼中導入的global,其類型為global: ExecutionContext,這里暫時不進行解釋,留意一下后面會用到。
package zpj.future import org.scalatest.FunSuite import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global /** * Created by PerkinsZhu on 2018/3/18 11:34 **/ class Test extends FunSuite { test("future demo 1") { Future { println("hello world !!!") } sleep } val sleep = Thread.sleep(1000) }
直接運行代碼會打印出“hello world !!!”。我們知道,如果使用java的Thread,則必須調用.start()方法來啟動線程的運行,可是在這里我們並沒有主動觸發start()方法,而線程體卻執行了。下面進入源碼中看一下。在這之前注意打開idea的Structure窗口,留意每個方法是屬於哪個class、object或者trait中。這樣便於理解整個Future 的結構關系。
進入Future.apply()函數:
def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = unit.map(_ => body)
可以看到在body便是傳入的線程體,在這里使用unit調用了map方法,那么這個unit又是什么呢?
/** A Future which is always completed with the Unit value. */ val unit: Future[Unit] = successful(())
一個值為unit 的已完成future。這里調用的successful(())函數。注意傳入的() ,這個就是該future的值:Unit 。可以看一下()的類型:
很明顯()就是上面注釋所說的 Unit value.
繼續我們進入successful(())看一下是怎么實現的:
/** Creates an already completed Future with the specified result. * * @tparam T the type of the value in the future * @param result the given successful value * @return the newly created `Future` instance */ def successful[T](result: T): Future[T] = Promise.successful(result).future
先看一下參數部分,result:T,還記得上面傳入的()嗎,在這里便賦值給result。那么后面的Promise.successful(result).future
又是什么意思呢?我們先看前半部分Promise.successful(result),這里調用的是Promise的succeful(),進入看一下:
/** Creates an already completed Promise with the specified result. * * @tparam T the type of the value in the promise * @return the newly created `Promise` object */ def successful[T](result: T): Promise[T] = fromTry(Success(result))
到這里看到Success(result)大概就明白了,這就是用來構建future的結果值,其結果便是Success(()) 。【疑問1】同時注意一下這里返回的結果類型為Promise[T],而其調用出接收的卻是Future,這兩處是如何對接的呢?我們暫時放一下,先看下面。那fromTry又是做什么呢?
/** Creates an already completed Promise with the specified result or exception. * * @tparam T the type of the value in the promise * @return the newly created `Promise` object */ def fromTry[T](result: Try[T]): Promise[T] = impl.Promise.KeptPromise[T](result)
這里通過KeptPromise創建了一個Promise的實例,繼續進入KeptPromise.apply():
def apply[T](result: Try[T]): scala.concurrent.Promise[T] = resolveTry(result) match { case s @ Success(_) => new Successful(s) case f @ Failure(_) => new Failed(f) }
1、注意這里的Successful(s)和Failed(f),這兩個是繼承了Promise的私有類,看一下這里的繼承結構:
private[this] sealed trait Kept[T] extends Promise[T]
private[this] final class Successful[T](val result: Success[T]) extends Kept[T] private[this] final class Failed[T](val result: Failure[T]) extends Kept[T]
2、resolveTry是對result進行進一步處理,判斷result是否失敗,並解析出其Exception,只是對future中的結果做一個細分化。
private def resolveTry[T](source: Try[T]): Try[T] = source match { case Failure(t) => resolver(t) case _ => source } private def resolver[T](throwable: Throwable): Try[T] = throwable match { case t: scala.runtime.NonLocalReturnControl[_] => Success(t.value.asInstanceOf[T]) case t: scala.util.control.ControlThrowable => Failure(new ExecutionException("Boxed ControlThrowable", t)) case t: InterruptedException => Failure(new ExecutionException("Boxed InterruptedException", t)) case e: Error => Failure(new ExecutionException("Boxed Error", e)) case t => Failure(t) }
走到這里,就明白了Promise.successful(result).future中的 前半部分的執行機。還記得上面拋出的一個疑問嗎?這里就對【疑問1】解釋一下。
def successful[T](result: T): Future[T] = Promise.successful(result).future接收的是Future,而Promise.successful(result)返回的是一個Promise,這兩個類型怎么對接呢?后面調用了future ,我們進入看一下
trait Promise[T] {
def future: Future[T]
...
...
該函數是定義在特質scala.concurrent.Promise中的一個抽象函數(注意這里的包路徑)。上面我們知道Promise.successful(result)返回的是一個Successful,那么future應該會在Successful中進行實現了:
進去之后發現並沒有,那么會不會在其父類中實現了呢?我們繼續進入Kept看看:
發現Kept中也沒有,那么久繼續向上找,private[this] sealed trait Kept[T] extends Promise[T],(注意這里的Promise是scala.concurrent.impl中的Promise,不是剛才的scala.concurrent.Promis)這里我們進入scala.concurrent.Promise看一下:
private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] { def future: this.type = this
會發現在 scala.concurrent.impl.Promise[T] extends scala.concurrent.Promise[T],且兩者都是特質(注意區分這兩個Promise)。在下面可以看到 future 在這里被實現了def future: this.type = this。對於這里該如何理解呢?
future返回的結果應該是Future[T]類型的,那么這里的this.type 應該就是Promise類型,而this就應該是上面的Successful(())。這里可能有些不太容易理解,事實上 scala.concurrent.impl.Promise繼承了Promise 混合了Future ,注意看上面的繼承關系:
private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T]
這里的with混合了scala.concurrent.Future特質,通過def future: this.type = this把Promise類型轉化為Future返回給了調用處。
走到這里unit的構建就清晰了,其實質就是一個已經完成了的Future
回到Future.apply()方法中,unit就明白了其構建過程,而對於map呢?該如何理解?
def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = unit.map(_ => body)
繼續進入map的實現源碼:
def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = transform(_ map f)
def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S]
一路跟進來之后會進入scala.concurrent.Future#transform的抽象方法中。上面我們知道這里的unit是scala.concurrent.impl.Promise.KeptPromise.Successful的實例,根據上面的經驗一層一層的向上找transform的實現位置,會發現在scala.concurrent.impl.Promise#transform中進行了實現。看一下這里的實現代碼:
override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = { val p = new DefaultPromise[S]() onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) } p.future }
在這里我們逐一分析一下這三行代碼:
1、val p = new DefaultPromise[S]()。創建了 一個scala.concurrent.impl.Promise.DefaultPromise實例,進入DefaultPromise的構造器中看一下:
class DefaultPromise[T] extends AtomicReference[AnyRef](Nil) with Promise[T]
會發現DefaultPromise依舊混合了scala.concurrent.impl.Promise特質,同時還繼承了java.util.concurrent.atomic.AtomicReference且向其構造器中傳入了Nil空列表。這里先掛起,分析第二行代碼。
2、onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) },在理解這行代碼的時候需要注意scala的參數類型,明確其傳入的是函數還是參數值。
我們進入onComplete 發現是一個scala.concurrent.Future#onComplete的抽象方法。那么找到其實現處:scala.concurrent.impl.Promise.KeptPromise.Kept#onComplete,看一下源碼:
override def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = (new CallbackRunnable(executor.prepare(), func)).executeWithValue(result)
這里終於看到開啟線程的代碼了,每個future開啟一個線程的代碼應該就是這里了。
注意這里new CallbackRunnable(executor.prepare(), func)) 傳入的對象 executor,和func,這里的executor是從上面一路帶過來的(implicit executor: ExecutionContext),也就是我們上面剛開始導入的import scala.concurrent.ExecutionContext.Implicits.global;在看func,回溯上面會發現func就是scala.concurrent.Promise#complete方法,根據名字可以指定是在Future 完成之后的回調,接收的參數就是Future.apply()的函數體。
進入scala.concurrent.impl.CallbackRunnable看一起源碼:
private final class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable { // must be filled in before running it var value: Try[T] = null override def run() = { require(value ne null) // must set value to non-null before running! try onComplete(value) catch { case NonFatal(e) => executor reportFailure e } } def executeWithValue(v: Try[T]): Unit = { require(value eq null) // can't complete it twice value = v // Note that we cannot prepare the ExecutionContext at this point, since we might // already be running on a different thread! try executor.execute(this) catch { case NonFatal(t) => executor reportFailure t } } }
注意如下幾點:
1、繼承關系可以發現CallbackRunnable是java.lang.Runnable的實現類,因此其實一個可以在java Threa中運行的線程。 CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable
2、注意其構造器參數,executor是一個全局線程池,onComplete: Try[T] => Any是一個函數。函數是可以調用的代碼塊,可以傳參的(理解scala的函數式編程)。
3、注意其run方法中執行的代碼塊,其中是調用了onComplete的,且傳入的結果是一個Value。
4、注意executeWithValue的參數v,其把v賦值給Value。賦值之后調用了 executor.execute(this);該命令再熟悉不過了,調用線程池執行線程,這里的this就是CallbackRunnable實例。
通過這四點可以明白:
scala.concurrent.impl.Promise.KeptPromise.Kept#onComplete 是在單獨的線程中執行的,結合上面的 onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) }這塊代碼,發現onComplete執行的就是scala.concurrent.Promise#complete的代碼邏輯。
再看一下scala.concurrent.impl.Promise#transform的源碼:
override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = { val p = new DefaultPromise[S]() onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) } p.future }
注意這里面的參數類型,f: Try[T] => Try[S]是一個函數,然而注意這里: p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) ,看一下 p.complete()方法接收的參數類型是什么:
def complete(result: Try[T]): this.type = if (tryComplete(result)) this else throw new IllegalStateException("Promise already completed.")
一個結果參數,不是一個函數。再看上面的f(result),其實質在調用f()函數,傳入的參數就是result,然后計算出結果之后把結果值傳入scala.concurrent.Promise#complete。仔細體會一下這里的調用邏輯。也就是說在調用scala.concurrent.Promise#complete之前f()函數已經進行了調用,這里的f()函數也就是Future.apply()的函數體。
匯總上面再理一下調用邏輯:
override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = { val p = new DefaultPromise[S]() onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) } p.future }
在onComplete ()中開啟線程,並執行線程體。在線程執行過程中,調用p.complete()函數,而在調用p.complete()之前會觸發f()函數的調用,這樣便觸發了Future.apply()的執行,於是便執行了 println("hello world !!!") 代碼塊。
因此Future.apply()中的代碼塊是在單獨的一個線程中執行的,這便是scala 中Future自動開啟線程執行代碼塊的機制。
這里不太容易理解的就是這個函數的調用時機。搞清楚Future是如何把Future.apply()代碼塊加載到java Thread中運行之后,Future的核心便易於理解了。
注意這里還有一個result的傳入時機:
onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) }
這個result 是從哪里過來的呢?我們知道future是可以組合上一個future的結果的。例如:
Future { 10 }.map( _ + 10).map(_ * 10)
這里執行邏輯時機上是(10+10)* 10 結果就是200 ,那么這里的10如何傳給第二個map函數的呢?又是如何把20傳給第三個map函數的呢?
我們再看一下scala.concurrent.impl.Promise.KeptPromise.Kept#onComplete的實現源碼:
override def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = (new CallbackRunnable(executor.prepare(), func)).executeWithValue(result)
注意這里的result,調用executeWithValue()之后會把該result賦值給scala.concurrent.impl.CallbackRunnable#value的參數,在run運行過程中,調用onComlete會把該繼續把該result傳給p.complete()
override def run() = { require(value ne null) // must set value to non-null before running! try onComplete(value) catch { case NonFatal(e) => executor reportFailure e } } override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = { val p = new DefaultPromise[S]() onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) } p.future }
這里的result便是線程run方法中傳入的Value,那么在(new CallbackRunnable(executor.prepare(), func)).executeWithValue(result)這里的result又是哪里來的呢?
看一下onComplete的源碼:
private[this] sealed trait Kept[T] extends Promise[T] { def result: Try[T] override def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = (new CallbackRunnable(executor.prepare(), func)).executeWithValue(result)
發現result是一個抽象值,那么我們就去找Kept的實現類scala.concurrent.impl.Promise.KeptPromise.Successful。看一下構造器:
private[this] final class Successful[T](val result: Success[T]) extends Kept[T]
在這里可以發現其實result是通過構造器傳入的,那么是哪里調用構造器傳入的呢?還記得我們看unit實現邏輯嗎?其中有一部分這樣的代碼:
def apply[T](result: Try[T]): scala.concurrent.Promise[T] = resolveTry(result) match { case s @ Success(_) => new Successful(s) case f @ Failure(_) => new Failed(f) }
這里的S便是傳入的result,而在構建unit的時候,這里的S是一個Unit值,這也是初始Future的值。
那么我們上面說的10、20分別是如何通過map傳入的呢?
這里我們回想一下前面的unit,unit是通過scala.concurrent.impl.Promise.KeptPromise.Successful構造的,其混入的是scala.concurrent.impl.Promise.KeptPromise.Kept因此看下面
override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = { val p = new DefaultPromise[S]() onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) } p.future }
unit在調用transform的時候,執行的 onComplete 是scala.concurrent.impl.Promise.KeptPromise.Kept#onComplete。而看第三行返回的結果: p.future,也即是說第一個Future返回的對象是DefaultPromise()實例的future。結合代碼:
Future { 10 }.map( _ + 10).map(_ * 10)
這里返回的future是DefaultPromise()的future,所以調用map的也是DefaultPromise()的future。那么,進入map方法之后,我們會發現又進入了scala.concurrent.Future#transform
def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S]
override def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S] = { val p = new DefaultPromise[S]() onComplete { result => p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) } p.future }
注意這里調用transform的不再是KeptPromise()了,而是DefaultPromise()的實例在調用。所以 在調用onComplete()的時候進入的就是scala.concurrent.impl.Promise.DefaultPromise#onComplete,而不再是scala.concurrent.impl.Promise.KeptPromise.Kept#onComplete了
下面看一下scala.concurrent.impl.Promise.DefaultPromise#onComplete的源碼:
final def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = dispatchOrAddCallback(new CallbackRunnable[T](executor.prepare(), func))
注意這里只是new 了一個CallbackRunnable,並沒有啟動。不啟動的原因就是不確定上一個Future是否執行成功。可能需要等待,由此可以猜到dispatchOrAddCallback()的目的就是對調用者future進行判斷和等待的邏輯。看一下scala.concurrent.impl.Promise.DefaultPromise#dispatchOrAddCallback的源碼:
/** Tries to add the callback, if already completed, it dispatches the callback to be executed. * Used by `onComplete()` to add callbacks to a promise and by `link()` to transfer callbacks * to the root promise when linking two promises together. */ @tailrec private def dispatchOrAddCallback(runnable: CallbackRunnable[T]): Unit = { get() match { case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]]) case dp: DefaultPromise[_] => compressedRoot(dp).dispatchOrAddCallback(runnable) case listeners: List[_] => if (compareAndSet(listeners, runnable :: listeners)) () else dispatchOrAddCallback(runnable) } } /** * Gets the current value. * * @return the current value */ public final V get() {// 注意該方法的路徑:java.util.concurrent.atomic.AtomicReference#get return value; }
注意如下幾點:
1、scala.concurrent.impl.Promise.DefaultPromise#dispatchOrAddCallback是一個遞歸方法,注意注釋@tailrec
2、case r: Try[_] 該分支說明調用者future已經結束,啟動該future的線程,執行map中的操作。
3、為什么會調用的get()方法呢?因為DefaultPromise混入了AtomicReference:
class DefaultPromise[T] extends AtomicReference[AnyRef](Nil) with Promise[T]
注意這里傳入的是Nil ,這也是為什么會有case listeners: List[_]分支的原因。
scala在進行debug的時候不像java那么方便,需要深入理解函數式編程的邏輯,函數的調用邏輯。
=========================================
=========================================
-------end