Akka源碼分析-深入ActorRef&ActorPath


  上一節我們深入討論了ActorRef等相關的概念及其關系,但ActorRef和ActorPath的關系還需要再加以分析說明。其實還是官網說的比較清楚。

“A path in an actor system represents a “place” which might be occupied by a living actor. Initially (apart from system initialized actors) a path is empty. When actorOf() is called it assigns an incarnation of the actor described by the passed Props to the given path. An actor incarnation is identified by the path and a UID.”

“A restart only swaps the Actor instance defined by the Props but the incarnation and hence the UID remains the same. As long as the incarnation is same, you can keep using the same ActorRef. Restart is handled by the Supervision Strategy of actor’s parent actor, and there is more discussion about what restart means.

The lifecycle of an incarnation ends when the actor is stopped. At that point the appropriate lifecycle events are called and watching actors are notified of the termination. After the incarnation is stopped, the path can be reused again by creating an actor with actorOf(). In this case the name of the new incarnation will be the same as the previous one but the UIDs will differ. An actor can be stopped by the actor itself, another actor or the ActorSystem

“An ActorRef always represents an incarnation (path and UID) not just a given path. Therefore if an actor is stopped and a new one with the same name is created an ActorRef of the old incarnation will not point to the new one.”

  我們總結一下官網的說明。開發者自定義的Actor通過actorOf創建的時候,都會分配一個UID,actor的路徑(層級關系)+UID唯一標志一個Actor實例,也就是所謂的ActorRef。

@tailrec final def newUid(): Int = {
    // Note that this uid is also used as hashCode in ActorRef, so be careful
    // to not break hashing if you change the way uid is generated
    val uid = ThreadLocalRandom.current.nextInt()
    if (uid == undefinedUid) newUid
    else uid
  }

  上面是UID的生成方法,其實就是一個隨機數,這樣可以保證每次生成的UID不重復。從官網描述來看,這個uid就是ActorRef的hashCode值。

  在Actor完整的生命周期過程中,也就是沒有被terminate,UID不會發生變化,即使Actor發生了restart。但如要注意理解此處的restart,是指actor處理消息時拋出了異常,被監督者處理並調用了restart方法。這與Actor先stop,再actorOf創建是截然不同的。actorOf用同樣的名字重新創建actor會導致Actor的UID發生變化,也就會導致ActorRef不會重新指向新創建的Actor,其實此時Actor的路徑(層級關系)是相同的。

  final override def hashCode: Int = {
    if (path.uid == ActorCell.undefinedUid) path.hashCode
    else path.uid
  }

  /**
   * Equals takes path and the unique id of the actor cell into account.
   */
  final override def equals(that: Any): Boolean = that match {
    case other: ActorRef ⇒ path.uid == other.path.uid && path == other.path
    case _               ⇒ false
  }

   我們來看一下abstract class ActorRef對hasCode和equals的定義就大概明白,UID的具體作用了,跟我們分析的是一致的。那我們來看看ActorPath的equals是如何定義的。

  override def equals(other: Any): Boolean = {
    @tailrec
    def rec(left: ActorPath, right: ActorPath): Boolean =
      if (left eq right) true
      else if (left.isInstanceOf[RootActorPath]) left equals right
      else if (right.isInstanceOf[RootActorPath]) right equals left
      else left.name == right.name && rec(left.parent, right.parent)

    other match {
      case p: ActorPath ⇒ rec(this, p)
      case _            ⇒ false
    }
  }

   對上面的代碼簡單分析一下就會發下,ActorPath在計算是否相等時,除了判斷當前的hashCode是否相同外,就是在遞歸的判斷當前ActorPath的name是否相同,跟UID沒有關系,雖然在ActorPath的定義中也有uid值,且Actor的uid就是保存在ActorPath中,但該uid是一個內部變量,且只提供給ActorRef使用。

  我們再來看看Actor的restart過程。

 final def invoke(messageHandle: Envelope): Unit = {
    val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout]
    try {
      currentMessage = messageHandle
      if (influenceReceiveTimeout)
        cancelReceiveTimeout()
      messageHandle.message match {
        case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle)
        case msg                      ⇒ receiveMessage(msg)
      }
      currentMessage = null // reset current message after successful invocation
    } catch handleNonFatalOrInterruptedException { e ⇒
      handleInvokeFailure(Nil, e)
    } finally {
      if (influenceReceiveTimeout)
        checkReceiveTimeout // Reschedule receive timeout
    }
  }

   相信invoke大家應該知道這是做什么的了吧,所有發送個mailbox的用戶消息都會通過調用invoke來處理。很明顯在receiveMessage發生異常的過程中,如果不是致命錯誤,就會去調用handleInvokeFailure處理。

final def handleInvokeFailure(childrenNotToSuspend: immutable.Iterable[ActorRef], t: Throwable): Unit = {
    // prevent any further messages to be processed until the actor has been restarted
    if (!isFailed) try {
      suspendNonRecursive()
      // suspend children
      val skip: Set[ActorRef] = currentMessage match {
        case Envelope(Failed(_, _, _), child) ⇒ { setFailed(child); Set(child) }
        case _                                ⇒ { setFailed(self); Set.empty }
      }
      suspendChildren(exceptFor = skip ++ childrenNotToSuspend)
      t match {
        // tell supervisor
        case _: InterruptedException ⇒
          // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
          parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid))
        case _ ⇒
          // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
          parent.sendSystemMessage(Failed(self, t, uid))
      }
    } catch handleNonFatalOrInterruptedException { e ⇒
      publish(Error(e, self.path.toString, clazz(actor),
        "emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t)))
      try children foreach stop
      finally finishTerminate()
    }
  }

   handleInvokeFailure我們也分析過,它給父actor發送了一個Failed消息,表明某個子actor發生了異常。

  Failed屬於系統消息,會去調用invokeAll,很顯然調用了handleFailure處理異常。

 final protected def handleFailure(f: Failed): Unit = {
    currentMessage = Envelope(f, f.child, system)
    getChildByRef(f.child) match {
      /*
       * only act upon the failure, if it comes from a currently known child;
       * the UID protects against reception of a Failed from a child which was
       * killed in preRestart and re-created in postRestart
       */
      case Some(stats) if stats.uid == f.uid ⇒
        if (!actor.supervisorStrategy.handleFailure(this, f.child, f.cause, stats, getAllChildStats)) throw f.cause
      case Some(stats) ⇒
        publish(Debug(self.path.toString, clazz(actor),
          "dropping Failed(" + f.cause + ") from old child " + f.child + " (uid=" + stats.uid + " != " + f.uid + ")"))
      case None ⇒
        publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from unknown child " + f.child))
    }
  }

   handleFailure中通過發生異常的Actor的ActorRef找到對應的Actor實例,然后去調用該Actor的監督策略對異常的處理方案,如果該actor無法處理該異常,則繼續throw。

  def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
    val directive = decider.applyOrElse(cause, escalateDefault)
    directive match {
      case Resume ⇒
        logFailure(context, child, cause, directive)
        resumeChild(child, cause)
        true
      case Restart ⇒
        logFailure(context, child, cause, directive)
        processFailure(context, true, child, cause, stats, children)
        true
      case Stop ⇒
        logFailure(context, child, cause, directive)
        processFailure(context, false, child, cause, stats, children)
        true
      case Escalate ⇒
        logFailure(context, child, cause, directive)
        false
    }
  }

  通過當前監督策略來判斷如何處理異常,默認情況下,都是Restart,所以調用了processFailure方法。默認的監督策略一般是OneForOneStrategy

def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
    if (restart && stats.requestRestartPermission(retriesWindow))
      restartChild(child, cause, suspendFirst = false)
    else
      context.stop(child) //TODO optimization to drop child here already?
  }

   上面是OneForOneStrategy的processFailure方法實現,就是去調用restarChild。

  final def restartChild(child: ActorRef, cause: Throwable, suspendFirst: Boolean): Unit = {
    val c = child.asInstanceOf[InternalActorRef]
    if (suspendFirst) c.suspend()
    c.restart(cause)
  }

   restarChild最終又調用了發生異常的Actor的restart方法,是通過ActorRef調用的。通過前面的分析我們知道,這個ActorRef最終是一個RepointableActorRef。

def restart(cause: Throwable): Unit = underlying.restart(cause)

   上面是restart的定義,我們發現又去調用了underlying的restart,真是很繞啊。underlying是啥?當然是ActorRef引用的ActorCell了啊。但是我們翻了ActorCell的代碼並沒有發現restart的實現!但是我們卻在ActorCell混入的Dispatch中發現了restart的蹤影!

final def restart(cause: Throwable): Unit = try dispatcher.systemDispatch(this, Recreate(cause)) catch handleException

   很簡單,就是使用dispatcher給當前的ActorCell發送了一個Recreate消息。通過前面invokeAll我們知道收到Recreate后調用了faultRecreate,這個函數我們也分析過,就是調用了原有Actor的aroundPreRestart函數,然后調用finishRecreate函數。

  private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = {
    // need to keep a snapshot of the surviving children before the new actor instance creates new ones
    val survivors = children

    try {
      try resumeNonRecursive()
      finally clearFailed() // must happen in any case, so that failure is propagated

      val freshActor = newActor()
      actor = freshActor // this must happen before postRestart has a chance to fail
      if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.

      freshActor.aroundPostRestart(cause)
      if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted"))

      // only after parent is up and running again do restart the children which were not stopped
      survivors foreach (child ⇒
        try child.asInstanceOf[InternalActorRef].restart(cause)
        catch handleNonFatalOrInterruptedException { e ⇒
          publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child))
        })
    } catch handleNonFatalOrInterruptedException { e ⇒
      clearActorFields(actor, recreate = false) // in order to prevent preRestart() from happening again
      handleInvokeFailure(survivors, PostRestartException(self, e, cause))
    }
  }

   finishRecreate中調用newActor產生了一個新的Actor實例,調用了該實例的aroundPostRestart函數,最后如果可能則循環調用子actor的restart函數。

  在actor的restart的工程中,我們發現沒有任何涉及ActorPath和ActorRef修改或更新的地方,更沒有uid變更的地方。這樣就意味着,Actor的restart過程中,ActorRef不會失效,ActorPath更不會失效。還記得actorOf的過程么,其中有一步調用了makeChild,里面調用newUid產生了一個新的uid值給ActorRef,所以Actor被stop掉,然后用actorOf重建之后,actorRef當然會失效了。

  其實我們可以這樣簡單的理解,ActorRef = “ActorPathString” + UID。開發者自定義的Actor類是一個靜態的概念,當類通過actorOf創建的時候,就會產生一個Actor實例,如果該Actor由於某種原因失敗,被系統restart,系統會新生成一個Actor實例,但該實例的UID不變,所以ActorRef指向相同路徑下的actor實例。ActorPath標志Actor的樹形路徑,通過它可以找到這個路徑下的實例,但實例的UID是不是相同則不關心。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM