本文內容
- 簡介
- Futures
- 阻塞
- 異常
- Promises
- 工具
最近看了《七周七語言:理解多種編程泛型》,介紹了七種語言(四種編程范型)的主要特性:基本語法,集合,並行/並發,其中就有 Scala。你不能指望這種書全面介紹,因為其中任何一門語言都夠寫一本書了~
我比較關注並行/並發,但是書中關於 Scala 的並發部分——Actor,可代碼編譯不通過,官網標注“Deprecated”,哎,這書點不負責,示例代碼也不寫采用編譯器的版本~
Java 8 之前,函數式編程實在太弱了,不然也不會出現像 Scala 這樣在 JVM 上運行、能夠與 Java 完美配合的語言(估計,Java 在函數式編程方面的落后,已經讓社區等不急了)。
本文來自 Scala 官網,完整示例代碼幾乎沒有,大部分是理論,雖然講解得很詳細,但看起來實在有點費勁。因此,你最好找點這方面完整示例再看看。
官網其實也有中文翻譯,但卻是機器翻譯的。
簡介
Future提供了一套高效非阻塞(non-blocking)的方式完成並行操作。其基本思想很簡單,所謂 Future,指的是一類占位符對象(placeholder object),用於指代某些尚未完成計算的結果。一般,由Future的計算結果都是並行執行的,計算完后再使用。以這種方式組織並行任務,便可以寫出高效、異步、非阻塞的並行代碼。
默認情況,future 和 promise 利用回調(callback)的非阻塞方式,並不是采用典型的阻塞方式。為了在語法和概念層面簡化回調的使用,Scala 提供了 flatMap、foreach 和 filter 等算子(combinator),使得我們能夠以非阻塞的方式對future進行組合。當然,future 對於那些必須使用阻塞的情況仍然支持阻塞操作,可以阻塞等待future(不過不鼓勵這樣做)。
一個典型的 future 如下所示:
val inverseFuture:Future[Matrix]=Future{fatMatrix.inverse()// non-blocking long lasting computation
}(executionContext)
或是更常用的:
implicit val ec:ExecutionContext=...
val inverseFuture :Future[Matrix]=Future{
fatMatrix.inverse()
}// ec is implicitly passed
這兩個代碼片段把 fatMatrix.inverse() 的執行委托給 ExecutionContext,在 inverseFuture
中體現計算結果。
Futures
所謂 Future,是一種用於指代某個尚未就緒的值的對象。這個值通常是某個計算過程的結果:
- 若該計算過程尚未完成,我們就說該Future未完成;
- 若該計算過程正常結束,或中途拋出異常,我們就說該Future已完成。
Future 完成分為兩種情況:
- 當Future帶着某個值而完成時,我們就說該Future帶着計算結果成功完成。
- 當Future帶着異常而完成時,計算過程中拋出的異常,我們就說Future因異常而失敗。
Future 具有一個重要的屬性——只能被賦值一次。一旦給定了某個值或某個異常,future對象就變成了不可變對象——無法再被改寫。
創建future對象最簡單的方法是調用future方法,開始異步(asynchronous)計算,並返回保存有計算結果的futrue。一旦該future計算完成,其結果就變的可用。
注意,Future[T] 是一個類型,表示future對象,而future是一個方法,創建和調度一個異步計算,並返回一個帶有計算結果的future對象。
下面通過一個例子來展示。
假設,我們使用某個社交網絡假想的API獲取某個用戶的朋友列表,我們將打開一個新對話(session),然后發送一個獲取特定用戶朋友列表的請求。
import scala.concurrent._
importExecutionContext.Implicits.globalval session = socialNetwork.createSessionFor("user", credentials)
val f:Future[List[Friend]]=Future{session.getFriends()}
上面,首先導入 scala.concurrent 包。然后,通過一個假想的 createSessionFor 方法初始化一個向服務器發送請求 session 變量。這個請求是通過網絡發送的,所以可能耗時很長。調用 getFriends 方法返回 List[Friend]。為了更好的利用CPU,知道響應到達,不應該阻塞(block)程序的其他部分,這個計算應該被異步調度。future方法就是這樣做的,它並行地執行指定的計算塊,在這個例子中,向服務器發送請求,等待響應。
一旦服務器響應,future f 中的好友列表將變得可用。
失敗可能會導致一個 exception。在下面的例子中,session 的值未被正確的初始化,於是,future 塊中計算將拋出一個 NullPointerException。這樣,future f 失敗了。
val session =null
val f:Future[List[Friend]]=Future{
session.getFriends
}
上面的 import ExecutionContext.Implicits.global
導入默認的全局執行上下文(global execution context)。執行上下文執行提交給他們的任務,你也可把執行上下文看作線程池,這對future方法是必不可少的,因為,它們處理如何和何時執行異步計算。你可以定義自己的執行上下文,並用 future 使用,但現在,只需要知道你能夠通過上面的語句導入默認執行上下文就足夠了。
我們的例子是基於一個假想的社交網絡 API,計算包含了發送網絡請求和等待響應。下面,假設你有一個文本文件,想找出一個特定詞第一次出現的位置。當磁盤正在檢索此文件時,這個計算過程可能會陷入阻塞,因此,並行執行程序的剩余部分將很有意義。
val firstOccurrence:Future[Int]=Future{
val source = scala.io.Source.fromFile("e:\scala\myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
回調函數
現在,我們知道如何開始一個異步計算來創建一個新的future值,但是我們沒有演示一旦此結果變得可用后如何使用。我們經常對計算結果感興趣而不僅僅是它的副作用(side-effects)。
在許多future的實現中,一旦future的客戶端對結果感興趣,它必須阻塞它自己的計算,並等待直到future完成——然后才能使用future的值繼續它自己的計算。雖然這在Scala Future API(在后面會展示)中是允許的,但從性能角度來看更好的辦法是完全非阻塞,即在future中注冊一個回調。一旦future完成,就異步調用回調。如果當注冊回調,future已經完成,那么,回調或是異步執行,或在相同的線程中循序執行。
注冊回調最通常的形式,是使用OnComplete方法,即創建一個Try[T] => U
類型的回調函數。如果future成功完成,回調則會應用到Success[T]類型的值中,否則應用到 Failure[T]
類型的值中。
Try[T]
跟 Option[T]
或 Either[T, S]
相似,因為它是一個可能持有某種類型值的單子(monda)。然而,它是為持有一個值或異常對象特殊設計的。Option[T]
既可以是一個值(如:Some[T]
)也可以完全不是值(如:None
),如果Try[T]
獲得一個值是,那么它是 Success[T]
,否則為持有異常的 Failure[T]
。 Failure[T]
有很多信息,不僅僅是關於為什么沒有值 None。同時,也可以把 Try[T]
看作一種特殊版本的 Either[Throwable, T]
,特別是當左邊值為一個 Throwable 的情形。
“一個單子(Monad)說白了不過就是自函子范疇上的一個幺半群而已。”這句話出自Haskell大神Philip Wadler,也是他提議把Monad引入Haskell。
回到我們社交網絡的例子,假設,我們想獲取最近的帖子並顯示在屏幕上,可以通過調用 getRecentPosts 方法,它返回 List[String]:
import scala.util.{Success,Failure}
val f:Future[List[String]]=Future{
session.getRecentPosts
}
f onComplete {
caseSuccess(posts)=>for(post <- posts) println(post)
caseFailure(t)=> println("An error has occured: "+ t.getMessage)
}
onComplete 方法允許客戶處理失敗或成功的future 結果。對於成功,onSuccess 回調使用如下:
val f:Future[List[String]]=Future{
session.getRecentPosts
}
f
onSuccess {
case posts =>for(post <- posts
)
println(post)
}
對於失敗,onFailure 回調使用如下:
val f:Future[List[String]]=Future{
session.getRecentPosts
}
f onFailure {
case t => println("An error has occured: "+ t.getMessage)
}
f onSuccess {
case posts =>for(post <- posts) println(post)
}
onFailure 回調只有在 future 失敗,也就是包含一個異常時才會執行。
因為部分函數(partial functions)具有 isDefinedAt 方法, 所以,onFailure
方法只有為了特定 Throwable 而定義才會觸發。下面的例子,已注冊的 onFailure
回調永遠不會被觸發:
val f =Future{
2/0
}
f onFailure {
case npe:NullPointerException=>
println("I'd be amazed if this printed out.")
}
部分函數(Partial functions),假設有一個數學函數 f(a,b,c),partial(f,1)返回的是數學函數 f(1,b,c),函數的參數 a 已經被代入。
回到前面例子,查找某個第一次出現的關鍵字,在屏幕上輸出該關鍵字的位置:
val firstOccurrence:Future[Int]=Future{
val source = scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
firstOccurrence onSuccess {
case idx => println("The keyword first appears at position: "+ idx)
}
firstOccurrence onFailure {
case t => println("Could not process file: "+ t.getMessage)
}
onComplete,、onSuccess 和 onFailure 方法都具有結果類型 Unit,這意味着這些回調方法不能被鏈接。注意,這種設計是為了避免鏈式調用可能隱含在已注冊回調上一個順序的執行(同一個 future 中注冊的回調是無序的)。
也就是說,我們現在應討論論何時調用回調。因為回調需要future 中的值是可用的,只有future完成后才能被調用。然而,不能保證被完成 future 的線程或創建回調的線程調用。反而, 回調有時會在future對象完成后被某個線程調用。我們可以說,回調最終會被執行。
更進一步,回調被執行的順序不是預先定義的,甚至在同一個應用程序。事實上,回調也許不是一個接一個連續調用的,但在同一時間並發調用。這意味着,下面例子中,變量 totalA 也許不能從計算的文本中得到大小寫字母數量的正確值。
@volatilevar totalA =0
val text =Future{
"na"*16+"BATMAN!!!"
}
text onSuccess {
case txt => totalA += txt.count(_ =='a')
}
text onSuccess {
case txt => totalA += txt.count(_ =='A')
}
上面,兩個回調可能一個接一個地執行,變量 totalA 得到的預期值為 18。然而,它們也可能是並發執行,於是,totalA 最終可能是16或2,因為+= 不是一個原子操作(即它是由一個讀和一個寫的步驟組成,這樣就可能使其與其他的讀和寫任意交錯執行)。
考慮到完整性,回調的語義如下:
- 在 future 上注冊 onComplete 回調,要確保 future 執行完成后,相應的閉包(closure)最終被調用。
- 注冊 onSuccess 或 onFailure 回調,與 onComplete 語義一樣,不同的是,只有在 future 成功地或失敗地執行完后,才會調用。
- 在 future 上注冊一個已經完成的回調,將導致回調最終被執行。將最終導致一直處於執行狀態的回調(上面 1 所隱含的)。
- 在 future 上注冊多個回調時,這些回調的執行順序是不確定的。事實上,這些回調可能是並行執行的,然而,某個 ExecutionContext 執行可能導致明確的執行順序。
- 在某些回調拋出異常時,其他回調的執行不受影響。
- 在某些回調無法永遠無法結束時(例如,回調包含一個無限循環),其他回調可能完全不會執行。這種情況下,那些潛在的阻塞的回調需要使用阻塞結構。將在后面“阻塞”小節說明。
- 一旦執行完后,回調會從 future 對象中移除,這對垃圾回收機制(GC)很合適。
函數組合(Functional Composition)和For解構(For-Comprehensions)
盡管前文所展示的回調機制已經足夠把future的結果和后繼計算結合起來的,但是有時回調機制並不易於使用,且容易造成冗余的代碼。可以通過一個例子來說明。假設,我們有一個用於進行貨幣交易服務的API,想要在有盈利的時候購進一些美元。讓我們先來看看怎樣用回調來解決這個問題:
val rateQuote =Future{
connection.getCurrentValue(USD)
}
rateQuote onSuccess {case quote =>
val purchase =Future{
if(isProfitable(quote)) connection.buy(amount, quote)
else throw new Exception("not profitable")
}
purchase onSuccess {
case _ => println("Purchased "+ amount +" USD")
}
}
首先,我們創建一個名為rateQuote的future對象並獲得當前的匯率。從服務器返回匯率且該future對象成功完成之后,在onSuccess回調判斷買還是不買。所以,我們創建了另一個名為purchase的future對象,用來在可盈利的情況下做出購買決定,並在發送一個請求。最后,一旦purchase運行結束,會輸出一條通知消息。
這確實是可行,但有兩點原因使這種做法並不方便。其一,不得不使用onSuccess,且不得不在其中嵌套另一個purchase future對象。試想一下,如果在purchase執行完成之后我們可能想要賣掉一些其他的貨幣。這時將不得不在onSuccess的回調中重復這個模式,從而可能使代碼過度嵌套,冗長且難以理解。
其二,purchase只是定義在局部范圍,只能被來自onSuccess內部的回調響應。也就是說,這個應用的其他部分看不到purchase,而且不能為它注冊其他的onSuccess回調,比如說賣掉些其他貨幣。
為解決上述的兩個問題,futures提供了組合器(combinators)使之具有更多易用的組合。映射(map)是最基本的組合器之一。試想,給定一個future對象和一個通過映射來獲得該future值的函數,映射方法將創建一個新Future對象,一旦原來的Future成功完成了計算,新Future會通過該返回值來完成自己的計算。你能夠像理解容器(collections)的map一樣來理解future的map。
讓我們用map的方法來重構一下前面的例子:
val rateQuote =Future{
connection.getCurrentValue(USD)
}
val purchase = rateQuote map { quote =>
if(isProfitable(quote)) connection.buy(amount, quote)
elsethrownewException("not profitable")
}
purchase onSuccess {
case _ => println("Purchased "+ amount +" USD")
}
通過對rateQuote的映射我們減少了一次onSuccess回調,更重要的是避免了嵌套。這時如果我們決定出售一些貨幣就可以再次使用purchase方法上的映射了。
可是,如果isProfitable方法返回了false將會發生些什么?會引發異常?這種情況下,purchase的確會因異常而失敗。不僅僅如此,想象一下,鏈接的中斷和getCurrentValue方法拋出異常會使rateQuote的操作失敗。在這些情況下,映射將不會返回任何值,而purchase也會自動以和rateQuote相同的異常而執行失敗。
總之,如果原Future的計算成功完成了,那么返回的Future將會使用原Future的映射值來完成計算。如果映射函數拋出了異常,則Future也會帶着該異常完成計算。如果原Future由於異常而計算失敗,那么返回的Future也會包含相同的異常。這種異常的傳導方式也同樣適用於其他的組合器(combinators)。
Future的這個設計目的是使之能夠在 For-comprehensions 下使用。也正是因為這,Future還擁有flatMap,filter和foreach等組合器。其中,flatMap方法可以構造一個函數,可以把值映射到一個姑且稱為g的新future,然后返回一個隨g的完成而完成的Future對象。
讓我們假設,想把一些美元兌換成法郎。必須為這兩種貨幣報價,然后再在這兩個報價的基礎上確定交易。下面是一個在 for-comprehensions 中使用flatMap和withFilter的例子:
val usdQuote =Future{ connection.getCurrentValue(USD)}
val chfQuote =Future{ connection.getCurrentValue(CHF)}
val purchase =for{
usd <- usdQuote
chf <- chfQuote
if isProfitable(usd, chf)
}yield connection.buy(amount, chf)
purchase onSuccess {
case _ => println("Purchased "+ amount +" CHF")
}
purchase只有當usdQuote和chfQuote都完成計算后才能完成,它以其他兩個Future的計算值為前提,所以它自己的計算不能更早開始。
上面的 for-comprhension 將被轉換為:
val purchase = usdQuote flatMap {
usd =>
chfQuote
.withFilter(chf => isProfitable(usd, chf))
.map(chf => connection.buy(amount, chf))
}
這的確是比for-comprehension稍微難以把握一些,但是我們這樣分析有助於更容易的理解flatMap操作。FlatMap操作會把自身的值映射到其他future對象上,並隨着該對象計算完成的返回值一起完成計算。在例子中,flatMap用usdQuote的值把chfQuote的值映射到第三個futrue對象里,該對象用於發送一定量法郎的購入請求。只有當通過映射返回的第三個future對象完成了計算,purchase才能完成計算。
這可能有些難以置信,但幸運的是faltMap操作在for-comprhensions模式以外很少使用,因為for-comprehensions本身更容易理解和使用。
再說說filter,它可以用於創建一個新的future對象,該對象只有在滿足某些特定條件的前提下才會得到原始future的計算值,否則就會拋出一個NoSuchElementException的異常而失敗。調用了filter的future,其效果與直接調用withFilter完全一樣。
collect 和 filter 組合器之間的關系有些類似容器(collections)API里那些方法之間的關系。
值得注意的是,調用foreach組合器並不會在計算值可用的時候阻塞當前的進程去獲取計算值。恰恰相反,只有當future對象成功計算完成后,foreach所迭代的函數才能夠被異步執行。這意味着foreach與onSuccess回調意義完全相同。
由於Future trait(trait 類似 java 中的接口 interface)從概念上看包含兩種類型的返回值(計算結果和異常),所以組合器會有一個處理異常的需求。
比如,我們准備在rateQuote的基礎上決定購入一定量的貨幣,那么connection.buy
方法需要知道購入的數量和期望的報價值,最終完成購買的數量將會被返回。假如報價值偏偏在這個時候改變了,那么,buy方法將會拋出一個QuoteChangedExecption
,並且不會做任何交易。如果我們想讓我們的Future對象返回0,而不是拋出那個該死的異常,那我們需要使用recover組合器:
val purchase:Future[Int]= rateQuote map {
quote => connection.buy(amount, quote)
} recover {
caseQuoteChangedException()=>0
}
recover能夠創建一個新future對象,該對象當計算完成時持有和原future對象一樣的值。如果執行不成功,則偏函數的參數會被傳遞給使原Future失敗的那個Throwable異常。如果它把Throwable映射到了某個值,那么新的Future就會成功完成並返回該值。如果偏函數沒有定義在Throwable中,那么最終產生結果的future也會失敗並返回同樣的Throwable。
組合器recoverWith能夠創建一個新future對象,當原future對象成功完成計算時,新future對象包含有和原future對象相同的計算結果。若原future失敗或異常,偏函數將會返回造成原future失敗的相同的Throwable異常。如果此時Throwable又被映射給了別的future,那么新Future就會完成並返回這個future的結果。recoverWith 和 recover的關系與flatMap和map之間的關系很像。
fallbackTo組合器生成的future對象可以在該原future成功完成計算時返回結果,如果原future失敗或異常返回future參數對象的成功值。在原future和參數future都失敗的情況下,新future對象會完成並返回原future對象拋出的異常。正如下面例子,本想打印美元的匯率,但在獲取美元匯率失敗的情況下會打印法郎的匯率:
val usdQuote =Future{
connection.getCurrentValue(USD)
} map {
usd =>"Value: "+ usd +"$"
}
val chfQuote =Future{
connection.getCurrentValue(CHF)
} map {
chf =>"Value: "+ chf +"CHF"
}
val anyQuote = usdQuote fallbackTo chfQuote
anyQuote onSuccess { println(_)}
組合器andThen的用法是出於純粹的side-effecting目的。經andThen返回的新Future,無論原Future成功或失敗都會返回與原Future一樣的結果。一旦原Future完成並返回結果,andThen后跟的代碼塊就會被調用,且新Future將返回與原Future一樣的結果,這確保了多個andThen調用的順序執行。正如下例所示,這段代碼可以從社交網站上把近期發出的帖子收集到一個可變集合里,然后把它們都打印在屏幕上:
val allposts =mutable.Set[String]()
Future{
session.getRecentPosts
} andThen {
posts => allposts ++= posts
} andThen {
posts =>
clearAll()
for(post <- allposts) render(post)
}
綜上所述,Future的組合器功能是純函數式的,每種組合器都會返回一個與原Future相關的新Future對象。
投影(Projections)
為了確保 for-comprehensions 能夠返回異常,futures也提供了投影(projections)。如果原future對象失敗了,失敗的投影(projection)會返回一個帶有Throwable類型返回值的future對象。如果原Future成功了,失敗的投影(projection)會拋出一個NoSuchElementException異常。下面是一個在屏幕上打印出異常的例子:
val f =Future{
2/0
}
for(exc <- f.failed) println(exc)
下面例子不會在屏幕上打印出任何東西:
val f =Future{
4/2
}
for(exc <- f.failed) println(exc)
Future 擴展
更多的實用方法對Futures API進行了擴展支持,這將為很多外部框架提供更多專業工具。
阻塞
future 一般都是異步的,不會阻塞潛在的執行線程。然而,在某些情況下,阻塞是必要的。我們區分了兩種阻塞執行線程的形式:future 內的阻塞,以及 future 外的阻塞,等待直到 future 完成。
Future 內的阻塞
As seen with the global ExecutionContext
, it is possible to notify an ExecutionContext
of a blocking call with the blocking
construct. The implementation is however at the complete discretion of the ExecutionContext
. While some ExecutionContext
such as ExecutionContext.global
implement blocking
by means of a ManagedBlocker
, some execution contexts such as the fixed thread pool:
正如全局 ExecutionContext,它可以通知一個具有阻塞結構的阻塞調用的 ExecutionContext。然而,實現是很慎重的。當某些 ExecutionContext 通過 ManagedBlocker 實現阻塞,一些執行上下文,如固定的線程池:
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(x))
下面代碼將什么都不做:
implicit val ec =ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(4))
Future{
blocking { blockingStuff()}
}
下面效果一樣:
Future{ blockingStuff()}
阻塞的代碼也可能拋出異常。在這種情況下,這個異常會轉發給調用者。
future 外阻塞
正如前面所說,在 future 上阻塞是不鼓勵的,因為會出現性能和死鎖。回調(Callbacks)和組合器(combinators)才是首選方法,但在某些情況中阻塞也是需要的,並且Futures 和 Promises API 也支持。
在之前的並發交易例子中,在最后有一處用到阻塞來確定是否所有的futures都已完成。下面是如何使用block來處理一個future結果的例子:
import scala.concurrent._
import scala.concurrent.duration._
def main(args:Array[String]){
val rateQuote =Future{
connection.getCurrentValue(USD)
}
val purchase = rateQuote map { quote =>
if(isProfitable(quote)) connection.buy(amount, quote)
elsethrownewException("not profitable")
}
Await.result(purchase,0 nanos)
}
在這種情況下,future失敗了,調用者轉發出了該future 失敗的異常。它包含了失敗的投影(projection)——阻塞該結果,將會造成一個NoSuchElementException異常,若原future對象被成功完成。
相反的,調用Await.ready
來等待,知道這個future完成,但獲不到結果。同樣,如果 future 失敗,調用不會拋出異常的方法。
Future trait 用ready()
和result()
方法實現了Awaitable trait。這些方法不能被客戶端直接調用——它們只能被執行上下文調用。
異常
當異步計算拋出未處理的異常時,與那些計算相關的futures就失敗了。失敗的futures存儲了一個Throwable的實例,而不是返回值。Futures提供onFailure回調方法,它用一個PartialFunction去表示一個Throwable。下列特殊異常的處理方式不同:
scala.runtime.NonLocalReturnControl[_]
–此異常保存了一個與返回相關聯的值。通常情況下,在方法體中的返回結構被調用去拋出這個異常。相關聯的值將會存儲到future或一個promise中,而不是一直保存在這個異常中。
ExecutionException-當因為一個未處理的中斷異常、錯誤或者scala.util.control.ControlThrowable
導致計算失敗時會被存儲起來。這種情況下,ExecutionException會為此具有未處理的異常。這些異常會在執行失敗的異步計算線程中重新拋出。這樣做的目的,是為了防止正常情況下沒有被客戶端代碼處理過的那些關鍵的、與控制流相關的異常繼續傳播下去,同時告知客戶端其中的future對象是計算失敗的。
更精確的語義描述請參見 [NonFatal]。
Promises
到目前為止,我們僅考慮了通過異步計算的方式創建future對象來使用future的方法。盡管如此,futures也可以使用promises來創建。
如果說futures是為了一個還沒有存在的結果,而當成一種只讀占位符的對象類型去創建,那么promise就被認為是一個可寫的,可以實現一個future的單一賦值容器。這就是說,promise通過這種success方法可以成功去實現一個帶有值的future。相反的,因為一個失敗的promise通過failure方法就會實現一個帶有異常的future。
一個promise p通過p.future方式返回future。 這個futrue對象被指定到promise p。根據這種實現方式,可能就會出現p.future與p相同的情況。
考慮下面的生產者 - 消費者的例子,其中一個計算產生一個值,並把它轉移到另一個使用該值的計算。這個傳遞中的值通過一個promise來完成。
import scala.concurrent.{ future, promise }
import scala.concurrent.ExecutionContext.Implicits.global
val p = promise[T]
val f = p.future
val producer = future {
val r = produceSomething()
p success r
continueDoingSomethingUnrelated()
}
val consumer = future {
startDoingSomething()
f onSuccess {
case r => doSomethingWithResult()
}
}
這里,我們創建了一個promise,並利用它的future方法獲得一個 Future。然后,開始兩個異步計算。第一種,做些計算,將結果放在 r 中,通過執行promise p,這個值被用來完成future對象f。第二個,也做某寫計算,然后讀取實現了 future f的計算結果值 r。注意,在生產者完成執行 continueDoingSomethingUnrelated()
方法之前,消費者可以獲得這個結果值。
正如前面提到的,promises 具有單賦值語義。因此,它們僅能被實現一次。在一個已經計算完成的promise或者failed的promise上調用success方法將會拋出一個IllegalStateException 異常。
下面例子顯示了如何失敗一個 promise。
val p = promise[T]
val f = p.future
val producer = future {
val r = someComputation
if(isInvalid(r))
p failure (newIllegalStateException)
else{
val q = doSomeMoreComputation(r)
p success q
}
}
如上,生產者計算出一個中間結果 r,並判斷它的有效性。如果無效,它會通過返回一個異常實現 promise p 的方式失敗這個 promise,關聯的future f是 failed。否則,生產者會繼續它的計算,最終使用一個有效的結果值實現 future f,同時實現 promise p。
Promises也能通過一個 complete 方法來實現,這個方法采用了一個 potential value Try[T]
,這個值要么是一個類型為 Failure[Throwable]
的失敗結果,要么是一個類型為 Success[T]
的成功結果。
類似 success 方法,在一個已經完成的 promise 對象上調用 failure 和 complete 方法同樣會拋出一個 IllegalStateException 異常。
前面所述的promises和futures方法的一個優點是,這些方法是單一操作的,並且是沒有副作用(side-effects)的,因此程序是確定性的(deterministic)。確定性意味着,如果該程序沒有拋出異常(future的計算值被獲得),無論並行的程序如何調度,那么程序的結果將會永遠是一樣的。
在一些情況下,客戶端也許希望能夠只在 promie 沒有完成的情況下完成該 promise的計算(例如,如果有多個HTTP請求被多個不同的futures對象來執行,並且客戶端只關心一個HTTP應答,該應答對應於第一個完成該 promise的 future)。因此,future 提供了 tryComplete,trySuccess 和 tryFailure 方法。客戶端需要意識到調用這些的結果是不確定的,調用的結果將依賴程序的調度。
completeWith 方法將用另外一個 future 完成 promise 計算。當該future結束的時候,該promise對象得到那個future對象同樣的值,如下的程序將打印 1:
val f = future {1}
val p = promise[Int]
p completeWith f
p.future onSuccess {
case x => println(x)
}
當讓一個 promise 以異常失敗的時候,三種子類型的 Throwable 異常被分別的處理。如果中斷該promise的可拋出(Throwable)一場是scala.runtime.NonLocalReturnControl
,那么該promise將以對應的值結束;如果是一個Error的實例,InterruptedException
或者scala.util.control.ControlThrowable
,那么該可拋出(Throwable)異常將會封裝一個 ExecutionException 異常,該 ExectionException 將會讓該 promise 以失敗結束。
通過使用promises,futures的onComplete方法和future的構造方法,你能夠實現前文描述的任何函數式競爭組合器(compition combinators)。假設,你想實現一個新的組合器,該組合器首先使用兩個future對象 f 和 g,並產生第三個future,該 future 是通過 f 或者 g 完成的,但只在成功完成的情況下。
下面是關於這個的例子:
def first[T](f:Future[T], g:Future[T]):Future[T]={
val p = promise[T]
f onSuccess {
case x => p.trySuccess(x)
}
g onSuccess {
case x => p.trySuccess(x)
}
p.future
注意,在這種實現方式中,如果f與g都不成功,那么 first(f, g)
將永遠不會完成(即返回一個值或者返回一個異常)。
工具(Utilities)
為了簡化在並發應用中處理時序的問題,scala.concurrent
引入了 Duration 抽象。Duration 不是被作為另外一個通常的時間抽象存在的。他是為了用在並發庫中,Duration位於scala.concurrent
包中。
Duration是表示時間長短的基礎類,其可以是有限的或者無限的。有限的duration用FiniteDuration類來表示,並通過時間長度(length)
和java.util.concurrent.TimeUnit
來構造。無限的durations,同樣擴展了Duration,只在兩種情況下存在,Duration.Inf
和 Duration.MinusInf
。庫中同樣提供了一些Durations的子類用來做隱式的轉換,這些子類不應被直接使用。
抽象的 Duration 類包含了如下方法:
- 轉換到不同的四件單位(toNanos, toMicros, toMillis, toSeconds, toMinutes, toHours, toDays and toUnit(unit: TimeUnit));
- durations 比較(<, <=, > 和 >=);
- 算術運算符(+, -, *, / 和 unary_-);
- Minimum and maximum between this duration and the one supplied in the argument (min, max).
- 檢查 duration 是否無限(isFinite)。
Duration 能夠用如下方法實例化(instantiated):
- 隱式的通過Int和Long類型轉換得來
val d = 100 millis
。 - 通過傳遞一個
Long length
和java.util.concurrent.TimeUnit
。例如val d = Duration(100, MILLISECONDS)
。 - 通過傳遞一個字符串來表示時間區間,例如
val d = Duration("1.2 µs")
。
Duration也提供了unapply方法,可以被用於模式匹配構造(pattern matching constructs)中,例如:
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit._
// instantiation
val d1 =Duration(100, MILLISECONDS)// from Long and TimeUnit
val d2 =Duration(100,"millis")// from Long and String
val d3 =100 millis // implicitly from Long, Int or Double
val d4 =Duration("1.2 µs")// from String
// pattern matching
val Duration(length, unit)=5 millis