視頻教程地址 DT課堂(原名顏群)
整體思路
RPC(Remote Procedure Call),即遠程過程調用。使用RPC,可以像使用本地的程序一樣使用遠程計算機上的程序。RPC使得開發分布式程序更加容易。下面是一個基於java的簡單的RPC實例,有助於學習dubbo或grpc等框架的原理。
原理分析
RPC采用客戶機/服務器模式。請求程序就是客戶端,而服務提供程序就是服務端。也就是說需要兩個角色,服務端和客戶端。首先,客戶端調用進程發送一個調用信息(調用的接口,方法名,方法傳入參數等)給服務端,然后等待應答信息。在服務器端,當一個調用信息到達,服務器獲得調用信息並解析執行調用的接口和方法,然后發送調用的方法返回值,然后等待下一個調用信息,最后,客戶端接收到服務端發送回來的方法返回信息。
以下是代碼
服務端
首先需要業務類,然后需要一個注冊中心,注冊中心可以把被調用的業務類注冊到一個map集合中,然后根據客戶端發送過來的調用信息執行相應的業務類對象的方法,並返回方法的返回值
創建需要發布的業務類接口和具體實現類
package org.rpc.service; public interface HelloService { Object sayHello(String name); }
package org.rpc.service; public class HelloServiceImpl implements HelloService { public Object sayHello(String name) { // TODO Auto-generated method stub return "hello,"+name; } }
然后是服務端的主體類,就是注冊中心。定義三個方法start()初始化方法,stop()停止服務方法,register()注冊中心
package org.rpc.service; public interface Server { void start(); void stop(); void register(Class service,Class serviceImpl); }
具體實現類,首先聲明一個map集合來來存放業務類,key是業務類的接口名,value是接口對應的具體實現類class對象
package org.rpc.service; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.HashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ServerCenter implements Server { private static HashMap<String, Class> serviceRegister=new HashMap<>(); private static int PORT=0; //根據本地計算機性能生成對應容量的線程池 private static ExecutorService servicePool= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); public ServerCenter() { } public ServerCenter(int port) { this.PORT=port; } @Override public void start() { ServerSocket server=null; try { server=new ServerSocket(); server.bind(new InetSocketAddress(PORT)); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } while(true) { System.out.println("等待客戶端連接..."); Socket socket = null; try { //服務器等待連接,每當有客戶端連接就開啟線程執行調用信息處理類 socket = server.accept(); servicePool.execute(new ServiceTask(socket)); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } @Override public void stop() { servicePool.shutdown(); } @Override public void register(Class service, Class serviceImpl) { serviceRegister.put(service.getName(), serviceImpl); } //具體調用信息處理類,解析客戶端發來的調用信息並執行對應的業務方法並相應方法的返回值 private class ServiceTask implements Runnable{ private Socket socket=null; public ServiceTask() { } public ServiceTask(Socket socket) { this.socket = socket; } @Override public void run() { ObjectInputStream ois=null; ObjectOutputStream oos=null; try { System.out.println("客戶端已連接"); ois=new ObjectInputStream(socket.getInputStream()); //獲取客戶端發來的接口名 String className=ois.readUTF(); //獲取客戶端發來的方法 String methodName=ois.readUTF(); //獲取客戶端發來的方法參數類型 Class[] methodTypes=(Class[]) ois.readObject(); //獲取客戶端發來的方法參數值 Object[] args =(Object[]) ois.readObject(); //從map中找到需要的接口並執行客戶端調用的方法 Class service=serviceRegister.get(className); Method method = service.getMethod(methodName,methodTypes); Object returns = method.invoke(service.newInstance(), args); oos=new ObjectOutputStream(socket.getOutputStream()); //返回方法執行的結果 oos.writeObject(returns); }catch (Exception e) { e.printStackTrace(); }finally { try { //關閉資源 if(oos!=null)oos.close(); if(ois!=null)ois.close(); if(socket!=null)socket.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
客戶端
客戶端使用動態代理來接受服務端的業務類返回值
package org.rpc.client; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.Socket; public class Client { @SuppressWarnings("unchecked") public static <T> T getRemoteProxyObj(Class serviceInterface,InetSocketAddress addr) { return (T)Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[] {serviceInterface}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args){ Socket socket =null; ObjectInputStream ois=null; ObjectOutputStream oos=null; Object result=null; try { socket=new Socket(); socket.connect(addr); oos=new ObjectOutputStream(socket.getOutputStream()); //發送需要的接口名 oos.writeUTF(serviceInterface.getName()); //發送需要的方法名 oos.writeUTF(method.getName()); //方法參數類型 oos.writeObject(method.getParameterTypes()); //方法參數 oos.writeObject(args); ois=new ObjectInputStream(socket.getInputStream()); result=ois.readObject(); }catch (Exception e) { e.printStackTrace(); } finally { try { if(oos!=null)oos.close(); if(ois!=null)ois.close(); if(socket!=null)socket.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return result; } }); } }
測試
服務端使用register()方法對HelloService類進行注冊並開啟服務等待客戶端連接
package org.rpc.test; import org.rpc.service.HelloService; import org.rpc.service.HelloServiceImpl; import org.rpc.service.Server; import org.rpc.service.ServerCenter; public class ServerTest { public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { Server server = new ServerCenter(9999); server.register(HelloService.class, HelloServiceImpl.class); server.start(); } }).start(); } }
客戶端直接聲明需要調用的業務類的接口接受動態代理對象並執行需要的方法
package org.rpc.test; import java.net.InetSocketAddress; import org.rpc.client.Client; import org.rpc.service.HelloService; public class ClientTest { public static void main(String[] args) throws ClassNotFoundException { HelloService hs=Client.getRemoteProxyObj(Class.forName("org.rpc.service.HelloService"), new InetSocketAddress("127.0.0.1", 9999)); System.out.println(hs.sayHello("world")); } }
視頻教程地址http://aibd.ke.qq.com