Akka(3): Actor監管 - 細述BackoffSupervisor


    在上一篇討論中我們談到了監管:在Akka中就是一種直屬父子監管樹結構,父級Actor負責處理直屬子級Actor產生的異常。當時我們把BackoffSupervisor作為父子監管方式的其中一種。實際上BackoffSupervisor與定義了supervisorStrategy的Actor有所不同。我們應該把BackoffSupervisor看作是一個一體化的Actor。當然,它的實現方式還是由一對父子Actor組成。監管策略(SupervisorStrategy)是在BackoffSupervisor的內部實現的。從外表上BackoffSupervisor就像是一個Actor,運算邏輯是在子級Actor中定義的,所謂的父級Actor除監管之外沒有任何其它功能,我們甚至沒有地方定義父級Actor的功能,它的唯一功能是轉發收到的信息給子級,是嵌入BackoffSupervisor里的。所以我們雖然發送消息給BackoffSupervisor,但實際上是在與它的子級交流。我們看看下面這個例子:

package backoffSupervisorDemo import akka.actor._ import akka.pattern._ import backoffSupervisorDemo.InnerChild.TestMessage import scala.concurrent.duration._ object InnerChild { case class TestMessage(msg: String) class ChildException extends Exception def props = Props[InnerChild] } class InnerChild extends Actor with ActorLogging { import InnerChild._ override def receive: Receive = { case TestMessage(msg) => //模擬子級功能
      log.info(s"Child received message: ${msg}") } } object Supervisor { def props: Props = { //在這里定義了監管策略和child Actor構建
    def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = { case _: InnerChild.ChildException => SupervisorStrategy.Restart } val options = Backoff.onFailure(InnerChild.props, "innerChild", 1 second, 5 seconds, 0.0) .withManualReset .withSupervisorStrategy( OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)( decider.orElse(SupervisorStrategy.defaultDecider) ) ) BackoffSupervisor.props(options) } } //注意:下面是Supervisor的父級,不是InnerChild的父級
object ParentalActor { case class SendToSupervisor(msg: InnerChild.TestMessage) case class SendToInnerChild(msg: InnerChild.TestMessage) case class SendToChildSelection(msg: InnerChild.TestMessage) def props = Props[ParentalActor] } class ParentalActor extends Actor with ActorLogging { import ParentalActor._ //在這里構建子級Actor supervisor
  val supervisor = context.actorOf(Supervisor.props,"supervisor") supervisor ! BackoffSupervisor.getCurrentChild //要求supervisor返回當前子級Actor
  var innerChild: Option[ActorRef] = None   //返回的當前子級ActorRef
  val selectedChild = context.actorSelection("/user/parent/supervisor/innerChild") override def receive: Receive = { case BackoffSupervisor.CurrentChild(ref) =>   //收到子級Actor信息
      innerChild = ref
    case SendToSupervisor(msg) => supervisor ! msg case SendToChildSelection(msg) => selectedChild ! msg case SendToInnerChild(msg) => innerChild foreach(child => child ! msg) } } object BackoffSupervisorDemo extends App { import ParentalActor._ val testSystem = ActorSystem("testSystem") val parent = testSystem.actorOf(ParentalActor.props,"parent") Thread.sleep(1000)   //wait for BackoffSupervisor.CurrentChild(ref) received
 parent ! SendToSupervisor(TestMessage("Hello message 1 to supervisor")) parent ! SendToInnerChild(TestMessage("Hello message 2 to innerChild")) parent ! SendToChildSelection(TestMessage("Hello message 3 to selectedChild")) scala.io.StdIn.readLine() testSystem.terminate() }

在上面的例子里我們分別向supervisor,innerChild,selectedChild發送消息。但所有消息都是由InnerChild響應的,如下:

[INFO] [05/29/2017 16:11:48.167] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 1 to supervisor
[INFO] [05/29/2017 16:11:48.177] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 2 to innerChild
[INFO] [05/29/2017 16:11:48.179] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 3 to selectedChild

上面我們向supervisor發送了一個BackoffSupervisor.GetCurrentChild消息用來獲取子級Actor。BackoffSupervisor是這樣處理下面幾個特殊消息的:

private[akka] trait HandleBackoff { this: Actor ⇒ def childProps: Props def childName: String def reset: BackoffReset var child: Option[ActorRef] = None var restartCount = 0 import BackoffSupervisor._ import context.dispatcher override def preStart(): Unit = startChild() def startChild(): Unit = { if (child.isEmpty) { child = Some(context.watch(context.actorOf(childProps, childName))) } } def handleBackoff: Receive = { case StartChild ⇒ startChild() reset match { case AutoReset(resetBackoff) ⇒ val _ = context.system.scheduler.scheduleOnce(resetBackoff, self, ResetRestartCount(restartCount)) case _ ⇒ // ignore
 } case Reset ⇒ reset match { case ManualReset ⇒ restartCount = 0
        case msg ⇒ unhandled(msg) } case ResetRestartCount(current) ⇒ if (current == restartCount) { restartCount = 0 } case GetRestartCount ⇒ sender() ! RestartCount(restartCount) case GetCurrentChild ⇒ sender() ! CurrentChild(child) case msg if child.contains(sender()) ⇒ // use the BackoffSupervisor as sender
      context.parent ! msg case msg ⇒ child match { case Some(c) ⇒ c.forward(msg) case None ⇒ context.system.deadLetters.forward(msg) } } }

在handleBackoff函數里可以找到這些消息的處理方式。

在構建上面例子里的Supervisor的Props時定義了監管策略(SupervisorStrategy)對InnerChild產生的異常ChildException進行Restart處理。我們調整一下InnerChild代碼來隨機產生一些異常:

object InnerChild { case class TestMessage(msg: String) class ChildException(val errmsg: TestMessage) extends Exception object CException {  //for pattern match of class with parameter
    def apply(msg: TestMessage) = new ChildException(msg) def unapply(cex: ChildException) = Some(cex.errmsg) } def props = Props[InnerChild] } class InnerChild extends Actor with ActorLogging { import InnerChild._ context.parent ! BackoffSupervisor.Reset  //reset backoff counts
  override def receive: Receive = { case TestMessage(msg) => //模擬子級功能
      if (Random.nextBoolean())   //任意產生異常
        throw new ChildException(TestMessage(msg)) else log.info(s"Child received message: ${msg}") } }

我們用Random.nextBoolean來任意產生一些異常。注意:我們同時把ChildException改成了一個帶參數的class,因為我們可能需要在重啟之前獲取造成異常的消息,如下:

    def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = { case InnerChild.CException(tmsg) => println(s"Message causing exception: ${tmsg.msg}") //we can extract message here
 SupervisorStrategy.Restart }

所有信息發給supervisor就行了:

class ParentalActor extends Actor with ActorLogging { import ParentalActor._ //在這里構建子級Actor supervisor
  val supervisor = context.actorOf(Supervisor.props,"supervisor") override def receive: Receive = { case msg@ _ => supervisor ! msg } } object BackoffSupervisorDemo extends App { import ParentalActor._ import InnerChild._ val testSystem = ActorSystem("testSystem") val parent = testSystem.actorOf(ParentalActor.props,"parent") parent ! TestMessage("Hello message 1 to supervisor") parent ! TestMessage("Hello message 2 to supervisor") parent ! TestMessage("Hello message 3 to supervisor") parent ! TestMessage("Hello message 4 to supervisor") parent ! TestMessage("Hello message 5 to supervisor") parent ! TestMessage("Hello message 6 to supervisor") scala.io.StdIn.readLine() testSystem.terminate() }

運行后發現在出現異常后所有消息都變成了DeadLetter:

[INFO] [05/29/2017 18:22:11.689] [testSystem-akka.actor.default-dispatcher-5] [akka://testSystem/user/parent/supervisor/innerChild] Message [backoffSupervisorDemo.InnerChild$TestMessage] from Actor[akka://testSystem/user/parent#2140150413] to Actor[akka://testSystem/user/parent/supervisor/innerChild#-1047097634] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
....

這也證明了BackoffSupervisor具有不同的Restart處理方式,好像是直接終止InnerChild而非正常的掛起,銷毀了ActorRef和郵箱,所以在完成啟動之前發給InnerChild的消息都被導入DeadLetter隊列了。也就是說不但錯過造成異常的消息,而是跳過了下面啟動時間段內所有的消息。

下面我們來解決失蹤消息的問題:首先是如何重新發送造成異常的消息,我們可以在監管策略中重啟前發送:

    def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = { case InnerChild.CException(tmsg) => println(s"Message causing exception: ${tmsg.msg}") //we can extract message here
        BackoffSupervisorDemo.sendToParent(tmsg)  //resend message 
 SupervisorStrategy.Restart }

在BackoffSupervisorDemo里先聲明sendToParent函數:

  def sendToParent(msg: TestMessage) = parent ! msg

然后再想辦法把DeadLetter撈出來。我們可以用Akka的eventStream來訂閱DeadLetter類型消息:

object DeadLetterMonitor { def props(parentRef: ActorRef) = Props(new DeadLetterMonitor(parentRef)) } class DeadLetterMonitor(receiver: ActorRef) extends Actor with ActorLogging { import InnerChild._ import context.dispatcher override def receive: Receive = { case DeadLetter(msg,sender,_) =>
      //wait till InnerChild finishes restart then resend
      context.system.scheduler.scheduleOnce(1 second,receiver,msg.asInstanceOf[TestMessage]) } } object BackoffSupervisorDemo extends App { import ParentalActor._ import InnerChild._ def sendToParent(msg: TestMessage) = parent ! msg val testSystem = ActorSystem("testSystem") val parent = testSystem.actorOf(ParentalActor.props,"parent") val deadLetterMonitor = testSystem.actorOf(DeadLetterMonitor.props(parent),"dlmonitor") testSystem.eventStream.subscribe(deadLetterMonitor,classOf[DeadLetter]) //listen to DeadLetter
 parent ! TestMessage("Hello message 1 to supervisor") parent ! TestMessage("Hello message 2 to supervisor") parent ! TestMessage("Hello message 3 to supervisor") parent ! TestMessage("Hello message 4 to supervisor") parent ! TestMessage("Hello message 5 to supervisor") parent ! TestMessage("Hello message 6 to supervisor") scala.io.StdIn.readLine() testSystem.terminate() }

試運算后顯示InnerChild成功處理了所有6條消息。

下面是本次討論的完整示范代碼:

package backoffSupervisorDemo import akka.actor._ import akka.pattern._ import scala.util.Random import scala.concurrent.duration._ object InnerChild { case class TestMessage(msg: String) class ChildException(val errmsg: TestMessage) extends Exception object CException {  //for pattern match of class with parameter
    def apply(msg: TestMessage) = new ChildException(msg) def unapply(cex: ChildException) = Some(cex.errmsg) } def props = Props[InnerChild] } class InnerChild extends Actor with ActorLogging { import InnerChild._ context.parent ! BackoffSupervisor.Reset  //reset backoff counts
  override def receive: Receive = { case TestMessage(msg) => //模擬子級功能
      if (Random.nextBoolean())   //任意產生異常
        throw new ChildException(TestMessage(msg)) else log.info(s"Child received message: ${msg}") } } object Supervisor { def props: Props = { //在這里定義了監管策略和child Actor構建
    def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = { case InnerChild.CException(tmsg) => println(s"Message causing exception: ${tmsg.msg}") //we can extract message here
        BackoffSupervisorDemo.sendToParent(tmsg)  //resend message
 SupervisorStrategy.Restart } val options = Backoff.onFailure(InnerChild.props, "innerChild", 1 second, 5 seconds, 0.0) .withManualReset .withSupervisorStrategy( OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)( decider.orElse(SupervisorStrategy.defaultDecider) ) ) BackoffSupervisor.props(options) } } //注意:下面是Supervisor的父級,不是InnerChild的父級
object ParentalActor { case class SendToSupervisor(msg: InnerChild.TestMessage) case class SendToInnerChild(msg: InnerChild.TestMessage) case class SendToChildSelection(msg: InnerChild.TestMessage) def props = Props[ParentalActor] } class ParentalActor extends Actor with ActorLogging { import ParentalActor._ //在這里構建子級Actor supervisor
  val supervisor = context.actorOf(Supervisor.props,"supervisor") override def receive: Receive = { case msg@ _ => supervisor ! msg } } object DeadLetterMonitor { def props(parentRef: ActorRef) = Props(new DeadLetterMonitor(parentRef)) } class DeadLetterMonitor(receiver: ActorRef) extends Actor with ActorLogging { import InnerChild._ import context.dispatcher override def receive: Receive = { case DeadLetter(msg,sender,_) =>
      //wait till InnerChild finishes restart then resend
      context.system.scheduler.scheduleOnce(1 second,receiver,msg.asInstanceOf[TestMessage]) } } object BackoffSupervisorDemo extends App { import ParentalActor._ import InnerChild._ def sendToParent(msg: TestMessage) = parent ! msg val testSystem = ActorSystem("testSystem") val parent = testSystem.actorOf(ParentalActor.props,"parent") val deadLetterMonitor = testSystem.actorOf(DeadLetterMonitor.props(parent),"dlmonitor") testSystem.eventStream.subscribe(deadLetterMonitor,classOf[DeadLetter]) //listen to DeadLetter
 parent ! TestMessage("Hello message 1 to supervisor") parent ! TestMessage("Hello message 2 to supervisor") parent ! TestMessage("Hello message 3 to supervisor") parent ! TestMessage("Hello message 4 to supervisor") parent ! TestMessage("Hello message 5 to supervisor") parent ! TestMessage("Hello message 6 to supervisor") scala.io.StdIn.readLine() testSystem.terminate() }

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM