[轉] Scala 中的異步事件處理


在任何並發性應用程序中,異步事件處理都至關重要。無論事件的來源是什么(不同的計算任務、I/O 操作或與外部系統的交互),您的代碼都必須跟蹤事件,協調為響應它們而執行的操作。應用程序可以采用兩種基本方法之一來實現異步事件處理:

  • 阻塞:一個等待事件的協調線程。
  • 非阻塞:事件向應用程序生成某種形式的通知,而沒有線程顯式等待它。

合成事件

scala.concurrent.Promise 和 scala.concurrent.Future 類為 Scala 開發人員提供了一些與 Java 8 開發人員的 CompletableFuture 使用方式類似的選項。具體地講,Future 同時提供了阻塞和非阻塞的事件完成方式。但是,盡管在此級別上很相似,但用於處理兩種 future 的技術是不同的。

我們先來看一個並發任務設置:

任務和排序

在一個特定操作中,應用程序通常必須執行多個處理步驟。例如,在向用戶返回結果之前,Web 應用程序可能需要:

  1. 在一個數據庫中查找用戶的信息
  2. 使用查找到的信息來執行 Web 服務調用,並執行另一次數據庫查詢。
  3. 根據從前兩個操作中獲得的結果來執行數據庫更新。
    圖 1 演示了這種結構類型。

圖 1. 應用程序任務流

圖 1 將處理過程分解為 4 個不同的任務,它們通過表示順序依賴關系的箭頭相連接。任務 1 可以直接執行,任務 2 和任務 3 都在任務 1 完成后執行,任務 4 在任務 2 和任務 3 都完成后執行。

建模異步事件

在真實的系統中,異步事件的來源一般是並行計算或一種形式的 I/O 操作。但是,使用簡單的時間延遲來建模這種系統會更容易一些,這也是這里所采用的方法。清單 1 顯示了我用於生成事件的基本的賦時事件 (timed-event) 代碼,這些事件采用了已完成的 Future 格式。

清單 1. 賦時事件代碼

import java.util.Timer
import java.util.TimerTask
 
import scala.concurrent._
 
object TimedEvent {
  val timer = new Timer
 
  /** Return a Future which completes successfully with the supplied value after secs seconds. */
  def delayedSuccess[T](secs: Int, value: T): Future[T] = {
    val result = Promise[T]
    timer.schedule(new TimerTask() {
      def run() = {
        result.success(value)
      }
    }, secs * 1000)
    result.future
  }
 
  /** Return a Future which completes failing with an IllegalArgumentException after secs
    * seconds. */
  def delayedFailure(secs: Int, msg: String): Future[Int] = {
    val result = Promise[Int]
    timer.schedule(new TimerTask() {
      def run() = {
        result.failure(new IllegalArgumentException(msg))
      }
    }, secs * 1000)
    result.future
  }

清單 1 中的 Scala 代碼使用一個 java.util.Timer 來安排 java.util.TimerTask 在一個延遲之后執行。每個 TimerTask 在運行時完成一個有關聯的 future。delayedSuccess 函數定制了一個任務,在運行時成功完成一個 Scala Future[T],然后將該 future 返回給調用方。delayedSuccess 函數返回相同類型的 future,但使用了一個在完成 future 時發生 IllegalArgumentException 異常的失敗任務。

清單 2 展示了如何使用 清單 1 中的代碼創建 Future[Int] 格式的事件,使之與 圖 1 中的 4 個任務相匹配。(此代碼來自示例代碼中的 AsyncHappy 類。)

清單 2. 示例任務的事件

// task definitions
def task1(input: Int) = TimedEvent.delayedSuccess(1, input + 1)
def task2(input: Int) = TimedEvent.delayedSuccess(2, input + 2)
def task3(input: Int) = TimedEvent.delayedSuccess(3, input + 3)
def task4(input: Int) = TimedEvent.delayedSuccess(1, input + 4)

清單 2 中 4 個任務方法中的每一個都為該任務的完成時刻使用了特定的延遲值:task1 為 1 秒,task2 為 2 秒,task3 為 3 秒,task4 重新變為 1 秒。每個任務還接受一個輸入值,是該輸入加上任務編號作為 future 的(最終)結果值。這些方法都使用了 future 的成功形式;稍后您會看到一些使用失敗形式的例子。

這些任務要求您按 圖 1 中所示的順序運行它們,向每個任務傳遞上一個任務返回的結果值(或者對於 task4,傳遞前兩個任務結果的和)。如果中間兩個任務同時執行,總的執行時間大約為 5 秒(1 秒 + (2 秒、3 秒中的最大值)+ 1 秒。如果 task1 的輸入為 1,那么結果為 2。如果該結果被傳遞給 task2 和 task3,那么結果將為 4 和 5。如果這兩個結果的和 (9) 被作為輸入傳遞給 task4,那么最終結果將為 13。

阻塞等待

在設定好操作環境之后,是時候來查看 Scala 如何處理事件的完成情況了。與上一期的 Java 代碼中一樣,協調 4 個任務的執行的最簡單的方法是使用阻塞等待:主要線程等待每個任務依次完成。清單 3(同樣來自示例代碼中的 AsyncHappy 類)給出了此方法。

清單 3. 阻塞等待任務執行

def runBlocking() = {
  val v1 = Await.result(task1(1), Duration.Inf)
  val future2 = task2(v1)
  val future3 = task3(v1)
  val v2 = Await.result(future2, Duration.Inf)
  val v3 = Await.result(future3, Duration.Inf)
  val v4 = Await.result(task4(v2 + v3), Duration.Inf)
  val result = Promise[Int]
  result.success(v4)
  result.future
}

清單 3 使用 Scala scala.concurrent.Await 對象的 result() 方法來完成阻塞等待。該代碼首先等待 task1 的結果,然后同時創建 task2 和 task3 future,並等待兩個任務依次返回 future,最后等待 task4 的結果。最后 3 行(創建和設置 result)使得該方法能夠返回一個 Future[Int]。返回該 future,讓此方法與我接下來展示的非阻塞形式一致,但該 future 將在該方法返回之前完成。

組合 future

清單 4(同樣來自示例代碼中的 AsyncHappy 類)展示了一種將 future 聯系在一起的方式,以便按正確順序並使用正確的依賴關系執行任務,而不使用阻塞。

清單 4. 使用 onSuccess() 處理事件的完成

def runOnSuccess() = {
  val result = Promise[Int]
  task1(1).onSuccess(v => v match {
    case v1 => {
      val a = task2(v1)
      val b = task3(v1)
      a.onSuccess(v => v match {
        case v2 =>
          b.onSuccess(v => v match {
            case v3 => task4(v2 + v3).onSuccess(v4 => v4 match {
              case x => result.success(x)
            })
          })
      })
    }
  })
  result.future
}

清單 4 代碼使用 onSuccess() 方法將一個函數(技術上講是一個部分函數,因為它僅處理成功完成的情況)設置為在每個 future 完成時返回。因為 onSuccess() 調用是嵌套式的,所以它們將按順序執行(即使 future 未完全按順序完成)。

清單 4 的代碼比較容易理解,但很冗長。清單 5 展示了一種使用 flatMap() 方法處理這種情況的更簡單的方法。

清單 5. 使用 flatMap() 處理事件的完成

def runFlatMap() = {
  task1(1) flatMap {v1 =>
    val a = task2(v1)
    val b = task3(v1)
    a flatMap { v2 =>
      b flatMap { v3 => task4(v2 + v3) }}
  }
}

清單 5 中的代碼實際上執行了與 清單 4 相同的事情,但 清單 5 使用了 flatMap() 方法從每個 future 中提取單一結果值。使用 flatMap() 消除了 清單 4 中所需的 match / case 結構,提供了一種更簡潔的格式,但采用了同樣的逐步執行路線。

試用示例

示例代碼使用了一個 Scala App 來依次運行事件代碼的每個版本,並確保完成事件(約 5 秒)和結果 (13) 是正確的。您可以使用 Maven 從命令行運行此代碼,如清單 6 所示(刪除了無關的 Maven 輸出):

清單 6. 運行事件代碼

dennis@linux-9qea:~/devworks/scala4/code> mvn scala:run -Dlauncher=happypath
...
[INFO] launcher 'happypath' selected => com.sosnoski.concur.article4.AsyncHappy
Starting runBlocking
runBlocking returned 13 in 5029 ms.
Starting runOnSuccess
runOnSuccess returned 13 in 5011 ms.
Starting runFlatMap
runFlatMap returned 13 in 5002 ms.

不順利的道路

目前為止,您看到了以 future 形式協調事件的代碼,這些代碼總是能夠成功完成。在真實應用程序中,不能寄希望於事情總是這么順利。處理任務過程中可能會出現問題,而且在 JVM 語言術語中,這些問題通常表示為 Throwable。

更改 清單 2 中的任務定義很容易,只需使用 delayedFailure() 代替 delayedSuccess() 方法,如這里的 task4 所示:

def task4(input: Int) = TimedEvent.delayedFailure(1, "This won't work!")

如果運行僅將 task4 修改為完成時拋出異常的 清單 3,那么您會得到 task4 上的 Await.result() 調用所拋出的預期的 IllegalArgumentException。如果在 runBlocking() 方法中沒有捕獲該問題,該異常會在調用鏈中一直傳遞,直到最終捕獲問題(如果未捕獲問題,則會終止線程)。幸運的是,修改該代碼很容易,因此,如果任何任務完成時拋出異常,該異常會通過返回的 future 傳遞給調用方來處理。清單 7 展示了這一更改。

清單 7. 具有異常的阻塞等待

def runBlocking() = {
  val result = Promise[Int]
  try {
    val v1 = Await.result(task1(1), Duration.Inf)
    val future2 = task2(v1)
    val future3 = task3(v1)
    val v2 = Await.result(future2, Duration.Inf)
    val v3 = Await.result(future3, Duration.Inf)
    val v4 = Await.result(task4(v2 + v3), Duration.Inf)
    result.success(v4)
  } catch {
    case t: Throwable => result.failure(t)
  }
  result.future
}

清單 7 非常淺顯易懂,最初的代碼包裝在一個 try/catch 中,catch 在返回的 future 完成時傳回異常。此方法稍微復雜一些,但任何 Scala 開發人員應該仍然很容易理解它。

那么,清單 4 和清單 5 中的事件處理代碼的非阻塞變形是怎樣的?從名稱可以看出,清單 4 中使用的 onSuccess() 方法僅 適用於 future 的成功完成類型。如果想要同時處理成功和失敗完成類型,則必須使用 onComplete() 方法,檢查哪種完成例行適用。清單 8 展示了此技術如何用在事件處理代碼中。

清單 8. 成功和失敗的 onComplete() 處理

def runOnComplete() = {
  val result = Promise[Int]
  task1(1).onComplete(v => v match {
    case Success(v1) => {
      val a = task2(v1)
      val b = task3(v1)
      a.onComplete(v => v match {
        case Success(v2) =>
          b.onComplete(v => v match {
            case Success(v3) => task4(v2 + v3).onComplete(v4 => v4 match {
              case Success(x) => result.success(x)
              case Failure(t) => result.failure(t)
            })
            case Failure(t) => result.failure(t)
          })
        case Failure(t) => result.failure(t)
      })
    }
    case Failure(t) => result.failure(t)
  })
  result.future
}

清單 8 看起來很凌亂,幸運的是還有一種簡單得多的替代方法:使用 清單 5 中的 flatMap() 代碼代替。flatMap() 方法同時處理成功和失敗完成類型,無需執行任何更改。

使用 async

最新的 Scala 版本包含在編譯期間使用宏 轉換代碼的能力。目前實現的一個最有用的宏是 async,它在編譯期間將使用 future 的看似順序的代碼轉換為異步代碼。清單 9 展示了 async 如何簡化本教程中使用的任務代碼。

清單 9. 結合使用 future 與 async {}

def runAsync(): Future[Int] = {
  async {
    val v1 = await(task1(1))
    val a = task2(v1)
    val b = task3(v1)
    await(task4(await(a) + await(b)))
  }
}

清單 9 中封裝的 async {...} 調用了 async 宏。此調用將該代碼塊聲明為異步執行的代碼,並在默認情況下異步執行它,然后返回一個 future 表示該代碼塊的執行結果。在該代碼塊中,await() 方法(實際上是該宏的一個關鍵字,而不是一個真正的方法)顯示了何處需要一個 future 的結果。async 宏在編譯期間修改了 Scala 程序的抽象語法樹 (AST),以便將該代碼塊轉換為使用回調的代碼,這大體相當於 清單 4 的代碼。

除了 async {...} 包裝器之外,清單 9 中的代碼還與 清單 3 中最初的阻塞代碼很相似。這主要是這個宏的成就,它抽象化了異步事件的所有復雜性,使它看起來像您在編寫簡單的線性代碼。在幕后,這涉及到大量復雜性。

async 內部原理

如果查看 Scala 編譯器從源代碼生成的類,就會看到一些具有類似 AsyncHappy$$anonfun$1.class 的名稱的內部類。從名稱可以猜到,這些類由編譯器為異步函數而生成(比如傳遞給 onSuccess() 或 flatMap() 方法的語句。)

使用 Scala 2.11.1 編譯器和 Async 0.9.2 實現,您還會看到一個名為 AsyncUnhappy$stateMachine$macro$1$1.class 的類。這是 async 宏生成的實際實現代碼,采用狀態機的形式來處理異步任務。清單 10 給出了此類的一個部分地方進行了反編譯(decompiled)的視圖。

清單 10. 反編譯后的 AsyncUnhappy$stateMachine$macro$1$1.class

public class AsyncUnhappy$stateMachine$macro$1$1
  implements Function1<Try<Object>, BoxedUnit>, Function0.mcV.sp
{
  private int state;
  private final Promise<Object> result;
  private int await$macro$3$macro$13;
  private int await$macro$7$macro$14;
  private int await$macro$5$macro$15;
  private int await$macro$11$macro$16;
  ...
  public void resume() {
    ...
  }
 
  public void apply(Try<Object> tr) {
    int i = this.state;
    switch (i) {
      default:
        throw new MatchError(BoxesRunTime.boxToInteger(i));
      case 3:
        if (tr.isFailure()) {
          result().complete(tr);
        } else {
          this.await$macro$11$macro$16 = BoxesRunTime.unboxToInt(tr.get());
          this.state = 4;
          resume();
        }
        break;
      case 2:
        if (tr.isFailure()) {
          result().complete(tr);
        } else {
          this.await$macro$7$macro$14 = BoxesRunTime.unboxToInt(tr.get());
          this.state = 3;
          resume();
        }
        break;
      case 1:
        if (tr.isFailure()) {
          result().complete(tr);
        } else {
          this.await$macro$5$macro$15 = BoxesRunTime.unboxToInt(tr.get());
          this.state = 2;
          resume();
        }
        break;
      case 0:
        if (tr.isFailure()) {
          result().complete(tr);
        } else {
          this.await$macro$3$macro$13 = BoxesRunTime.unboxToInt(tr.get());
          this.state = 1;
          resume();
        }
        break;
    }
  } 
  ...
}

清單 10 中的 apply() 方法處理實際的狀態更改,估算一個 future 的結果並將輸出狀態更改為匹配。輸入狀態會告訴該代碼正在估算哪個 future;每個狀態值對應於 async 代碼塊中一個特定的 future。從 清單 10 的部分代碼很難了解這一點,但查看其他一些字節碼,就可以看到狀態代碼是與任務匹配的,所以狀態 0 表示 task1 的結果符合預期,狀態 1 表示 task2 的結果符合預期,依此類推。

resume() 方法並未顯示在 清單 10 中,因為反編譯器無法確定如何將它轉換為 Java 代碼。我也不打算探討這個過程,但通過查看字節碼,可以確定 resume() 方法執行了與狀態代碼上的 Java switch 相似的工作。對於每個非最終狀態,resume() 執行適當的代碼段來設置下一個預期的 future,最終將 AsyncUnhappy$stateMachine$macro$1$1 實例設置為 future 的 onComplete() 方法的目標。對於最終狀態,resume() 將會設置結果值並履行對最終結果的承諾。

您實際上並不需要深入分析生成的代碼來理解 async(但它可能很有趣)。關於 async 工作原理的完整描述,請查閱 SIP-22 - Async 提案。

async 限制
由於 async 宏將代碼轉換為狀態機類的方式,該宏的使用有一些限制。最明顯的限制是,不能將 await() 嵌套在 async 代碼塊中的另一個對象或閉包內(包括一個函數定義)。也不能將 await() 嵌套在一個 try 或 catch 內。

除了這些使用限制之外,async 的最大問題是:在調試時,您同樣會體驗到一些通常與異步代碼有關的問題回調,在這種情況下,需要嘗試理解沒有反映明顯的代碼結構的調用堆棧。不幸的是,目前的調試器設計無法解決這些問題。這是 Scala 中一個新的工作區域(請參閱 反思調試器。)與此同時,您可以禁用 async 代碼塊的異步執行,讓調試變得更輕松(假設您嘗試修復的問題在按順序執行操作時仍然存在)。

最后,Scala 宏仍是一項我們正在開展的工作。async 有望在未來的版本中成為 Scala 語言的一個正式部分,但只有在 Scala 語言團隊對宏的工作方式感到滿意時,這種情況才會出現。到那時,無法確保 async 的格式不會發生改變。

結束語

一些處理異步事件的 Scala 方法與 Java 代碼存在很大的區別。借助 flatMap() 和 async 宏,Scala 提供了整潔而且容易理解的技術。async 特別有趣,您可以編寫看似正常的順序的代碼,但編譯的代碼會並發地執行。Scala 不是提供這種方法的惟一語言,但基於宏的實現為其他方法提供了極高的靈活性。

本文轉自:https://www.ibm.com/developerworks/cn/java/j-jvmc4/index.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM