概述
正如其它RPC或者RMI框架那樣,Akka也提供了遠程調用的能力。服務端在監聽的端口上接收客戶端的調用。本文將在《Spring與Akka的集成》一文的基礎上介紹Akka的remote調用,本文很多代碼和例子來源於Akka官網的代碼示例,也包含了一些適用於Spring集成的改造,本文旨在介紹Akka的遠程調用的開發過程。
服務端開發
配置
Akka的默認配置文件為application.conf,如果不特別指明,Akka System都會默認加載此配置。如果你想自定義符合你習慣的名字,可以使用如下代碼:
final ActorSystem system = ActorSystem.create("YourSystem", ConfigFactory.load("yourconf"));
上述代碼中的yourconf不包含文件后綴名,在你的資源路徑下實際是yourconf.conf。
我不太想自定義加載的配置文件,而是繼續使用application.conf,這里先列出其配置:
include "common"
akka {
# LISTEN on tcp port 2552
remote.netty.tcp.port = 2552
}
這里的remote.netty.tcp.port配置屬性表示使用Netty框架在TCP層的監聽端口是2552。include與java里的import或者jsp頁面中的include標簽的作用類似,表示引用其他配置文件中的配置。由於common.conf中包含了Akka的一些公共配置,所以可以這樣引用,common.conf的配置如下:
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
netty.tcp {
hostname = "127.0.0.1"
}
}
}
common配置中的provider屬性表示Actor的引用提供者是akka.remote.RemoteActorRefProvider,即遠程ActorRef的提供者。這里的hostname屬性表示服務器的主機名。從common配置我們還可以看出Akka的配置有點類似於json,也是一種嵌套結構。此外,Akka還可以采用一種扁平的配置方式,例如:
akka.actor.provider = "..." akka.remote.netty.tcp.hostname = "127.0.0.1"
它們所代表的作用是一樣的。至於選擇扁平還是嵌套的,一方面依據你的個人習慣,一方面依據配置的多寡——隨着配置項的增多,你會發現嵌套會讓你的配置文件更加清晰。
服務端
由於官網的例子比較簡潔並能說明問題,所以本文對Akka官網的例子進行了一些改造來介紹服務端與客戶端之間的遠程調用。服務端的配置已在上一小節列出,本小節着重介紹服務端的實現。
我們的服務端是一個簡單的提供基本的加、減、乘、除的服務的CalculatorActor,這些運算都直接封裝在CalculatorActor的實現中(在實際的業務場景中,Actor應該只接收、回復及調用具體的業務接口,這里的加減乘除運算應當由指定的Service接口實現,特別是在J2EE或者與Spring集成后),CalculatorActor的實現見代碼清單1。
代碼清單1
@Named("CalculatorActor")
@Scope("prototype")
public class CalculatorActor extends UntypedActor {
private static Logger logger = LoggerFactory.getLogger(CalculatorActor.class);
@Override
public void onReceive(Object message) {
if (message instanceof Op.Add) {
Op.Add add = (Op.Add) message;
logger.info("Calculating " + add.getN1() + " + " + add.getN2());
Op.AddResult result = new Op.AddResult(add.getN1(), add.getN2(), add.getN1() + add.getN2());
getSender().tell(result, getSelf());
} else if (message instanceof Op.Subtract) {
Op.Subtract subtract = (Op.Subtract) message;
logger.info("Calculating " + subtract.getN1() + " - " + subtract.getN2());
Op.SubtractResult result = new Op.SubtractResult(subtract.getN1(), subtract.getN2(),
subtract.getN1() - subtract.getN2());
getSender().tell(result, getSelf());
} else if (message instanceof Op.Multiply) {
Op.Multiply multiply = (Op.Multiply) message;
logger.info("Calculating " + multiply.getN1() + " * " + multiply.getN2());
Op.MultiplicationResult result = new Op.MultiplicationResult(multiply.getN1(), multiply.getN2(),
multiply.getN1() * multiply.getN2());
getSender().tell(result, getSelf());
} else if (message instanceof Op.Divide) {
Op.Divide divide = (Op.Divide) message;
logger.info("Calculating " + divide.getN1() + " / " + divide.getN2());
Op.DivisionResult result = new Op.DivisionResult(divide.getN1(), divide.getN2(),
divide.getN1() / divide.getN2());
getSender().tell(result, getSelf());
} else {
unhandled(message);
}
}
}
Add、Subtract、Multiply、Divide都繼承自MathOp,這里只列出MathOp和Add的實現,見代碼清單2所示。
代碼清單2
public interface MathOp extends Serializable {
}
public static class Add implements MathOp {
private static final long serialVersionUID = 1L;
private final int n1;
private final int n2;
public Add(int n1, int n2) {
this.n1 = n1;
this.n2 = n2;
}
public int getN1() {
return n1;
}
public int getN2() {
return n2;
}
}
服務端應當啟動CalculatorActor實例,以提供服務,代碼如下:
logger.info("Start calculator");
final ActorRef calculator = actorSystem.actorOf(springExt.props("CalculatorActor"), "calculator");
actorMap.put("calculator", calculator);
logger.info("Started calculator");
客戶端
客戶端調用遠程CalculatorActor提供的服務后,還要接收其回復信息,因此也需要監聽端口。客戶端和服務端如果在同一台機器節點上,那么客戶端的監聽端口不能與服務端沖突,我給出的配置示例如下:
include "common"
akka {
remote.netty.tcp.port = 2553
}
客戶端通過遠程Actor的路徑獲得ActorSelection,然后向遠程的Akka System獲取遠程CalculatorActor的ActorRef,進而通過此引用使用遠端CalculatorActor提供的服務。在詳細的說明實現細節之前,先來看看LookupActor的實現,見代碼清單3所示。
代碼清單3
@Named("LookupActor")
@Scope("prototype")
public class LookupActor extends UntypedActor {
private static Logger logger = LoggerFactory.getLogger(LookupActor.class);
private final String path;
private ActorRef calculator = null;
public LookupActor(String path) {
this.path = path;
sendIdentifyRequest();
}
private void sendIdentifyRequest() {
getContext().actorSelection(path).tell(new Identify(path), getSelf());
getContext().system().scheduler().scheduleOnce(Duration.create(3, SECONDS), getSelf(),
ReceiveTimeout.getInstance(), getContext().dispatcher(), getSelf());
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof ActorIdentity) {
calculator = ((ActorIdentity) message).getRef();
if (calculator == null) {
logger.info("Remote actor not available: " + path);
} else {
getContext().watch(calculator);
getContext().become(active, true);
}
} else if (message instanceof ReceiveTimeout) {
sendIdentifyRequest();
} else {
logger.info("Not ready yet");
}
}
Procedure<Object> active = new Procedure<Object>() {
@Override
public void apply(Object message) {
if (message instanceof Op.MathOp) {
// send message to server actor
calculator.tell(message, getSelf());
} else if (message instanceof Op.AddResult) {
Op.AddResult result = (Op.AddResult) message;
logger.info(String.format("Add result: %d + %d = %d\n", result.getN1(), result.getN2(), result.getResult()));
ActorRef sender = getSender();
logger.info("Sender is: " + sender);
} else if (message instanceof Op.SubtractResult) {
Op.SubtractResult result = (Op.SubtractResult) message;
logger.info(String.format("Sub result: %d - %d = %d\n", result.getN1(), result.getN2(), result.getResult()));
ActorRef sender = getSender();
logger.info("Sender is: " + sender);
} else if (message instanceof Terminated) {
logger.info("Calculator terminated");
sendIdentifyRequest();
getContext().unbecome();
} else if (message instanceof ReceiveTimeout) {
// ignore
} else {
unhandled(message);
}
}
};
}
LookupActor的構造器需要傳遞遠端CalculatorActor的路徑,並且調用了sendIdentifyRequest方法,sendIdentifyRequest的作用有兩個:
- 通過向ActorSelection向遠端的Akka System發送Identify消息,並獲取遠程CalculatorActor的ActorRef;
- 啟動定時調度,3秒后向CalculatorActor的執行上下文發送ReceiveTimeout消息,而LookupActor處理ReceiveTimeout消息時,再次調用了sendIdentifyRequest方法。
- 如果收到MathOp的消息,說明是加減乘除的消息,則將消息進一步告知遠端的CalculatorActor並由其進行處理;
- 如果收到AddResult或者SubtractResult,這說明CalculatorActor已經處理完了加或者減的處理,並回復了處理結果,因此對計算結果進行使用(本例只是簡單的打印);
- 如果收到了Terminated消息,說明遠端的CalculatorActor停止或者重啟了,因此需要重新調用sendIdentifyRequest獲取最新的CalculatorActor的ActorRef。最后還需要取消active,恢復為默認接收消息的狀態;
logger.info("start lookup");
final String path = "akka.tcp://metadataAkkaSystem@127.0.0.1:2552/user/calculator";
final ActorRef lookup = actorSystem.actorOf(springExt.props("LookupActor", path), "lookup");
final Random r = new Random();
actorSystem.scheduler().schedule(Duration.create(1, SECONDS), Duration.create(1, SECONDS), new Runnable() {
@Override
public void run() {
if (r.nextInt(100) % 2 == 0) {
lookup.tell(new Op.Add(r.nextInt(100), r.nextInt(100)), null);
} else {
lookup.tell(new Op.Subtract(r.nextInt(100), r.nextInt(100)), null);
}
}
}, actorSystem.dispatcher());
Actor遠端調用模型
運行結果
我的客戶端和服務端都運行於本地,客戶端tcp監聽端口是2553,服務端監聽端口是2552,由於本例子的代碼較為健壯,所以客戶端、服務端可以以任意順序啟動。客戶端運行后的日志如下圖所示:

服務端的運行日志如下圖所示:

總結
Akka的遠端調用是大家在使用時最常用的特性之一,掌握起來不是什么難事,如何實現處理多種消息,並考慮其穩定性、健壯性是需要詳細考慮的。
