概述
正如其它RPC或者RMI框架那樣,Akka也提供了遠程調用的能力。服務端在監聽的端口上接收客戶端的調用。本文將在《Spring與Akka的集成》一文的基礎上介紹Akka的remote調用,本文很多代碼和例子來源於Akka官網的代碼示例,也包含了一些適用於Spring集成的改造,本文旨在介紹Akka的遠程調用的開發過程。
服務端開發
配置
Akka的默認配置文件為application.conf,如果不特別指明,Akka System都會默認加載此配置。如果你想自定義符合你習慣的名字,可以使用如下代碼:
final ActorSystem system = ActorSystem.create("YourSystem", ConfigFactory.load("yourconf"));
上述代碼中的yourconf不包含文件后綴名,在你的資源路徑下實際是yourconf.conf。
我不太想自定義加載的配置文件,而是繼續使用application.conf,這里先列出其配置:
include "common" akka { # LISTEN on tcp port 2552 remote.netty.tcp.port = 2552 }
這里的remote.netty.tcp.port配置屬性表示使用Netty框架在TCP層的監聽端口是2552。include與java里的import或者jsp頁面中的include標簽的作用類似,表示引用其他配置文件中的配置。由於common.conf中包含了Akka的一些公共配置,所以可以這樣引用,common.conf的配置如下:
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { netty.tcp { hostname = "127.0.0.1" } } }
common配置中的provider屬性表示Actor的引用提供者是akka.remote.RemoteActorRefProvider,即遠程ActorRef的提供者。這里的hostname屬性表示服務器的主機名。從common配置我們還可以看出Akka的配置有點類似於json,也是一種嵌套結構。此外,Akka還可以采用一種扁平的配置方式,例如:
akka.actor.provider = "..." akka.remote.netty.tcp.hostname = "127.0.0.1"
它們所代表的作用是一樣的。至於選擇扁平還是嵌套的,一方面依據你的個人習慣,一方面依據配置的多寡——隨着配置項的增多,你會發現嵌套會讓你的配置文件更加清晰。
服務端
由於官網的例子比較簡潔並能說明問題,所以本文對Akka官網的例子進行了一些改造來介紹服務端與客戶端之間的遠程調用。服務端的配置已在上一小節列出,本小節着重介紹服務端的實現。
我們的服務端是一個簡單的提供基本的加、減、乘、除的服務的CalculatorActor,這些運算都直接封裝在CalculatorActor的實現中(在實際的業務場景中,Actor應該只接收、回復及調用具體的業務接口,這里的加減乘除運算應當由指定的Service接口實現,特別是在J2EE或者與Spring集成后),CalculatorActor的實現見代碼清單1。
代碼清單1
@Named("CalculatorActor") @Scope("prototype") public class CalculatorActor extends UntypedActor { private static Logger logger = LoggerFactory.getLogger(CalculatorActor.class); @Override public void onReceive(Object message) { if (message instanceof Op.Add) { Op.Add add = (Op.Add) message; logger.info("Calculating " + add.getN1() + " + " + add.getN2()); Op.AddResult result = new Op.AddResult(add.getN1(), add.getN2(), add.getN1() + add.getN2()); getSender().tell(result, getSelf()); } else if (message instanceof Op.Subtract) { Op.Subtract subtract = (Op.Subtract) message; logger.info("Calculating " + subtract.getN1() + " - " + subtract.getN2()); Op.SubtractResult result = new Op.SubtractResult(subtract.getN1(), subtract.getN2(), subtract.getN1() - subtract.getN2()); getSender().tell(result, getSelf()); } else if (message instanceof Op.Multiply) { Op.Multiply multiply = (Op.Multiply) message; logger.info("Calculating " + multiply.getN1() + " * " + multiply.getN2()); Op.MultiplicationResult result = new Op.MultiplicationResult(multiply.getN1(), multiply.getN2(), multiply.getN1() * multiply.getN2()); getSender().tell(result, getSelf()); } else if (message instanceof Op.Divide) { Op.Divide divide = (Op.Divide) message; logger.info("Calculating " + divide.getN1() + " / " + divide.getN2()); Op.DivisionResult result = new Op.DivisionResult(divide.getN1(), divide.getN2(), divide.getN1() / divide.getN2()); getSender().tell(result, getSelf()); } else { unhandled(message); } } }
Add、Subtract、Multiply、Divide都繼承自MathOp,這里只列出MathOp和Add的實現,見代碼清單2所示。
代碼清單2
public interface MathOp extends Serializable { } public static class Add implements MathOp { private static final long serialVersionUID = 1L; private final int n1; private final int n2; public Add(int n1, int n2) { this.n1 = n1; this.n2 = n2; } public int getN1() { return n1; } public int getN2() { return n2; } }
服務端應當啟動CalculatorActor實例,以提供服務,代碼如下:
logger.info("Start calculator"); final ActorRef calculator = actorSystem.actorOf(springExt.props("CalculatorActor"), "calculator"); actorMap.put("calculator", calculator); logger.info("Started calculator");
客戶端
客戶端調用遠程CalculatorActor提供的服務后,還要接收其回復信息,因此也需要監聽端口。客戶端和服務端如果在同一台機器節點上,那么客戶端的監聽端口不能與服務端沖突,我給出的配置示例如下:
include "common" akka { remote.netty.tcp.port = 2553 }
客戶端通過遠程Actor的路徑獲得ActorSelection,然后向遠程的Akka System獲取遠程CalculatorActor的ActorRef,進而通過此引用使用遠端CalculatorActor提供的服務。在詳細的說明實現細節之前,先來看看LookupActor的實現,見代碼清單3所示。
代碼清單3
@Named("LookupActor") @Scope("prototype") public class LookupActor extends UntypedActor { private static Logger logger = LoggerFactory.getLogger(LookupActor.class); private final String path; private ActorRef calculator = null; public LookupActor(String path) { this.path = path; sendIdentifyRequest(); } private void sendIdentifyRequest() { getContext().actorSelection(path).tell(new Identify(path), getSelf()); getContext().system().scheduler().scheduleOnce(Duration.create(3, SECONDS), getSelf(), ReceiveTimeout.getInstance(), getContext().dispatcher(), getSelf()); } @Override public void onReceive(Object message) throws Exception { if (message instanceof ActorIdentity) { calculator = ((ActorIdentity) message).getRef(); if (calculator == null) { logger.info("Remote actor not available: " + path); } else { getContext().watch(calculator); getContext().become(active, true); } } else if (message instanceof ReceiveTimeout) { sendIdentifyRequest(); } else { logger.info("Not ready yet"); } } Procedure<Object> active = new Procedure<Object>() { @Override public void apply(Object message) { if (message instanceof Op.MathOp) { // send message to server actor calculator.tell(message, getSelf()); } else if (message instanceof Op.AddResult) { Op.AddResult result = (Op.AddResult) message; logger.info(String.format("Add result: %d + %d = %d\n", result.getN1(), result.getN2(), result.getResult())); ActorRef sender = getSender(); logger.info("Sender is: " + sender); } else if (message instanceof Op.SubtractResult) { Op.SubtractResult result = (Op.SubtractResult) message; logger.info(String.format("Sub result: %d - %d = %d\n", result.getN1(), result.getN2(), result.getResult())); ActorRef sender = getSender(); logger.info("Sender is: " + sender); } else if (message instanceof Terminated) { logger.info("Calculator terminated"); sendIdentifyRequest(); getContext().unbecome(); } else if (message instanceof ReceiveTimeout) { // ignore } else { unhandled(message); } } }; }
LookupActor的構造器需要傳遞遠端CalculatorActor的路徑,並且調用了sendIdentifyRequest方法,sendIdentifyRequest的作用有兩個:
- 通過向ActorSelection向遠端的Akka System發送Identify消息,並獲取遠程CalculatorActor的ActorRef;
- 啟動定時調度,3秒后向CalculatorActor的執行上下文發送ReceiveTimeout消息,而LookupActor處理ReceiveTimeout消息時,再次調用了sendIdentifyRequest方法。
- 如果收到MathOp的消息,說明是加減乘除的消息,則將消息進一步告知遠端的CalculatorActor並由其進行處理;
- 如果收到AddResult或者SubtractResult,這說明CalculatorActor已經處理完了加或者減的處理,並回復了處理結果,因此對計算結果進行使用(本例只是簡單的打印);
- 如果收到了Terminated消息,說明遠端的CalculatorActor停止或者重啟了,因此需要重新調用sendIdentifyRequest獲取最新的CalculatorActor的ActorRef。最后還需要取消active,恢復為默認接收消息的狀態;
logger.info("start lookup"); final String path = "akka.tcp://metadataAkkaSystem@127.0.0.1:2552/user/calculator"; final ActorRef lookup = actorSystem.actorOf(springExt.props("LookupActor", path), "lookup"); final Random r = new Random(); actorSystem.scheduler().schedule(Duration.create(1, SECONDS), Duration.create(1, SECONDS), new Runnable() { @Override public void run() { if (r.nextInt(100) % 2 == 0) { lookup.tell(new Op.Add(r.nextInt(100), r.nextInt(100)), null); } else { lookup.tell(new Op.Subtract(r.nextInt(100), r.nextInt(100)), null); } } }, actorSystem.dispatcher());
Actor遠端調用模型

運行結果
我的客戶端和服務端都運行於本地,客戶端tcp監聽端口是2553,服務端監聽端口是2552,由於本例子的代碼較為健壯,所以客戶端、服務端可以以任意順序啟動。客戶端運行后的日志如下圖所示:
服務端的運行日志如下圖所示:
總結
Akka的遠端調用是大家在使用時最常用的特性之一,掌握起來不是什么難事,如何實現處理多種消息,並考慮其穩定性、健壯性是需要詳細考慮的。
