1.mailbox
Akka的每個actor默認有一個mailbox,按照FIFO順序單線程處理。在拋出異常導致父actor根據設置的監管策略執行重啟或恢復操作時,會從觸發異常的消息的后續消息開始處理,郵箱並不會被清空。如果你想重新處理那個觸發異常的消息,可以通過重寫preRestart方法來訪問該消息,java 中的preRestart參數為(Throwable reason, Option<Object> message),message.get()可以獲得該消息(因為是從Option對象中get,所以可能為空),可以將該消息再次發給自己或做其它處理。
默認郵箱的大小沒有限制,也就是內存的上限。可以設置bounded郵箱來限定大小,還可以設置郵箱以文件形式持久存儲。
2.監管策略設置
1)在actor類中重寫supervisorStrategy()
2)創建父actor時在Props參數中使用FromConfig.getInstance().withSupervisorStrategy(strategy).props(XXX)
可以使用下面的類來方便設置:

import akka.actor.AllForOneStrategy; import akka.actor.OneForOneStrategy; import akka.actor.SupervisorStrategy; import akka.japi.Function; import scala.concurrent.duration.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import static akka.actor.SupervisorStrategy.escalate; /** * Created by fyk on 16-4-2. */ public class StrategySetter { private Map<Class<? extends Throwable>, SupervisorStrategy.Directive> map; private boolean oneForOne; private int maxNrOfRetries=5; private Duration withinTimeRange=Duration.create(1, TimeUnit.MINUTES);//Duration.create("1 minute") public StrategySetter(boolean oneForOne) { this.oneForOne=oneForOne; map=new HashMap<Class<? extends Throwable>, SupervisorStrategy.Directive>(); } public void setOptParam(int maxNrOfRetries,Duration withinTimeRange){ this.maxNrOfRetries=maxNrOfRetries; this.withinTimeRange=withinTimeRange; } public void put(Class<? extends Throwable> t, SupervisorStrategy.Directive action){ map.put(t,action); } /** * 設定監管策略並返回 * cls.isInstance(yourObject) * instead of using the instanceof operator, which can only be used if you know the class at compile time. */ public SupervisorStrategy getSupervisorStrategy(){ SupervisorStrategy strategy=null; if(oneForOne){ strategy=new OneForOneStrategy(maxNrOfRetries, withinTimeRange, new Function<Throwable, SupervisorStrategy.Directive>() { @Override public SupervisorStrategy.Directive apply(Throwable t) { for(Class c:map.keySet()){ if(c.isInstance(t)) return map.get(c); } return escalate();//提交給上一級監管 } }); }else{ strategy=new AllForOneStrategy(maxNrOfRetries, withinTimeRange, new Function<Throwable, SupervisorStrategy.Directive>() { @Override public SupervisorStrategy.Directive apply(Throwable t) { for(Class c:map.keySet()){ if(c.isInstance(t)) return map.get(c); } return escalate();//提交給上一級監管 } }); } return strategy; } }
注意在進行某一個actor的重啟時會調用postStop、構造函數與preStart、preRestart等,在重寫父類的方法時記得在第一句調用父類的方法(會對子actor進行一些操作)。如果你在actor中創建了子actor,重啟時也會重啟子actor,如果在重寫preStart中沒有調用父類的preStart會導致子actor重復創建,由於akka不能創建同名的actor,會拋出name not unique的異常信息。
3.actor Monitor
監管策略中指定了時間區間內重啟或恢復等操作的上限,達到指定出錯頻率后actor被停止,以后再也不運轉了。
也許你想要監視actor的生命狀態,當它發現有actor停止時進行一些操作,如發郵件通知你,或簡單粗暴的重新創建運行。
生命周期monitor:
/** * Created on 16-4-10. * 此生命周期monitor與設置了監管策略的supervisor都可以對遠程actor進行監督 */ public class MonitorActor extends UntypedActor { Logger log = LoggerFactory.getLogger(MonitorActor.class); Map<ActorRef, ActorRef> monitoredActors =//<worker,supervisor> new HashMap<ActorRef, ActorRef>(); @Override public void onReceive(Object message) throws Exception { if (message instanceof Terminated) { final Terminated t = (Terminated) message; if (monitoredActors.containsKey(t.getActor())) { ActorPath path=t.getActor().path(); log.info("Received Worker Actor Termination Message ->{}", path); log.info("Sending message to Supervisor"); monitoredActors.get(t.getActor()).tell( new DeadWorker(path),self()); } } else if (message instanceof RegisterWorker) { RegisterWorker msg = (RegisterWorker) message; //下面這句是關鍵的注冊語句,當被觀察的actor結束時,本actor會收到akka.actor.Terminated消息 getContext().watch(msg.getWorker()); monitoredActors.put(msg.getWorker(), msg.getSupervisor()); } else { unhandled(message); } } }
使用:在worker actor初始化時向monitor發送RegisterWorker消息,包含對actor本身以及supervisor(一般為父actor)的引用。
4.默認管理策略
- ActorInitializationException will stop the failing child actor
- ActorKilledException will stop the failing child actor
- Exception will restart the failing child actor
- Other types of Throwable will be escalated to parent actor
如果想將自己的部分管理策略與默認管理策略結合,可以在一些異常處理中使用super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate)
注意,當actor被重啟時並不會發送Terminated消息給monitor,而context.stop(actor)才會
5.Akka-remote注意事項
Akka在接收到消息時根據郵箱地址也就是actor的地址來遞送消息,本地的地址形式如/user/parent/childActor,遠程的akka.tcp://RemoteSystem@host:port/user/xxx。尋址按照字符串匹配的形式,所以hostname與ip地址不能亂寫,即使是指向相同的ip地址。
在向遠程actor主動發送消息時需要使用ActorSelection的tell方法。而在接收到遠程actor發來的消息后回復時使用sender.tell不一定能夠發送成功,並且也沒有發送到DeadLetter。最好使用ActorSelection(sender.path())來發送。
x.continue...