Akka(6): become/unbecome:運算行為切換


   通過一段時間的學習了解,加深了一些對Akka的認識,特別是對於Akka在實際編程中的用途方面。我的想法,或者我希望利用Akka來達到的目的是這樣的:作為傳統方式編程的老兵,我們已經習慣了直線流程方式一口氣實現完整的功能。如果使用Akka,我們可以把這個完整的功能分切成多個能產生中間臨時結果的小功能然后把這些功能放到不同的Actor上分別獨立運算,再通過消息來連接這些功能集合成最終結果。如此我們就輕易得到了一個多線程並發程序。由於Akka是軟件工具(Tool),沒有軟件架構(Framework)對編程方式的特別要求,Actor的構建和使用非常方便,我們甚至不需要多少修改就可以直接把原來的一段代碼移到Actor上。如果遇到一些重復的運算,我們還可以用Routing來實現並行運算。當然,把Actor當作簡單的行令運算器可能還不夠,如果能實現一些具體運算之上的高層次程序邏輯和流程就更加完善。我們可以用這樣的高層次Actor去解析程序邏輯、執行流程、把具體的運算分配給其它各種運算Actor或者一組Routees並行運算從而取得整體程序的高效率運行。具備了這些功能后,也許我們就可以完全用Actor模式來替代傳統單線程行令編程了。Akka可以通過Actor的動態行為轉換來實現同一Actor在不同情況下提供不同的功能支持。我們前面提到Actor的功能是在receive函數內實現的。那么轉換功能是否就是切換不同的receive函數呢?答案是確定的,Akka是通過Actor的context.become(rcvFunc)來實現receive函數切換的,我們看看下面這個示范:

import akka.actor._ object FillSeasons { case object HowYouFeel def props = Props(new FillSeasons) } class FillSeasons extends Actor with ActorLogging { import FillSeasons._ override def receive: Receive = spring def Winter: Receive = { case HowYouFeel => log.info("It's freezing cold!") } def summer: Receive = { case HowYouFeel => log.info("It's hot hot hot!") } def spring: Receive = { case HowYouFeel => log.info("It feels so goooood!") } } object Becoming extends App { val demoSystem = ActorSystem("demoSystem") val feelingsActor = demoSystem.actorOf(FillSeasons.props,"feelings") feelingsActor ! FillSeasons.HowYouFeel }

在FeelingsActor里我們定義了三個receive函數,對共同的HowYouFeel消息采取了不同的反應。默認行為是spring。那么應該如何在三種行為中切換呢?用context.become(???),如下:

import akka.actor._ object FillSeasons { case object HowYouFeel case object ToSummer case object ToSpring case object ToWinter def props = Props(new FillSeasons) } class FillSeasons extends Actor with ActorLogging { import FillSeasons._ override def receive: Receive = spring def winter: Receive = { case HowYouFeel => log.info("It's freezing cold!") case ToSummer => context.become(summer) case ToSpring => context.become(spring) } def summer: Receive = { case HowYouFeel => log.info("It's hot hot hot!") case ToSpring => context.become(spring) case ToWinter => context.become(winter) } def spring: Receive = { case HowYouFeel => log.info("It feels so goooood!") case ToSummer => context.become(summer) case ToWinter => context.become(winter) } } object Becoming extends App { val demoSystem = ActorSystem("demoSystem") val feelingsActor = demoSystem.actorOf(FillSeasons.props,"feelings") feelingsActor ! FillSeasons.HowYouFeel feelingsActor ! FillSeasons.ToSummer feelingsActor ! FillSeasons.HowYouFeel feelingsActor ! FillSeasons.ToWinter feelingsActor ! FillSeasons.HowYouFeel feelingsActor ! FillSeasons.ToSpring feelingsActor ! FillSeasons.HowYouFeel scala.io.StdIn.readLine() demoSystem.terminate() }

我們增加了三個消息來切換receive。運算結果如下:

[INFO] [06/08/2017 17:51:46.013] [demoSystem-akka.actor.default-dispatcher-3] [akka://demoSystem/user/feelings] It feels so goooood!
[INFO] [06/08/2017 17:51:46.019] [demoSystem-akka.actor.default-dispatcher-4] [akka://demoSystem/user/feelings] It's hot hot hot!
[INFO] [06/08/2017 17:51:46.028] [demoSystem-akka.actor.default-dispatcher-4] [akka://demoSystem/user/feelings] It's freezing cold!
[INFO] [06/08/2017 17:51:46.028] [demoSystem-akka.actor.default-dispatcher-4] [akka://demoSystem/user/feelings] It feels so goooood!


Process finished with exit code 0

就這樣在幾個receive里竄來竄去的好像已經能達到我們設想的目的了。看看Akka源代碼中become和unbecome發現這樣的做法是不正確的:

  def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit = behaviorStack = behavior :: (if (discardOld && behaviorStack.nonEmpty) behaviorStack.tail else behaviorStack) def become(behavior: Procedure[Any]): Unit = become(behavior, discardOld = true) def become(behavior: Procedure[Any], discardOld: Boolean): Unit = become({ case msg ⇒ behavior.apply(msg) }: Actor.Receive, discardOld) def unbecome(): Unit = { val original = behaviorStack behaviorStack =
      if (original.isEmpty || original.tail.isEmpty) actor.receive :: emptyBehaviorStack else original.tail }

從上面的代碼可以發現:調用become(x)實際上是把x壓進了一個堆棧里。如果像我們這樣不斷調用become轉來轉去的,在堆棧上留下舊的行為函數實例最終會造成StackOverFlowError。所以Akka提供了unbecome,這是個堆棧彈出函數,把上一個become壓進的行為函數再彈出來,釋放一個堆棧空間。所以我們應該用unbecome來解決堆棧溢出問題。但是,如果在多個receive函數之間轉換來實現行為變化的話,就難以正確掌握堆棧的壓進,彈出沖抵配對,並且無法避免所謂的意大利面代碼造成的混亂邏輯。所以,become/unbecome最好使用在兩個功能之間的轉換。我們再設計一個例子來示范:

sealed trait DBOperations case class DBWrite(sql: String) extends DBOperations case class DBRead(sql: String) extends DBOperations sealed trait DBStates case object Connected extends DBStates case object Disconnected extends DBStates

DBoperations代表數據庫讀寫操作。DBState代表數據庫當前狀態:連線Connected或斷線Disconnected。只有數據庫在Connected狀態下才能進行數據庫操作。順理成章,我們需要兩個receive函數:

import akka.actor._ sealed trait DBOperations case class DBWrite(sql: String) extends DBOperations case class DBRead(sql: String) extends DBOperations sealed trait DBStates case object Connected extends DBStates case object Disconnected extends DBStates object DBOActor { def props = Props(new DBOActor) } class DBOActor extends Actor with ActorLogging { override def receive: Receive = disconnected def disconnected: Receive = { case Connected => log.info("Logon to DB.") context.become(connected) } def connected: Receive = { case Disconnected => log.info("Logoff from DB.") context.unbecome() case DBWrite(sql) => log.info(s"Writing to DB: $sql") case DBRead(sql) => log.info(s"Reading from DB: $sql") } } object BecomeDB extends App { val dbSystem = ActorSystem("dbSystem") val dbActor = dbSystem.actorOf(DBOActor.props,"dbActor") dbActor ! Connected dbActor ! DBWrite("Update table x") dbActor ! DBRead("Select from table x") dbActor ! Disconnected scala.io.StdIn.readLine() dbSystem.terminate() }

 

運算結果顯示如下:

[INFO] [06/09/2017 11:44:40.093] [dbSystem-akka.actor.default-dispatcher-3] [akka://dbSystem/user/dbActor] Logon to DB.
[INFO] [06/09/2017 11:44:40.106] [dbSystem-akka.actor.default-dispatcher-3] [akka://dbSystem/user/dbActor] Writing to DB: Update table x
[INFO] [06/09/2017 11:44:40.107] [dbSystem-akka.actor.default-dispatcher-3] [akka://dbSystem/user/dbActor] Reading from DB: Select from table x
[INFO] [06/09/2017 11:44:40.107] [dbSystem-akka.actor.default-dispatcher-3] [akka://dbSystem/user/dbActor] Logoff from DB.

以上是按正確順序向dbActor發出數據庫操作指令后產生的結果。但是,我們是在一個多線程消息驅動的環境里。發送給dbActor的消息收到時間無法預料。我們試着調換一下指令到達順序:

  dbActor ! DBWrite("Update table x") dbActor ! Connected dbActor ! DBRead("Select from table x") dbActor ! Disconnected

運算結果:

[INFO] [06/09/2017 11:54:57.264] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Logon to DB.
[INFO] [06/09/2017 11:54:57.273] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Reading from DB: Select from table x
[INFO] [06/09/2017 11:54:57.273] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Logoff from DB.

漏掉了DBWrite操作。可以理解,所有connected狀態之前的任何操作都不會真正生效。Akka提供了個Stash trait能把一個receive函數未處理的消息都存起來。然后用unstash()可以把存儲的消息都轉移到本Actor的郵箱里。我們可以用Stash來解決這個消息遺失問題:

  def disconnected: Receive = { case Connected => log.info("Logon to DB.") context.become(connected) unstashAll() case _ => stash() }

所有消息遺失都是在Disconnected狀態內發生的。在disconnected里我們用stash把所有非Connected消息存起來,然后在轉換成Connected狀態時把這些消息轉到信箱。再看看運算結果:

object BecomeDB extends App { val dbSystem = ActorSystem("dbSystem") val dbActor = dbSystem.actorOf(DBOActor.props,"dbActor") dbActor ! DBWrite("Update table x") dbActor ! Connected dbActor ! DBRead("Select from table x") dbActor ! Disconnected scala.io.StdIn.readLine() dbSystem.terminate() }

[INFO] [06/09/2017 12:01:54.518] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Logon to DB.
[INFO] [06/09/2017 12:01:54.528] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Writing to DB: Update table x
[INFO] [06/09/2017 12:01:54.528] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Reading from DB: Select from table x
[INFO] [06/09/2017 12:01:54.528] [dbSystem-akka.actor.default-dispatcher-4] [akka://dbSystem/user/dbActor] Logoff from DB.

顯示結果正確。下面就是整個示范的源代碼:

import akka.actor._ sealed trait DBOperations case class DBWrite(sql: String) extends DBOperations case class DBRead(sql: String) extends DBOperations sealed trait DBStates case object Connected extends DBStates case object Disconnected extends DBStates object DBOActor { def props = Props(new DBOActor) } class DBOActor extends Actor with ActorLogging with Stash { override def receive: Receive = disconnected def disconnected: Receive = { case Connected => log.info("Logon to DB.") context.become(connected) unstashAll() case _ => stash() } def connected: Receive = { case Disconnected => log.info("Logoff from DB.") context.unbecome() case DBWrite(sql) => log.info(s"Writing to DB: $sql") case DBRead(sql) => log.info(s"Reading from DB: $sql") } } object BecomeDB extends App { val dbSystem = ActorSystem("dbSystem") val dbActor = dbSystem.actorOf(DBOActor.props,"dbActor") dbActor ! DBWrite("Update table x") dbActor ! Connected dbActor ! DBRead("Select from table x") dbActor ! Disconnected scala.io.StdIn.readLine() dbSystem.terminate() }

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM