RMI 系列(02)源碼分析


RMI 系列(02)源碼分析

1. 架構

RMI 中有三個重要的角色:注冊中心(Registry)、客戶端(Client)、服務端(Server)。

圖1 RMI 架構圖

在 RMI 中也要先進行服務注冊,客戶端從注冊中心獲取服務。為了屏蔽網絡通信的復雜性,RMI 提出了 Stub(客戶端存根)和 Skeleton(服務端骨架)兩個概念,客戶端和服務端的網絡信都通過 Stub 和 Skeleton 進行。

圖2 RMI 整體調用時序圖

總結: 整體還是可以分為三部分,服務注冊、服務發現、服務調用。

  1. 服務注冊(1 ~ 2)
    • 第一步:創建遠程對象包括兩部分。一是創建 ServiceImpl 遠程對象;二是發布 ServiceImpl 服務。ServiceImpl 繼承自 UnicastRemoteObject,在創建時默認會隨機綁定一個端口,監聽客戶端的請求。所以即使可以不注冊,直接請求這個端口也可以進行通信。
    • 第二步:向注冊中心注冊該服務。注意:和其它的注冊中心不同,Registry 只能注冊本地的服務。
  2. 服務發現(3 ~ 4)
    • 向注冊中心查找本地存根,返回給客戶端。需要注意的是,Dubbo 先從注冊中心獲服務的 ip、port 等配置信息,然后在客戶端生成 Stub 代理,而 RMI 不一樣,已經在服務端保存了 Stub 代理對象,直接通過網絡傳輸直接將 Stub 對象進行序列化與反序列化。
  3. 服務調用(5 ~ 9)
    • 客戶端存根和服務器骨架通信,返回結果。

2. 服務注冊

首先回顧一下 RMI 服務發布的使用方法:

@Test
public void server() {
	// 1. 服務創建及發布。注意:HelloServiceImpl extends UnicastRemoteObject 
    HelloService service = new HelloServiceImpl();
    // 2. 創建注冊中心:創建本機 1099 端口上的 RMI 注冊表
    Registry registry = LocateRegistry.createRegistry(1099);
    // 3. 服務注冊:將服務綁定到注冊表中
    registry.bind(name, service);
}

總結: RMI 服務發布有三個流程:

  1. 服務創建及發布:HelloServiceImpl 需要繼承自 UnicastRemoteObject,當初始化時會自動將 HelloServiceImpl 任務一個服務發布,綁定一個隨機端口。
  2. 創建注冊中心:注冊中心實際和普通的服務一樣,也會將自己作為一個服務發布。
  3. 服務注冊:將 service 注冊到注冊中心。

服務創建及發布和創建注冊中心流程完全相同,至於服務注冊則是將 service 注冊到一個 map 中,非常簡單。所以服務的注冊主要圍繞服務創建及發布展開。

2.1 服務發布整體流程

圖3 RMI 服務發布時序圖

服務的發布的關鍵點有以下幾個:

  1. 創建本地存根,用於客戶端訪問。
  2. 啟動 socket。
  3. 服務注冊與查找。

無論是 HelloServiceImpl 還是 Registry 都是 Remote 的子類,准確的說是 RemoteObject 的子類。RemoteObject 最重要的屬性是 RemoteRef ref,RemoteRef 的實現類 UnicastRef,UnicastRef 包含屬性 LiveRef ref。LiveRef 類中的 Endpoint、Channel 封裝了與網絡通信相關的方法。類結構如下:

圖4 Remote 和 RemoteRef 類結構

2.2 服務暴露入口 exportObject

HelloServiceImpl 的構造器中調用父類 UnicastRemoteObject,最終調用 exportObject((Remote) this, port)

protected UnicastRemoteObject(int port) throws RemoteException {
    this.port = port;
    exportObject((Remote) this, port);
}
private static Remote exportObject(Remote obj, UnicastServerRef sref)
    throws RemoteException {
    if (obj instanceof UnicastRemoteObject) {
        ((UnicastRemoteObject) obj).ref = sref;
    }
    return sref.exportObject(obj, null, false);
}

Registry createRegistry(int port) 創建注冊中心時也會調用 exportObject 方法。

public RegistryImpl(int port) throws RemoteException
    LiveRef lref = new LiveRef(id, port);
	setup(new UnicastServerRef(lref, RegistryImpl::registryFilter));
}
private void setup(UnicastServerRef uref) throws RemoteException {
    ref = uref;
    uref.exportObject(this, null, true);
}

總結: Registry 和 HelloServiceImpl 最終都調用 exportObject 方法,那 exportObject 到底是干什么的呢?從字面上看 exportObject 暴露對象,事實上正如其名,exportObject 打開了一個 ServerSocket,監聽客戶端的請求。

public Remote exportObject(Remote impl, Object data, boolean permanent)
        throws RemoteException {
    // 1. 創建本地存根,封裝網絡通信
    Class<?> implClass = impl.getClass();
    Remote stub = Util.createProxy(implClass, getClientRef(), forceStubUse);
    ...
    // 2. 服務暴露,this 是指 RemoteObject 對象
    Target target = new Target(impl, this, stub, ref.getObjID(), permanent);
    ref.exportObject(target);
    return stub;
}

總結: exportObject 核心的方法有兩個:一是生成本地存根的代理對象;二是調用 ref.exportObject(target) 啟動 socket 服務。

注意:exportObject 時會先將 impl 和 stub 等信息封裝到 Target 對象中,最終注冊到 ObjectTable。

2.3 生成本地存根

在 Util.createProxy() 方法中創建代理對象。

public static Remote createProxy(Class<?> implClass, RemoteRef clientRef,
		boolean forceStubUse) throws StubNotFoundException {
    Class<?> remoteClass = getRemoteClass(implClass);
   
    // 1. 是否存在以 _Stub 結尾的類。remoteClass + "_Stub"
    //    forceStubUse 表示當不存在時是否拋出異常
    if (forceStubUse ||
        !(ignoreStubClasses || !stubClassExists(remoteClass))) {
        return createStub(remoteClass, clientRef);
    }

    // 2. jdk 動態代理
    final ClassLoader loader = implClass.getClassLoader();
    final Class<?>[] interfaces = getRemoteInterfaces(implClass);
    final InvocationHandler handler = new RemoteObjectInvocationHandler(clientRef);
    return (Remote) Proxy.newProxyInstance(loader, interfaces, handler);
}

總結: 創建代理對象有兩種情況:

  1. 存在以 _Stub 結尾的類(eg: RegistryImpl_Stub)則直接返回,當 forceStubUse=true 時不存在則拋出異常。
  2. JDK 動態代理。RemoteObjectInvocationHandler#invoke 方法實際上直接委托給了 RemoteRef#invoke 方法進行網絡通信,具體代碼見 UnicastRef#invoke(Remote, Method, Object[], long)

2.4 服務監聽

跟蹤 LiveRef#exportObject 方法,最終調用 TCPTransport#exportObject 方法。

public void exportObject(Target target) throws RemoteException {
    // 1. 啟動網絡監聽,默認 port=0,即隨機啟動一個端口
    synchronized (this) {
        listen();
        exportCount++;
    }
    // 2. 將 Target 注冊到 ObjectTable
    super.exportObject(target);
}

總結: 最終服務暴露時做了兩件事,一是如果 socket 沒有啟動,啟動 socket 監聽;二是將 Target 實例注冊到 ObjectTable 對象中。

 private void listen() throws RemoteException {
     TCPEndpoint ep = getEndpoint();
     int port = ep.getPort();

     if (server == null) {
             server = ep.newServerSocket();
             Thread t = new NewThreadAction(new AcceptLoop(server), 
                            "TCP Accept-" + port, true));
             t.start();
         } catch (IOException e) {
             throw new ExportException("Listen failed on port: " + port, e);
         }
     } 
 }

2.5 ObjectTable 注冊與查找

ObjectTable 用來管理所有發布的服務實例 Target,ObjectTable 提供了根據 ObjectEndpoint 和 Remote 實例兩種方式查找 Target 的方法。先看注冊:

private static final Map<ObjectEndpoint,Target> objTable = new HashMap<>();
private static final Map<WeakRef,Target> implTable = new HashMap<>();

// Target 注冊
static void putTarget(Target target) throws ExportException {
    ObjectEndpoint oe = target.getObjectEndpoint();
    WeakRef weakImpl = target.getWeakImpl();

    synchronized (tableLock) {
        if (target.getImpl() != null) {
            ...
            objTable.put(oe, target);
            implTable.put(weakImpl, target);
        }
    }
}

那實例查找也就很簡單了,之后就可以根據 Target 對象獲取本地存根 stub。

static Target getTarget(ObjectEndpoint oe) {
    synchronized (tableLock) {
        return objTable.get(oe);
    }
}
public static Target getTarget(Remote impl) {
    synchronized (tableLock) {
        return implTable.get(new WeakRef(impl));
    }
}

2.6 服務綁定

當服務 HelloService 和 Registry 均已創建並發布后,之后需要將服務綁定到注冊中心。這一步就很簡單了,代碼 registry.bind(name, service)

// 服務名稱 -> 實例 impl
private Hashtable<String, Remote> bindings = new Hashtable<>(101);

public void bind(String name, Remote obj)
    throws RemoteException, AlreadyBoundException, AccessException {
    checkAccess("Registry.bind");
    synchronized (bindings) {
        Remote curr = bindings.get(name);
        if (curr != null)
            throw new AlreadyBoundException(name);
        bindings.put(name, obj);
    }
}

總結: service 綁定到注冊中心實際就很簡單了,將服務名稱和實例保存到 map 中即可。查找時可以通過 name 查找到 impl,再通過 impl 在 ObjectTable 中查找到對應的 Target。

2.7 總結

服務暴露主要完成兩件事:一是服務端生成本地存根 stub,並包裝成 Target 對象,最終注冊到 ObjectTable 中;二是啟動 ServerSocket 綁定端口,監聽客戶端的請求。 又可以分為普通服務暴露和注冊中心暴露,兩種服務暴露過程完全相同。

  1. 普通服務暴露(HelloService):默認綁定隨機端口。使用 HelloServicempl 實例,根據動態代理生成本地存儲 stub,RemoteObjectInvocationHandler#invoke 最終調用 UnicastRef#invoke(Remote, Method, Object[], long) 方法。
  2. 注冊中心暴露(Registry):LocateRegistry.createRegistry(port) 需要指定綁定端口,默認 1099。使用 RegistryImpl 實例,本地存根使用 RegistryImpl_Stub。

3. 服務發現

@Test
public void client() {
    String name = HelloService.class.getName();
    // 獲取注冊表
    Registry registry = LocateRegistry.getRegistry("localhost", 1099);
    // 查找對應的服務
    HelloService service = (HelloService) registry.lookup(name);
}

總結: RMI 服務發現核心步驟兩步:一是獲取注冊中心 registry;二是根據注冊中心獲取服務的代理類 service。registry 和 service 都是通過 Util.createProxy() 方法生成的代理類,不過這兩個代理類的生成時機完全不同,registry 是在客戶端生成的代理類,service 是在服務端生成的代理類。

3.1 注冊中心 Stub

public static Registry getRegistry(String host, int port, RMIClientSocketFactory csf) {
    LiveRef liveRef = new LiveRef(new ObjID(ObjID.REGISTRY_ID),
                    new TCPEndpoint(host, port, csf, null), false);
    RemoteRef ref = (csf == null) ? new UnicastRef(liveRef) : new UnicastRef2(liveRef);
    return (Registry) Util.createProxy(RegistryImpl.class, ref, false);
}

由於默認存在 RegistryImpl_Stub,所以直接返回 RegistryImpl_Stub 的實例。

public Remote lookup(String var1) throws AccessException, NotBoundException, RemoteException {
    RemoteCall var2 = super.ref.newCall(this, operations, 2, 4905912898345647071L);
    ObjectOutput var3 = var2.getOutputStream();
    var3.writeObject(var1);

    super.ref.invoke(var2);
    ObjectInput var6 = var2.getInputStream();
    Remote var23 = (Remote)var6.readObject();       
    super.ref.done(var2);       
    return var23;
}

總結: LocateRegistry.getRegistry 獲取注冊中心時,在客戶端直接生成代理對象 RegistryImpl_Stub,RegistryImpl_Stub 實際調用 RemoteRef 的 invoke 方法進行網絡通信。

3.2 普通服務 Stub

和 RegistryImpl_Stub 不同,普通服務是在服務端生成本地存根 Stub。在服務注冊的階段,我們提到服務暴露時會將服務實例及其生成的 Stub 包裝成 Target,並最終注冊到 ObjectTable 上。那客戶端 registry.lookup(name) 是如何最終查找到對應服務的 Stub 中的呢?

首先客戶端調用 registry.lookup(name) 時,會通過網絡通信最終調用到 RegistryImpl#lookup 方法,查找到對應的 Remote 實例,之后將這個實例返回給客戶端。

但是這個 Socket 輸出流是被 MarshalOutputStream 包裝過的,在輸出對應時會將 Remote 替換為 Stub 對象。也就是說客戶端直接可以拿到代理后的對象,反序列后進行網絡通信,不需要在客戶端生成代理對象。代碼如下:

protected final Object replaceObject(Object obj) throws IOException {
    if ((obj instanceof Remote) && !(obj instanceof RemoteStub)) {
        Target target = ObjectTable.getTarget((Remote) obj);
        if (target != null) {
            return target.getStub();
        }
    }
    return obj;
}

總結: registry.lookup(name) 獲取服務端生成的代理對象 stub。這個 stub 代理對象調用 UnicastRef#invoke(Remote, Method, Object[], long) 方法進行網絡通信。

注意: 如果該服務沒有暴露,則 target=null,也就是直接將服務端注冊的實例而不是存根 Stub 返回,所以在客戶端必須有該類的實現,否則反序列反時會拋出異常。不過,不暴露服務這種情況好像並沒有什么意義。

Exception in thread "main" java.rmi.UnmarshalException: error unmarshalling return; nested exception is: 
	java.lang.ClassNotFoundException: com.binarylei.rmi.helloword.service.HelloServiceImpl (no security manager: RMI class loader disabled)
	at sun.rmi.registry.RegistryImpl_Stub.lookup(Unknown Source)
	at com.binarylei.rmi.helloword.ClientTest.main(ClientTest.java:21)
Caused by: java.lang.ClassNotFoundException: com.binarylei.rmi.helloword.service.HelloServiceImpl (no security manager: RMI class loader disabled)
	at sun.rmi.server.LoaderHandler.loadClass(LoaderHandler.java:396)
	at sun.rmi.server.LoaderHandler.loadClass(LoaderHandler.java:186)
	at java.rmi.server.RMIClassLoader$2.loadClass(RMIClassLoader.java:637)
	at java.rmi.server.RMIClassLoader.loadClass(RMIClassLoader.java:264)
	at sun.rmi.server.MarshalInputStream.resolveClass(MarshalInputStream.java:219)
	... 2 more

4. 服務調用

RMI 中網絡通信相關的邏輯都是由 RemoteRef 完成的,客戶端的實現是 UnicastRef,而服務端則是 UnicastServerRef。

4.1 客戶端調用過程

圖5 RMI 整體調用時序圖

客戶端生成的代理對象調用 UnicastRef.invoke 進行網絡傳輸,至少要告訴服務端以下信息:

  1. 接口名
  2. 方法名稱
  3. 參數類型
  4. 參數

通過 接口 + 方法名稱 + 參數類型 三個坐標就可以確定調用的具體方法。RMI 中通過傳遞 opnum 參數標記是這個類的第幾個方法來確定。

public Object invoke(Remote obj, Method method, Object[] params, long opnum)
    throws Exception {
   
    // 1. create call context
    Connection conn = ref.getChannel().newConnection();
    RemoteCall call = new StreamRemoteCall(conn, ref.getObjID(), -1, opnum);

    // 2. marshal parameters
    try {
        ObjectOutput out = call.getOutputStream();
        marshalCustomCallData(out);
        Class<?>[] types = method.getParameterTypes();
        for (int i = 0; i < types.length; i++) {
            marshalValue(types[i], params[i], out);
        }
    } catch (IOException e) {
        throw new MarshalException("error marshalling arguments", e);
    }

    // 3. unmarshal return
    call.executeCall();

    try {
        Class<?> rtype = method.getReturnType();
        if (rtype == void.class)
            return null;
        ObjectInput in = call.getInputStream();

        Object returnValue = unmarshalValue(rtype, in);

        ref.getChannel().free(conn, true);
        return returnValue;
    } finally {
       	// 關閉連接等
        call.done();
    }
}

總結: UnicastRef.invoke 方法過程很清晰,先發送 opnum,再發送參數,最后處理返回結果。

4.2 服務端處理過程

圖6 RMI 服務端調用時序圖

總結:

  1. 服務暴露時,通過 listen 方法啟動 socket,創建 AcceptLoop 線程接收客戶端的連接,每一個 socket 連接創建一個 ConnectionHandler 處理,這也是 BIO 處理客戶端連接的基本套路。

  2. 然后從 ObjectTarget.getTarget 中獲取服務端保存的 Target 對象,可以獲取 impl 和 dispatcher 對象。dispatcher 的實現類是 UnicastServerRef。

  3. 服務端獲取 opnum 確定具體的方法 method,再接收方法的參數,調用 method.invoke(obj, params) 后將結果返回給客戶端。

下面就看一下 UnicastServerRef#dispatch 具體做了些什么。

public void dispatch(Remote obj, RemoteCall call) throws IOException {
    // positive operation number in 1.1 stubs;
    // negative version number in 1.2 stubs and beyond...
    int num;
    long op;
    
    // 1. 處理 num 和 op 參數,jdk1.1 oldDispatch
	try {
        ObjectInput in;
        try {
            in = call.getInputStream();
            num = in.readInt();
            if (num >= 0) {
                if (skel != null) {
                    oldDispatch(obj, call, num);
                    return;
                } else {
                    throw new UnmarshalException(
                        "skeleton class not found but required for client version");
                }
            }
            op = in.readLong();
        } catch (Exception readEx) {
            throw new UnmarshalException("error unmarshalling call header", readEx);
        }
        
        MarshalInputStream marshalStream = (MarshalInputStream) in;
        marshalStream.skipDefaultResolveClass();

        // 2. 根據 op 獲取 method
        Method method = hashToMethod_Map.get(op);

        // 3. unmarshal parameters
        Object[] params = null;
        try {
            unmarshalCustomCallData(in);
            params = unmarshalParameters(obj, method, marshalStream);
        } finally {
            call.releaseInputStream();
        }

        // 4. make upcall on remote object
        Object result;
        try {
            result = method.invoke(obj, params);
        } catch (InvocationTargetException e) {
            throw e.getTargetException();
        }

        // 5. marshal return value
        try {
            ObjectOutput out = call.getResultStream(true);
            Class<?> rtype = method.getReturnType();
            if (rtype != void.class) {
                marshalValue(rtype, result, out);
            }
        } catch (IOException ex) {
            throw new MarshalException("error marshalling return", ex);
        }
    } catch (Throwable e) {
        ObjectOutput out = call.getResultStream(false);
        out.writeObject(e);
    } finally {
        call.releaseInputStream(); // in case skeleton doesn't
        call.releaseOutputStream();
    }
}

總結: UnicastServerRef#dispatch 方法也很清晰,無非是根據 op 確定具體的方法 method,獲取參數,將反射的結果通過網絡返回給客戶端。


每天用心記錄一點點。內容也許不重要,但習慣很重要!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM