Akka和μJavaActorsμJavaActors均是java的Actor庫,其中Akka提供了叫為完整的Actor開發框架,較為龐大,學習成本很高,μJavaActors 是一個輕量級Actor庫,大約僅有1200行代碼,比較適合入門。
一.Akka Demo
Akka是一個相當成熟、強大的庫,github上download下的是Akka的源碼,應該使用sbt構建的工程,如果沒有使用sbt經驗,想導出jar還挺不容易的,推薦Akka官網下載akka各個組件的jar去使用,簡單介紹一下helloworld 級別Akka的demo。
1.Akka的主要組件
akka-actor.jar : Actor核心組件了,定義了Acotr核心類
akka-slf4f.jar : SLF4F Logger的支持,一個打log的組件,不用太關注
akka-remote.jar : Actor做遠程調用的jar,類似RFC吧
akka-cluster : actor做集群管理組件
akka-camel.jar : 對Apache Camel 集成接口
scala-library-2.11.8.jar : akka核心應該是Scala寫的,這個組件就是對akka的核心支持
Akka還有很多組件,不過對於hello world級的程序簡單了解幾個就ok了。工程是基於eclipse的,需要包含下面幾個基礎的組件:
編寫兩個Actor:
package demo02; import akka.actor.UntypedActor; /* * UntypedAcotr是無類型Actor的一個抽象類,繼承與核心類Actor */ public class Greeter extends UntypedActor { public static enum Msg{ GREET , DONE; } /** * 每個Actor必須實現OnReceive,當該Actor收到消息調用該方法 */ @Override public void onReceive(Object msg) throws Throwable { if(msg == Msg.GREET){ System.out.println("Hello world"); /** * 這里吐槽一下Akka對於發消息的設計,發送消息的設計竟然是: * receiver.tell(msg , sender) * 也許沒理解akka設計的理念,但是正常人設計不應該是: * sender.tell(msg , receiver) * 汗…… */ getSender().tell(Msg.DONE, getSelf()); }else{ unhandled(msg); } } }
package demo02; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; public class HelloWorld extends UntypedActor { @Override public void preStart(){ final ActorRef greeter = getContext().actorOf(Props.create(Greeter.class)); greeter.tell(Greeter.Msg.GREET, getSelf()); } @Override public void onReceive(Object msg) throws Throwable { if(msg == Greeter.Msg.DONE){ getContext().stop(getSelf()); }else{ unhandled(msg); } } }
下面是Main方法:
package demo02; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.Terminated; import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; public class Main { public static void main(String[] args) { //ActorSystem 相當於ActorManager,管理各種Acor調度、線程管理等 ActorSystem system = ActorSystem.create("hello"); //創建一個HelloWorld 類型的Actor,在Actor啟動前,會調preStart(),此時會想Greeter發消息 ActorRef actor = system.actorOf(Props.create(HelloWorld.class)); //添加結束終結Actor,當ActorSystem調Stop時,會向每個Actor發送Terminated消息 system.actorOf(Props.create(Terminator.class, actor), "terminator"); } public static class Terminator extends UntypedActor{ private final LoggingAdapter log = Logging.getLogger(getContext().system(),this); private ActorRef actorRef = null; public Terminator(ActorRef ref){ System.out.println("Terminator Init !!!"); actorRef = ref; getContext().watch(actorRef); } @Override public void onReceive(Object msg) throws Throwable { if (msg instanceof Terminated) { log.info("{} has terminated, shutting down system", actorRef.path()); getContext().system().terminate(); } else { unhandled(msg); } } } }
上面代碼在akka的源碼中sample都可以找到的,從上面看Akka對消息的識別是根據類型處理的,在我這種菜鳥看來,並不是很合適,當我消息類型較多時,消息類豈不是要爆炸,當然也可以做分級Actor,再加一層轉發層解決這個問題哈
二.μJavaActors
μJavaActors 是一個十分輕量級的Actor庫,實現核心的Actor調度,不涉及復雜的框架,簡單分析一下它的源碼吧
1.Actor核心接口
Actor:定義了一個標准的Actor應該具有行為
ActorManager:Actor管理器接口,提供線程管理,Actor調度等
Messager : Actor相互間傳遞傳遞的消息接口,當然附帶的接口還有MessageEvent和MessageListener
簡單引用作者對這個概念的描述:
Actor 是一個執行單元,一次處理一條消息。Actor 具有以下關鍵行為或特征:
- 每個 actor 有一個
name,該名稱在每個ActorManager中必須是惟一的。 - 每個 actor 屬於一個
category;類別是一種向一組 actor 中的一個成員發送消息的方式。一個 actor 一次只能屬於一個類別。 - 只要
ActorManager可以提供一個執行 actor 的線程,系統就會調用receive()。為了保持最高效率,actor 應該迅速處理消息,而不要進入漫長的等待狀態(比如等待人為輸入)。 willReceive()允許 actor 過濾潛在的消息主題。peek()允許該 actor 和其他 actor 查看是否存在掛起的消息(或許是為了選擇主題)。remove()允許該 actor 和其他 actor 刪除或取消任何尚未處理的消息。getMessageCount()允許該 actor 和其他 actor 獲取掛起的消息數量。getMaxMessageCount()允許 actor 限制支持的掛起消息數量;此方法可用於預防不受控制地發送。
大部分程序都有許多 actor,這些 actor 常常具有不同的類型。actor 可在程序啟動時創建或在程序執行時創建(和銷毀)。本文中的 actor 包 包含一個名為 AbstractActor 的抽象類,actor 實現基於該類。
ActorManager 是一個 actor 管理器。它負責向 actor 分配線程(進而分配處理器)來處理消息。ActorManager 擁有以下關鍵行為或特征:
createActor()創建一個 actor 並將它與此管理器相關聯。startActor()啟動一個 actor。detachActor()停止一個 actor 並將它與此管理器斷開。send()/broadcast()將一條消息發送給一個 actor、一組 actor、一個類別中的任何 actor 或所有 actor。
在大部分程序中,只有一個 ActorManager,但如果您希望管理多個線程和/或 actor 池,也可以有多個 ActorManager。此接口的默認實現是 DefaultActorManager。
消息 是在 actor 之間發送的消息。Message 是 3 個(可選的)值和一些行為的容器:
source是發送 actor。subject是定義消息含義的字符串(也稱為命令)。data是消息的任何參數數據;通常是一個映射、列表或數組。參數可以是要處理和/或其他 actor 要與之交互的數據。subjectMatches()檢查消息主題是否與字符串或正則表達式匹配。
μJavaActors 包的默認消息類是 DefaultMessage。
ActorManager其實只要簡單瀏覽一下μJavaActors源碼就可以理解Actor設計思路啦,主要分析一下ActorManager中的Actor調度源碼:
public class ActorRunnable implements Runnable { public boolean hasThread; public AbstractActor actor; public void run() { // logger.trace("procesNextActor starting"); int delay = 1; while (running) { try { if (!procesNextActor()) { // logger.trace("procesNextActor waiting on actor"); // sleep(delay * 1000); synchronized (actors) { // TOOD: adjust this delay; possible parameter // we want to minizmize overhead (make bigger); // but it has a big impact on message processing // rate (makesmaller) // actors.wait(delay * 1000); actors.wait(100); } delay = Math.max(5, delay + 1); } else { delay = 1; } } catch (InterruptedException e) { } catch (Exception e) { logger.error("procesNextActor exception", e); } } // logger.trace("procesNextActor ended"); } protected boolean procesNextActor() { boolean run = false, wait = false, res = false; actor = null; synchronized (actors) { for (String key : runnables.keySet()) { actor = runnables.remove(key); break; } } if (actor != null) { // first run never started run = true; actor.setHasThread(true); hasThread = true; try { actor.run(); } finally { actor.setHasThread(false); hasThread = false; } } else { synchronized (actors) { for (String key : waiters.keySet()) { actor = waiters.remove(key); break; } } if (actor != null) { // then waiting for responses wait = true; actor.setHasThread(true); hasThread = true; try { res = actor.receive(); if (res) { incDispatchCount(); } } finally { actor.setHasThread(false); hasThread = false; } } } // if (!(!run && wait && !res) && a != null) { // logger.trace("procesNextActor %b/%b/%b: %s", run, wait, res, a); // } return run || res; } }

![4YQ$GDJYW$F]Q~]K`XCL@YU 4YQ$GDJYW$F]Q~]K`XCL@YU](/image/aHR0cHM6Ly9pbWFnZXMyMDE1LmNuYmxvZ3MuY29tL2Jsb2cvNzI3NTcxLzIwMTYwOS83Mjc1NzEtMjAxNjA5MDIxNTU4MDIxNjgtMTgwNDg0MDU4Ny5wbmc=.png)