Akka是由各種角色和功能的Actor組成的,工作的主要原理是把一項大的計算任務分割成小環節,再按各環節的要求構建相應功能的Actor,然后把各環節的運算托付給相應的Actor去獨立完成。Akka是個工具庫(Tools-Library),不是一個軟件架構(Software-Framework),我們不需要按照Akka的框架格式去編寫程序,而是直接按需要構建Actor去異步運算一項完整的功能,這樣讓用戶在不知不覺中自然的實現了多線程並發軟件編程(concurrent programming)。按這樣的描述,Actor就是一種靠消息驅動(Message-driven)的運算器,我們可以直接調用它來運算一段程序。消息驅動模式的好處是可以實現高度的松散耦合(loosely-coupling),因為系統部件之間不用軟件接口,而是通過消息來進行系統集成的。消息驅動模式支持了每個Actor的獨立運算環境,又可以在運行時按需要靈活的對系統Actor進行增減,伸縮自如,甚至可以在運行時(runtime)對系統部署進行調配。Akka的這些鮮明的特點都是通過消息驅動來實現的。
曾經看到一個關於Actor模式的觀點:認為Actor並不適合並發(concurrency)編程,更應該是維護內部狀態的運算工具。聽起來好像很無知,畢竟Actor模式本身就是並發模式,如果不適合並發編程,豈不與Akka的發明意願相左。再仔細研究了一下這個觀點的論述后就完全認同了這種看法。在這里我們分析一下這種論述,先看看下面這段Actor用法偽代碼:
class QueryActor extends Actor { override def receive: Receive = { case GetResult(query) => val x = db.RunQuery(query) val y = getValue(x) sender() ! computeResult(x,y) } } val result: Future[Any] = QueryActor ? GetResult(...)
這段代碼中QueryActor沒有任何內部狀態。通過Future傳遞計算結果能實現不阻塞(non-blocking)運算。下面我們用QueryActor來實現並發運算:
val r1 = QueryActor ! request1 val r2 = QueryActor ! request2 for { x <- r1 y <- r2 } yield combineValues(x,y)
乍眼看r1和r2貌似能實現並行運算,但不要忘記Actor運算環境是單線程的,而Actor信箱又是按序的(Ordered),所以這兩個運算只能按順序運行,最多也就是能在另一個線程里異步進行而已,r1運算始終會阻塞r2的運行。如此還不如直接使用Future,能更好的實現並發程序的並行運算。同樣的要求如果用Future來實現的話可以用下面的偽代碼:
def fuQuery(query: DBQuery): Future[FResult] = Future { val x = db.RunQuery(query) val y = getValue(x) computeResults(x,y) } val r1 = fuQuery(query1) val r2 = fuQuery(query2) for { x <- r1 y <- r2 } yield combineValues(x,y)
在這個例子里r1和r2就真正是並行運算的。從這個案例中我的結論是盡量把Akka Actor使用在需要維護內部狀態的應用中。如果為了實現non-blocking只需要把程序分布到不同的線程里運行的話就應該直接用Future,這樣自然的多。但使用Future是完全無法維護內部狀態的。
好了,回到正題:從功能上Actor是由實例引用(ActorRef),消息郵箱(Mailbox),內部狀態(State),運算行為(Behavior),子類下屬(Child-Actor),監管策略(Supervision/Monitoring)幾部分組成。Actor的物理結構由ActorRef、Actor Instance(runtime實例)、Mailbox、dispatcher(運算器)組成。我們在本篇先介紹一下ActorRef,Mailbox,State和Behavior。
1、ActorRef:Akka系統是一個樹形層級式的結構,每個節點由一個Actor代表。每一個Actor在結構中都可以用一個路徑(ActorPath)來代表它在系統結構里的位置。我們可以重復用這個路徑來構建Actor,但每次構建都會產生新的ActorRef。所以ActorRef是唯一的,代表了某個路徑指向位置上的一個運行時的Actor實例,我們只能用ActorRef來向Actor發送消息
2、Mailbox:可以說成是一個運算指令隊列(command queque)。Actor從外部接收的消息都是先存放在Mailbox里的。系統默認Mailbox中無限數量的消息是按時間順序排列的,但用戶可以按照具體需要定制Mailbox,比如有限容量信箱、按消息優先排序信箱等。
3、Behavior:簡單來說就是對Mailbox里消息的反應方式。Mailbox中臨時存放了從外界傳來的指令,如何運算這些指令、產生什么結果都是由這些指令的運算函數來確定。所以這些函數的功能就代表着Actor的行為模式。Actor的運算行為可以通過become來替換默認的receive函數,用unbecome來恢復默認行為。
4、State:Actor內部狀態,由一組變量值表示。當前內部狀態即行為函數最后一次運算所產生的變量值
下面我們就用個例子來示范Actor:模擬一個吝嗇人的錢包,他總是會把付出放在最次要的位置。如此我們可以用消息優先排序信箱UnboundedPriorityMailbox來實現。按照Akka程序標准格式,我們先把每個Actor所需要處理的消息和Props構建放在它的伴生對象里:
object Wallet { sealed trait WalletMsg case object ZipUp extends WalletMsg //鎖錢包
case object UnZip extends WalletMsg //開錢包
case class PutIn(amt: Double) extends WalletMsg //存入
case class DrawOut(amt: Double) extends WalletMsg //取出
case object CheckBalance extends WalletMsg //查看余額
def props = Props(new Wallet) }
下面是Actor wallet的定義,必須繼承Actor以及override receive函數:
class Wallet extends Actor { import Wallet._ var balance: Double = 0
var zipped: Boolean = true
override def receive: Receive = { case ZipUp => zipped = true println("Zipping up wallet.") case UnZip => zipped = false println("Unzipping wallet.") case PutIn(amt) =>
if (zipped) { self ! UnZip //無論如何都要把錢存入
self ! PutIn(amt) } else { balance += amt println(s"$amt put-in wallet.") } case DrawOut(amt) =>
if (zipped) //如果錢包沒有打開就算了
println("Wallet zipped, Cannot draw out!") else
if ((balance - amt) < 0) println(s"$amt is too much, not enough in wallet!") else { balance -= amt println(s"$amt drawn out of wallet.") } case CheckBalance => println(s"You have $balance in your wallet.") } }
我們可以看到這個Actor的內部狀態分別是:var balance, var zipped。下面是定制Mailbox定義:
class PriorityMailbox(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox ( PriorityGenerator { case Wallet.ZipUp => 0
case Wallet.UnZip => 0
case Wallet.PutIn(_) => 0
case Wallet.DrawOut(_) => 2
case Wallet.CheckBalance => 4
case PoisonPill => 4
case otherwise => 4 } )
PriorityMailbox需要繼承UnboundedPriorityMailbox並且提供對比函數PriorityGenerator。ZipUp,UnZip和PutIn都是最優先的。然后在application.conf登記dispatcher的配置:
prio-dispatcher { mailbox-type = "PriorityMailbox" }
下面的代碼可以用來試運行Actor wallet:
object Actor101 extends App { val system = ActorSystem("actor101-demo",ConfigFactory.load) val wallet = system.actorOf(Wallet.props.withDispatcher( "prio-dispatcher"),"mean-wallet") wallet ! Wallet.UnZip wallet ! Wallet.PutIn(10.50) wallet ! Wallet.PutIn(20.30) wallet ! Wallet.DrawOut(10.00) wallet ! Wallet.ZipUp wallet ! Wallet.PutIn(100.00) wallet ! Wallet.CheckBalance Thread.sleep(1000) system.terminate() }
由於需要解析application.conf里的配置,所以使用了ActorSystem(name, config)方式。構建Actor時用.withDispatcher把application.conf里的dispatcher配置prio-dispatcher傳入。
運算的結果如下:
Unzipping wallet. 10.5 put-in wallet. 20.3 put-in wallet. 100.0 put-in wallet. Zipping up wallet. Wallet zipped, Cannot draw out! You have 130.8 in your wallet. Process finished with exit code 0
下面是本次示范的完整代碼:
application.conf:
prio-dispatcher { mailbox-type = "PriorityMailbox" }
Actor101.scala:
import akka.actor._ import akka.dispatch.PriorityGenerator import akka.dispatch.UnboundedPriorityMailbox import com.typesafe.config._ object Wallet { sealed trait WalletMsg case object ZipUp extends WalletMsg //鎖錢包
case object UnZip extends WalletMsg //開錢包
case class PutIn(amt: Double) extends WalletMsg //存入
case class DrawOut(amt: Double) extends WalletMsg //取出
case object CheckBalance extends WalletMsg //查看余額
def props = Props(new Wallet) } class PriorityMailbox(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox ( PriorityGenerator { case Wallet.ZipUp => 0
case Wallet.UnZip => 0
case Wallet.PutIn(_) => 0
case Wallet.DrawOut(_) => 2
case Wallet.CheckBalance => 4
case PoisonPill => 4
case otherwise => 4 } ) class Wallet extends Actor { import Wallet._ var balance: Double = 0
var zipped: Boolean = true
override def receive: Receive = { case ZipUp => zipped = true println("Zipping up wallet.") case UnZip => zipped = false println("Unzipping wallet.") case PutIn(amt) =>
if (zipped) { self ! UnZip //無論如何都要把錢存入
self ! PutIn(amt) } else { balance += amt println(s"$amt put-in wallet.") } case DrawOut(amt) =>
if (zipped) //如果錢包沒有打開就算了
println("Wallet zipped, Cannot draw out!") else
if ((balance - amt) < 0) println(s"$amt is too much, not enough in wallet!") else { balance -= amt println(s"$amt drawn out of wallet.") } case CheckBalance => println(s"You have $balance in your wallet.") } } object Actor101 extends App { val system = ActorSystem("actor101-demo",ConfigFactory.load) val wallet = system.actorOf(Wallet.props.withDispatcher( "prio-dispatcher"),"mean-wallet") wallet ! Wallet.UnZip wallet ! Wallet.PutIn(10.50) wallet ! Wallet.PutIn(20.30) wallet ! Wallet.DrawOut(10.00) wallet ! Wallet.ZipUp wallet ! Wallet.PutIn(100.00) wallet ! Wallet.CheckBalance Thread.sleep(1000) system.terminate() }