Akka(2):Actor生命周期管理 - 監控和監視


  在開始討論Akka中對Actor的生命周期管理前,我們先探討一下所謂的Actor編程模式。對比起我們習慣的行令式(imperative)編程模式,Actor編程模式更接近現實中的應用場景和功能測試模式。這是因為Actor是靠消息來驅動的,每種消息代表一項功能的運算指令。由於消息驅動式的程序是松散耦合的,每項功能都是在獨立的線程中運算,互不干擾依賴,所以我們可以很自然的分開來實現各項功能以及獨立測試每項功能。雖然Akka同時提供了Java和Scala兩種API,但可能由於Akka本身是用Scala開發的,所以感覺用Scala來開發Akka程序會更自然些:籠統來講,Actor編程主要就是對receive函數的實現。而receive函數就是幾個普通的功能函數用模式匹配的方式按消息類型進行調用。receive函數所調用的功能函數可以是任何JVM兼容語言函數,由於每個Actor的運算都在自己獨立的線程里進行,所以我們不必擔心Actor函數在運行中的交叉調用問題。Akka程序本就是一種原生的多線程程序,每個Actor都在一個自己的線程內獨立運算它的receive函數。除此之外Actor的運算環境可以在任何不同的JVM里,只要Akka信息發送能實現跨JVM投遞的話,實現分布式程序也是自然而然的事了。所以,理論上Akka編程初學者應該把主要注意力放在這個receive函數的實現上來,按照一種模版式的方式來編寫Akka程序就可以了,如下面演示的這個模版例子:

import akka.actor._ object MyActor { //在這個伴生對象里申明MyActor所支持的功能指令
  sealed trait ActorCommands case object RunFuncA extends ActorCommands case object RunFuncB extends ActorCommands } //假設有funcA,funcB. 它們可以從任何JVM函數庫里調用
val funcA : () => Any = ??? val funcB : () => Any = ???
class MyActor extends Actor { import MyActor._ var stateValue: Any = _    //內部狀態,代表這個Actor的當前運算結果
  override def receive: Receive = { case RunFuncA => stateValue = funcA   //運算funcA,更新stateValue
    case RunFuncB => stateValue = funcB    //運算funcB,更新stateValue
 ... } }

以上是一個Actor需要實現的功能編程樣板。可以說其它Akka編程部分也都不過是標准的鋪墊代碼而已。在我來看就是把原來的一個完整程序按功能(應該是按程序狀態)切分開來並按上面的模板套入各種Actor的receive函數里。想想看,如此這般我們就可以實現一個分布式的多線程程序了。

行令式(imperative)程序的運算流程是按代碼順序進行的。這種流程模式不方便運算流程控制,這個缺點在進行異常處理時更加明顯。對於一段我們認為在運算中可能發生異常的代碼,我們只能用try-catch來把這段代碼包裹起來。那么對於一個安全考慮的比較詳細的程序來講就會出現許多try-catch代碼段混合在運算流程里造成整體程序邏輯紊亂,不利對程序的理解和維護。再試想如果容許重試異常運算的話會是怎樣的一個場景。而這個問題在Akka編程中得到了完美的解決。在Akka編程里我們可以把每段可能產生異常的代碼放到一個獨立的Actor中去運算。Akka的Actor組織是一個層級結構。下層Actor是由直接上一層Actor產生,形成一種父子Actor關系。父級Actor除維護自身狀態之外還必須負責處理下一層子級Actor所發生的異常,形成一種樹形父子層級監管結構。任何子級Actor在運算中發生異常后立即將自己和自己的子級Actor運算掛起,並將下一步行動交付給自己的父級Actor決定。父級Actor對發生異常的子級Actor有以下幾種處理方式:

1、恢復運算(Resume):不必理會異常,保留當前狀態,跳過當前異常消息,照常繼續處理其它消息

2、重新啟動(Restart):清除當前狀態,保留郵箱及內容,終止當前Actor,再重新構建一個新的Actor實例,沿用原來的消息地址ActorRef繼續工作

3、徹底終止(Stop):銷毀當前Actor及ActorRef郵箱,把所有消息導向DeadLetter隊列。

4、向上提交(Esculate):如果父級無法處理子級異常,則這種情況也視為父級出現的異常。按照規定,父級會將自己和子級Actor運算暫停掛起並把子級Actor實際產生的異常當作自己發生的異常提交給上一層父級處理(也就是說異常信息的發送者sender變成了父級Actor)。

Akka處理異常的方式簡單直接:如果發生異常就先暫停掛起然后交給直屬父級Actor去處理。這就把異常封閉在這個Actor的監管鏈條里。Akka系統的監管鏈條實際代表一個功能的分散封閉運算,所以一個監管鏈條里發生的異常不會影響其它監管鏈條。換句話說就是Actor發生異常是封閉在它所屬的功能內部的,一個功能發生異常不會影響其它功能。而在行令式程序中,如果沒有try-catch,任何一段產生異常的代碼都會導致整個程序中斷。

Akka提供了OneForOneStrategy和AllForOneStrategy兩種對待異常Actor的策略配置,策略中定義了對下屬子級發生的各種異常的處理方式。異常處理策略是以策略施用對象分類的,如下:

OneForOneStrategy:只針對發生異常的Actor施用策略

AllForOneStrategy:雖然一個直屬子級Actor發生了異常,監管父級Actor把它當作所有下屬子級同時發生了相同異常,對所有子級Actor施用策略

下面是一個典型的策略例子:

OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException      => Resume case _: SomeMinerExecption       => Resume case _: NullPointerException     => Restart case _: IllegalArgumentException => Stop case _: Exception                => Escalate }

Akka對待這種父子監管的原則保證了在Akka系統中不會出現任何孤兒,也就是說保證不會出現斷裂的監管樹。這就要求當任何一個Actor在暫停掛起前都要保證先暫停掛起它的所有直屬子級Actor,而子級則必須先暫停掛起它們的直屬子級,如此遞歸。同樣,任何Actor在重啟(Restart)時也必須遞歸式地重啟直屬子級,因為重啟一個Actor需要先停止再啟動,我們必須肯定在停止時不會產生孤兒Actor。如果一個父級Actor無法處理子級異常需要向上提交(Esculate)的話,首先它需要采取遞歸方式來暫停掛起自身以下的監管鏈條。它的直屬父級Actor會按自己的異常處理策略來對待提交上來的異常,處理的結果將會遞歸式沿着監管樹影響屬下的所有子子孫孫。但如果這個級別的Actor異常處理策略還是無法覆蓋這個異常時,它又會掛起自己,再向上提交(Esculate)。那么如果到達了頂級Actor又如何向上提交呢?Akka系統最終的異常處理策略可以在config文件里配置:

# The guardian "/user" will use this class to obtain its supervisorStrategy. # It needs to be a subclass of akka.actor.SupervisorStrategyConfigurator. # In addition to the default there is akka.actor.StoppingSupervisorStrategy. guardian-supervisor-strategy = "akka.actor.DefaultSupervisorStrategy"

默認策略是DefaultSupervisorStrategy。以下是Akka提供的默認策略:

/** * When supervisorStrategy is not specified for an actor this * `Decider` is used by default in the supervisor strategy. * The child will be stopped when [[akka.actor.ActorInitializationException]], * [[akka.actor.ActorKilledException]], or [[akka.actor.DeathPactException]] is * thrown. It will be restarted for other `Exception` types. * The error is escalated if it's a `Throwable`, i.e. `Error`. */ final val defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: DeathPactException ⇒ Stop case _: Exception ⇒ Restart } /** * When supervisorStrategy is not specified for an actor this * is used by default. OneForOneStrategy with decider defined in * [[#defaultDecider]]. */ f inal val defaultStrategy: SupervisorStrategy = { OneForOneStrategy()(defaultDecider) }  

我們看到前面三種異常直屬父級直接終止子級Actor,其它類型重啟。當然我們可以在這個默認策略之上再添加自定義的一些異常處理策略:

override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException => Resume case _: MyException => Restart case t => super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate) }

上面提到Akka絕對不容許有孤兒Actor存在(斷裂的監管樹),所以停止任何一個Actor,它下屬的子子孫孫都會自下而上依次停止運算。為了更好的理解Actor的監管策略,我們必須先從了解Actor的生命周期(lift-cycle)開始。一個Actor從構建產生ActorRef開始到徹底終止為整個生命周期。其中可以發生多次重啟(Restart)。我們在下面對Actor的開始、終止、重啟這三個環節中發生的事件進行描述:

1、開始

   當Akka通過Props構建一個Actor后,這個Actor可以立即開始處理消息,進入開始(started)狀態。Akka提供了針對開始狀態的事件接口(event hooks)preStart如下:

/** * User overridable callback. * <p/> * Is called when an Actor is started. * Actors are automatically started asynchronously when created. * Empty default implementation. */ @throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest //#lifecycle-hooks
  def preStart(): Unit = ()

我們可以重載preStart在Actor開始處理消息前進行一些初始化准備工作,如下:

  override def preStart={ log.info ("Starting storage actor...") initDB }

2、終止

   一個Actor可能因為完成運算、發生異常又或者人為通過發送Kill,PoisonPill強行終止等而進入停止(stopping)狀態。在停止過程中這個Actor會先以遞歸方式停止它屬下的所有子孫Actor然后停止處理消息並將所有發給它的消息導向DeadLetter隊列。Akka提供了事件接口postStop:

/** * User overridable callback. * <p/> * Is called asynchronously after 'actor.stop()' is invoked. * Empty default implementation. */ @throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest //#lifecycle-hooks
  def postStop(): Unit = ()

我們可以重載postStop來進行一些事后清理工作:

  override def postStop={ log.info ("Stopping storage actor...") db.release }

3、重啟

   重啟是Actor生命周期里一個最重要的環節。在一個Actor的生命周期里可能因為多種原因發生重啟(Restart)。造成一個Actor需要重啟的原因可能有下面幾個:

1、在處理某特定消息時造成了系統性的異常,必須通過重啟來清理系統錯誤

2、內部狀態毀壞,必須通過重啟來重新構建狀態

3、在處理消息時無法使用到一些依賴資源,需要重啟來重新配置資源

重啟是一個先停止再開始的過程。父級Actor通過遞歸方式先停止下面的子孫Actor,那么在啟動過程中這些停止的子孫Actor是否會自動構建呢?這里需要特別注意:因為父級Actor是通過Props重新構建的,如果子級Actor的構建是在父級Actor的類構建器內而不是在消息處理函數內構建的,那么子級Actor會自動構建。Akka提供了preRestart和postRestart兩個事件接口。preRestart發生在停止之前,postRestart發生在開始前,如下:

  /** * Scala API: User overridable callback: '''By default it disposes of all children and then calls `postStop()`.''' * @param reason the Throwable that caused the restart to happen * @param message optionally the current message the actor processed when failing, if applicable * <p/> * Is called on a crashed Actor right BEFORE it is restarted to allow clean * up of resources before Actor is terminated. */ @throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest //#lifecycle-hooks
  def preRestart(reason: Throwable, message: Option[Any]): Unit = { context.children foreach { child ⇒ context.unwatch(child) context.stop(child) } postStop() } //#lifecycle-hooks

  /** * User overridable callback: By default it calls `preStart()`. * @param reason the Throwable that caused the restart to happen * <p/> * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash. */ @throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest //#lifecycle-hooks
  def postRestart(reason: Throwable): Unit = { preStart() }

可以看到:Akka提供給Actor的默認事件接口preRestart先將所有直屬子級Actor全部停止並把它們從監視清單里剔除,然后調用postStop執行事后清理。所以如果我們需要重載preRestart應該注意調用super.preRestart才能保留這些動作,如下:

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Parent restarting with error ${message}...") doSomeWorkBeforeStopping super.preRestart(reason, message) }

postRestart發生在開始之前,調用了事件接口preStart。如果我們重載了preStart進行初始化,那么在重載postRestart時可以選擇是否在重啟時需要再進行初始化,如果需要則必須調用super.postRestart:

  override def postRestart(reason: Throwable): Unit = { log.info("need to initialize too ...") doSomeExtraInit super.postRestart(reason) }

我們知道:很多時候由於外界原因,Actor的重啟無法保證一次成功。這種現象在使用依賴資源如數據庫、網絡連接等最為明顯。我們前面介紹過的異常處理策略中就包含了重試(retry)次數及最長重試時間,如下:

/** * Applies the fault handling `Directive` (Resume, Restart, Stop) specified in the `Decider` * to the child actor that failed, as opposed to [[akka.actor.AllForOneStrategy]] that applies * it to all children. * * @param maxNrOfRetries the number of times a child actor is allowed to be restarted, negative value means no limit, * if the limit is exceeded the child actor is stopped * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window * @param decider mapping from Throwable to [[akka.actor.SupervisorStrategy.Directive]], you can also use a * [[scala.collection.immutable.Seq]] of Throwables which maps the given Throwables to restarts, otherwise escalates. * @param loggingEnabled the strategy logs the failure if this is enabled (true), by default it is enabled */
case class OneForOneStrategy( maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf, override val loggingEnabled: Boolean  = true)(val decider: SupervisorStrategy.Decider) extends SupervisorStrategy {...}

為了應付更復雜的重啟方式,Akka提供了一種逐步延時重啟策略(BackoffSupervisor)。BackoffSupervisor的定義如下:

/** * Back-off supervisor that stops and starts a child actor using a back-off algorithm when the child actor stops. * This back-off supervisor is created by using `akka.pattern.BackoffSupervisor.props` * with `Backoff.onStop`. */ final class BackoffSupervisor( val childProps: Props, val childName: String, minBackoff: FiniteDuration, maxBackoff: FiniteDuration, val reset: BackoffReset, randomFactor: Double, strategy: SupervisorStrategy) extends Actor with HandleBackoff {...} /** * Props for creating a [[BackoffSupervisor]] actor from [[BackoffOptions]]. * * @param options the [[BackoffOptions]] that specify how to construct a backoff-supervisor. */ def props(options: BackoffOptions): Props = options.props /** * Builds back-off options for creating a back-off supervisor. * You can pass `BackoffOptions` to `akka.pattern.BackoffSupervisor.props`. * An example of creating back-off options: * {{{ * Backoff.onFailure(childProps, childName, minBackoff, maxBackoff, randomFactor) * .withManualReset * .withSupervisorStrategy( * OneforOneStrategy(){ * case e: GivingUpException => Stop * case e: RetryableException => Restart * } * ) * * }}} */
object Backoff { /** * Back-off options for creating a back-off supervisor actor that expects a child actor to restart on failure. * * This explicit supervisor behaves similarly to the normal implicit supervision where * if an actor throws an exception, the decider on the supervisor will decide when to * `Stop`, `Restart`, `Escalate`, `Resume` the child actor. * * When the `Restart` directive is specified, the supervisor will delay the restart * using an exponential back off strategy (bounded by minBackoff and maxBackoff). * * This supervisor is intended to be transparent to both the child actor and external actors. * Where external actors can send messages to the supervisor as if it was the child and the * messages will be forwarded. And when the child is `Terminated`, the supervisor is also * `Terminated`. * Transparent to the child means that the child does not have to be aware that it is being * supervised specifically by this actor. Just like it does * not need to know when it is being supervised by the usual implicit supervisors. * The only caveat is that the `ActorRef` of the child is not stable, so any user storing the * `sender()` `ActorRef` from the child response may eventually not be able to communicate with * the stored `ActorRef`. In general all messages to the child should be directed through this actor. * * An example of where this supervisor might be used is when you may have an actor that is * responsible for continuously polling on a server for some resource that sometimes may be down. * Instead of hammering the server continuously when the resource is unavailable, the actor will * be restarted with an exponentially increasing back off until the resource is available again. * * '''*** * This supervisor should not be used with `Akka Persistence` child actors. * `Akka Persistence` actors shutdown unconditionally on `persistFailure()`s rather * than throw an exception on a failure like normal actors. * [[#onStop]] should be used instead for cases where the child actor * terminates itself as a failure signal instead of the normal behavior of throwing an exception. * ***''' * You can define another * supervision strategy by using `akka.pattern.BackoffOptions.withSupervisorStrategy` on [[akka.pattern.BackoffOptions]]. * * @param childProps the [[akka.actor.Props]] of the child actor that * will be started and supervised * @param childName name of the child actor * @param minBackoff minimum (initial) duration until the child actor will * started again, if it is terminated * @param maxBackoff the exponential back-off is capped to this duration * @param randomFactor after calculation of the exponential back-off an additional * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. * In order to skip this additional delay pass in `0`. */ def onFailure( childProps: Props, childName: String, minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double): BackoffOptions = BackoffOptionsImpl(RestartImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor) /** * Back-off options for creating a back-off supervisor actor that expects a child actor to stop on failure. * * This actor can be used to supervise a child actor and start it again * after a back-off duration if the child actor is stopped. * * This is useful in situations where the re-start of the child actor should be * delayed e.g. in order to give an external resource time to recover before the * child actor tries contacting it again (after being restarted). * * Specifically this pattern is useful for persistent actors, * which are stopped in case of persistence failures. * Just restarting them immediately would probably fail again (since the data * store is probably unavailable). It is better to try again after a delay. * * It supports exponential back-off between the given `minBackoff` and * `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and * `maxBackoff` 30 seconds the start attempts will be delayed with * 3, 6, 12, 24, 30, 30 seconds. The exponential back-off counter is reset * if the actor is not terminated within the `minBackoff` duration. * * In addition to the calculated exponential back-off an additional * random delay based the given `randomFactor` is added, e.g. 0.2 adds up to 20% * delay. The reason for adding a random delay is to avoid that all failing * actors hit the backend resource at the same time. * * You can retrieve the current child `ActorRef` by sending `BackoffSupervisor.GetCurrentChild` * message to this actor and it will reply with [[akka.pattern.BackoffSupervisor.CurrentChild]] * containing the `ActorRef` of the current child, if any. * * The `BackoffSupervisor`delegates all messages from the child to the parent of the * `BackoffSupervisor`, with the supervisor as sender. * * The `BackoffSupervisor` forwards all other messages to the child, if it is currently running. * * The child can stop itself and send a [[akka.actor.PoisonPill]] to the parent supervisor * if it wants to do an intentional stop. * * Exceptions in the child are handled with the default supervisionStrategy, which can be changed by using * [[BackoffOptions#withSupervisorStrategy]] or [[BackoffOptions#withDefaultStoppingStrategy]]. A * `Restart` will perform a normal immediate restart of the child. A `Stop` will * stop the child, but it will be started again after the back-off duration. * * @param childProps the [[akka.actor.Props]] of the child actor that * will be started and supervised * @param childName name of the child actor * @param minBackoff minimum (initial) duration until the child actor will * started again, if it is terminated * @param maxBackoff the exponential back-off is capped to this duration * @param randomFactor after calculation of the exponential back-off an additional * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. * In order to skip this additional delay pass in `0`. */ def onStop( childProps: Props, childName: String, minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double): BackoffOptions = BackoffOptionsImpl(StopImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor) } /** * Configures a back-off supervisor actor. Start with `Backoff.onStop` or `Backoff.onFailure`. * BackoffOptions is immutable, so be sure to chain methods like: * {{{ * val options = Backoff.onFailure(childProps, childName, minBackoff, maxBackoff, randomFactor) * .withManualReset * context.actorOf(BackoffSupervisor.props(options), name) * }}} */ trait BackoffOptions { /** * Returns a new BackoffOptions with automatic back-off reset. * The back-off algorithm is reset if the child does not crash within the specified `resetBackoff`. * @param resetBackoff The back-off is reset if the child does not crash within this duration. */ def withAutoReset(resetBackoff: FiniteDuration): BackoffOptions /** * Returns a new BackoffOptions with manual back-off reset. The back-off is only reset * if the child sends a `BackoffSupervisor.Reset` to its parent (the backoff-supervisor actor). */ def withManualReset: BackoffOptions /** * Returns a new BackoffOptions with the supervisorStrategy. * @param supervisorStrategy the supervisorStrategy that the back-off supervisor will use. * The default supervisor strategy is used as fallback if the specified supervisorStrategy (its decider) * does not explicitly handle an exception. As the BackoffSupervisor creates a separate actor to handle the * backoff process, only a [[OneForOneStrategy]] makes sense here. */ def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy): BackoffOptions /** * Returns a new BackoffOptions with a default `SupervisorStrategy.stoppingStrategy`. * The default supervisor strategy is used as fallback for throwables not handled by `SupervisorStrategy.stoppingStrategy`. */ def withDefaultStoppingStrategy: BackoffOptions /** * Returns the props to create the back-off supervisor. */
  private[akka] def props: Props }

注意以上源代碼中Backoff.onFailure和Backoff.onStop的使用說明:當一個預設為永生的子級Actor由於某些原因而停止后再重啟時用onStop、當一個子級Actor因為異常造成失敗中斷再重啟時用onFailure。所以在處理異常時我們應該使用onFailure。
我們看到BackoffSupervior提供了更詳細的重啟方式支持。下面是使用BackoffSupervisor的一個典型例子:

val childProps = Props(classOf[EchoActor]) val supervisor = BackoffSupervisor.props( Backoff.onFailure( childProps, childName = "myEcho", minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
 ).withManualReset .withSupervisorStrategy( OneforOneStrategy(){ case _: GivingUpException => Stop case _: RetryableException => Restart case _: MinorException => Resume } ) ) system.actorOf(supervisor, name = "echoSupervisor")

以上的withManualReset是個BackoffOption的方法:要求子級Actor在成功重啟后手動發送akka.pattern.BackoffSupervisor.Reset給它的監管父級Actor使其可以清除那些計數器,Akka源代碼中是這樣處理的:

    case Reset ⇒ reset match { case ManualReset ⇒ restartCount = 0
        case msg ⇒ unhandled(msg) }

我們也可以用自動方式withAutoReset(3.seconds):

  /** * Returns a new BackoffOptions with automatic back-off reset. * The back-off algorithm is reset if the child does not crash within the specified `resetBackoff`. * @param resetBackoff The back-off is reset if the child does not crash within this duration. */ def withAutoReset(resetBackoff: FiniteDuration): BackoffOptions /** * Returns a new BackoffOptions with manual back-off reset. The back-off is only reset * if the child sends a `BackoffSupervisor.Reset` to its parent (the backoff-supervisor actor). */ def withManualReset: BackoffOptions

現在我們發現:異常處理策略沒有包括對下屬正常終止(termination)信息的監聽。那么如何捕捉Actor終止的信息呢?Akka提供了context.watch和context.unwatch來設置通過ActorRef對任何Actor的終止狀態監視,無須父子級別關系要求。下面是Akka提供的這兩個函數:

/** * Have this FunctionRef watch the given Actor. This method must not be * called concurrently from different threads, it should only be called by * its parent Actor. * * Upon receiving the Terminated message, unwatch() must be called from a * safe context (i.e. normally from the parent Actor). */ def watch(actorRef: ActorRef): Unit = { watching += actorRef actorRef.asInstanceOf[InternalActorRef].sendSystemMessage(Watch(actorRef.asInstanceOf[InternalActorRef], this)) } /** * Have this FunctionRef unwatch the given Actor. This method must not be * called concurrently from different threads, it should only be called by * its parent Actor. */ def unwatch(actorRef: ActorRef): Unit = { watching -= actorRef actorRef.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(actorRef.asInstanceOf[InternalActorRef], this)) }

被監視對象的終止事件是通過Terminate消息獲取的。典型的監視方式示范如下:

class DeathPactExceptionParentActor extends Actor with ActorLogging{ def receive={ case "create_child"=> { log.info ("creating child") val child=context.actorOf(Props[DeathPactExceptionChildActor]) context.watch(child) //watch child's death
      child!"stop" } case "someMessage" => log.info ("some message") case Terminated(_) => context.stop(self)  //child has stopped
 } }

講了這么多,還是感到有許多疑問,可能還是用一些代碼來了解一下這些策略的具體用法。我想,無可否認的BackoffSupervisor應該是個最硬的骨頭,我們先設計一個場景來示范BackoffSupervisor的用法和效果:假設一個咖啡餐廳場景,其中有個廚房,廚房內有大廚,這幾個環節都可以用Actor來表達。其中餐廳是頂層Actor,直屬子級是廚房,而大廚則是廚房的直屬子級。我們可以把廚房Actor作為一個BackoffSupervisor,這樣當大廚Actor出現任何異常時廚房Actor可以用一種逐步延時的方式來重啟大廚Actor。我們先定義這個大廚Actor:

object Chef { sealed trait Cooking case object CookSpecial extends Cooking class ChefBusy(msg: String) extends Exception(msg) def props = Props(new Chef) } class Chef extends Actor with ActorLogging { import Chef._ log.info(s"Chef actor created at ${System.currentTimeMillis()}") override def receive: Receive = { case _ => throw new ChefBusy("Chef is busy cooking!") } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { super.preRestart(reason, message) log.info(s"Restarting Chef for $message") } override def postRestart(reason: Throwable): Unit = { super.postRestart(reason) log.info(s"Chef restarted for ${reason.getMessage}") } override def postStop(): Unit = { log.info("Chef stopped!") } }

以上還包括了Chef的生命周期跟蹤。現在Chef的唯一功能就是收到消息就立即產生異常ChefBusy,控制馬上交到直屬父級Actor。Chef的直屬父級Actor是Kitchen:

object Kitchen { def kitchenProps = { import Chef._ val options = Backoff.onFailure(Chef.props, "chef", 200 millis, 10 seconds, 0.0) .withSupervisorStrategy(OneForOneStrategy(maxNrOfRetries = 4, withinTimeRange = 30 seconds) { case _: ChefBusy => SupervisorStrategy.Restart }) BackoffSupervisor.props(options) } } class Kitchen extends Actor with ActorLogging { override def receive: Receive = { case x => context.children foreach {child => child ! x} } }

上面示范了BackoffSupervisor的Props定義方法。Chef Actor的實例構建(ActorRef產生)應該在Backoff.onFailure()函數里。現在我們了解了BackoffSupervisor只容許獨子,所以context.children 只有一個child: "chef"。我們必須給每個需要逐步延緩監管的Actor設置獨立的BackoffSupervisor監管父級。

下面我們試試BackoffSupervisor的具體效果:

object Cafe extends App { import Kitchen._ val cafeSystem = ActorSystem("cafe") val kitchen = cafeSystem.actorOf(kitchenProps,"kitchen") println(s"Calling chef at ${System.currentTimeMillis()}") kitchen ! "CookCook" println(s"Calling chef at ${System.currentTimeMillis()}") Thread.sleep(1000) println(s"Calling chef at ${System.currentTimeMillis()}") kitchen ! "CookCook" Thread.sleep(1000) kitchen ! "CookCook" Thread.sleep(1000) kitchen ! "CookCook" Thread.sleep(1000) kitchen ! "CookCook" Thread.sleep(1000 * 30) cafeSystem.terminate()

}

測試運行結果中其中一輪顯示:

Calling chef at 1495108529380 [INFO] [05/18/2017 19:55:29.384] [cafe-akka.actor.default-dispatcher-2] [akka://cafe/user/kitchen/chef] Chef actor created at 1495108529382
[ERROR] [05/18/2017 19:55:29.392] [cafe-akka.actor.default-dispatcher-3] [akka://cafe/user/kitchen/chef] Chef is busy cooking!
Chef$ChefBusy: Chef is busy cooking! at Chef$$anonfun$receive$1.applyOrElse(Cafe.scala:24) ... [INFO] [05/18/2017 19:55:29.394] [cafe-akka.actor.default-dispatcher-2] [akka://cafe/user/kitchen/chef] Chef stopped!
[INFO] [05/18/2017 19:55:29.614] [cafe-akka.actor.default-dispatcher-4] [akka://cafe/user/kitchen/chef] Chef actor created at 1495108529614
Calling chef at 1495108530382 [ERROR] [05/18/2017 19:55:30.382] [cafe-akka.actor.default-dispatcher-3] [akka://cafe/user/kitchen/chef] Chef is busy cooking!

我們看到Chef被重啟過程。值得注意的是:生命周期監控函數中只有postStop被調用過,preRestart和postRestart都沒引用。如果這樣的話BackoffSupervisor就是一錘子買賣,是正真的let it crash模式體現了。那如果需要重新處理造成異常的消息又怎么辦呢?看來只好試試SupervisorStrategy了。我們用下面的例子來示范一下:

import akka.actor._ import scala.util.Random import scala.concurrent.duration._ object ChildActor { class RndException(msg: String) extends Exception(msg) def props = Props[ChildActor] } class ChildActor extends Actor with ActorLogging { import ChildActor._ override def receive: Receive = { case msg: String => {   //任意產生一些RndExcption
      if (Random.nextBoolean()) throw new RndException("Any Exception!") else log.info(s"Processed message: $msg !!!") } } override def preStart(): Unit = { log.info("ChildActor Started.") super.preStart() } //在重啟時preRestart是在原來的Actor實例上調用preRestart的
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Restarting ChildActor for ${reason.getMessage}...") message match { case Some(msg) => log.info(s"Exception message: ${msg.toString}") self ! msg       //把異常消息再擺放到信箱最后
      case None => } super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { super.postRestart(reason) log.info(s"Restarted ChildActor for ${reason.getMessage}...") } override def postStop(): Unit = { log.info(s"Stopped ChildActor.") super.postStop() } } //監管父級
class Parent extends Actor with ActorLogging { def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = { case _: ChildActor.RndException => SupervisorStrategy.Restart } override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 30, withinTimeRange = 3 seconds) { decider.orElse(SupervisorStrategy.defaultDecider) } val childActor = context.actorOf(ChildActor.props,"childActor") override def receive: Receive = { case msg@ _ => childActor ! msg    //把所有收到的消息都轉給childActor
 } }

以上就是一個SupervisorStrategy的父子結構例子。特別要注意的是OneForOneStrategy參數maxNrOfRetries,這個是一次性次數設置,每次重啟成功后不會重設。在整體程序運行時這個次數會不斷增加直到設置數,之后發生異常直接終止被監管Actor。下面是這個例子的運行示范:

object TestMyActor extends App { val system = ActorSystem("testSystem") val parentActor = system.actorOf(Props[Parent],"parentActor") parentActor ! "Hello 1" parentActor ! "Hello 2" parentActor ! "Hello 3" parentActor ! "Hello 4" parentActor ! "Hello 5" Thread.sleep(5000) system.terminate() }

運算結果顯示所有消息都得到處理,只是順序變得混亂了。

好了,明白了如何使用BackoffSupervior,我們還是把整個例子完善一下吧:還是這個Cafe場景。Cafe里分廚房Kitchen、收款員Cashier幾個部分。上面已經介紹過Kitchen下還有Chef,而Cashier之下還有收據打印機ReceiptPrinter。整個工作流程大致如下:

1、一個客人進店要求一杯特價咖啡

2、Cafe要求廚房在三種咖啡中即時選擇任意一款作為特價咖啡

3、Cafe同時要求Cashier按廚房提供的特價咖啡價錢收款並打印收據

4、以上2,3成功后完成一單完整銷售,更新銷售額

5、完成銷售目標后通知廚房打烊

6、收款員看到廚房打烊后停業關門

另外,可能出現幾種異常情況:廚房的大廚可能忙不過來准備特價咖啡、收據打印機有可能卡紙。遇到這幾種情況直接通知客人遲點再來光顧。

很明顯Cafe需要維護內部狀態即當前銷售額salesAmount,Kitchen的狀態是當前特餐currentSpecial,Cashier的狀態是paperJammed代表收據打印機是否卡紙。

我們先從Chef,Kitchen及BackoffSupervisor監管開始:

object Chef { sealed trait Order  //消息類型
  case object MakeSpecial extends Order  //烹制特飲
  class ChefBusy(msg: String) extends Exception(msg)  //異常類型
  def props = Props[Chef] } class Chef extends Actor with ActorLogging { import Chef._ log.info("Chef says: I am ready to work ...")   //構建成功信息 //內部狀態
  var currentSpecial: Cafe.Coffee = Cafe.Original var chefBusy: Boolean = false val specials = Map(0 -> Cafe.Original,1 -> Cafe.Espresso, 2 -> Cafe.Cappuccino) override def receive: Receive = { case MakeSpecial => { if ((Random.nextInt(6) % 6) == 0) {  //任意產生異常 2/6
        log.info("Chef is busy ...") chefBusy = true
        throw new ChefBusy("Busy!") } else { currentSpecial = randomSpecial     //選出當前特飲
        log.info(s"Chef says: Current special is ${currentSpecial.toString}.") sender() ! currentSpecial } } } def randomSpecial = specials(Random.nextInt(specials.size)) //選出當前特飲

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Restarting Chef for ${reason.getMessage}...") super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { log.info(s"Restarted Chef for ${reason.getMessage}.") context.parent ! BackoffSupervisor.Reset super.postRestart(reason) } override def postStop(): Unit = { log.info("Stopped Chef.") super.postStop() } } //Kitchen只是Chef的Backoff監管,沒有任何其它功能
class Kitchen extends Actor with ActorLogging { override def receive: Receive = { //context.children.size == 1,就是chef。 直接把所有消息轉發到Chef
    case msg@_ =>  //注意,無法使用Chef ?因為sender不明
      context.children foreach ( chef => chef forward msg) } override def postStop(): Unit = { log.info("Kitchen close!") super.postStop() } } object Kitchen { //指定的異常處理策略
  val kitchenDecider: PartialFunction[Throwable, SupervisorStrategy.Directive] = { case _: Chef.ChefBusy => SupervisorStrategy.Restart } def kitchenProps: Props = {  //定義BackoffSupervisor strategy
    val option = Backoff.onFailure(Chef.props,"chef",1 seconds, 5 seconds, 0.0) .withManualReset .withSupervisorStrategy { OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds) { kitchenDecider.orElse(SupervisorStrategy.defaultDecider) } } BackoffSupervisor.props(option) } }

Kitchen是存粹為監管Chef而設置的,沒有任何其它功能。收到任何消息就直接forward給Chef。這里值得注意的是當我們用?發消息給Kitchen再forward給Chef時,sender()是不確定的。所以必須想法子直接 ? Chef

Chef的唯一功能就是烹制當前特飲。如果太忙無法接單,產生ChefBusy異常。

Cashier和ReceiptPrinter同樣是一種父子監管關系。我們用SupervisorStrategy來實現這兩個Actor:

object ReceiptPrinter { case class PrintReceipt(sendTo: ActorRef, receipt: Cafe.Receipt)  //print command
  class PaperJamException extends Exception def props = Props[ReceiptPrinter] } class ReceiptPrinter extends Actor with ActorLogging { import ReceiptPrinter._ var paperJammed: Boolean = false
  override def receive: Receive = { case PrintReceipt(customer, receipt) =>    //打印收據並發送給顧客
      if ((Random.nextInt(6) % 6) == 0) { log.info("Printer jammed paper ...") paperJammed = true
        throw new PaperJamException } else { log.info(s"Printing receipt $receipt and sending to ${customer.path.name}") customer ! receipt } } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Restarting ReceiptPrinter for ${reason.getMessage}...") super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { log.info(s"Started ReceiptPrinter for ${reason.getMessage}.") super.postRestart(reason) } override def postStop(): Unit = { log.info("Stopped ReceiptPrinter.") super.postStop() } } object Cashier { case class RingRegister(cup: Cafe.Coffee, customer: ActorRef)  //收款並出具收據
 def props(kitchen: ActorRef) = Props(classOf[Cashier],kitchen) } class Cashier(kitchen: ActorRef) extends Actor with ActorLogging { import Cashier._ import ReceiptPrinter._ context.watch(kitchen) //監視廚房。如果打烊了就關門歇業
  val printer = context.actorOf(ReceiptPrinter.props,"printer") //打印機卡紙后重啟策略
  def cashierDecider: PartialFunction[Throwable,SupervisorStrategy.Directive] = { case _: PaperJamException => SupervisorStrategy.Restart } override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){ cashierDecider.orElse(SupervisorStrategy.defaultDecider) } val menu = Map[Cafe.Coffee,Double](Cafe.Original -> 5.50, Cafe.Cappuccino -> 12.95, Cafe.Espresso -> 11.80) override def receive: Receive = { case RingRegister(coffee, customer) => //收款並出具收據
      log.info(s"Producing receipt for a cup of ${coffee.toString}...") val amt = menu(coffee)    //計價
      val rcpt = Cafe.Receipt(coffee.toString,amt) printer ! PrintReceipt(customer,rcpt)  //打印收據。可能出現卡紙異常
      sender() ! Cafe.Sold(rcpt)  //通知Cafe銷售成功 sender === Cafe
    case Terminated(_) => log.info("Cashier says: Oh, kitchen is closed. Let's make the end of day!") context.system.terminate() //廚房打烊,停止營業。
 } }

Cashier必須確定成功打印收據后才通知Cafe銷售成功完成。另一個功能是監視廚房打烊情況,廚房打烊則關門停止營業。

下面是Cafe和Customer的實現代碼:

object Cafe { sealed trait Coffee  //咖啡種類
  case object Original extends Coffee case object Espresso extends Coffee case object Cappuccino extends Coffee case class Receipt(item: String, amt: Double) sealed trait Routine case object PlaceOrder extends Routine case class Sold(receipt: Receipt) extends Routine } class Cafe extends Actor with ActorLogging { import Cafe._ import Cashier._ import context.dispatcher implicit val timeout = Timeout(1 seconds) var totalAmount: Double = 0.0 val kitchen = context.actorOf(Kitchen.kitchenProps,"kitchen") //Chef可能重啟,但path不變。必須直接用chef ? msg,否則經Kitchen轉發無法獲取正確的sender
  val chef = context.actorSelection("/user/cafe/kitchen/chef") val cashier = context.actorOf(Cashier.props(kitchen),"cashier") var customer: ActorRef = _     //當前客戶

  override def receive: Receive = { case Sold(rcpt) => totalAmount += rcpt.amt log.info(s"Today's sales is up to $totalAmount") customer ! Customer.OrderServed(rcpt)   //send him the order
      if (totalAmount > 100.00) { log.info("Asking kichen to clean up ...") context.stop(kitchen) } case PlaceOrder => customer = sender()     //send coffee to this customer
      (for { item <- (chef ? Chef.MakeSpecial).mapTo[Coffee] sales <- (cashier ? RingRegister(item,sender())).mapTo[Sold] } yield(Sold(sales.receipt))).mapTo[Sold] .recover { case _: AskTimeoutException => Customer.ComebackLater }.pipeTo(self) //send receipt to be added to totalAmount
 } } object Customer { sealed trait CustomerOrder case object OrderSpecial extends CustomerOrder case class OrderServed(rcpt: Cafe.Receipt) extends CustomerOrder case object ComebackLater extends CustomerOrder def props(cafe: ActorRef) = Props(new Customer(cafe)) } class Customer(cafe: ActorRef) extends Actor with ActorLogging { import Customer._ import context.dispatcher override def receive: Receive = { case OrderSpecial => log.info("Customer place an order ...") cafe ! Cafe.PlaceOrder case OrderServed(rcpt) => log.info(s"Customer says: Oh my! got my order ${rcpt.item} for ${rcpt.amt}") case ComebackLater => log.info("Customer is not so happy! says: I will be back later!") context.system.scheduler.scheduleOnce(1 seconds){cafe ! Cafe.PlaceOrder} } }

在這里由於使用了?來發送消息,所以具體的發送主體sender有可能出現混亂情況。?產生Future,Future是個monad,如果需要串聯多個Future可以用flatMap或者for-comprehension。

下面就是整個例子的測試運行代碼:

object MyCafe extends App { import Cafe._ import Customer._ import scala.concurrent.ExecutionContext.Implicits.global val cafeSys = ActorSystem("cafeSystem") val cafe = cafeSys.actorOf(Props[Cafe],"cafe") val customer = cafeSys.actorOf(Customer.props(cafe),"customer") cafeSys.scheduler.schedule(1 second, 1 second, customer, OrderSpecial) }

運行結果顯示所有功能按需求實現。
再次經歷這種Actor模式編程使我有了更多的體會。Actor模式的確跟現實場景很匹配。在編程的過程中我可以分別獨立考慮一個Actor的功能而不需要擔心其它Actor可能造成什么影響。最后在總體功能集成時如有需要,再調整這些Actor的消息。

這,對於像我這樣的人來說,的確是一種全新的編程方式!

下面就是這個例子完整的示范源代碼:

package mycafe import akka.actor._ import scala.concurrent.duration._ import scala.util.Random import akka.pattern._ import akka.util.Timeout import scala.concurrent._ object Chef { sealed trait Order  //消息類型
  case object MakeSpecial extends Order  //烹制特飲
  class ChefBusy(msg: String) extends Exception(msg)  //異常類型
  def props = Props[Chef] } class Chef extends Actor with ActorLogging { import Chef._ log.info("Chef says: I am ready to work ...")   //構建成功信息 //內部狀態
  var currentSpecial: Cafe.Coffee = Cafe.Original var chefBusy: Boolean = false val specials = Map(0 -> Cafe.Original,1 -> Cafe.Espresso, 2 -> Cafe.Cappuccino) override def receive: Receive = { case MakeSpecial => { if ((Random.nextInt(6) % 6) == 0) {  //任意產生異常 2/6
        log.info("Chef is busy ...") chefBusy = true
        throw new ChefBusy("Busy!") } else { currentSpecial = randomSpecial     //選出當前特飲
        log.info(s"Chef says: Current special is ${currentSpecial.toString}.") sender() ! currentSpecial } } } def randomSpecial = specials(Random.nextInt(specials.size)) //選出當前特飲

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Restarting Chef for ${reason.getMessage}...") super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { log.info(s"Restarted Chef for ${reason.getMessage}.") context.parent ! BackoffSupervisor.Reset super.postRestart(reason) } override def postStop(): Unit = { log.info("Stopped Chef.") super.postStop() } } //Kitchen只是Chef的Backoff監管,沒有任何其它功能
class Kitchen extends Actor with ActorLogging { override def receive: Receive = { //context.children.size == 1,就是chef。 直接把所有消息轉發到Chef
    case msg@_ =>  //注意,無法使用Chef ?因為sender不明
      context.children foreach ( chef => chef forward msg) } override def postStop(): Unit = { log.info("Kitchen close!") super.postStop() } } object Kitchen { //指定的異常處理策略
  val kitchenDecider: PartialFunction[Throwable, SupervisorStrategy.Directive] = { case _: Chef.ChefBusy => SupervisorStrategy.Restart } def kitchenProps: Props = {  //定義BackoffSupervisor strategy
    val option = Backoff.onFailure(Chef.props,"chef",1 seconds, 5 seconds, 0.0) .withManualReset .withSupervisorStrategy { OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds) { kitchenDecider.orElse(SupervisorStrategy.defaultDecider) } } BackoffSupervisor.props(option) } } object ReceiptPrinter { case class PrintReceipt(sendTo: ActorRef, receipt: Cafe.Receipt)  //print command
  class PaperJamException extends Exception def props = Props[ReceiptPrinter] } class ReceiptPrinter extends Actor with ActorLogging { import ReceiptPrinter._ var paperJammed: Boolean = false
  override def receive: Receive = { case PrintReceipt(customer, receipt) =>    //打印收據並發送給顧客
      if ((Random.nextInt(6) % 6) == 0) { log.info("Printer jammed paper ...") paperJammed = true
        throw new PaperJamException } else { log.info(s"Printing receipt $receipt and sending to ${customer.path.name}") customer ! receipt } } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Restarting ReceiptPrinter for ${reason.getMessage}...") super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { log.info(s"Started ReceiptPrinter for ${reason.getMessage}.") super.postRestart(reason) } override def postStop(): Unit = { log.info("Stopped ReceiptPrinter.") super.postStop() } } object Cashier { case class RingRegister(cup: Cafe.Coffee, customer: ActorRef)  //收款並出具收據
 def props(kitchen: ActorRef) = Props(classOf[Cashier],kitchen) } class Cashier(kitchen: ActorRef) extends Actor with ActorLogging { import Cashier._ import ReceiptPrinter._ context.watch(kitchen) //監視廚房。如果打烊了就關門歇業
  val printer = context.actorOf(ReceiptPrinter.props,"printer") //打印機卡紙后重啟策略
  def cashierDecider: PartialFunction[Throwable,SupervisorStrategy.Directive] = { case _: PaperJamException => SupervisorStrategy.Restart } override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){ cashierDecider.orElse(SupervisorStrategy.defaultDecider) } val menu = Map[Cafe.Coffee,Double](Cafe.Original -> 5.50, Cafe.Cappuccino -> 12.95, Cafe.Espresso -> 11.80) override def receive: Receive = { case RingRegister(coffee, customer) => //收款並出具收據
      log.info(s"Producing receipt for a cup of ${coffee.toString}...") val amt = menu(coffee)    //計價
      val rcpt = Cafe.Receipt(coffee.toString,amt) printer ! PrintReceipt(customer,rcpt)  //打印收據。可能出現卡紙異常
      sender() ! Cafe.Sold(rcpt)  //通知Cafe銷售成功 sender === Cafe
    case Terminated(_) => log.info("Cashier says: Oh, kitchen is closed. Let's make the end of day!") context.system.terminate() //廚房打烊,停止營業。
 } } object Cafe { sealed trait Coffee  //咖啡種類
  case object Original extends Coffee case object Espresso extends Coffee case object Cappuccino extends Coffee case class Receipt(item: String, amt: Double) sealed trait Routine case object PlaceOrder extends Routine case class Sold(receipt: Receipt) extends Routine } class Cafe extends Actor with ActorLogging { import Cafe._ import Cashier._ import context.dispatcher implicit val timeout = Timeout(1 seconds) var totalAmount: Double = 0.0 val kitchen = context.actorOf(Kitchen.kitchenProps,"kitchen") //Chef可能重啟,但path不變。必須直接用chef ? msg,否則經Kitchen轉發無法獲取正確的sender
  val chef = context.actorSelection("/user/cafe/kitchen/chef") val cashier = context.actorOf(Cashier.props(kitchen),"cashier") var customer: ActorRef = _     //當前客戶

  override def receive: Receive = { case Sold(rcpt) => totalAmount += rcpt.amt log.info(s"Today's sales is up to $totalAmount") customer ! Customer.OrderServed(rcpt)   //send him the order
      if (totalAmount > 100.00) { log.info("Asking kichen to clean up ...") context.stop(kitchen) } case PlaceOrder => customer = sender()     //send coffee to this customer
      (for { item <- (chef ? Chef.MakeSpecial).mapTo[Coffee] sales <- (cashier ? RingRegister(item,sender())).mapTo[Sold] } yield(Sold(sales.receipt))).mapTo[Sold] .recover { case _: AskTimeoutException => Customer.ComebackLater }.pipeTo(self) //send receipt to be added to totalAmount
 } } object Customer { sealed trait CustomerOrder case object OrderSpecial extends CustomerOrder case class OrderServed(rcpt: Cafe.Receipt) extends CustomerOrder case object ComebackLater extends CustomerOrder def props(cafe: ActorRef) = Props(new Customer(cafe)) } class Customer(cafe: ActorRef) extends Actor with ActorLogging { import Customer._ import context.dispatcher override def receive: Receive = { case OrderSpecial => log.info("Customer place an order ...") cafe ! Cafe.PlaceOrder case OrderServed(rcpt) => log.info(s"Customer says: Oh my! got my order ${rcpt.item} for ${rcpt.amt}") case ComebackLater => log.info("Customer is not so happy! says: I will be back later!") context.system.scheduler.scheduleOnce(1 seconds){cafe ! Cafe.PlaceOrder} } } object MyCafe extends App { import Cafe._ import Customer._ import scala.concurrent.ExecutionContext.Implicits.global val cafeSys = ActorSystem("cafeSystem") val cafe = cafeSys.actorOf(Props[Cafe],"cafe") val customer = cafeSys.actorOf(Customer.props(cafe),"customer") cafeSys.scheduler.schedule(1 second, 1 second, customer, OrderSpecial) }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM