簡單實現Java的RMI——遠程方法調用


一、RMI簡介:

說到RMI就不得不說RPC了。

RPC:(Remote Procedure Call),遠程過程調用。

RMI(Remote Method Invocation),遠程方法調用。

RPC和RMI是有區別的,RPC中是通過網絡服務協議向遠程主機發送請求,RPC遠程主機就去搜索與之相匹配的類和方法,找到后就執行方法並把結果編碼,通過網絡協議發回。

而RMI是通過客戶端的對象作為遠程接口進行遠程方法的調用。RMI只適用於Java語言。

二、RMI的運行機理:

涉及兩個網絡端。其核心思想是,一個端可以通過調用另一個端的方法,實現相關功能。
一個端“執行”一個方法,而這個方法的實際執行是在另一端進行的!

當然,兩個端都應該有相同的類,自然會擁有相同的方法。
一個端所謂的執行這個方法,其實是通過調用這個類的代理對象的方法,在其中攔截這個方法,在這個方法中
實際上是將執行這個方法的參數和類名稱、方法名稱,通過網絡通訊傳輸給另一端;另一端根據得到的方法名稱、
類名稱和參數,實際執行那個方法,再將方法執行結果回傳給對端。
要注意的問題:
1、實際執行方法的一端,我們可以認為是RMI服務器端,偽執行一端,自然是RMI客戶端;
2、偽執行端不應該自己完成參數、方法名稱和類名稱的傳遞工作;也就是說,對於RMI客戶端用戶而言,他只面對一個類的一個方法,
    直接執行就好;
3、RMI服務器端可能接收多個RMI客戶端有關這個方法的執行請求,每個客戶端的執行當然應該是獨立的,應該用線程實現;
4、RMI服務器端在執行了相關方法,並回傳方法執行結果后,應該斷開與RMI客戶端的連接。

下面是我要實現它的一個思路。

 

上圖由於截圖原因,給點補充說明:RpcClientExecutor的作用是建立和服務器的連接,並接受消息和發送消息,具體是發送方法的序列號和參數類型。

1.首先:應該是RpcBeanDefinition:

 1 package com.xupt.rpc.core;
 2 
 3 import java.lang.reflect.Method;
 4 
 5 public class RpcBeanDefination {   //將類,類方法,類對象封裝在Definition中。
 6 
 7     private Class<?> klass;
 8     private Method method;
 9     private Object object;
10     
11     RpcBeanDefination() {
12     }

給該類所有的成員都有getter和setter方法就不需要說了,這個類,將執行的哪個類的哪個方法和類的對象封裝起來,以后這個類將形成Map

的值,下面來介紹的RpcBeanFactory會重點介紹。

2.RpcBeanFactory

 

 1 package com.xupt.rpc.core;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 
 6 public class RpcBeanFactory {
 7 
 8     private final Map<String, RpcBeanDefination> beanMap;
 9     
10     RpcBeanFactory() {
11         beanMap = new HashMap<>();
12     }
13     
14     void rpcBeanRegistry(String beanId,RpcBeanDefination defination) {
15         RpcBeanDefination rbd = beanMap.get(beanId);
16         if(rbd != null) {
17             return;
18         }
19         beanMap.put(beanId, defination);
20     }
21     
22     RpcBeanDefination getBean(String beanId) {
23         return beanMap.get(beanId);
24     }
25 }

 

此類是將序列號作為Map中的鍵,RpcBeanDeifintion作為值放入Map中,用BeanId來找對應客戶端那邊序列號相同的方法。

3.下來是RpcBeanRegistry:

 1 package com.xupt.rpc.core;
 2 
 3 import java.lang.reflect.Method;
 4 
 5 public class RpcBeanRegistry {
 6     
 7     RpcBeanRegistry() {
 8     }
 9     
10     //給客戶端提供
11      static void registryInterface(RpcBeanFactory rpcBeanFactory,Class<?> interfaces) {
12         doregistry(rpcBeanFactory,interfaces,null);
13     }
14      
15     //內部使用,注冊
16      private static void doregistry(RpcBeanFactory rpcBeanFactory , Class<?> interfaces ,Object object) {
17           //得到接口中的所有的方法,行程方法的數組
18          Method[] methods = interfaces.getDeclaredMethods();
19             for(Method method : methods) {  //遍歷這些方法
20                 String beanId = String.valueOf(method.toString().hashCode());//將方法序列化。
21                 RpcBeanDefination rpcBeanDefination = new RpcBeanDefination();
22                 
23                 //將得到的實現接口的那個類和它的方法以及對象放進RpcBeanDefination()中。
24                 rpcBeanDefination.setKlass(interfaces);
25                 rpcBeanDefination.setMethod(method);
26                 rpcBeanDefination.setObject(object);
27                 
28                 rpcBeanFactory.rpcBeanRegistry(beanId, rpcBeanDefination);
29             }
30     }
31     
32      //服務端使用,知道實現類的對象。
33      static void registryInterface(RpcBeanFactory rpcBeanFactory,Class<?> interfaces,Object object) {
34          //判斷此類是否實現了這個接口。
35         if(!interfaces.isAssignableFrom(object.getClass())){
36             return;
37         }
38         doregistry(rpcBeanFactory,interfaces,object);
39     }
40     
41      //服務器端使用,知道類,創建一個對象。
42     static void registryInterface(RpcBeanFactory rpcBeanFactory,Class<?> interfaces,Class<?> klass) {
43         //判斷該類是否實現了接口。
44         if(!interfaces.isAssignableFrom(klass)){
45             return;
46         }
47         try {
48             doregistry(rpcBeanFactory, interfaces, klass.newInstance());
49         } catch (Exception e) {
50             e.printStackTrace();
51         }
52     }
53 }

這個類是同時給客戶端和服務器使用的,所以有一個私有方法來完成類方法的獲得和序列號的產生(method.toString().hashcode())。然后將類、方法、對象放進RpcBeanDefinition中,將得到的序列號作為鍵和rpcBeanDeifinyion作為值放進Map中,形成鍵值對,方便客戶端和服務器的調用。

4.下來是RpcServer:

1 package com.xupt.rpc.core;

 2 
 3 import java.io.IOException;
 4 import java.net.ServerSocket;
 5 import java.net.Socket;
 6 
 7 public class RpcServer implements Runnable {
 8     
 9     private ServerSocket server;
10     private int port;
11     private boolean goon;
12     private final RpcBeanFactory rpcBeanFactory;
13     private static long executorId;
14     
15     public RpcServer() {
16         rpcBeanFactory = new RpcBeanFactory();
17         this.goon = false;
18     }
19 
20     public void setPort(int port) {
21         this.port = port;
22     }
23     
24     public void startRpcServer() throws Exception {
25         if(this.port == 0) {
26             return;
27         }
28         server = new ServerSocket(port);
29         this.goon = true;
30         new Thread(this,"Rpc_Server").start();//啟動線程
31     }
32     
33     public void stopRpcServer() {   //關閉服務器
34         if (this.server != null && !this.server.isClosed()) {
35             try {
36                 this.server.close();
37             } catch (IOException e) {
38                 e.printStackTrace();
39             } finally {
40                 this.server = null;
41             }
42         }
43     }
44     
45     RpcBeanFactory getRpcBeanFactory() {
46         return rpcBeanFactory;
47     }
48     
49     //用注冊的方法知道類實現的接口,類的對象,通過對象執行類的方法。
50     public void rpcRegistry(Class<?> interfaces,Object object) {
51         RpcBeanRegistry.registryInterface(rpcBeanFactory, interfaces, object);
52     }
53     
54     public void rpcRegistry(Class<?> interfaces, Class<?> imClass) {
55         RpcBeanRegistry.registryInterface(rpcBeanFactory, interfaces,imClass);
56     }
57     
58     @Override
59     public void run() {
60         while(goon) {
61             try {
62                 Socket socket = server.accept();//不斷的偵聽
63                 
64                 new RpcServerExecutor(socket, this,++executorId);
65                 
66             } catch (Exception e) {
67                 goon = false;
68                 e.printStackTrace();
69             }
70         }
71         stopRpcServer();
72     }
73 }

上述服務器端的基本任務已經完成了。

下來我們來看看客戶端:

首先還是:RpcClientExecutor,它是建立與服務器端的連接,以及接收,發送消息,具體發送的是方法的序列號和參數類型。

package com.xupt.rpc.core;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;

public class RpcClientExecutor {
	
	private String rpcServerIp;  //ip地址
	private int rpcServerPort;	//端口號
	
	RpcClientExecutor() {
	}

	RpcClientExecutor(String rpcServerIp, int rpcServerPort) {
		this.rpcServerIp = rpcServerIp;
		this.rpcServerPort = rpcServerPort;
	}

	String getRpcServerIp() {
		return rpcServerIp;
	}

	void setRpcServerIp(String rpcServerIp) {
		this.rpcServerIp = rpcServerIp;
	}

	int getRpcServerPort() {
		return rpcServerPort;
	}

	void setRpcServerPort(int rpcServerPort) {
		this.rpcServerPort = rpcServerPort;
	}
	
	//關閉客戶端
	private void closeSocket(ObjectInputStream ois, ObjectOutputStream oos, Socket socket) {
		try {
			if (ois != null) {
				ois.close();
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			ois = null;
		}
		try {
			if (oos != null) {
				oos.close();
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			oos = null;
		}
		try {
			if (socket != null && !socket.isClosed()) {
				socket.close();
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			socket = null;
		}
	}
	
	@SuppressWarnings("unchecked")
	<T> T rpExecutor(String beanId,Object[] params) throws Exception {
		//連接服務器端
		Socket socket = new Socket(rpcServerIp, rpcServerPort);
		//發送方法的序列號和參數類型。
		ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
		oos.writeUTF(beanId);
		oos.writeObject(params);
		
		//必須將這句放在前面三句的后面。
		ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
		//接收服務器端返回的結果。
		Object result = ois.readObject();
		
		closeSocket(ois,oos,socket);
		
		return (T) result;
		
		
	}
}

 下來是:RpcClient:這個類是產生代理,用代理對象偽執行相關的方法,然后在真正的來連接服務器。

 1 package com.xupt.rpc.core;
 2 
 3 import java.lang.reflect.InvocationHandler;
 4 import java.lang.reflect.Method;
 5 import java.lang.reflect.Proxy;
 6 
 7 public class RpcClient {
 8     
 9     private RpcClientExecutor rpcClientExecutor;
10     
11     public RpcClient(String rpcServerIp,int rpcServerport) {
12         this.rpcClientExecutor = new RpcClientExecutor(rpcServerIp, rpcServerport);
13     }
14     
15     @SuppressWarnings("unchecked")
16     public <T> T getProxy(Class<?> klass) {
17         return (T) Proxy.newProxyInstance(
18                 klass.getClassLoader(),
19                 new Class[] {klass},
20                 new InvocationHandler() {
21                     
22                     @Override
23                     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
24                         String BeanId = String.valueOf(method.toString().hashCode());
25                         Object result = rpcClientExecutor.rpExecutor(BeanId, args);
26                         
27                         return result;
28                     }
29                 }
30             );
31     }
32 }

上述方法產生代理就不多做解釋,不明白請看上一篇的代理機制之初見吧。。。。

以上就是我的RMI的簡單實現,入如果有錯誤,請指正!也希望它對你有所幫助。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM