Akka(1):Actor - 靠消息驅動的運算器


  Akka是由各種角色和功能的Actor組成的,工作的主要原理是把一項大的計算任務分割成小環節,再按各環節的要求構建相應功能的Actor,然后把各環節的運算托付給相應的Actor去獨立完成。Akka是個工具庫(Tools-Library),不是一個軟件架構(Software-Framework),我們不需要按照Akka的框架格式去編寫程序,而是直接按需要構建Actor去異步運算一項完整的功能,這樣讓用戶在不知不覺中自然的實現了多線程並發軟件編程(concurrent programming)。按這樣的描述,Actor就是一種靠消息驅動(Message-driven)的運算器,我們可以直接調用它來運算一段程序。消息驅動模式的好處是可以實現高度的松散耦合(loosely-coupling),因為系統部件之間不用軟件接口,而是通過消息來進行系統集成的。消息驅動模式支持了每個Actor的獨立運算環境,又可以在運行時按需要靈活的對系統Actor進行增減,伸縮自如,甚至可以在運行時(runtime)對系統部署進行調配。Akka的這些鮮明的特點都是通過消息驅動來實現的。

曾經看到一個關於Actor模式的觀點:認為Actor並不適合並發(concurrency)編程,更應該是維護內部狀態的運算工具。聽起來好像很無知,畢竟Actor模式本身就是並發模式,如果不適合並發編程,豈不與Akka的發明意願相左。再仔細研究了一下這個觀點的論述后就完全認同了這種看法。在這里我們分析一下這種論述,先看看下面這段Actor用法偽代碼:

 class QueryActor extends Actor { override def receive: Receive = { case GetResult(query) => val x = db.RunQuery(query) val y = getValue(x) sender() ! computeResult(x,y) } }
 val result: Future[Any] = QueryActor ? GetResult(...)

這段代碼中QueryActor沒有任何內部狀態。通過Future傳遞計算結果能實現不阻塞(non-blocking)運算。下面我們用QueryActor來實現並發運算:

  val r1 = QueryActor ! request1 val r2 = QueryActor ! request2 for { x <- r1 y <- r2 } yield combineValues(x,y)

乍眼看r1和r2貌似能實現並行運算,但不要忘記Actor運算環境是單線程的,而Actor信箱又是按序的(Ordered),所以這兩個運算只能按順序運行,最多也就是能在另一個線程里異步進行而已,r1運算始終會阻塞r2的運行。如此還不如直接使用Future,能更好的實現並發程序的並行運算。同樣的要求如果用Future來實現的話可以用下面的偽代碼:

  def fuQuery(query: DBQuery): Future[FResult] = Future { val x = db.RunQuery(query) val y = getValue(x) computeResults(x,y) } val r1 = fuQuery(query1) val r2 = fuQuery(query2) for { x <- r1 y <- r2 } yield combineValues(x,y)

在這個例子里r1和r2就真正是並行運算的。從這個案例中我的結論是盡量把Akka Actor使用在需要維護內部狀態的應用中。如果為了實現non-blocking只需要把程序分布到不同的線程里運行的話就應該直接用Future,這樣自然的多。但使用Future是完全無法維護內部狀態的。

好了,回到正題:從功能上Actor是由實例引用(ActorRef),消息郵箱(Mailbox),內部狀態(State),運算行為(Behavior),子類下屬(Child-Actor),監管策略(Supervision/Monitoring)幾部分組成。Actor的物理結構由ActorRef、Actor Instance(runtime實例)、Mailbox、dispatcher(運算器)組成。我們在本篇先介紹一下ActorRef,Mailbox,State和Behavior。

1、ActorRef:Akka系統是一個樹形層級式的結構,每個節點由一個Actor代表。每一個Actor在結構中都可以用一個路徑(ActorPath)來代表它在系統結構里的位置。我們可以重復用這個路徑來構建Actor,但每次構建都會產生新的ActorRef。所以ActorRef是唯一的,代表了某個路徑指向位置上的一個運行時的Actor實例,我們只能用ActorRef來向Actor發送消息

2、Mailbox:可以說成是一個運算指令隊列(command queque)。Actor從外部接收的消息都是先存放在Mailbox里的。系統默認Mailbox中無限數量的消息是按時間順序排列的,但用戶可以按照具體需要定制Mailbox,比如有限容量信箱、按消息優先排序信箱等。

3、Behavior:簡單來說就是對Mailbox里消息的反應方式。Mailbox中臨時存放了從外界傳來的指令,如何運算這些指令、產生什么結果都是由這些指令的運算函數來確定。所以這些函數的功能就代表着Actor的行為模式。Actor的運算行為可以通過become來替換默認的receive函數,用unbecome來恢復默認行為。

4、State:Actor內部狀態,由一組變量值表示。當前內部狀態即行為函數最后一次運算所產生的變量值

下面我們就用個例子來示范Actor:模擬一個吝嗇人的錢包,他總是會把付出放在最次要的位置。如此我們可以用消息優先排序信箱UnboundedPriorityMailbox來實現。按照Akka程序標准格式,我們先把每個Actor所需要處理的消息和Props構建放在它的伴生對象里:

  object Wallet { sealed trait WalletMsg case object ZipUp extends WalletMsg    //鎖錢包
    case object UnZip extends WalletMsg    //開錢包
    case class PutIn(amt: Double) extends WalletMsg   //存入
    case class DrawOut(amt: Double) extends WalletMsg //取出 
    case object CheckBalance extends WalletMsg  //查看余額
 def props = Props(new Wallet) }

下面是Actor wallet的定義,必須繼承Actor以及override receive函數:

    class Wallet extends Actor { import Wallet._ var balance: Double = 0
      var zipped: Boolean = true

      override def receive: Receive = { case ZipUp => zipped = true println("Zipping up wallet.") case UnZip => zipped = false println("Unzipping wallet.") case PutIn(amt) =>
          if (zipped) { self ! UnZip         //無論如何都要把錢存入
            self ! PutIn(amt) } else { balance += amt println(s"$amt put-in wallet.") } case DrawOut(amt) =>
          if (zipped)  //如果錢包沒有打開就算了
            println("Wallet zipped, Cannot draw out!") else
            if ((balance - amt) < 0) println(s"$amt is too much, not enough in wallet!") else { balance -= amt println(s"$amt drawn out of wallet.") } case CheckBalance => println(s"You have $balance in your wallet.") } }

我們可以看到這個Actor的內部狀態分別是:var balance, var zipped。下面是定制Mailbox定義:

  class PriorityMailbox(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox ( PriorityGenerator { case Wallet.ZipUp => 0        
      case Wallet.UnZip => 0
      case Wallet.PutIn(_) => 0
      case Wallet.DrawOut(_) => 2
      case Wallet.CheckBalance => 4
      case PoisonPill => 4
      case otherwise => 4 } )

PriorityMailbox需要繼承UnboundedPriorityMailbox並且提供對比函數PriorityGenerator。ZipUp,UnZip和PutIn都是最優先的。然后在application.conf登記dispatcher的配置:

prio-dispatcher { mailbox-type = "PriorityMailbox" }

下面的代碼可以用來試運行Actor wallet:

object Actor101 extends App { val system = ActorSystem("actor101-demo",ConfigFactory.load) val wallet = system.actorOf(Wallet.props.withDispatcher( "prio-dispatcher"),"mean-wallet") wallet ! Wallet.UnZip wallet ! Wallet.PutIn(10.50) wallet ! Wallet.PutIn(20.30) wallet ! Wallet.DrawOut(10.00) wallet ! Wallet.ZipUp wallet ! Wallet.PutIn(100.00) wallet ! Wallet.CheckBalance Thread.sleep(1000) system.terminate() }

由於需要解析application.conf里的配置,所以使用了ActorSystem(name, config)方式。構建Actor時用.withDispatcher把application.conf里的dispatcher配置prio-dispatcher傳入。

運算的結果如下:

Unzipping wallet. 10.5 put-in wallet. 20.3 put-in wallet. 100.0 put-in wallet. Zipping up wallet. Wallet zipped, Cannot draw out! You have 130.8 in your wallet. Process finished with exit code 0

下面是本次示范的完整代碼:

application.conf:

prio-dispatcher { mailbox-type = "PriorityMailbox" }

Actor101.scala:

import akka.actor._ import akka.dispatch.PriorityGenerator import akka.dispatch.UnboundedPriorityMailbox import com.typesafe.config._ object Wallet { sealed trait WalletMsg case object ZipUp extends WalletMsg    //鎖錢包
    case object UnZip extends WalletMsg    //開錢包
    case class PutIn(amt: Double) extends WalletMsg   //存入
    case class DrawOut(amt: Double) extends WalletMsg //取出
    case object CheckBalance extends WalletMsg  //查看余額
 def props = Props(new Wallet) } class PriorityMailbox(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox ( PriorityGenerator { case Wallet.ZipUp => 0
      case Wallet.UnZip => 0
      case Wallet.PutIn(_) => 0
      case Wallet.DrawOut(_) => 2
      case Wallet.CheckBalance => 4
      case PoisonPill => 4
      case otherwise => 4 } ) class Wallet extends Actor { import Wallet._ var balance: Double = 0
      var zipped: Boolean = true

      override def receive: Receive = { case ZipUp => zipped = true println("Zipping up wallet.") case UnZip => zipped = false println("Unzipping wallet.") case PutIn(amt) =>
          if (zipped) { self ! UnZip         //無論如何都要把錢存入
            self ! PutIn(amt) } else { balance += amt println(s"$amt put-in wallet.") } case DrawOut(amt) =>
          if (zipped)  //如果錢包沒有打開就算了
            println("Wallet zipped, Cannot draw out!") else
            if ((balance - amt) < 0) println(s"$amt is too much, not enough in wallet!") else { balance -= amt println(s"$amt drawn out of wallet.") } case CheckBalance => println(s"You have $balance in your wallet.") } } object Actor101 extends App { val system = ActorSystem("actor101-demo",ConfigFactory.load) val wallet = system.actorOf(Wallet.props.withDispatcher( "prio-dispatcher"),"mean-wallet") wallet ! Wallet.UnZip wallet ! Wallet.PutIn(10.50) wallet ! Wallet.PutIn(20.30) wallet ! Wallet.DrawOut(10.00) wallet ! Wallet.ZipUp wallet ! Wallet.PutIn(100.00) wallet ! Wallet.CheckBalance Thread.sleep(1000) system.terminate() }

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM