Akka源碼分析-ask模式


  在我之前的博文中,已經介紹過要慎用Actor的ask。這里我們要分析一下ask的源碼,看看它究竟是怎么實現的。

  開發時,如果要使用ask方法,必須要引入akka.pattern._,這樣才能使用ask(或者?)方法,那么想必ask是在akka.pattern._對應的包里面實現的。

/*
 * Implementation class of the “ask” pattern enrichment of ActorRef
 */
final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {

  /**
   * INTERNAL API: for binary compatibility
   */
  protected def ask(message: Any, timeout: Timeout): Future[Any] =
    internalAsk(message, timeout, ActorRef.noSender)

  def ask(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] =
    internalAsk(message, timeout, sender)

  /**
   * INTERNAL API: for binary compatibility
   */
  protected def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
    internalAsk(message, timeout, ActorRef.noSender)

  def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] =
    internalAsk(message, timeout, sender)

  /**
   * INTERNAL API: for binary compatibility
   */
  private[pattern] def internalAsk(message: Any, timeout: Timeout, sender: ActorRef) = actorRef match {
    case ref: InternalActorRef if ref.isTerminated ⇒
      actorRef ! message
      Future.failed[Any](new AskTimeoutException(s"""Recipient[$actorRef] had already been terminated. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
    case ref: InternalActorRef ⇒
      if (timeout.duration.length <= 0)
        Future.failed[Any](new IllegalArgumentException(s"""Timeout length must be positive, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
      else {
        val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, sender)
        actorRef.tell(message, a)
        a.result.future
      }
    case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
  }

}

   上面是通過定位ask(或?)找到的實現源碼,我們發現,這是一個隱式轉換,在akka.pattern.AskSupport中我們找到了隱式轉換對應的函數。

implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef(actorRef)

   通過AskableActorRef源碼我們知道最終調用了internalAsk函數,該函數有三個參數:待發送的消息、超時時間、消息發送者。函數創建了一個PromiseActorRef,又把消息原樣的通過原actor的tell函數發送給了原actor,然后用新創建的PromiseActorRef作為新的sender傳遞,再用PromiseActorRef的result.future作為最終的Future返回。下面我們來看一下PromiseActorRef的創建過程。

def apply(provider: ActorRefProvider, timeout: Timeout, targetName: Any, messageClassName: String,
            sender: ActorRef = Actor.noSender, onTimeout: String ⇒ Throwable = defaultOnTimeout): PromiseActorRef = {
    val result = Promise[Any]()
    val scheduler = provider.guardian.underlying.system.scheduler
    val a = new PromiseActorRef(provider, result, messageClassName)
    implicit val ec = a.internalCallingThreadExecutionContext
    val f = scheduler.scheduleOnce(timeout.duration) {
      result tryComplete Failure(
        onTimeout(s"""Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]. Sender[$sender] sent message of type "${a.messageClassName}"."""))
    }
    result.future onComplete { _ ⇒ try a.stop() finally f.cancel() }
    a
  }

   很明顯,PromiseActorRef持有了一個Promise[Any],但是上面的代碼只顯示了在超時的時候通過onTimeout賦了值,並沒有不超時賦值的邏輯,且Promise[Any]一旦完成就調用PromiseActorRef的stop方法和cancel方法。那么成功時賦值的邏輯應該在哪里呢?

  如果你對ask的使用方式比較熟悉的話,一定會找出其中的端倪的。我們來梳理一下這其中的使用細節。其實,在ask的使用與tell,並沒有太大的區別,至少對於server端的Actor來說沒有任何區別,都是正常的接收消息,然后處理,最后通過sender把消息使用tell返回。在ask時,消息的sender是什么,就是PromiseActorRef啊,那PromiseActorRef的!方法具體怎么實現的呢?

override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = state match {
    case Stopped | _: StoppedWithPath ⇒ provider.deadLetters ! message
    case _ ⇒
      if (message == null) throw InvalidMessageException("Message is null")
      if (!(result.tryComplete(
        message match {
          case Status.Success(r) ⇒ Success(r)
          case Status.Failure(f) ⇒ Failure(f)
          case other             ⇒ Success(other)
        }))) provider.deadLetters ! message
  }

   很明顯,接收消息的Actor會通過!返回對應的消息,消息的處理一般會命中 case other,這其實就是給result賦值,在超時之前的賦值。如果在!方法內部給result賦值的時候,剛好已經超時或已經賦過值,會把返回的消息發送給deadLetters。

  其實result,也就是Promise[Any]的賦值邏輯已經解釋清楚。不過如果小伙伴對Promise不熟悉的話,此處還是有點難理解的。如果說Future是一個只讀的,值還沒計算的占位符。那么Promise就是一個可寫的,單次指派的容器,也就是說Promise一旦賦值,就無法再次賦值,且與之關聯的future也就計算完畢,返回的值就是固定的。當然了通過ask(也就是?)返回的還只是一個future,如果要取出future最終的值,還是需要Await.ready等語義來支持的,這里就不再詳細解釋了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM