實現自己的RPC框架如果不需要自定義協議的話那就要基於Socket+序列化。
ProcessorHandler:
主要是用來處理客戶端的請求。
package dgb.nospring.myrpc; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.Socket; /** * 任務處理類 * * @author Dongguabai * @date 2018/11/1 16:10 */ public class ProcessorHandler implements Runnable { private Socket socket; /** * 服務端發布的服務 */ private Object service; public ProcessorHandler(Socket socket, Object service) { this.socket = socket; this.service = service; } //處理請求 @Override public void run() { ObjectInputStream objectInputStream = null; try { objectInputStream = new ObjectInputStream(socket.getInputStream()); //反序列化 RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject(); Object result = invoke(rpcRequest); //將結果返回給客戶端 ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(result); objectOutputStream.flush(); objectInputStream.close(); } catch (Exception e) { e.printStackTrace(); } finally { if (objectInputStream != null) { try { objectInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 反射調用 * * @param rpcRequest */ private Object invoke(RpcRequest rpcRequest) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { System.out.println("服務端開始調用------"); Object[] parameters = rpcRequest.getParameters(); Class[] parameterTypes = new Class[parameters.length]; for (int i = 0, length = parameters.length; i < length; i++) { parameterTypes[i] = parameters[i].getClass(); } Method method = service.getClass().getMethod(rpcRequest.getMethodName(), parameterTypes); return method.invoke(service, parameters); } }
RemoteInvocationHandler:
動態代理InvocationHandler。
package dgb.nospring.myrpc; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; /** * @author Dongguabai * @date 2018/11/1 16:20 */ public class RemoteInvocationHandler implements InvocationHandler{ private String host; private int port; /** *發起客戶端和服務端的遠程調用。調用客戶端的信息進行傳輸 */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setClassName(method.getDeclaringClass().getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameters(args); TcpTransport tcpTransport = new TcpTransport(host,port); return tcpTransport.send(rpcRequest); } public RemoteInvocationHandler(String host, int port) { this.host = host; this.port = port; } } RpcClientProxy: 客戶端獲取代理對象。 package dgb.nospring.myrpc; import java.lang.reflect.Proxy; /** * 客戶端代理 * @author Dongguabai * @date 2018/11/1 16:18 */ public class RpcClientProxy { public <T> T clientProxy(final Class<T> interfaceClass,final String host,final int port){ return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class[]{interfaceClass},new RemoteInvocationHandler(host, port)); } }
RpcRequest:
封裝的一個傳輸對象。
package dgb.nospring.myrpc; import java.io.Serializable; /** * 統一傳輸對象(讓服務端知道當前要做什么) * * @author Dongguabai * @date 2018/11/1 16:16 */ public class RpcRequest implements Serializable { private String className; private String methodName; private Object[] parameters; public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Object[] getParameters() { return parameters; } public void setParameters(Object[] parameters) { this.parameters = parameters; } }
RpcServer:
服務端發布服務。
package dgb.nospring.myrpc; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author Dongguabai * @date 2018/11/1 15:53 */ public class RpcServer { //不建議通過Executors創建線程池,這里為了方便 private static final ExecutorService executor = Executors.newCachedThreadPool(); public void publisher(final Object service, int port) { //啟動一個服務監聽 try (ServerSocket serverSocket = new ServerSocket(port)) { while (true){ //通過ServerSocket獲取請求 Socket socket = serverSocket.accept(); executor.execute(new ProcessorHandler(socket,service)); } } catch (IOException e) { e.printStackTrace(); } } }
TcpTransport:
處理Socket傳輸。
package dgb.nospring.myrpc; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.Socket; /** * socket傳輸 * * @author Dongguabai * @date 2018/11/1 16:25 */ public class TcpTransport { private String host; private int port; public TcpTransport(String host, int port) { this.host = host; this.port = port; } private Socket newSocket() { System.out.println("准備創建Socket連接,host:" + host + ",port:" + port); try { Socket socket = new Socket(host, port); return socket; } catch (IOException e) { throw new RuntimeException("Socket連接創建失敗!host:" + host + ",port:" + port); } } public Object send(RpcRequest rpcRequest) { Socket socket = null; try { socket = newSocket(); try { ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeObject(rpcRequest); outputStream.flush(); ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream()); Object result = inputStream.readObject(); inputStream.close(); outputStream.close(); return result; } catch (IOException | ClassNotFoundException e) { throw new RuntimeException("發起遠程調用異常!",e); } } finally { if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
測試Demo
接口:
package dgb.nospring.myrpc.demo; /** * @author Dongguabai * @date 2018/11/1 15:50 */ public interface IHelloService { String sayHello(String name); } 實現類: package dgb.nospring.myrpc.demo; /** * @author Dongguabai * @date 2018/11/1 15:51 */ public class HelloServiceImpl implements IHelloService { @Override public String sayHello(String name) { return "你好," + name; } }
客戶端:
package dgb.nospring.myrpc.demo; import dgb.nospring.myrpc.RpcClientProxy; /** * @author Dongguabai * @date 2018/11/1 18:10 */ public class ClientDemo { public static void main(String[] args) { RpcClientProxy proxy = new RpcClientProxy(); IHelloService helloService = proxy.clientProxy(IHelloService.class, "127.0.0.1", 12345); String name = helloService.sayHello("張三"); System.out.println(name); } }
服務端:
package dgb.nospring.myrpc.demo; import dgb.nospring.myrpc.RpcServer; /** * @author Dongguabai * @date 2018/11/1 18:07 */ public class ServerDemo { public static void main(String[] args) { RpcServer rpcServer = new RpcServer(); rpcServer.publisher(new HelloServiceImpl(),12345); } }
目前大部分遠程調用框架都是基於netty去實現的,畢竟Socket的性能實在不行。
作者:Dongguabai
來源:CSDN
原文:https://blog.csdn.net/Dongguabai/article/details/83624822
------------------------------------------------------------------------------------------------------------------------
基於以上完成的RPC框架進行改造,增加基於Curator實現的ZK注冊中心。
項目源碼地址:https://gitee.com/white_melon_white/rpcDemo
可能這個圖不太准確,但是大體意思就是服務端在注冊中心中注冊服務,客戶端在注冊中心獲取服務地址進行調用,中間可能還會有一些LB等:
定義一個注冊服務的頂層接口IRegistryCenter:
package dgb.nospring.myrpc.registry; /** * 注冊中心頂層接口 * @author Dongguabai * @date 2018/11/1 19:05 */ public interface IRegistryCenter { /** * 注冊服務 * @param serviceName 服務名稱 * @param serviceAddress 服務地址 */ void register(String serviceName,String serviceAddress); }
實現類RegistryCenterImpl:
package dgb.nospring.myrpc.registry; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; /** * 注冊中心實現 * * @author Dongguabai * @date 2018/11/1 19:10 */ @Slf4j public class RegistryCenterImpl implements IRegistryCenter { private CuratorFramework curatorFramework; { curatorFramework = CuratorFrameworkFactory.builder() .connectString(RegistryCenterConfig.CONNECTING_STR) .sessionTimeoutMs(RegistryCenterConfig.SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)).build(); curatorFramework.start(); } //注冊相應服務 @Override public void register(String serviceName, String serviceAddress) { String serviceNodePath = RegistryCenterConfig.NAMESPACE + "/" + serviceName; try { //如果serviceNodePath(/rpcNode/userService)不存在就創建 if (curatorFramework.checkExists().forPath(serviceNodePath)==null){ //持久化節點 curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(serviceNodePath,RegistryCenterConfig.DEFAULT_VALUE); } //注冊的服務的節點路徑 String addressPath = serviceNodePath+"/"+serviceAddress; //臨時節點 String rsNode = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(addressPath, RegistryCenterConfig.DEFAULT_VALUE); log.info("服務注冊成功:{}",rsNode); } catch (Exception e) { throw new RuntimeException("注冊服務出現異常!",e); } } }
注冊中心的一些配置參數RegistryCenterConfig:
package dgb.nospring.myrpc.registry; /** * @author Dongguabai * @date 2018/11/1 19:13 */ public interface RegistryCenterConfig { /** * ZK地址int */ String CONNECTING_STR = "192.168.220.136,192.168.220.137"; int SESSION_TIMEOUT = 4000; /** * 注冊中心namespace */ String NAMESPACE = "/rpcNode"; /** * value一般來說作用不大;一般主要是利用節點特性搞點事情 */ byte[] DEFAULT_VALUE = "0".getBytes(); }
為了方便,增加了一個服務發布的注解RpcAnnotation,在接口的實現類上標明這個注解表示對外發布這個接口:
package dgb.nospring.myrpc.registry; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * @author Dongguabai * @date 2018/11/2 8:54 */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface RpcAnnotation { /** * 對外發布服務的接口 * * @return */ Class<?> value(); /** * 版本,用來區分不同版本 * @return */ String version() default ""; }
這個版本號的作用在本次Demo中沒有體現出來,不過其實使用也是很簡單的,可以將版本號與ZK node地址中的serviceName拼接或者綁定起來,然后根據版本號+serviceName獲取相應的服務調用地址。那么客戶端在發現服務的時候也要傳入相應的版本進去。
首先改造服務端,服務端要將服務發布到注冊中心,RpcServer:
package dgb.nospring.myrpc; import dgb.nospring.myrpc.registry.IRegistryCenter; import dgb.nospring.myrpc.registry.RpcAnnotation; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author Dongguabai * @date 2018/11/1 15:53 */ @Slf4j public class RpcServer { /** * 注冊中心 */ private IRegistryCenter registryCenter; /** * 服務的發布地址 */ private String addressService; /** * 服務名稱和服務對象之間的關系 */ private static final Map<String,Object> HANDLER_MAPPING = new HashMap<>(); //不建議通過Executors創建線程池,這里為了方便 private static final ExecutorService executor = Executors.newCachedThreadPool(); /*public void publisher(final Object service, int port) { //啟動一個服務監聽 try (ServerSocket serverSocket = new ServerSocket(port)) { while (true){ //通過ServerSocket獲取請求 Socket socket = serverSocket.accept(); executor.execute(new ProcessorHandler(socket,service)); } } catch (IOException e) { e.printStackTrace(); } }*/ /** * 改造后的發布服務的方法 */ public void publisher() { //啟動一個服務監聽 //獲取端口 int port = Integer.parseInt(addressService.split(":")[1]); try (ServerSocket serverSocket = new ServerSocket(port)) { //循環獲取所有的接口Name HANDLER_MAPPING.keySet().forEach(interfaceName->{ registryCenter.register(interfaceName,addressService); log.info("注冊服務成功:【serviceName:{},address:{}】",interfaceName,addressService); }); while (true){ //通過ServerSocket獲取請求 Socket socket = serverSocket.accept(); executor.execute(new ProcessorHandler(socket,HANDLER_MAPPING)); } } catch (IOException e) { e.printStackTrace(); } } /** * 綁定服務名稱和服務對象 * @param services */ public void bind(Object... services){ for (Object service : services) { //獲取發布的服務接口 RpcAnnotation rpcAnnotation = service.getClass().getAnnotation(RpcAnnotation.class); if (rpcAnnotation==null){ continue; } //發布接口的class String serviceName = rpcAnnotation.value().getName(); //將serviceName和service進行綁定 HANDLER_MAPPING.put(serviceName,service); } } public RpcServer(IRegistryCenter registryCenter, String addressService) { this.registryCenter = registryCenter; this.addressService = addressService; } }
改造任務處理類ProcessorHandler:
package dgb.nospring.myrpc; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.Socket; import java.util.Map; /** * 任務處理類 * * @author Dongguabai * @date 2018/11/1 16:10 */ public class ProcessorHandler implements Runnable { private Socket socket; /** * 服務端發布的服務 */ private Map<String,Object> handlerMap; /** * 通過構造傳入Map * @param socket * @param handlerMap */ public ProcessorHandler(Socket socket, Map<String, Object> handlerMap) { this.socket = socket; this.handlerMap = handlerMap; } //處理請求 @Override public void run() { ObjectInputStream objectInputStream = null; try { objectInputStream = new ObjectInputStream(socket.getInputStream()); //反序列化 RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject(); Object result = invoke(rpcRequest); //將結果返回給客戶端 ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(result); objectOutputStream.flush(); objectInputStream.close(); } catch (Exception e) { e.printStackTrace(); } finally { if (objectInputStream != null) { try { objectInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 反射調用 * * @param rpcRequest */ /*private Object invoke(RpcRequest rpcRequest) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { System.out.println("服務端開始調用------"); Object[] parameters = rpcRequest.getParameters(); Class[] parameterTypes = new Class[parameters.length]; for (int i = 0, length = parameters.length; i < length; i++) { parameterTypes[i] = parameters[i].getClass(); } Method method = service.getClass().getMethod(rpcRequest.getMethodName(), parameterTypes); return method.invoke(service, parameters); }*/ /** * 反射調用(之前通過Service進行反射調用,現在通過Map獲取service) * * @param rpcRequest */ private Object invoke(RpcRequest rpcRequest) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { System.out.println("服務端開始調用------"); Object[] parameters = rpcRequest.getParameters(); Class[] parameterTypes = new Class[parameters.length]; for (int i = 0, length = parameters.length; i < length; i++) { parameterTypes[i] = parameters[i].getClass(); } //從Map中獲得Service(根據客戶端請求的ServiceName,獲得相應的服務),依舊是通過反射發起調用 Object service = handlerMap.get(rpcRequest.getClassName()); Method method = service.getClass().getMethod(rpcRequest.getMethodName(), parameterTypes); return method.invoke(service, parameters); } }
測試服務發布Demo:
package dgb.nospring.myrpc.demo; import dgb.nospring.myrpc.RpcServer; import dgb.nospring.myrpc.registry.IRegistryCenter; import dgb.nospring.myrpc.registry.RegistryCenterImpl; /** * @author Dongguabai * @date 2018/11/1 18:07 */ public class ServerDemo { public static void main(String[] args) { //之前發布服務 /* RpcServer rpcServer = new RpcServer(); rpcServer.publisher(new HelloServiceImpl(),12345); */ //改造后 IRegistryCenter registryCenter = new RegistryCenterImpl(); //這里為了方便,獲取ip地址就直接寫了 RpcServer rpcServer = new RpcServer(registryCenter,"127.0.0.1:12345"); //綁定服務 rpcServer.bind(new HelloServiceImpl()); rpcServer.publisher(); } }
運行結果:
在ZK客戶端:
服務客戶發布后,現在要解決的就是服務發現的問題。
定義一個頂層服務發現接口IServiceDiscovery:
package dgb.nospring.myrpc.registry; /** * @author Dongguabai * @date 2018/11/2 9:55 */ public interface IServiceDiscovery { /** * 根據接口名稱發現服務調用地址 * @param serviceName * @return */ String discover(String serviceName); }
實現類:
package dgb.nospring.myrpc.registry; import dgb.nospring.myrpc.registry.loadbalance.LoadBalance; import dgb.nospring.myrpc.registry.loadbalance.RandomLoadBanalce; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.List; /** * 服務發現實現類 * @author Dongguabai * @date 2018/11/2 9:56 */ @Slf4j public class ServiceDiscoveryImpl implements IServiceDiscovery { /** * /rpcNode/dgb.nospring.myrpc.demo.IHelloService * 當前服務下所有的協議地址 */ private List<String> repos; /** * ZK地址 */ private String zkAddress; private CuratorFramework curatorFramework; @Override public String discover(String serviceName) { //獲取/rpcNode/dgb.nospring.myrpc.demo.IHelloService下所有協議地址 String nodePath = RegistryCenterConfig.NAMESPACE+"/"+serviceName; try { repos = curatorFramework.getChildren().forPath(nodePath); } catch (Exception e) { throw new RuntimeException("服務發現獲取子節點異常!",e); } //動態發現服務節點變化,需要注冊監聽 registerWatcher(nodePath); //這里為了方便,直接使用隨機負載 LoadBalance loadBalance = new RandomLoadBanalce(); return loadBalance.selectHost(repos); } /** * 監聽節點變化,給repos重新賦值 * @param path */ private void registerWatcher(String path){ PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework,path,true); PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { repos = curatorFramework.getChildren().forPath(path); } }; pathChildrenCache.getListenable().addListener(pathChildrenCacheListener); try { pathChildrenCache.start(); } catch (Exception e) { throw new RuntimeException("監聽節點變化異常!",e); } } public ServiceDiscoveryImpl(String zkAddress) { this.zkAddress = zkAddress; curatorFramework = CuratorFrameworkFactory.builder() .connectString(RegistryCenterConfig.CONNECTING_STR) .sessionTimeoutMs(RegistryCenterConfig.SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)).build(); curatorFramework.start(); } }
還有一套負載算法(這里簡單實現了一個隨機負載):
package dgb.nospring.myrpc.registry.loadbalance; import java.util.List; /** * 負載頂層接口 * @author Dongguabai * @date 2018/11/2 10:11 */ public interface LoadBalance { String selectHost(List<String> repos); } package dgb.nospring.myrpc.registry.loadbalance; import org.apache.commons.collections.CollectionUtils; import java.util.List; /** * @author Dongguabai * @date 2018/11/2 10:15 */ public abstract class AbstractLoadBanance implements LoadBalance{ /** * 通過模板方法,做一些牽制操作 * @param repos * @return */ @Override public String selectHost(List<String> repos) { if(CollectionUtils.isEmpty(repos)){ return null; } if(repos.size()==1){ return repos.get(0); } return doSelect(repos); } /** * 實現具體的實現負載算法 * @param repos * @return */ protected abstract String doSelect(List<String> repos); } package dgb.nospring.myrpc.registry.loadbalance; import java.util.List; import java.util.Random; /** * 隨機負載算法 * @author Dongguabai * @date 2018/11/2 10:17 */ public class RandomLoadBanalce extends AbstractLoadBanance{ @Override protected String doSelect(List<String> repos) { return repos.get(new Random().nextInt(repos.size())); } } 還有獲取服務的RpcClientProxy需要進行改造,其實就是改了一個參數傳遞而已: package dgb.nospring.myrpc; import dgb.nospring.myrpc.registry.IServiceDiscovery; import java.lang.reflect.Proxy; /** * 客戶端代理 * @author Dongguabai * @date 2018/11/1 16:18 */ public class RpcClientProxy { private IServiceDiscovery serviceDiscovery; /* public <T> T clientProxy(final Class<T> interfaceClass,final String host,final int port){ return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class[]{interfaceClass},new RemoteInvocationHandler(host, port)); }*/ public <T> T clientProxy(final Class<T> interfaceClass){ return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class[]{interfaceClass},new RemoteInvocationHandler(serviceDiscovery)); } public RpcClientProxy(IServiceDiscovery serviceDiscovery) { this.serviceDiscovery = serviceDiscovery; } }
同樣的,動態代理的InvocationHandler也要修改:
package dgb.nospring.myrpc; import dgb.nospring.myrpc.registry.IServiceDiscovery; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; /** * @author Dongguabai * @date 2018/11/1 16:20 */ public class RemoteInvocationHandler implements InvocationHandler{ private IServiceDiscovery serviceDiscovery; /** *發起客戶端和服務端的遠程調用。調用客戶端的信息進行傳輸 */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setClassName(method.getDeclaringClass().getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameters(args); //從ZK中獲取地址 127.0.0.1:12345 String discover = serviceDiscovery.discover(rpcRequest.getClassName()); TcpTransport tcpTransport = new TcpTransport(discover); return tcpTransport.send(rpcRequest); } public RemoteInvocationHandler(IServiceDiscovery serviceDiscovery) { this.serviceDiscovery = serviceDiscovery; } }
同樣的。TCPTransport也要進行改造:
package dgb.nospring.myrpc; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.Socket; /** * socket傳輸 * * @author Dongguabai * @date 2018/11/1 16:25 */ public class TcpTransport { private String serviceAddress; private Socket newSocket() { System.out.println("准備創建Socket連接,"+serviceAddress); String[] split = serviceAddress.split(":"); try { Socket socket = new Socket(split[0], Integer.parseInt(split[1])); return socket; } catch (IOException e) { throw new RuntimeException("Socket連接創建失敗!" + serviceAddress); } } public TcpTransport(String serviceAddress) { this.serviceAddress = serviceAddress; } public Object send(RpcRequest rpcRequest) { Socket socket = null; try { socket = newSocket(); try { ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeObject(rpcRequest); outputStream.flush(); ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream()); Object result = inputStream.readObject(); inputStream.close(); outputStream.close(); return result; } catch (IOException | ClassNotFoundException e) { throw new RuntimeException("發起遠程調用異常!",e); } } finally { if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
客戶端Demo:
package dgb.nospring.myrpc.demo; import dgb.nospring.myrpc.RpcClientProxy; import dgb.nospring.myrpc.registry.IServiceDiscovery; import dgb.nospring.myrpc.registry.RegistryCenterConfig; import dgb.nospring.myrpc.registry.ServiceDiscoveryImpl; /** * @author Dongguabai * @date 2018/11/1 18:10 */ public class ClientDemo { public static void main(String[] args) { /*RpcClientProxy proxy = new RpcClientProxy(); IHelloService helloService = proxy.clientProxy(IHelloService.class, "127.0.0.1", 12345); String name = helloService.sayHello("張三"); System.out.println(name);*/ IServiceDiscovery serviceDiscovery = new ServiceDiscoveryImpl(RegistryCenterConfig.CONNECTING_STR); RpcClientProxy proxy = new RpcClientProxy(serviceDiscovery); IHelloService service = proxy.clientProxy(IHelloService.class); System.out.println(service.sayHello("張三")); } }
控制台輸出:
如果需要驗證集群環境下,我們可以創建兩個ServerDemo:
兩個服務均注冊到注冊中心:
客戶端調用還是不變:
連續調用兩次客戶端:
---------------------
作者:Dongguabai
來源:CSDN
原文:https://blog.csdn.net/dongguabai/article/details/83625362