一、RMI簡介:
說到RMI就不得不說RPC了。
RPC:(Remote Procedure Call),遠程過程調用。
RMI(Remote Method Invocation),遠程方法調用。
RPC和RMI是有區別的,RPC中是通過網絡服務協議向遠程主機發送請求,RPC遠程主機就去搜索與之相匹配的類和方法,找到后就執行方法並把結果編碼,通過網絡協議發回。
而RMI是通過客戶端的對象作為遠程接口進行遠程方法的調用。RMI只適用於Java語言。
二、RMI的運行機理:
涉及兩個網絡端。其核心思想是,一個端可以通過調用另一個端的方法,實現相關功能。
一個端“執行”一個方法,而這個方法的實際執行是在另一端進行的!
當然,兩個端都應該有相同的類,自然會擁有相同的方法。
一個端所謂的執行這個方法,其實是通過調用這個類的代理對象的方法,在其中攔截這個方法,在這個方法中
實際上是將執行這個方法的參數和類名稱、方法名稱,通過網絡通訊傳輸給另一端;另一端根據得到的方法名稱、
類名稱和參數,實際執行那個方法,再將方法執行結果回傳給對端。
要注意的問題:
1、實際執行方法的一端,我們可以認為是RMI服務器端,偽執行一端,自然是RMI客戶端;
2、偽執行端不應該自己完成參數、方法名稱和類名稱的傳遞工作;也就是說,對於RMI客戶端用戶而言,他只面對一個類的一個方法,
直接執行就好;
3、RMI服務器端可能接收多個RMI客戶端有關這個方法的執行請求,每個客戶端的執行當然應該是獨立的,應該用線程實現;
4、RMI服務器端在執行了相關方法,並回傳方法執行結果后,應該斷開與RMI客戶端的連接。
下面是我要實現它的一個思路。
上圖由於截圖原因,給點補充說明:RpcClientExecutor的作用是建立和服務器的連接,並接受消息和發送消息,具體是發送方法的序列號和參數類型。
1.首先:應該是RpcBeanDefinition:
1 package com.xupt.rpc.core; 2 3 import java.lang.reflect.Method; 4 5 public class RpcBeanDefination { //將類,類方法,類對象封裝在Definition中。 6 7 private Class<?> klass; 8 private Method method; 9 private Object object; 10 11 RpcBeanDefination() { 12 }
給該類所有的成員都有getter和setter方法就不需要說了,這個類,將執行的哪個類的哪個方法和類的對象封裝起來,以后這個類將形成Map
的值,下面來介紹的RpcBeanFactory會重點介紹。
2.RpcBeanFactory
1 package com.xupt.rpc.core; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 6 public class RpcBeanFactory { 7 8 private final Map<String, RpcBeanDefination> beanMap; 9 10 RpcBeanFactory() { 11 beanMap = new HashMap<>(); 12 } 13 14 void rpcBeanRegistry(String beanId,RpcBeanDefination defination) { 15 RpcBeanDefination rbd = beanMap.get(beanId); 16 if(rbd != null) { 17 return; 18 } 19 beanMap.put(beanId, defination); 20 } 21 22 RpcBeanDefination getBean(String beanId) { 23 return beanMap.get(beanId); 24 } 25 }
此類是將序列號作為Map中的鍵,RpcBeanDeifintion作為值放入Map中,用BeanId來找對應客戶端那邊序列號相同的方法。
3.下來是RpcBeanRegistry:
1 package com.xupt.rpc.core; 2 3 import java.lang.reflect.Method; 4 5 public class RpcBeanRegistry { 6 7 RpcBeanRegistry() { 8 } 9 10 //給客戶端提供 11 static void registryInterface(RpcBeanFactory rpcBeanFactory,Class<?> interfaces) { 12 doregistry(rpcBeanFactory,interfaces,null); 13 } 14 15 //內部使用,注冊 16 private static void doregistry(RpcBeanFactory rpcBeanFactory , Class<?> interfaces ,Object object) { 17 //得到接口中的所有的方法,行程方法的數組 18 Method[] methods = interfaces.getDeclaredMethods(); 19 for(Method method : methods) { //遍歷這些方法 20 String beanId = String.valueOf(method.toString().hashCode());//將方法序列化。 21 RpcBeanDefination rpcBeanDefination = new RpcBeanDefination(); 22 23 //將得到的實現接口的那個類和它的方法以及對象放進RpcBeanDefination()中。 24 rpcBeanDefination.setKlass(interfaces); 25 rpcBeanDefination.setMethod(method); 26 rpcBeanDefination.setObject(object); 27 28 rpcBeanFactory.rpcBeanRegistry(beanId, rpcBeanDefination); 29 } 30 } 31 32 //服務端使用,知道實現類的對象。 33 static void registryInterface(RpcBeanFactory rpcBeanFactory,Class<?> interfaces,Object object) { 34 //判斷此類是否實現了這個接口。 35 if(!interfaces.isAssignableFrom(object.getClass())){ 36 return; 37 } 38 doregistry(rpcBeanFactory,interfaces,object); 39 } 40 41 //服務器端使用,知道類,創建一個對象。 42 static void registryInterface(RpcBeanFactory rpcBeanFactory,Class<?> interfaces,Class<?> klass) { 43 //判斷該類是否實現了接口。 44 if(!interfaces.isAssignableFrom(klass)){ 45 return; 46 } 47 try { 48 doregistry(rpcBeanFactory, interfaces, klass.newInstance()); 49 } catch (Exception e) { 50 e.printStackTrace(); 51 } 52 } 53 }
這個類是同時給客戶端和服務器使用的,所以有一個私有方法來完成類方法的獲得和序列號的產生(method.toString().hashcode())。然后將類、方法、對象放進RpcBeanDefinition中,將得到的序列號作為鍵和rpcBeanDeifinyion作為值放進Map中,形成鍵值對,方便客戶端和服務器的調用。
4.下來是RpcServer:
1 package com.xupt.rpc.core;
2 3 import java.io.IOException; 4 import java.net.ServerSocket; 5 import java.net.Socket; 6 7 public class RpcServer implements Runnable { 8 9 private ServerSocket server; 10 private int port; 11 private boolean goon; 12 private final RpcBeanFactory rpcBeanFactory; 13 private static long executorId; 14 15 public RpcServer() { 16 rpcBeanFactory = new RpcBeanFactory(); 17 this.goon = false; 18 } 19 20 public void setPort(int port) { 21 this.port = port; 22 } 23 24 public void startRpcServer() throws Exception { 25 if(this.port == 0) { 26 return; 27 } 28 server = new ServerSocket(port); 29 this.goon = true; 30 new Thread(this,"Rpc_Server").start();//啟動線程 31 } 32 33 public void stopRpcServer() { //關閉服務器 34 if (this.server != null && !this.server.isClosed()) { 35 try { 36 this.server.close(); 37 } catch (IOException e) { 38 e.printStackTrace(); 39 } finally { 40 this.server = null; 41 } 42 } 43 } 44 45 RpcBeanFactory getRpcBeanFactory() { 46 return rpcBeanFactory; 47 } 48 49 //用注冊的方法知道類實現的接口,類的對象,通過對象執行類的方法。 50 public void rpcRegistry(Class<?> interfaces,Object object) { 51 RpcBeanRegistry.registryInterface(rpcBeanFactory, interfaces, object); 52 } 53 54 public void rpcRegistry(Class<?> interfaces, Class<?> imClass) { 55 RpcBeanRegistry.registryInterface(rpcBeanFactory, interfaces,imClass); 56 } 57 58 @Override 59 public void run() { 60 while(goon) { 61 try { 62 Socket socket = server.accept();//不斷的偵聽 63 64 new RpcServerExecutor(socket, this,++executorId); 65 66 } catch (Exception e) { 67 goon = false; 68 e.printStackTrace(); 69 } 70 } 71 stopRpcServer(); 72 } 73 }
上述服務器端的基本任務已經完成了。
下來我們來看看客戶端:
首先還是:RpcClientExecutor,它是建立與服務器端的連接,以及接收,發送消息,具體發送的是方法的序列號和參數類型。
package com.xupt.rpc.core; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.Socket; public class RpcClientExecutor { private String rpcServerIp; //ip地址 private int rpcServerPort; //端口號 RpcClientExecutor() { } RpcClientExecutor(String rpcServerIp, int rpcServerPort) { this.rpcServerIp = rpcServerIp; this.rpcServerPort = rpcServerPort; } String getRpcServerIp() { return rpcServerIp; } void setRpcServerIp(String rpcServerIp) { this.rpcServerIp = rpcServerIp; } int getRpcServerPort() { return rpcServerPort; } void setRpcServerPort(int rpcServerPort) { this.rpcServerPort = rpcServerPort; } //關閉客戶端 private void closeSocket(ObjectInputStream ois, ObjectOutputStream oos, Socket socket) { try { if (ois != null) { ois.close(); } } catch (IOException e) { e.printStackTrace(); } finally { ois = null; } try { if (oos != null) { oos.close(); } } catch (IOException e) { e.printStackTrace(); } finally { oos = null; } try { if (socket != null && !socket.isClosed()) { socket.close(); } } catch (IOException e) { e.printStackTrace(); } finally { socket = null; } } @SuppressWarnings("unchecked") <T> T rpExecutor(String beanId,Object[] params) throws Exception { //連接服務器端 Socket socket = new Socket(rpcServerIp, rpcServerPort); //發送方法的序列號和參數類型。 ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream()); oos.writeUTF(beanId); oos.writeObject(params); //必須將這句放在前面三句的后面。 ObjectInputStream ois = new ObjectInputStream(socket.getInputStream()); //接收服務器端返回的結果。 Object result = ois.readObject(); closeSocket(ois,oos,socket); return (T) result; } }
下來是:RpcClient:這個類是產生代理,用代理對象偽執行相關的方法,然后在真正的來連接服務器。
1 package com.xupt.rpc.core; 2 3 import java.lang.reflect.InvocationHandler; 4 import java.lang.reflect.Method; 5 import java.lang.reflect.Proxy; 6 7 public class RpcClient { 8 9 private RpcClientExecutor rpcClientExecutor; 10 11 public RpcClient(String rpcServerIp,int rpcServerport) { 12 this.rpcClientExecutor = new RpcClientExecutor(rpcServerIp, rpcServerport); 13 } 14 15 @SuppressWarnings("unchecked") 16 public <T> T getProxy(Class<?> klass) { 17 return (T) Proxy.newProxyInstance( 18 klass.getClassLoader(), 19 new Class[] {klass}, 20 new InvocationHandler() { 21 22 @Override 23 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 24 String BeanId = String.valueOf(method.toString().hashCode()); 25 Object result = rpcClientExecutor.rpExecutor(BeanId, args); 26 27 return result; 28 } 29 } 30 ); 31 } 32 }
上述方法產生代理就不多做解釋,不明白請看上一篇的代理機制之初見吧。。。。
以上就是我的RMI的簡單實現,入如果有錯誤,請指正!也希望它對你有所幫助。