上一節我們深入討論了ActorRef等相關的概念及其關系,但ActorRef和ActorPath的關系還需要再加以分析說明。其實還是官網說的比較清楚。
“A path in an actor system represents a “place” which might be occupied by a living actor. Initially (apart from system initialized actors) a path is empty. When actorOf()
is called it assigns an incarnation of the actor described by the passed Props
to the given path. An actor incarnation is identified by the path and a UID.”
“A restart only swaps the Actor
instance defined by the Props
but the incarnation and hence the UID remains the same. As long as the incarnation is same, you can keep using the same ActorRef
. Restart is handled by the Supervision Strategy of actor’s parent actor, and there is more discussion about what restart means.
The lifecycle of an incarnation ends when the actor is stopped. At that point the appropriate lifecycle events are called and watching actors are notified of the termination. After the incarnation is stopped, the path can be reused again by creating an actor with actorOf()
. In this case the name of the new incarnation will be the same as the previous one but the UIDs will differ. An actor can be stopped by the actor itself, another actor or the ActorSystem
”
“An ActorRef
always represents an incarnation (path and UID) not just a given path. Therefore if an actor is stopped and a new one with the same name is created an ActorRef
of the old incarnation will not point to the new one.”
我們總結一下官網的說明。開發者自定義的Actor通過actorOf創建的時候,都會分配一個UID,actor的路徑(層級關系)+UID唯一標志一個Actor實例,也就是所謂的ActorRef。
@tailrec final def newUid(): Int = { // Note that this uid is also used as hashCode in ActorRef, so be careful // to not break hashing if you change the way uid is generated val uid = ThreadLocalRandom.current.nextInt() if (uid == undefinedUid) newUid else uid }
上面是UID的生成方法,其實就是一個隨機數,這樣可以保證每次生成的UID不重復。從官網描述來看,這個uid就是ActorRef的hashCode值。
在Actor完整的生命周期過程中,也就是沒有被terminate,UID不會發生變化,即使Actor發生了restart。但如要注意理解此處的restart,是指actor處理消息時拋出了異常,被監督者處理並調用了restart方法。這與Actor先stop,再actorOf創建是截然不同的。actorOf用同樣的名字重新創建actor會導致Actor的UID發生變化,也就會導致ActorRef不會重新指向新創建的Actor,其實此時Actor的路徑(層級關系)是相同的。
final override def hashCode: Int = { if (path.uid == ActorCell.undefinedUid) path.hashCode else path.uid } /** * Equals takes path and the unique id of the actor cell into account. */ final override def equals(that: Any): Boolean = that match { case other: ActorRef ⇒ path.uid == other.path.uid && path == other.path case _ ⇒ false }
我們來看一下abstract class ActorRef對hasCode和equals的定義就大概明白,UID的具體作用了,跟我們分析的是一致的。那我們來看看ActorPath的equals是如何定義的。
override def equals(other: Any): Boolean = { @tailrec def rec(left: ActorPath, right: ActorPath): Boolean = if (left eq right) true else if (left.isInstanceOf[RootActorPath]) left equals right else if (right.isInstanceOf[RootActorPath]) right equals left else left.name == right.name && rec(left.parent, right.parent) other match { case p: ActorPath ⇒ rec(this, p) case _ ⇒ false } }
對上面的代碼簡單分析一下就會發下,ActorPath在計算是否相等時,除了判斷當前的hashCode是否相同外,就是在遞歸的判斷當前ActorPath的name是否相同,跟UID沒有關系,雖然在ActorPath的定義中也有uid值,且Actor的uid就是保存在ActorPath中,但該uid是一個內部變量,且只提供給ActorRef使用。
我們再來看看Actor的restart過程。
final def invoke(messageHandle: Envelope): Unit = { val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout] try { currentMessage = messageHandle if (influenceReceiveTimeout) cancelReceiveTimeout() messageHandle.message match { case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) case msg ⇒ receiveMessage(msg) } currentMessage = null // reset current message after successful invocation } catch handleNonFatalOrInterruptedException { e ⇒ handleInvokeFailure(Nil, e) } finally { if (influenceReceiveTimeout) checkReceiveTimeout // Reschedule receive timeout } }
相信invoke大家應該知道這是做什么的了吧,所有發送個mailbox的用戶消息都會通過調用invoke來處理。很明顯在receiveMessage發生異常的過程中,如果不是致命錯誤,就會去調用handleInvokeFailure處理。
final def handleInvokeFailure(childrenNotToSuspend: immutable.Iterable[ActorRef], t: Throwable): Unit = { // prevent any further messages to be processed until the actor has been restarted if (!isFailed) try { suspendNonRecursive() // suspend children val skip: Set[ActorRef] = currentMessage match { case Envelope(Failed(_, _, _), child) ⇒ { setFailed(child); Set(child) } case _ ⇒ { setFailed(self); Set.empty } } suspendChildren(exceptFor = skip ++ childrenNotToSuspend) t match { // tell supervisor case _: InterruptedException ⇒ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid)) case _ ⇒ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ parent.sendSystemMessage(Failed(self, t, uid)) } } catch handleNonFatalOrInterruptedException { e ⇒ publish(Error(e, self.path.toString, clazz(actor), "emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t))) try children foreach stop finally finishTerminate() } }
handleInvokeFailure我們也分析過,它給父actor發送了一個Failed消息,表明某個子actor發生了異常。
Failed屬於系統消息,會去調用invokeAll,很顯然調用了handleFailure處理異常。
final protected def handleFailure(f: Failed): Unit = { currentMessage = Envelope(f, f.child, system) getChildByRef(f.child) match { /* * only act upon the failure, if it comes from a currently known child; * the UID protects against reception of a Failed from a child which was * killed in preRestart and re-created in postRestart */ case Some(stats) if stats.uid == f.uid ⇒ if (!actor.supervisorStrategy.handleFailure(this, f.child, f.cause, stats, getAllChildStats)) throw f.cause case Some(stats) ⇒ publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from old child " + f.child + " (uid=" + stats.uid + " != " + f.uid + ")")) case None ⇒ publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from unknown child " + f.child)) } }
handleFailure中通過發生異常的Actor的ActorRef找到對應的Actor實例,然后去調用該Actor的監督策略對異常的處理方案,如果該actor無法處理該異常,則繼續throw。
def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { val directive = decider.applyOrElse(cause, escalateDefault) directive match { case Resume ⇒ logFailure(context, child, cause, directive) resumeChild(child, cause) true case Restart ⇒ logFailure(context, child, cause, directive) processFailure(context, true, child, cause, stats, children) true case Stop ⇒ logFailure(context, child, cause, directive) processFailure(context, false, child, cause, stats, children) true case Escalate ⇒ logFailure(context, child, cause, directive) false } }
通過當前監督策略來判斷如何處理異常,默認情況下,都是Restart,所以調用了processFailure方法。默認的監督策略一般是OneForOneStrategy
def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (restart && stats.requestRestartPermission(retriesWindow)) restartChild(child, cause, suspendFirst = false) else context.stop(child) //TODO optimization to drop child here already? }
上面是OneForOneStrategy的processFailure方法實現,就是去調用restarChild。
final def restartChild(child: ActorRef, cause: Throwable, suspendFirst: Boolean): Unit = { val c = child.asInstanceOf[InternalActorRef] if (suspendFirst) c.suspend() c.restart(cause) }
restarChild最終又調用了發生異常的Actor的restart方法,是通過ActorRef調用的。通過前面的分析我們知道,這個ActorRef最終是一個RepointableActorRef。
def restart(cause: Throwable): Unit = underlying.restart(cause)
上面是restart的定義,我們發現又去調用了underlying的restart,真是很繞啊。underlying是啥?當然是ActorRef引用的ActorCell了啊。但是我們翻了ActorCell的代碼並沒有發現restart的實現!但是我們卻在ActorCell混入的Dispatch中發現了restart的蹤影!
final def restart(cause: Throwable): Unit = try dispatcher.systemDispatch(this, Recreate(cause)) catch handleException
很簡單,就是使用dispatcher給當前的ActorCell發送了一個Recreate消息。通過前面invokeAll我們知道收到Recreate后調用了faultRecreate,這個函數我們也分析過,就是調用了原有Actor的aroundPreRestart函數,然后調用finishRecreate函數。
private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = { // need to keep a snapshot of the surviving children before the new actor instance creates new ones val survivors = children try { try resumeNonRecursive() finally clearFailed() // must happen in any case, so that failure is propagated val freshActor = newActor() actor = freshActor // this must happen before postRestart has a chance to fail if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields. freshActor.aroundPostRestart(cause) if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted")) // only after parent is up and running again do restart the children which were not stopped survivors foreach (child ⇒ try child.asInstanceOf[InternalActorRef].restart(cause) catch handleNonFatalOrInterruptedException { e ⇒ publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child)) }) } catch handleNonFatalOrInterruptedException { e ⇒ clearActorFields(actor, recreate = false) // in order to prevent preRestart() from happening again handleInvokeFailure(survivors, PostRestartException(self, e, cause)) } }
finishRecreate中調用newActor產生了一個新的Actor實例,調用了該實例的aroundPostRestart函數,最后如果可能則循環調用子actor的restart函數。
在actor的restart的工程中,我們發現沒有任何涉及ActorPath和ActorRef修改或更新的地方,更沒有uid變更的地方。這樣就意味着,Actor的restart過程中,ActorRef不會失效,ActorPath更不會失效。還記得actorOf的過程么,其中有一步調用了makeChild,里面調用newUid產生了一個新的uid值給ActorRef,所以Actor被stop掉,然后用actorOf重建之后,actorRef當然會失效了。
其實我們可以這樣簡單的理解,ActorRef = “ActorPathString” + UID。開發者自定義的Actor類是一個靜態的概念,當類通過actorOf創建的時候,就會產生一個Actor實例,如果該Actor由於某種原因失敗,被系統restart,系統會新生成一個Actor實例,但該實例的UID不變,所以ActorRef指向相同路徑下的actor實例。ActorPath標志Actor的樹形路徑,通過它可以找到這個路徑下的實例,但實例的UID是不是相同則不關心。