https://code.csdn.NET/DOC_Scala/chinese_scala_offical_document/file/Futures-and-Promises-cn.md#anchor_0
Philipp Haller, Aleksandar Prokopec, Heather Miller, Viktor Klang, Roland Kuhn, and Vojin Jovanovic著
簡介
Future提供了一套高效便捷的非阻塞並行操作管理方案。其基本思想很簡單,所謂Future,指的是一類占位符對象,用於指代某些尚未完成的計算的結果。一般來說,由Future指代的計算都是並行執行的,計算完畢后可另行獲取相關計算結果。以這種方式組織並行任務,便可以寫出高效、異步、非阻塞的並行代碼。
默認情況下,future和promise並不采用一般的阻塞操作,而是依賴回調進行非阻塞操作。為了在語法和概念層面更加簡明扼要地使用這些回調,Scala還提供了flatMap、foreach和filter等算子,使得我們能夠以非阻塞的方式對future進行組合。當然,future仍然支持阻塞操作——必要時,可以阻塞等待future(不過並不鼓勵這樣做)。
Future
所謂Future,是一種用於指代某個尚未就緒的值的對象。而這個值,往往是某個計算過程的結果:
- 若該計算過程尚未完成,我們就說該Future未就位;
- 若該計算過程正常結束,或中途拋出異常,我們就說該Future已就位。
Future的就位分為兩種情況:
- 當Future帶着某個值就位時,我們就說該Future攜帶計算結果成功就位。
- 當Future因對應計算過程拋出異常而就緒,我們就說這個Future因該異常而失敗。
Future的一個重要屬性在於它只能被賦值一次。一旦給定了某個值或某個異常,future對象就變成了不可變對象——無法再被改寫。
創建future對象最簡單的方法是調用future方法,該future方法啟用異步(asynchronous)計算並返回保存有計算結果的futrue,一旦該future對象計算完成,其結果就變的可用。
注意_Future[T]_ 是表示future對象的類型,而future是方法,該方法創建和調度一個異步計算,並返回隨着計算結果而完成的future對象。
這最好通過一個例子予以說明。
假設我們使用某些流行的社交網絡的假定API獲取某個用戶的朋友列表,我們將打開一個新對話(session),然后發送一個請求來獲取某個特定用戶的好友列表。
import scala.concurrent._ import ExecutionContext.Implicits.global val session = socialNetwork.createSessionFor("user", credentials) val session = socialNetwork.createSessionFor("user", credentials) session.getFriends() }
以上,首先導入scala.concurrent 包使得Future類型和future構造函數可見。我們將馬上解釋第二個導入。
然后我們初始化一個session變量來用作向服務器發送請求,用一個假想的 createSessionFor 方法來返回一個List[Friend]。為了獲得朋友列表,我們必須通過網絡發送一個請求,這個請求可能耗時很長。這能從調用getFriends方法得到解釋。為了更好的利用CPU,響應到達前不應該阻塞(block)程序的其他部分執行,於是在計算中使用異步。future方法就是這樣做的,它並行地執行指定的計算塊,在這個例子中是向服務器發送請求和等待響應。
一旦服務器響應,future f 中的好友列表將變得可用。
未成功的嘗試可能會導致一個異常(exception)。在下面的例子中,session的值未被正確的初始化,於是在future的計算中將拋出NullPointerException,future f 不會圓滿完成,而是以此異常失敗。
val session = null val session = socialNetwork.createSessionFor("user", credentials) session.getFriends }
import ExecutionContext.Implicits.global
上面的線條導入默認的全局執行上下文(global execution context),執行上下文執行執行提交給他們的任務,也可把執行上下文看作線程池,這對於future方法來說是必不可少的,因為這可以處理異步計算如何及何時被執行。我們可以定義自己的執行上下文,並在future上使用它,但是現在只需要知道你能夠通過上面的語句導入默認執行上下文就足夠了。
我們的例子是基於一個假定的社交網絡API,此API的計算包含發送網絡請求和等待響應。提供一個涉及到你能試着立即使用的異步計算的例子是公平的。假設你有一個文本文件,你想找出一個特定的關鍵字第一次出現的位置。當磁盤正在檢索此文件內容時,這種計算可能會陷入阻塞,因此並行的執行該操作和程序的其他部分是合理的(make sense)。
val firstOccurrence: Future[Int] = future {
val source = scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
Callbacks(回調函數)
現在我們知道如何開始一個異步計算來創建一個新的future值,但是我們沒有展示一旦此結果變得可用后如何來使用,以便我們能夠用它來做一些有用的事。我們經常對計算結果感興趣而不僅僅是它的副作用。
在許多future的實現中,一旦future的client對future的結果感興趣,它不得不阻塞它自己的計算直到future完成——然后才能使用future的值繼續它自己的計算。雖然這在Scala的Future API(在后面會展示)中是允許的,但是從性能的角度來看更好的辦法是一種完全非阻塞的方法,即在future中注冊一個回調,future完成后這個回調稱為異步回調。如果當注冊回調時future已經完成,則回調可能是異步執行的,或在相同的線程中循序執行。
注冊回調最通常的形式是使用OnComplete方法,即創建一個Try[T] => U
類型的回調函數。如果future成功完成,回調則會應用到Success[T]類型的值中,否則應用到Failure[T]
類型的值中。
Try[T]
和Option[T]
或 Either[T, S]
相似,因為它是一個可能持有某種類型值的單子。然而,它是特意設計來保持一個值或某個可拋出(throwable)對象。Option[T]
既可以是一個值(如:Some[T]
)也可以是完全無值(如:None
),如果Try[T]
獲得一個值則它為Success[T]
,否則為Failure[T]
的異常。 Failure[T]
獲得更多的關於為什么這兒沒值的信息,而不僅僅是None。同時也可以把Try[T]
看作一種特殊版本的Either[Throwable, T]
,專門用於左值為可拋出類型(Throwable)的情形。
回到我們的社交網絡的例子,假設我們想要獲取我們最近的帖子並顯示在屏幕上,我們通過調用getRecentPosts方法獲得一個返回值List[String]——一個近期帖子的列表文本:
val f: Future[List[String]] = future { session.getRecentPosts } f onComplete { case Success(posts) => for (post <- posts) println(post) case Success(posts) => for (post <- posts) println(post) }
onComplete方法一般在某種意義上它允許客戶處理future計算出的成功或失敗的結果。對於僅僅處理成功的結果,onSuccess 回調使用如下(該回調以一個偏函數(partial function)為參數):
val f: Future[List[String]] = future { session.getRecentPosts } f onSuccess { case posts => for (post <- posts) println(post) }
對於處理失敗結果,onFailure回調使用如下:
val f: Future[List[String]] = future { session.getRecentPosts } f onFailure { case t => println("An error has occured: " + t.getMessage) } f onSuccess { case posts => for (post <- posts) println(post) }
如果future失敗,即future拋出異常,則執行onFailure回調。
因為偏函數具有 isDefinedAt方法, onFailure方法只有在特定的Throwable類型對象中被定義才會觸發。下面例子中的onFailure回調永遠不會被觸發:
val f = future { 2 / 0 } f onFailure { case npe: NullPointerException => println("I'd be amazed if this printed out.") }
回到前面查找某個關鍵字第一次出現的例子,我們想要在屏幕上打印出此關鍵字的位置:
val firstOccurrence: Future[Int] = future {
val source = scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
firstOccurrence onSuccess {
case idx => println("The keyword first appears at position: " + idx)
}
firstOccurrence onFailure {
case t => println("Could not process file: " + t.getMessage)
}
onComplete,、onSuccess 和 onFailure 方法都具有Unit的結果類型,這意味着不能鏈接使用這些方法的回調。注意這種設計是為了避免暗示而刻意為之的,因為鏈接回調也許暗示着按照一定的順序執行注冊回調(回調注冊在同一個future中是無序的)。
也就是說,我們現在應討論論何時調用callback。因為callback需要future的值是可用的,所有回調只能在future完成之后被調用。然而,不能保證callback在完成future的線程或創建callback的線程中被調用。反而, 回調(callback)會在future對象完成之后的一些線程和一段時間內執行。所以我們說回調(callback)最終會被執行。
此外,回調(callback)執行的順序不是預先定義的,甚至在相同的應用程序中callback的執行順序也不盡相同。事實上,callback也許不是一個接一個連續的調用,但是可能會在同一時間同時執行。這意味着在下面的例子中,變量totalA也許不能在計算上下文中被設置為正確的大寫或者小寫字母。
@volatile var totalA = 0 val text = future { "na" * 16 + "BATMAN!!!" } text onSuccess { case txt => totalA += txt.count(_ == 'a') } text onSuccess { case txt => totalA += txt.count(_ == 'a') }
以上,這兩個回調(callbacks)可能是一個接一個地執行的,這樣變量totalA得到的預期值為18。然而,它們也可能是並發執行的,於是totalA最終可能是16或2,因為+= 是一個不可分割的操作符(即它是由一個讀和一個寫的步驟組成,這樣就可能使其與其他的讀和寫任意交錯執行)。
考慮到完整性,回調的使用情景列在這兒:
-
在future中注冊onComplete回調的時候要確保最后future執行完成之后調用相應的終止回調。
-
注冊onSuccess或者onFailure回調時也和注冊onComplete一樣,不同之處在於future執行成功或失敗分別調用onSuccess或onSuccess的對應的閉包。
-
注冊一個已經完成的future的回調最后將導致此回調一直處於執行狀態(1所隱含的)。
-
在future中注冊多個回調的情況下,這些回調的執行順序是不確定的。事實上,這些回調也許是同時執行的,然而,特定的ExecutionContext執行可能導致明確的順序。
-
在一些回調拋出異常的情況下,其他的回調的執行不受影響。
-
在一些情況下,回調函數永遠不能結束(例如,這些回調處於無限循環中),其他回調可能完全不會執行。在這種情況下,對於那些潛在的阻塞回調要使用阻塞的構造(例子如下)。
-
一旦執行完,回調將從future對象中移除,這樣更適合JVM的垃圾回收機制(GC)。
函數組合(Functional Composition)和For解構(For-Comprehensions)
盡管前文所展示的回調機制已經足夠把future的結果和后繼計算結合起來的,但是有些時候回調機制並不易於使用,且容易造成冗余的代碼。我們可以通過一個例子來說明。假設我們有一個用於進行貨幣交易服務的API,我們想要在有盈利的時候購進一些美元。讓我們先來看看怎樣用回調來解決這個問題:
val rateQuote = future { connection.getCurrentValue(USD) } rateQuote onSuccess { case quote => val purchase = future { if (isProfitable(quote)) connection.buy(amount, quote) else throw new Exception("not profitable") } purchase onSuccess { case _ => println("Purchased " + amount + " USD") } }
首先,我們創建一個名為rateQuote的future對象並獲得當前的匯率。在服務器返回了匯率且該future對象成功完成了之后,計算操作才會從onSuccess回調中執行,這時我們就可以開始判斷買還是不買了。所以我們創建了另一個名為purchase的future對象,用來在可盈利的情況下做出購買決定,並在稍后發送一個請求。最后,一旦purchase運行結束,我們會在標准輸出中打印一條通知消息。
這確實是可行的,但是有兩點原因使這種做法並不方便。其一,我們不得不使用onSuccess,且不得不在其中嵌套purchase future對象。試想一下,如果在purchase執行完成之后我們可能會想要賣掉一些其他的貨幣。這時我們將不得不在onSuccess的回調中重復這個模式,從而可能使代碼過度嵌套,過於冗長,並且難以理解。
其二,purchase只是定義在局部范圍內--它只能被來自onSuccess內部的回調響應。這也就是說,這個應用的其他部分看不到purchase,而且不能為它注冊其他的onSuccess回調,比如說賣掉些別的貨幣。
為解決上述的兩個問題,futures提供了組合器(combinators)來使之具有更多易用的組合形式。映射(map)是最基本的組合器之一。試想給定一個future對象和一個通過映射來獲得該future值的函數,映射方法將創建一個新Future對象,一旦原來的Future成功完成了計算操作,新的Future會通過該返回值來完成自己的計算。你能夠像理解容器(collections)的map一樣來理解future的map。
讓我們用map的方法來重構一下前面的例子:
val rateQuote = future { connection.getCurrentValue(USD) } val purchase = rateQuote map { quote => if (isProfitable(quote)) connection.buy(amount, quote) else throw new Exception("not profitable") } purchase onSuccess { case _ => println("Purchased " + amount + " USD") }
通過對rateQuote的映射我們減少了一次onSuccess的回調,更重要的是避免了嵌套。這時如果我們決定出售一些貨幣就可以再次使用purchase方法上的映射了。
可是如果isProfitable方法返回了false將會發生些什么?會引發異常?這種情況下,purchase的確會因為異常而失敗。不僅僅如此,想象一下,鏈接的中斷和getCurrentValue方法拋出異常會使rateQuote的操作失敗。在這些情況下映射將不會返回任何值,而purchase也會會自動的以和rateQuote相同的異常而執行失敗。
總之,如果原Future的計算成功完成了,那么返回的Future將會使用原Future的映射值來完成計算。如果映射函數拋出了異常則Future也會帶着該異常完成計算。如果原Future由於異常而計算失敗,那么返回的Future也會包含相同的異常。這種異常的傳導方式也同樣適用於其他的組合器(combinators)。
使之能夠在For-comprehensions原則下使用,是設計Future的目的之一。也正是因為這個原因,Future還擁有flatMap,filter和foreach等組合器。其中flatMap方法可以構造一個函數,它可以把值映射到一個姑且稱為g的新future,然后返回一個隨g的完成而完成的Future對象。
讓我們假設我們想把一些美元兌換成瑞士法郎。我們必須為這兩種貨幣報價,然后再在這兩個報價的基礎上確定交易。下面是一個在for-comprehensions中使用flatMap和withFilter的例子:
val usdQuote = future { connection.getCurrentValue(USD) } val chfQuote = future { connection.getCurrentValue(CHF) } val purchase = for { usd <- usdQuote chf <- chfQuote if isProfitable(usd, chf) } yield connection.buy(amount, chf) purchase onSuccess { case _ => println("Purchased " + amount + " CHF") }
purchase只有當usdQuote和chfQuote都完成計算以后才能完成-- 它以其他兩個Future的計算值為前提所以它自己的計算不能更早的開始。
上面的for-comprhension將被轉換為:
val purchase = usdQuote flatMap { usd => chfQuote .withFilter(chf => isProfitable(usd, chf)) .map(chf => connection.buy(amount, chf)) }
這的確是比for-comprehension稍微難以把握一些,但是我們這樣分析有助於您更容易的理解flatMap的操作。FlatMap操作會把自身的值映射到其他future對象上,並隨着該對象計算完成的返回值一起完成計算。在我們的例子里,flatMap用usdQuote的值把chfQuote的值映射到第三個futrue對象里,該對象用於發送一定量瑞士法郎的購入請求。只有當通過映射返回的第三個future對象完成了計算,purchase才能完成計算。
這可能有些難以置信,但幸運的是faltMap操作在for-comprhensions模式以外很少使用,因為for-comprehensions本身更容易理解和使用。
再說說filter,它可以用於創建一個新的future對象,該對象只有在滿足某些特定條件的前提下才會得到原始future的計算值,否則就會拋出一個NoSuchElementException的異常而失敗。調用了filter的future,其效果與直接調用withFilter完全一樣。
作為組合器的collect同filter之間的關系有些類似容器(collections)API里的那些方法之間的關系。
值得注意的是,調用foreach組合器並不會在計算值可用的時候阻塞當前的進程去獲取計算值。恰恰相反,只有當future對象成功計算完成了,foreach所迭代的函數才能夠被異步的執行。這意味着foreach與onSuccess回調意義完全相同。
由於Future trait(譯注: trait有點類似Java中的接口(interface)的概念)從概念上看包含兩種類型的返回值(計算結果和異常),所以組合器會有一個處理異常的需求。
比方說我們准備在rateQuote的基礎上決定購入一定量的貨幣,那么connection.buy
方法需要知道購入的數量和期望的報價值,最終完成購買的數量將會被返回。假如報價值偏偏在這個節骨眼兒改變了,那buy方法將會拋出一個QuoteChangedExecption
,並且不會做任何交易。如果我們想讓我們的Future對象返回0而不是拋出那個該死的異常,那我們需要使用recover組合器:
val purchase: Future[Int] = rateQuote map {
quote => connection.buy(amount, quote)
} recover {
case QuoteChangedException() => 0
}
這里用到的recover能夠創建一個新future對象,該對象當計算完成時持有和原future對象一樣的值。如果執行不成功則偏函數的參數會被傳遞給使原Future失敗的那個Throwable異常。如果它把Throwable映射到了某個值,那么新的Future就會成功完成並返回該值。如果偏函數沒有定義在Throwable中,那么最終產生結果的future也會失敗並返回同樣的Throwable。
組合器recoverWith能夠創建一個新future對象,當原future對象成功完成計算時,新future對象包含有和原future對象相同的計算結果。若原future失敗或異常,偏函數將會返回造成原future失敗的相同的Throwable異常。如果此時Throwable又被映射給了別的future,那么新Future就會完成並返回這個future的結果。recoverWith同recover的關系跟flatMap和map之間的關系很像。
fallbackTo組合器生成的future對象可以在該原future成功完成計算時返回結果,如果原future失敗或異常返回future參數對象的成功值。在原future和參數future都失敗的情況下,新future對象會完成並返回原future對象拋出的異常。正如下面的例子中,本想打印美元的匯率,但是在獲取美元匯率失敗的情況下會打印出瑞士法郎的匯率:
val usdQuote = future { connection.getCurrentValue(USD) } map { usd => "Value: " + usd + "$" } val chfQuote = future { connection.getCurrentValue(CHF) } map { chf => "Value: " + chf + "CHF" } al anyQuote = usdQuote fallbackTo chfQuote anyQuote onSuccess { println(_) }
組合器andThen的用法是出於純粹的side-effecting目的。經andThen返回的新Future無論原Future成功或失敗都會返回與原Future一模一樣的結果。一旦原Future完成並返回結果,andThen后跟的代碼塊就會被調用,且新Future將返回與原Future一樣的結果,這確保了多個andThen調用的順序執行。正如下例所示,這段代碼可以從社交網站上把近期發出的帖子收集到一個可變集合里,然后把它們都打印在屏幕上:
val allposts = mutable.Set[String]() future { session.getRecentPosts } andThen { posts => allposts ++= posts } andThen { posts => clearAll() for (post <- allposts) render(post) }
綜上所述,Future的組合器功能是純函數式的,每種組合器都會返回一個與原Future相關的新Future對象。
投影(Projections)
為了確保for解構(for-comprehensions)能夠返回異常,futures也提供了投影(projections)。如果原future對象失敗了,失敗的投影(projection)會返回一個帶有Throwable類型返回值的future對象。如果原Future成功了,失敗的投影(projection)會拋出一個NoSuchElementException異常。下面就是一個在屏幕上打印出異常的例子:
val f = future { 2 / 0 } for (exc <- f.failed) println(exc)
下面的例子不會在屏幕上打印出任何東西:
val f = future { 4 / 2 } for (exc <- f.failed) println(exc)
Future的擴展
用更多的實用方法來對Futures API進行擴展支持已經被提上了日程,這將為很多外部框架提供更多專業工具。
Blocking
正如前面所說的,在future的blocking非常有效地緩解性能和預防死鎖。雖然在futures中使用這些功能方面的首選方式是Callbacks和combinators,但在某些處理中也會需要用到blocking,並且它也是被Futures and Promises API所支持的。
在之前的並發交易(concurrency trading)例子中,在應用的最后有一處用到block來確定是否所有的futures已經完成。這有個如何使用block來處理一個future結果的例子:
import scala.concurrent._ import scala.concurrent.duration._ def main(args: Array[String]) { val rateQuote = future { connection.getCurrentValue(USD) } val purchase = rateQuote map { quote => if (isProfitable(quote)) connection.buy(amount, quote) else throw new Exception("not profitable") } Await.result(purchase, 0 nanos) }
在這種情況下這個future是不成功的,這個調用者轉發出了該future對象不成功的異常。它包含了失敗的投影(projection)-- 阻塞(blocking)該結果將會造成一個NoSuchElementException異常在原future對象被成功計算的情況下被拋出。
相反的,調用Await.ready
來等待這個future直到它已完成,但獲不到它的結果。同樣的方式,調用那個方法時如果這個future是失敗的,它將不會拋出異常。
The Future trait實現了Awaitable trait還有其ready()
和result()
方法。這些方法不能被客戶端直接調用,它們只能通過執行環境上下文來進行調用。
為了允許程序調用可能是阻塞式的第三方代碼,而又不必實現Awaitable特質,原函數可以用如下的方式來調用:
blocking { potentiallyBlockingCall() }
這段blocking代碼也可以拋出一個異常。在這種情況下,這個異常會轉發給調用者。
異常(Exceptions)
當異步計算拋出未處理的異常時,與那些計算相關的futures就失敗了。失敗的futures存儲了一個Throwable的實例,而不是返回值。Futures提供onFailure回調方法,它用一個PartialFunction去表示一個Throwable。下列特殊異常的處理方式不同:
scala.runtime.NonLocalReturnControl[_]
--此異常保存了一個與返回相關聯的值。通常情況下,在方法體中的返回結構被調用去拋出這個異常。相關聯的值將會存儲到future或一個promise中,而不是一直保存在這個異常中。
ExecutionException-當因為一個未處理的中斷異常、錯誤或者scala.util.control.ControlThrowable
導致計算失敗時會被存儲起來。這種情況下,ExecutionException會為此具有未處理的異常。這些異常會在執行失敗的異步計算線程中重新拋出。這樣做的目的,是為了防止正常情況下沒有被客戶端代碼處理過的那些關鍵的、與控制流相關的異常繼續傳播下去,同時告知客戶端其中的future對象是計算失敗的。
更精確的語義描述請參見 [NonFatal]。
Promises
到目前為止,我們僅考慮了通過異步計算的方式創建future對象來使用future的方法。盡管如此,futures也可以使用promises來創建。
如果說futures是為了一個還沒有存在的結果,而當成一種只讀占位符的對象類型去創建,那么promise就被認為是一個可寫的,可以實現一個future的單一賦值容器。這就是說,promise通過這種success方法可以成功去實現一個帶有值的future。相反的,因為一個失敗的promise通過failure方法就會實現一個帶有異常的future。
一個promise p通過p.future方式返回future。 這個futrue對象被指定到promise p。根據這種實現方式,可能就會出現p.future與p相同的情況。
考慮下面的生產者 - 消費者的例子,其中一個計算產生一個值,並把它轉移到另一個使用該值的計算。這個傳遞中的值通過一個promise來完成。
import scala.concurrent.{ future, promise } import scala.concurrent.ExecutionContext.Implicits.global val p = promise[T] val f = p.future val producer = future { val r = produceSomething() p success r continueDoingSomethingUnrelated() } val consumer = future { startDoingSomething() f onSuccess { case r => doSomethingWithResult() } }
在這里,我們創建了一個promise並利用它的future方法獲得由它實現的Future。然后,我們開始了兩種異步計算。第一種做了某些計算,結果值存放在r中,通過執行promise p,這個值被用來完成future對象f。第二種做了某些計算,然后讀取實現了future f的計算結果值r。需要注意的是,在生產者完成執行continueDoingSomethingUnrelated()
方法這個任務之前,消費者可以獲得這個結果值。
正如前面提到的,promises具有單賦值語義。因此,它們僅能被實現一次。在一個已經計算完成的promise或者failed的promise上調用success方法將會拋出一個IllegalStateException異常。
下面的這個例子顯示了如何fail a promise。
val p = promise[T] val f = p.future val producer = future { val r = someComputation if (isInvalid(r)) p failure (new IllegalStateException) else { val q = doSomeMoreComputation(r) p success q } }
如上,生產者計算出一個中間結果值r,並判斷它的有效性。如果它不是有效的,它會通過返回一個異常實現promise p的方式fails the promise,關聯的future f是failed。否則,生產者會繼續它的計算,最終使用一個有效的結果值實現future f,同時實現 promise p。
Promises也能通過一個complete方法來實現,這個方法采用了一個potential value Try[T]
,這個值要么是一個類型為Failure[Throwable]
的失敗的結果值,要么是一個類型為Success[T]
的成功的結果值。
類似success方法,在一個已經完成(completed)的promise對象上調用failure方法和complete方法同樣會拋出一個IllegalStateException異常。
應用前面所述的promises和futures方法的一個優點是,這些方法是單一操作的並且是沒有副作用(side-effects)的,因此程序是具有確定性的(deterministic)。確定性意味着,如果該程序沒有拋出異常(future的計算值被獲得),無論並行的程序如何調度,那么程序的結果將會永遠是一樣的。
在一些情況下,客戶端也許希望能夠只在promise沒有完成的情況下完成該promise的計算(例如,如果有多個HTTP請求被多個不同的futures對象來執行,並且客戶端只關心地一個HTTP應答(response),該應答對應於地一個完成該promise的future)。因為這個原因,future提供了tryComplete,trySuccess和tryFailure方法。客戶端需要意識到調用這些的結果是不確定的,調用的結果將以來從程序執行的調度。
completeWith方法將用另外一個future完成promise計算。當該future結束的時候,該promise對象得到那個future對象同樣的值,如下的程序將打印1:
val f = future { 1 } val p = promise[Int] p completeWith f p.future onSuccess { case x => println(x) }
當讓一個promise以異常失敗的時候,三總子類型的Throwable異常被分別的處理。如果中斷該promise的可拋出(Throwable)一場是scala.runtime.NonLocalReturnControl
,那么該promise將以對應的值結束;如果是一個Error的實例,InterruptedException
或者scala.util.control.ControlThrowable
,那么該可拋出(Throwable)異常將會封裝一個ExecutionException異常,該ExectionException將會讓該promise以失敗結束。
通過使用promises,futures的onComplete方法和future的構造方法,你能夠實現前文描述的任何函數式組合組合器(compition combinators)。讓我們來假設一下你想實現一個新的組合起,該組合器首先使用兩個future對象f和,產生第三個future,該future能夠用f或者g來完成,但是只在它能夠成功完成的情況下。
這里有個關於如何去做的實例:
def first[T](f: Future[T], g: Future[T]): Future[T] = { val p = promise[T] f onSuccess { case x => p.trySuccess(x) } g onSuccess { case x => p.trySuccess(x) } p.future }
注意,在這種實現方式中,如果f與g都不是成功的,那么first(f, g)
將不會實現(即返回一個值或者返回一個異常)。
工具(Utilities)
為了簡化在並發應用中處理時序(time)的問題,scala.concurrent
引入了Duration抽象。Duration不是被作為另外一個通常的時間抽象存在的。他是為了用在並發(concurrency)庫中使用的,Duration位於scala.concurrent
包中。
Duration是表示時間長短的基礎類,其可以是有限的或者無限的。有限的duration用FiniteDuration類來表示,並通過時間長度(length)
和java.util.concurrent.TimeUnit
來構造。無限的durations,同樣擴展了Duration,只在兩種情況下存在,Duration.Inf
和Duration.MinusInf
。庫中同樣提供了一些Durations的子類用來做隱式的轉換,這些子類不應被直接使用。
抽象的Duration類包含了如下方法:
到不同時間單位的轉換(toNanos, toMicros, toMillis, toSeconds, toMinutes, toHours, toDays and toUnit(unit: TimeUnit))
。 durations的比較(<,<=,>和>=)
。 算術運算符(+, -, *, / 和單值運算_-)
duration的最大最小方法(min,max)
。 測試duration是否是無限的方法(isFinite)
。 Duration能夠用如下方法實例化(instantiated)
:
隱式的通過Int和Long類型轉換得來 val d = 100 millis
。 通過傳遞一個Long length
和java.util.concurrent.TimeUnit
。例如val d = Duration(100, MILLISECONDS)
。 通過傳遞一個字符串來表示時間區間,例如 val d = Duration("1.2 µs")
。 Duration也提供了unapply方法,因此可以i被用於模式匹配中,例如:
import scala.concurrent.duration._ import java.util.concurrent.TimeUnit._ // instantiation val d1 = Duration(100, MILLISECONDS) // from Long and TimeUnit val d2 = Duration(100, "millis") // from Long and String val d3 = 100 millis // implicitly from Long, Int or Double val d4 = Duration("1.2 µs") // from String // pattern matching val Duration(length, unit) = 5 millis
更多詳細內容參考官網:http://docs.scala-lang.org/overviews/core/futures.html