手寫實現RPC框架(不帶注冊中心和帶注冊中心兩種)


實現自己的RPC框架如果不需要自定義協議的話那就要基於Socket+序列化。

 

 

ProcessorHandler:
主要是用來處理客戶端的請求。

package dgb.nospring.myrpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;

/**
* 任務處理類
*
* @author Dongguabai
* @date 2018/11/1 16:10
*/
public class ProcessorHandler implements Runnable {

private Socket socket;
/**
* 服務端發布的服務
*/
private Object service;

public ProcessorHandler(Socket socket, Object service) {
this.socket = socket;
this.service = service;
}

//處理請求
@Override
public void run() {
ObjectInputStream objectInputStream = null;
try {
objectInputStream = new ObjectInputStream(socket.getInputStream());
//反序列化
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object result = invoke(rpcRequest);
//將結果返回給客戶端
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(result);
objectOutputStream.flush();
objectInputStream.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (objectInputStream != null) {
try {
objectInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

/**
* 反射調用
*
* @param rpcRequest
*/
private Object invoke(RpcRequest rpcRequest) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
System.out.println("服務端開始調用------");
Object[] parameters = rpcRequest.getParameters();
Class[] parameterTypes = new Class[parameters.length];
for (int i = 0, length = parameters.length; i < length; i++) {
parameterTypes[i] = parameters[i].getClass();
}
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), parameterTypes);
return method.invoke(service, parameters);
}

}

 


RemoteInvocationHandler:
動態代理InvocationHandler。

package dgb.nospring.myrpc;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
* @author Dongguabai
* @date 2018/11/1 16:20
*/
public class RemoteInvocationHandler implements InvocationHandler{

private String host;
private int port;

/**
*發起客戶端和服務端的遠程調用。調用客戶端的信息進行傳輸
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameters(args);
TcpTransport tcpTransport = new TcpTransport(host,port);
return tcpTransport.send(rpcRequest);
}

public RemoteInvocationHandler(String host, int port) {
this.host = host;
this.port = port;
}
}
RpcClientProxy:
客戶端獲取代理對象。

package dgb.nospring.myrpc;

import java.lang.reflect.Proxy;

/**
* 客戶端代理
* @author Dongguabai
* @date 2018/11/1 16:18
*/
public class RpcClientProxy {

public <T> T clientProxy(final Class<T> interfaceClass,final String host,final int port){
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class[]{interfaceClass},new RemoteInvocationHandler(host, port));
}
}

 

RpcRequest:
封裝的一個傳輸對象。

package dgb.nospring.myrpc;

import java.io.Serializable;

/**
* 統一傳輸對象(讓服務端知道當前要做什么)
*
* @author Dongguabai
* @date 2018/11/1 16:16
*/
public class RpcRequest implements Serializable {

private String className;
private String methodName;
private Object[] parameters;

public String getClassName() {
return className;
}

public void setClassName(String className) {
this.className = className;
}

public String getMethodName() {
return methodName;
}

public void setMethodName(String methodName) {
this.methodName = methodName;
}

public Object[] getParameters() {
return parameters;
}

public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
}

 


RpcServer:
服務端發布服務。

package dgb.nospring.myrpc;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author Dongguabai
* @date 2018/11/1 15:53
*/
public class RpcServer {
//不建議通過Executors創建線程池,這里為了方便
private static final ExecutorService executor = Executors.newCachedThreadPool();

public void publisher(final Object service, int port) {
//啟動一個服務監聽
try (ServerSocket serverSocket = new ServerSocket(port)) {
while (true){
//通過ServerSocket獲取請求
Socket socket = serverSocket.accept();
executor.execute(new ProcessorHandler(socket,service));
}
} catch (IOException e) {
e.printStackTrace();
}
}

}

 


TcpTransport:
處理Socket傳輸。

package dgb.nospring.myrpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;

/**
* socket傳輸
*
* @author Dongguabai
* @date 2018/11/1 16:25
*/
public class TcpTransport {

private String host;

private int port;

public TcpTransport(String host, int port) {
this.host = host;
this.port = port;
}

private Socket newSocket() {
System.out.println("准備創建Socket連接,host:" + host + ",port:" + port);
try {
Socket socket = new Socket(host, port);
return socket;
} catch (IOException e) {
throw new RuntimeException("Socket連接創建失敗!host:" + host + ",port:" + port);
}
}

public Object send(RpcRequest rpcRequest) {
Socket socket = null;
try {
socket = newSocket();
try {
ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(rpcRequest);
outputStream.flush();
ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
Object result = inputStream.readObject();
inputStream.close();
outputStream.close();
return result;
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("發起遠程調用異常!",e);
}
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

 


測試Demo
接口:

package dgb.nospring.myrpc.demo;

/**
* @author Dongguabai
* @date 2018/11/1 15:50
*/
public interface IHelloService {

String sayHello(String name);
}
實現類:

package dgb.nospring.myrpc.demo;

/**
* @author Dongguabai
* @date 2018/11/1 15:51
*/
public class HelloServiceImpl implements IHelloService {

@Override
public String sayHello(String name) {
return "你好," + name;
}
}

 

客戶端:

package dgb.nospring.myrpc.demo;

import dgb.nospring.myrpc.RpcClientProxy;

/**
* @author Dongguabai
* @date 2018/11/1 18:10
*/
public class ClientDemo {

public static void main(String[] args) {
RpcClientProxy proxy = new RpcClientProxy();
IHelloService helloService = proxy.clientProxy(IHelloService.class, "127.0.0.1", 12345);
String name = helloService.sayHello("張三");
System.out.println(name);
}
}

 


服務端:

package dgb.nospring.myrpc.demo;

import dgb.nospring.myrpc.RpcServer;

/**
* @author Dongguabai
* @date 2018/11/1 18:07
*/
public class ServerDemo {

public static void main(String[] args) {
RpcServer rpcServer = new RpcServer();
rpcServer.publisher(new HelloServiceImpl(),12345);
}
}

目前大部分遠程調用框架都是基於netty去實現的,畢竟Socket的性能實在不行。


作者:Dongguabai
來源:CSDN
原文:https://blog.csdn.net/Dongguabai/article/details/83624822

------------------------------------------------------------------------------------------------------------------------

基於以上完成的RPC框架進行改造,增加基於Curator實現的ZK注冊中心。

項目源碼地址:https://gitee.com/white_melon_white/rpcDemo

可能這個圖不太准確,但是大體意思就是服務端在注冊中心中注冊服務,客戶端在注冊中心獲取服務地址進行調用,中間可能還會有一些LB等:

 

 

定義一個注冊服務的頂層接口IRegistryCenter:

package dgb.nospring.myrpc.registry;

/**
* 注冊中心頂層接口
* @author Dongguabai
* @date 2018/11/1 19:05
*/
public interface IRegistryCenter {

/**
* 注冊服務
* @param serviceName 服務名稱
* @param serviceAddress 服務地址
*/
void register(String serviceName,String serviceAddress);
}

 


實現類RegistryCenterImpl:

package dgb.nospring.myrpc.registry;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

/**
* 注冊中心實現
*
* @author Dongguabai
* @date 2018/11/1 19:10
*/
@Slf4j
public class RegistryCenterImpl implements IRegistryCenter {

private CuratorFramework curatorFramework;

{
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(RegistryCenterConfig.CONNECTING_STR)
.sessionTimeoutMs(RegistryCenterConfig.SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();
curatorFramework.start();
}

//注冊相應服務
@Override
public void register(String serviceName, String serviceAddress) {
String serviceNodePath = RegistryCenterConfig.NAMESPACE + "/" + serviceName;
try {
//如果serviceNodePath(/rpcNode/userService)不存在就創建
if (curatorFramework.checkExists().forPath(serviceNodePath)==null){
//持久化節點
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(serviceNodePath,RegistryCenterConfig.DEFAULT_VALUE);
}
//注冊的服務的節點路徑
String addressPath = serviceNodePath+"/"+serviceAddress;
//臨時節點
String rsNode = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(addressPath, RegistryCenterConfig.DEFAULT_VALUE);
log.info("服務注冊成功:{}",rsNode);

} catch (Exception e) {
throw new RuntimeException("注冊服務出現異常!",e);
}
}

}

 


注冊中心的一些配置參數RegistryCenterConfig:

package dgb.nospring.myrpc.registry;

/**
* @author Dongguabai
* @date 2018/11/1 19:13
*/
public interface RegistryCenterConfig {

/**
* ZK地址int
*/
String CONNECTING_STR = "192.168.220.136,192.168.220.137";

int SESSION_TIMEOUT = 4000;

/**
* 注冊中心namespace
*/
String NAMESPACE = "/rpcNode";

/**
* value一般來說作用不大;一般主要是利用節點特性搞點事情
*/
byte[] DEFAULT_VALUE = "0".getBytes();
}

 


為了方便,增加了一個服務發布的注解RpcAnnotation,在接口的實現類上標明這個注解表示對外發布這個接口:

package dgb.nospring.myrpc.registry;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* @author Dongguabai
* @date 2018/11/2 8:54
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcAnnotation {

/**
* 對外發布服務的接口
*
* @return
*/
Class<?> value();

/**
* 版本,用來區分不同版本
* @return
*/
String version() default "";
}

 


這個版本號的作用在本次Demo中沒有體現出來,不過其實使用也是很簡單的,可以將版本號與ZK node地址中的serviceName拼接或者綁定起來,然后根據版本號+serviceName獲取相應的服務調用地址。那么客戶端在發現服務的時候也要傳入相應的版本進去。

首先改造服務端,服務端要將服務發布到注冊中心,RpcServer:

package dgb.nospring.myrpc;

import dgb.nospring.myrpc.registry.IRegistryCenter;
import dgb.nospring.myrpc.registry.RpcAnnotation;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author Dongguabai
* @date 2018/11/1 15:53
*/
@Slf4j
public class RpcServer {

/**
* 注冊中心
*/
private IRegistryCenter registryCenter;

/**
* 服務的發布地址
*/
private String addressService;

/**
* 服務名稱和服務對象之間的關系
*/
private static final Map<String,Object> HANDLER_MAPPING = new HashMap<>();

//不建議通過Executors創建線程池,這里為了方便
private static final ExecutorService executor = Executors.newCachedThreadPool();

/*public void publisher(final Object service, int port) {
//啟動一個服務監聽
try (ServerSocket serverSocket = new ServerSocket(port)) {
while (true){
//通過ServerSocket獲取請求
Socket socket = serverSocket.accept();
executor.execute(new ProcessorHandler(socket,service));
}
} catch (IOException e) {
e.printStackTrace();
}
}*/

/**
* 改造后的發布服務的方法
*/
public void publisher() {
//啟動一個服務監聽
//獲取端口
int port = Integer.parseInt(addressService.split(":")[1]);
try (ServerSocket serverSocket = new ServerSocket(port)) {
//循環獲取所有的接口Name
HANDLER_MAPPING.keySet().forEach(interfaceName->{
registryCenter.register(interfaceName,addressService);
log.info("注冊服務成功:【serviceName:{},address:{}】",interfaceName,addressService);
});
while (true){
//通過ServerSocket獲取請求
Socket socket = serverSocket.accept();
executor.execute(new ProcessorHandler(socket,HANDLER_MAPPING));
}
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 綁定服務名稱和服務對象
* @param services
*/
public void bind(Object... services){
for (Object service : services) {
//獲取發布的服務接口
RpcAnnotation rpcAnnotation = service.getClass().getAnnotation(RpcAnnotation.class);
if (rpcAnnotation==null){
continue;
}
//發布接口的class
String serviceName = rpcAnnotation.value().getName();
//將serviceName和service進行綁定
HANDLER_MAPPING.put(serviceName,service);
}
}

public RpcServer(IRegistryCenter registryCenter, String addressService) {
this.registryCenter = registryCenter;
this.addressService = addressService;
}
}

 


改造任務處理類ProcessorHandler:

package dgb.nospring.myrpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
import java.util.Map;

/**
* 任務處理類
*
* @author Dongguabai
* @date 2018/11/1 16:10
*/
public class ProcessorHandler implements Runnable {

private Socket socket;
/**
* 服務端發布的服務
*/
private Map<String,Object> handlerMap;

/**
* 通過構造傳入Map
* @param socket
* @param handlerMap
*/
public ProcessorHandler(Socket socket, Map<String, Object> handlerMap) {
this.socket = socket;
this.handlerMap = handlerMap;
}

//處理請求
@Override
public void run() {
ObjectInputStream objectInputStream = null;
try {
objectInputStream = new ObjectInputStream(socket.getInputStream());
//反序列化
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object result = invoke(rpcRequest);
//將結果返回給客戶端
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(result);
objectOutputStream.flush();
objectInputStream.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (objectInputStream != null) {
try {
objectInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

/**
* 反射調用
*
* @param rpcRequest
*/
/*private Object invoke(RpcRequest rpcRequest) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
System.out.println("服務端開始調用------");
Object[] parameters = rpcRequest.getParameters();
Class[] parameterTypes = new Class[parameters.length];
for (int i = 0, length = parameters.length; i < length; i++) {
parameterTypes[i] = parameters[i].getClass();
}
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), parameterTypes);
return method.invoke(service, parameters);
}*/

/**
* 反射調用(之前通過Service進行反射調用,現在通過Map獲取service)
*
* @param rpcRequest
*/
private Object invoke(RpcRequest rpcRequest) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
System.out.println("服務端開始調用------");
Object[] parameters = rpcRequest.getParameters();
Class[] parameterTypes = new Class[parameters.length];
for (int i = 0, length = parameters.length; i < length; i++) {
parameterTypes[i] = parameters[i].getClass();
}
//從Map中獲得Service(根據客戶端請求的ServiceName,獲得相應的服務),依舊是通過反射發起調用
Object service = handlerMap.get(rpcRequest.getClassName());
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), parameterTypes);
return method.invoke(service, parameters);
}



}

 


測試服務發布Demo:

package dgb.nospring.myrpc.demo;

import dgb.nospring.myrpc.RpcServer;
import dgb.nospring.myrpc.registry.IRegistryCenter;
import dgb.nospring.myrpc.registry.RegistryCenterImpl;

/**
* @author Dongguabai
* @date 2018/11/1 18:07
*/
public class ServerDemo {

public static void main(String[] args) {
//之前發布服務
/*
RpcServer rpcServer = new RpcServer();
rpcServer.publisher(new HelloServiceImpl(),12345);
*/
//改造后
IRegistryCenter registryCenter = new RegistryCenterImpl();
//這里為了方便,獲取ip地址就直接寫了
RpcServer rpcServer = new RpcServer(registryCenter,"127.0.0.1:12345");
//綁定服務
rpcServer.bind(new HelloServiceImpl());
rpcServer.publisher();
}
}

 


運行結果:

 

 

在ZK客戶端:

 

 

服務客戶發布后,現在要解決的就是服務發現的問題。

定義一個頂層服務發現接口IServiceDiscovery:

package dgb.nospring.myrpc.registry;

/**
* @author Dongguabai
* @date 2018/11/2 9:55
*/
public interface IServiceDiscovery {

/**
* 根據接口名稱發現服務調用地址
* @param serviceName
* @return
*/
String discover(String serviceName);
}

 


實現類:

package dgb.nospring.myrpc.registry;

import dgb.nospring.myrpc.registry.loadbalance.LoadBalance;
import dgb.nospring.myrpc.registry.loadbalance.RandomLoadBanalce;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.List;

/**
* 服務發現實現類
* @author Dongguabai
* @date 2018/11/2 9:56
*/
@Slf4j
public class ServiceDiscoveryImpl implements IServiceDiscovery {

/**
* /rpcNode/dgb.nospring.myrpc.demo.IHelloService
* 當前服務下所有的協議地址
*/
private List<String> repos;

/**
* ZK地址
*/
private String zkAddress;

private CuratorFramework curatorFramework;

@Override
public String discover(String serviceName) {
//獲取/rpcNode/dgb.nospring.myrpc.demo.IHelloService下所有協議地址
String nodePath = RegistryCenterConfig.NAMESPACE+"/"+serviceName;
try {
repos = curatorFramework.getChildren().forPath(nodePath);
} catch (Exception e) {
throw new RuntimeException("服務發現獲取子節點異常!",e);
}
//動態發現服務節點變化,需要注冊監聽
registerWatcher(nodePath);

//這里為了方便,直接使用隨機負載
LoadBalance loadBalance = new RandomLoadBanalce();
return loadBalance.selectHost(repos);
}

/**
* 監聽節點變化,給repos重新賦值
* @param path
*/
private void registerWatcher(String path){
PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework,path,true);
PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
repos = curatorFramework.getChildren().forPath(path);
}
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
try {
pathChildrenCache.start();
} catch (Exception e) {
throw new RuntimeException("監聽節點變化異常!",e);
}
}

public ServiceDiscoveryImpl(String zkAddress) {
this.zkAddress = zkAddress;
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(RegistryCenterConfig.CONNECTING_STR)
.sessionTimeoutMs(RegistryCenterConfig.SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();
curatorFramework.start();
}
}

 


還有一套負載算法(這里簡單實現了一個隨機負載):

package dgb.nospring.myrpc.registry.loadbalance;

import java.util.List;

/**
* 負載頂層接口
* @author Dongguabai
* @date 2018/11/2 10:11
*/
public interface LoadBalance {

String selectHost(List<String> repos);
}
package dgb.nospring.myrpc.registry.loadbalance;

import org.apache.commons.collections.CollectionUtils;

import java.util.List;

/**
* @author Dongguabai
* @date 2018/11/2 10:15
*/
public abstract class AbstractLoadBanance implements LoadBalance{

/**
* 通過模板方法,做一些牽制操作
* @param repos
* @return
*/
@Override
public String selectHost(List<String> repos) {
if(CollectionUtils.isEmpty(repos)){
return null;
}
if(repos.size()==1){
return repos.get(0);
}
return doSelect(repos);
}

/**
* 實現具體的實現負載算法
* @param repos
* @return
*/
protected abstract String doSelect(List<String> repos);

}
package dgb.nospring.myrpc.registry.loadbalance;

import java.util.List;
import java.util.Random;

/**
* 隨機負載算法
* @author Dongguabai
* @date 2018/11/2 10:17
*/
public class RandomLoadBanalce extends AbstractLoadBanance{

@Override
protected String doSelect(List<String> repos) {
return repos.get(new Random().nextInt(repos.size()));
}
}

還有獲取服務的RpcClientProxy需要進行改造,其實就是改了一個參數傳遞而已:

package dgb.nospring.myrpc;

import dgb.nospring.myrpc.registry.IServiceDiscovery;

import java.lang.reflect.Proxy;

/**
* 客戶端代理
* @author Dongguabai
* @date 2018/11/1 16:18
*/
public class RpcClientProxy {

private IServiceDiscovery serviceDiscovery;

/* public <T> T clientProxy(final Class<T> interfaceClass,final String host,final int port){
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class[]{interfaceClass},new RemoteInvocationHandler(host, port));
}*/
public <T> T clientProxy(final Class<T> interfaceClass){
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class[]{interfaceClass},new RemoteInvocationHandler(serviceDiscovery));
}

public RpcClientProxy(IServiceDiscovery serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
}
}

 

同樣的,動態代理的InvocationHandler也要修改:

package dgb.nospring.myrpc;

import dgb.nospring.myrpc.registry.IServiceDiscovery;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
* @author Dongguabai
* @date 2018/11/1 16:20
*/
public class RemoteInvocationHandler implements InvocationHandler{

private IServiceDiscovery serviceDiscovery;

/**
*發起客戶端和服務端的遠程調用。調用客戶端的信息進行傳輸
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameters(args);
//從ZK中獲取地址 127.0.0.1:12345
String discover = serviceDiscovery.discover(rpcRequest.getClassName());
TcpTransport tcpTransport = new TcpTransport(discover);
return tcpTransport.send(rpcRequest);
}

public RemoteInvocationHandler(IServiceDiscovery serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
}
}

 


同樣的。TCPTransport也要進行改造:

package dgb.nospring.myrpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;

/**
* socket傳輸
*
* @author Dongguabai
* @date 2018/11/1 16:25
*/
public class TcpTransport {

private String serviceAddress;

private Socket newSocket() {
System.out.println("准備創建Socket連接,"+serviceAddress);
String[] split = serviceAddress.split(":");
try {
Socket socket = new Socket(split[0], Integer.parseInt(split[1]));
return socket;
} catch (IOException e) {
throw new RuntimeException("Socket連接創建失敗!" + serviceAddress);
}
}

public TcpTransport(String serviceAddress) {
this.serviceAddress = serviceAddress;
}

public Object send(RpcRequest rpcRequest) {
Socket socket = null;
try {
socket = newSocket();
try {
ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(rpcRequest);
outputStream.flush();
ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
Object result = inputStream.readObject();
inputStream.close();
outputStream.close();
return result;
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("發起遠程調用異常!",e);
}
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

 


客戶端Demo:

package dgb.nospring.myrpc.demo;

import dgb.nospring.myrpc.RpcClientProxy;
import dgb.nospring.myrpc.registry.IServiceDiscovery;
import dgb.nospring.myrpc.registry.RegistryCenterConfig;
import dgb.nospring.myrpc.registry.ServiceDiscoveryImpl;

/**
* @author Dongguabai
* @date 2018/11/1 18:10
*/
public class ClientDemo {

public static void main(String[] args) {
/*RpcClientProxy proxy = new RpcClientProxy();
IHelloService helloService = proxy.clientProxy(IHelloService.class, "127.0.0.1", 12345);
String name = helloService.sayHello("張三");
System.out.println(name);*/

IServiceDiscovery serviceDiscovery = new ServiceDiscoveryImpl(RegistryCenterConfig.CONNECTING_STR);
RpcClientProxy proxy = new RpcClientProxy(serviceDiscovery);
IHelloService service = proxy.clientProxy(IHelloService.class);
System.out.println(service.sayHello("張三"));

}
}

 


控制台輸出:

 

 

如果需要驗證集群環境下,我們可以創建兩個ServerDemo:

 

 

 

 

兩個服務均注冊到注冊中心:

 

 

 

 

客戶端調用還是不變:

 

 

連續調用兩次客戶端:

 

 

 

 

 
---------------------
作者:Dongguabai
來源:CSDN
原文:https://blog.csdn.net/dongguabai/article/details/83625362


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM