使用Akka的遠程調用


概述

  正如其它RPC或者RMI框架那樣,Akka也提供了遠程調用的能力。服務端在監聽的端口上接收客戶端的調用。本文將在《Spring與Akka的集成》一文的基礎上介紹Akka的remote調用,本文很多代碼和例子來源於Akka官網的代碼示例,也包含了一些適用於Spring集成的改造,本文旨在介紹Akka的遠程調用的開發過程。

服務端開發

配置

  Akka的默認配置文件為application.conf,如果不特別指明,Akka System都會默認加載此配置。如果你想自定義符合你習慣的名字,可以使用如下代碼:

final ActorSystem system = ActorSystem.create("YourSystem", ConfigFactory.load("yourconf"));  

上述代碼中的yourconf不包含文件后綴名,在你的資源路徑下實際是yourconf.conf。

  我不太想自定義加載的配置文件,而是繼續使用application.conf,這里先列出其配置:

include "common"  
  
akka {  
  # LISTEN on tcp port 2552  
  remote.netty.tcp.port = 2552  
}  

這里的remote.netty.tcp.port配置屬性表示使用Netty框架在TCP層的監聽端口是2552。include與java里的import或者jsp頁面中的include標簽的作用類似,表示引用其他配置文件中的配置。由於common.conf中包含了Akka的一些公共配置,所以可以這樣引用,common.conf的配置如下:

akka {  
  
  actor {  
    provider = "akka.remote.RemoteActorRefProvider"  
  }  
  
  remote {  
    netty.tcp {  
      hostname = "127.0.0.1"  
    }  
  }  
  
}  

common配置中的provider屬性表示Actor的引用提供者是akka.remote.RemoteActorRefProvider,即遠程ActorRef的提供者。這里的hostname屬性表示服務器的主機名。從common配置我們還可以看出Akka的配置有點類似於json,也是一種嵌套結構。此外,Akka還可以采用一種扁平的配置方式,例如:

akka.actor.provider = "..."  
akka.remote.netty.tcp.hostname = "127.0.0.1"  

它們所代表的作用是一樣的。至於選擇扁平還是嵌套的,一方面依據你的個人習慣,一方面依據配置的多寡——隨着配置項的增多,你會發現嵌套會讓你的配置文件更加清晰。

服務端

  由於官網的例子比較簡潔並能說明問題,所以本文對Akka官網的例子進行了一些改造來介紹服務端與客戶端之間的遠程調用。服務端的配置已在上一小節列出,本小節着重介紹服務端的實現。

我們的服務端是一個簡單的提供基本的加、減、乘、除的服務的CalculatorActor,這些運算都直接封裝在CalculatorActor的實現中(在實際的業務場景中,Actor應該只接收、回復及調用具體的業務接口,這里的加減乘除運算應當由指定的Service接口實現,特別是在J2EE或者與Spring集成后),CalculatorActor的實現見代碼清單1。

代碼清單1

@Named("CalculatorActor")  
@Scope("prototype")  
public class CalculatorActor extends UntypedActor {  
      
    private static Logger logger = LoggerFactory.getLogger(CalculatorActor.class);  
  
    @Override  
    public void onReceive(Object message) {  
  
        if (message instanceof Op.Add) {  
            Op.Add add = (Op.Add) message;  
            logger.info("Calculating " + add.getN1() + " + " + add.getN2());  
            Op.AddResult result = new Op.AddResult(add.getN1(), add.getN2(), add.getN1() + add.getN2());  
            getSender().tell(result, getSelf());  
  
        } else if (message instanceof Op.Subtract) {  
            Op.Subtract subtract = (Op.Subtract) message;  
            logger.info("Calculating " + subtract.getN1() + " - " + subtract.getN2());  
            Op.SubtractResult result = new Op.SubtractResult(subtract.getN1(), subtract.getN2(),  
                    subtract.getN1() - subtract.getN2());  
            getSender().tell(result, getSelf());  
  
        } else if (message instanceof Op.Multiply) {  
            Op.Multiply multiply = (Op.Multiply) message;  
            logger.info("Calculating " + multiply.getN1() + " * " + multiply.getN2());  
            Op.MultiplicationResult result = new Op.MultiplicationResult(multiply.getN1(), multiply.getN2(),  
                    multiply.getN1() * multiply.getN2());  
            getSender().tell(result, getSelf());  
  
        } else if (message instanceof Op.Divide) {  
            Op.Divide divide = (Op.Divide) message;  
            logger.info("Calculating " + divide.getN1() + " / " + divide.getN2());  
            Op.DivisionResult result = new Op.DivisionResult(divide.getN1(), divide.getN2(),  
                    divide.getN1() / divide.getN2());  
            getSender().tell(result, getSelf());  
  
        } else {  
            unhandled(message);  
        }  
    }  
}  

Add、Subtract、Multiply、Divide都繼承自MathOp,這里只列出MathOp和Add的實現,見代碼清單2所示。

代碼清單2

public interface MathOp extends Serializable {  
}  
  
public static class Add implements MathOp {  
    private static final long serialVersionUID = 1L;  
    private final int n1;  
    private final int n2;  
  
    public Add(int n1, int n2) {  
        this.n1 = n1;  
        this.n2 = n2;  
    }  
  
    public int getN1() {  
        return n1;  
    }  
  
    public int getN2() {  
        return n2;  
    }  
} 

服務端應當啟動CalculatorActor實例,以提供服務,代碼如下:

logger.info("Start calculator");  
final ActorRef calculator = actorSystem.actorOf(springExt.props("CalculatorActor"), "calculator");  
actorMap.put("calculator", calculator);  
logger.info("Started calculator"); 

客戶端

  客戶端調用遠程CalculatorActor提供的服務后,還要接收其回復信息,因此也需要監聽端口。客戶端和服務端如果在同一台機器節點上,那么客戶端的監聽端口不能與服務端沖突,我給出的配置示例如下:

include "common"  
  
akka {  
  remote.netty.tcp.port = 2553  
}  

客戶端通過遠程Actor的路徑獲得ActorSelection,然后向遠程的Akka System獲取遠程CalculatorActor的ActorRef,進而通過此引用使用遠端CalculatorActor提供的服務。在詳細的說明實現細節之前,先來看看LookupActor的實現,見代碼清單3所示。

代碼清單3

@Named("LookupActor")  
@Scope("prototype")  
public class LookupActor extends UntypedActor {  
      
    private static Logger logger = LoggerFactory.getLogger(LookupActor.class);  
  
    private final String path;  
    private ActorRef calculator = null;  
  
    public LookupActor(String path) {  
        this.path = path;  
        sendIdentifyRequest();  
    }  
  
    private void sendIdentifyRequest() {  
        getContext().actorSelection(path).tell(new Identify(path), getSelf());  
        getContext().system().scheduler().scheduleOnce(Duration.create(3, SECONDS), getSelf(),  
                ReceiveTimeout.getInstance(), getContext().dispatcher(), getSelf());  
    }  
  
    @Override  
    public void onReceive(Object message) throws Exception {  
        if (message instanceof ActorIdentity) {  
            calculator = ((ActorIdentity) message).getRef();  
            if (calculator == null) {  
                logger.info("Remote actor not available: " + path);  
            } else {  
                getContext().watch(calculator);  
                getContext().become(active, true);  
            }  
  
        } else if (message instanceof ReceiveTimeout) {  
            sendIdentifyRequest();  
  
        } else {  
            logger.info("Not ready yet");  
  
        }  
    }  
  
    Procedure<Object> active = new Procedure<Object>() {  
        @Override  
        public void apply(Object message) {  
            if (message instanceof Op.MathOp) {  
                // send message to server actor  
                calculator.tell(message, getSelf());  
  
            } else if (message instanceof Op.AddResult) {  
                Op.AddResult result = (Op.AddResult) message;  
                logger.info(String.format("Add result: %d + %d = %d\n", result.getN1(), result.getN2(), result.getResult()));  
                ActorRef sender = getSender();  
                logger.info("Sender is: " + sender);  
  
            } else if (message instanceof Op.SubtractResult) {  
                Op.SubtractResult result = (Op.SubtractResult) message;  
                logger.info(String.format("Sub result: %d - %d = %d\n", result.getN1(), result.getN2(), result.getResult()));  
                ActorRef sender = getSender();  
                logger.info("Sender is: " + sender);  
                  
            } else if (message instanceof Terminated) {  
                logger.info("Calculator terminated");  
                sendIdentifyRequest();  
                getContext().unbecome();  
  
            } else if (message instanceof ReceiveTimeout) {  
                // ignore  
  
            } else {  
                unhandled(message);  
            }  
  
        }  
    };  
}

LookupActor的構造器需要傳遞遠端CalculatorActor的路徑,並且調用了sendIdentifyRequest方法,sendIdentifyRequest的作用有兩個:

  1. 通過向ActorSelection向遠端的Akka System發送Identify消息,並獲取遠程CalculatorActor的ActorRef;
  2. 啟動定時調度,3秒后向CalculatorActor的執行上下文發送ReceiveTimeout消息,而LookupActor處理ReceiveTimeout消息時,再次調用了sendIdentifyRequest方法。
為何要循環調用sendIdentifyRequest方法呢?由於遠端服務有可能因為進程奔潰、系統重啟等原因導致已經獲得的ActorRef過期或失效,因此需要一個監測機制。sendIdentifyRequest的循環調用就是一個簡單的檢測機制。
遠端的Akka System在接收到Identify消息后,會給LookupActor回復ActorIdentity消息,LookupActor收到ActorIdentity消息后便可以解析出消息中載有的CalculatorActor的ActorRef,LookupActor然后調用getContext().watch(calculator)實現對子Actor的監管,一旦CalculatorActor重啟或終止,LookupActor便可以接收到Terminated消息(有關Actor的監管機制,可以閱讀官方文檔)。
由於LookupActor的onReceive無法處理加、減、乘、除及Terminated消息,所以這里用到了一個Akka Actor的狀態轉換,通過使用getContext().become(active, true)。這里的active是一個內部類,其繼承了Procedure並重寫了apply方法,其中封裝了對於對於加減乘除的計算以及結果、Terminated消息的處理。通過getContext().become(active, true),使得active接替onReceive方法處理接收到的消息。正如Akka官網所述——Actor的這一特性非常適合於開發實現FSM(有限狀態自動機)。
active的功能主要分為三類:
  • 如果收到MathOp的消息,說明是加減乘除的消息,則將消息進一步告知遠端的CalculatorActor並由其進行處理;
  • 如果收到AddResult或者SubtractResult,這說明CalculatorActor已經處理完了加或者減的處理,並回復了處理結果,因此對計算結果進行使用(本例只是簡單的打印);
  • 如果收到了Terminated消息,說明遠端的CalculatorActor停止或者重啟了,因此需要重新調用sendIdentifyRequest獲取最新的CalculatorActor的ActorRef。最后還需要取消active,恢復為默認接收消息的狀態;
啟動客戶端的代碼示例如下:
logger.info("start lookup");  
final String path = "akka.tcp://metadataAkkaSystem@127.0.0.1:2552/user/calculator";  
final ActorRef lookup = actorSystem.actorOf(springExt.props("LookupActor", path), "lookup");  
final Random r = new Random();  
actorSystem.scheduler().schedule(Duration.create(1, SECONDS), Duration.create(1, SECONDS), new Runnable() {  
  
    @Override  
    public void run() {  
        if (r.nextInt(100) % 2 == 0) {  
            lookup.tell(new Op.Add(r.nextInt(100), r.nextInt(100)), null);  
        } else {  
            lookup.tell(new Op.Subtract(r.nextInt(100), r.nextInt(100)), null);  
        }  
  
    }  
}, actorSystem.dispatcher());   
這里的客戶端示例以1秒的周期,向LookupActor隨機發送Add或者Subtract的消息。

Actor遠端調用模型

  無論是本地Actor還是遠端Actor,Actor之所以能夠接收消息,是因為每個Actor都有其自身的郵箱,你可以定制自己的郵箱(可以用java中的各種隊列)。本地應用如果想要調用遠端的Actor服務並接收返回信息也就必須擁有自己的郵箱,否則郵遞員投遞信件時由於無法找到你家的郵箱,可能會打回郵件、放在你家的門縫下甚至丟棄。因此Actor的調用無論是本地的還是遠端的都最好遵守Actor的編程模型,就像下圖所示。
 

運行結果

  我的客戶端和服務端都運行於本地,客戶端tcp監聽端口是2553,服務端監聽端口是2552,由於本例子的代碼較為健壯,所以客戶端、服務端可以以任意順序啟動。客戶端運行后的日志如下圖所示:

 

服務端的運行日志如下圖所示:

 

總結

  Akka的遠端調用是大家在使用時最常用的特性之一,掌握起來不是什么難事,如何實現處理多種消息,並考慮其穩定性、健壯性是需要詳細考慮的。

后記:經過近一年的准備,《Spark內核設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:
 
售賣鏈接如下:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM