手写一个高性能的rpc框架
模拟服务端,运行main函数,相当于启动服务器
public class ServerApplication { public static void main(String[] args) throws Exception { //开启服务端,然后等待客户端发出请求,然后给客户端响应数据,但如果这些操作都写在,会使代码不好维护,因此,将其抽象化,抽象出 //一个工厂类,专门来做这样的操作。 //因此需要一个专门来处理提供者类的工厂类,此类需要提供一个初始化方法、一个开始/停止服务的方法、(如果要加注册中心的话还需要一个添加服务的方法) } }
提供者工厂类
public class XxlRpcProviderFactory { //初始化方法(相当于一个构造器)里面的参数,就是开启服务需要的参数 /** * * @param netType 网络通信方式 * @param serializer 序列化方式 * @param ip ip地址 * @param port 端口号 * @param accessToken 接收token(如果有) * @param serviceRegistryClass 注册中心类 (可先不要) * @param serviceRegistryParam 注册中心参数 (可先不要) */ public void init(){ } public void start(){ //这里使用回调的方式来完成开始和停止的操作 //首先需要一个Server类来具体管理服务端的数据通信,而使用回调函数,可以设置一个类似事件一样的功能 //当Server开启时需要执行注册的操作,当关闭的时候需要移除注册的操作 } public void stop(){ //在这里将Server停掉 } }
Server抽象类,延迟实现类(功能扩展)
public abstract class Server { //Server应该是一个抽象类,因为具体的通信可能不止一个。 //该类定义了XxlRpcProviderFactory需要的两个方法,开始、停止,但没有具体实现,具体实现延时到实现类中,比如NettyServer类 public abstract void start(); public abstract void st(); }
回调功能的实现
回调是一种思想,首先提供一个回调的抽象类,提供一个执行的抽象方法,但不去实现。然后设置一个条件,当满足了这个条件之后就执行某项操作,这就是回调。
需求:当Server执行了start方法的时候(条件),就要去注册服务;当Server执行了stop方法的时候(条件),就移除注册服务。
实现思路:
从需求上看,当Server的实现类执行start方法后,需要执行一个方法,这个方法可以去注册服务,但具体的注册服务代码不能写在实现类里面,实现类中只提供一个判断的功能,如果需要去回调就执行回调,不需要回调就不去回调。但这个显然和实现类负责具体的通信代码功能不相符合,因此,判断条件的方法可以写在Server抽象类中,并继承给每一个实现类。
具体判断逻辑就是看回调类是否实现。回调类应该是一个接口,只提供一个运行的方法,当回调类被实现了,那么回调函数一定需要去执行。从这个逻辑上看,真正执行回调函数的类就是实现了回调接口的实现类
BaseCallback
public abstract class BaseCallback { public abstract void run(); }
Server类中定义判断条件方法

public abstract class Server { //Server应该是一个抽象类,因为具体的通信可能不止一个。 //该类定义了XxlRpcProviderFactory需要的两个方法,开始、停止,但没有具体实现,具体实现延时到实现类中,比如NettyServer类 public abstract void start(); public abstract void stop(); //需要一个传来的BaseCallback类,来判断该类是否被实例化 private BaseCallback startedCallback; private BaseCallback stopedCallback; //需要从外部将该参数传进来,而且根据逻辑。需要连个传值的方法。 public void setStartedCallback(BaseCallback baseCallback){ this.startedCallback=baseCallback; } public void setStopdCallback(BaseCallback baseCallback){ this.stopedCallback=baseCallback; } //以下方法就是条件判断的方法,判断条件就是baseCallback是否在外面被实例化了 public void onStart(){ if(startedCallback!=null){ //说明start方法需要去注册服务了 startedCallback.run(); } } public void onStop(){ if(stopedCallback!=null){ //说明start方法需要去注册服务了 stopedCallback.run(); } } }
在工厂类中,去实现回调run方法

public class XxlRpcProviderFactory { //初始化方法(相当于一个构造器)里面的参数,就是开启服务需要的参数 /** * * @param netType 网络通信方式 * @param serializer 序列化方式 * @param ip ip地址 * @param port 端口号 * @param accessToken 接收token(如果有) * @param serviceRegistryClass 注册中心类 (可先不要) * @param serviceRegistryParam 注册中心参数 (可先不要) */ public void init(){ } //先提供一个Server类,然后使用该类的各种方法,但该类是一个抽象类,具体的实现类需要使用newstance创建 private Server server; public void start(){ //这里使用回调的方式来完成开始和停止的操作 //首先需要一个Server类来具体管理服务端的数据通信,而使用回调函数,可以设置一个类似事件一样的功能 //当Server开启时需要执行注册的操作,当关闭的时候需要移除注册的操作 //先将回调函数实例化,当后面执行的时候就会按条件执行了 server.setStartedCallback(new BaseCallback() { @Override public void run() { //执行注册的功能 } }); server.setStopdCallback(new BaseCallback() { @Override public void run() { //执行移除注册的功能 } }); //开启服务端的通信 server.start(); } public void stop(){ //在这里将Server停掉 server.stop(); } }
以上就是服务端的总体框架,接下来,只要提供了服务端的底层通信实现,服务端就搭建完成了。
下面开始实现Server类,使用netty通信模块,采用NIO模型,传输数据是异步的,当然,在服务端几乎不用考虑异步同步问题,当接收到请求之后,就返回所请求的对象。
为了更好的管理客户端传到服务器上的请求,将处理请求的操作启动不同的线程来管理,这需要一个线程池。
ThreadPoolExecutor类的获取(说明都在代码里)

public class ThreadPoolUtil { //该类的作用就是返回一个线程池的实现类 ThreadPoolExecutor,包括规定池子的一些参数以及特性 public static ThreadPoolExecutor makeServerThreadPool(final String serverType){ //serverHandlerPool服务端处理线程池,将所有请求的处理线程都放到池子里面去 //一个线程池的实例化过程就是这样,规定了池子的大小,存活时间,工作队列,线程创建工厂给线程命名,池子满了之后的处理 ThreadPoolExecutor serverHandlerPool =new ThreadPoolExecutor( 60, 300, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-rpc, " + serverType + "-serverHandlerPool-" + r.hashCode()); } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { throw new RuntimeException("xxl-rpc "+serverType+" Thread pool is EXHAUSTED!"); } } ); return serverHandlerPool; } }
有了上面的工具类,就可以管理每次请求处理的线程了。
现在工厂类中写一个将request转换为response的方法

public class XxlRpcProviderFactory { private NetEnum netType; private Serializer serializer; private String ip; // for registry private int port; // default port private String accessToken; private Class<? extends ServiceRegistry> serviceRegistryClass; private Map<String, String> serviceRegistryParam; //初始化方法(相当于一个构造器)里面的参数,就是开启服务需要的参数 /** * * @param netType 网络通信方式 * @param serializer 序列化方式 * @param ip ip地址 * @param port 端口号 * @param accessToken 接收token(如果有) * @param serviceRegistryClass 注册中心类 (可先不要) * @param serviceRegistryParam 注册中心参数 (可先不要) */ public void init(NetEnum netType, Serializer serializer, String ip, int port, String accessToken, Class<? extends ServiceRegistry> serviceRegistryClass, Map<String, String> serviceRegistryParam){ } //先提供一个Server类,然后使用该类的各种方法,但该类是一个抽象类,具体的实现类需要使用newstance创建 private Server server; public void start(){ //这里使用回调的方式来完成开始和停止的操作 //首先需要一个Server类来具体管理服务端的数据通信,而使用回调函数,可以设置一个类似事件一样的功能 //当Server开启时需要执行注册的操作,当关闭的时候需要移除注册的操作 //先将回调函数实例化,当后面执行的时候就会按条件执行了 server.setStartedCallback(new BaseCallback() { @Override public void run() { //执行注册的功能 } }); server.setStopdCallback(new BaseCallback() { @Override public void run() { //执行移除注册的功能 } }); //开启服务端的通信 server.start(); } public void stop(){ //在这里将Server停掉 server.stop(); } //存放bean对象的map集合 private Map<String,Object> serviceData=new HashMap<>(); public Map<String, Object> getServiceData() { return serviceData; } //这里是将servicebean添加到map中的方法,在application中被调用,并传给一个实例化好的类 public void addService(String iface, String version, Object serviceBean){ String serviceKey = makeServiceKey(iface, version); serviceData.put(serviceKey, serviceBean); } //建立一个制作key值的方法,根据接口名和版本号就可以得到key值 形式为iface#version public static String makeServiceKey(String iface, String version){ String serviceKey = iface; if (version!=null && version.trim().length()>0) { serviceKey += "#".concat(version); } return serviceKey; } //将handle类中的转换方法放在这里实现 public XxlRpcResponse invokeService(XxlRpcRequest request){ XxlRpcResponse response=new XxlRpcResponse(); //设置response对象 response.setRequestId(request.getRequestId()); //这里将之前请求过的bean放在一个map集合中,使用key值来取,可以避免重复的进行反射的操作 //因此,在上面需要定义一个集合来存放得到的bean对象 //根据上面定义的方法和map集合,先从map中取,如果取不到,说明请求的接口有问题 String key = makeServiceKey(request.getClassName(), request.getVersion()); Object serviceBean = serviceData.get(key); //对得到的bean以及时间和token进行判断,来抛出响应的错误 //没有发现这个bean if(serviceBean==null){ response.setErrorMsg("The serviceKey["+ key +"] not found."); return response; } //判断是否超时 if (System.currentTimeMillis() - request.getCreateMillisTime() > 3*60*1000) { response.setErrorMsg("The timestamp difference between admin and executor exceeds the limit."); return response; } //判断请求的token和响应的token是否一致 if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(request.getAccessToken())) { response.setErrorMsg("The access token[" + request.getAccessToken() + "] is wrong."); return response; } //接下来,就将请求的方法执行得到的结果返回,利用反射 Class<?> serviceBeanClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); //反射得到类的方法 try { Method method = serviceBeanClass.getMethod(methodName, parameterTypes); method.setAccessible(true); Object result = method.invoke(serviceBean, parameters); response.setResult(result); } catch (Throwable t) { // catch error response.setErrorMsg(""); } return response; } }
然后就可以在handle类中调用了

public class NettyServerHandler extends SimpleChannelInboundHandler<XxlRpcRequest>{ //请求传来,首先传到这个类中,然后进行处理 //XxlRpcRequest类是封装了请求的客户端请求的类,这里在NettyServer中添加了编解码就可以直接传输这个类了。 //向Handle类中传入线程池类,然后执行 private ThreadPoolExecutor serverHandlerPool; //需要传入一个工厂类,来调用转换代码 private XxlRpcProviderFactory providerFactory; //提供构造方法传值 public NettyServerHandler(final XxlRpcProviderFactory providerFactory,final ThreadPoolExecutor serverHandlerPool){ this.providerFactory=providerFactory; this.serverHandlerPool=serverHandlerPool; } //netty通信,从客户端传来的值会直接到这里,request @Override protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final XxlRpcRequest request) throws Exception { serverHandlerPool.execute(new Runnable() { @Override public void run() { //在线程池的某个线程中执行调用函数,将请求转换为响应。这样做是为了在逻辑上分开,netty的线程是处理netty的IO操作, // 而这个线程就专门处理请求转换。 //这个调用代码直接在这里实现,使代码有点繁琐,将其放到工厂类中调用。 XxlRpcResponse response=providerFactory.invokeService(request); channelHandlerContext.writeAndFlush(response); } }); } //异常处理,就是关掉上下文 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } }
然后将handle类加到具体nettyserver类的pipeline里面,就可以传输了

public class NettyServer extends Server{ //Server的实现类,底层使用的是Netty通信 //开启通信的方法,这里专门开辟一个线程开实现开启功能 private Thread thread; @Override public void start(final XxlRpcProviderFactory factory) { thread=new Thread(new Runnable() { @Override public void run() { //这里就是具体的netty通信的服务端代码 //首先创建一个线程池对象,要传递给处理类 final ThreadPoolExecutor pool = ThreadPoolUtil.makeServerThreadPool(NettyServer.class.getSimpleName()); //创建两个线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); // start server ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new NettyDecoder(XxlRpcRequest.class, factory.getSerializer())) .addLast(new NettyEncoder(XxlRpcResponse.class, factory.getSerializer())) .addLast(new NettyServerHandler(factory, pool)); } }) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true); // bind try { ChannelFuture future = bootstrap.bind(factory.getPort()).sync(); //这里是回调函数的执行体 onStart(); //等待停止 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //关闭池子 pool.shutdown(); workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }); //将该线程设置为守护线程 thread.setDaemon(true); thread.start(); } @Override public void stop() { // destroy server thread //如果线程没有关掉,但需要关掉,则执行线程的中端 if (thread != null && thread.isAlive()) { thread.interrupt(); } // on stop onStop(); } }
以上就实现了一个rpc的服务端。
下面来实现客户端,客户端相比服务端要复杂很多,服务端仅仅是开启一个线程,然后静静等待,当有请求来了,从线程池中取一个线程来执行请求逻辑,返回一个响应对象。
客户端比较复杂,其中发送请求和得到响应的过程就有四种调用方式,这四种调用方式也是最难理解的部分
首先,客户端需要一个模拟的客户端来完成服务的请求
ClientApplication类,模拟客户端

public class ClientApplication { //1.测试同步发送请求 @Test public void syncTest(){ //首先提供一个代理类,通过动态代理,得到我们所需的接口方法的返回值(从服务端的转换方法可以知道,得到了方法的名称和参数后,返回的是方法的返回值), //动态代理的作用就是在客户端使用哪个方法时,动态的请求哪个方法的返回值。 //这个动态代理封装在一个geObject方法里,至于如何得到,有四种调用方式,请求的时候就传入。 //创建一个参考的bean类来实现这个方法。 } //2.测试异步发送请求 @Test public void futureTest() throws ExecutionException, InterruptedException { } //3.测试回调发送请求 @Test public void callbackTest() throws InterruptedException { } //4,测试单一长连接发送请求 @Test public void onewayTest() throws InterruptedException { } }
因为有四种调用方式,我们在请求服务之初就将四种调用方式,通过枚举定义在一个类中
public enum CallType { //在判断的时候,可以按照其名字,去选择不同的调用方式 SYNC, FUTURE, CALLBACK, ONEWAY; }
定义参考Bean,创建getObject方法
步骤1:得到所需的必须要参数
//该类就封装了动态代理的getObject方法 //首先需要传入必要的参数 private NetEnum netType; private Serializer serializer; private CallType callType; private LoadBalance loadBalance; private Class<?> iface; private String version; private long timeout = 1000; private String address; private String accessToken; private XxlRpcInvokeCallback invokeCallback; private XxlRpcInvokerFactory invokerFactory; /** * * @param netType 使用的底层通信模块 * @param serializer 序列化方式 * @param callType 调用方式,有四种 * @param loadBalance 负载均衡的方式 * @param iface 接口名 * @param version 版本 * @param timeout 超时时间 * @param address 请求地址 * @param accessToken 接入token * @param invokeCallback 回调的调用(可无) * @param invokerFactory 调用工厂(可无) */ public XxlRpcReferenceBean(NetEnum netType, Serializer serializer, CallType callType, LoadBalance loadBalance, Class<?> iface, String version, long timeout, String address, String accessToken, XxlRpcInvokeCallback invokeCallback, XxlRpcInvokerFactory invokerFactory ){ this.netType = netType; this.serializer = serializer; this.callType = callType; this.loadBalance = loadBalance; this.iface = iface; this.version = version; this.timeout = timeout; this.address = address; this.accessToken = accessToken; this.invokeCallback = invokeCallback; this.invokerFactory = invokerFactory; // valid if (this.netType==null) { throw new XxlRpcException("xxl-rpc reference netType missing."); } if (this.serializer==null) { throw new XxlRpcException("xxl-rpc reference serializer missing."); } if (this.callType==null) { throw new XxlRpcException("xxl-rpc reference callType missing."); } if (this.loadBalance==null) { throw new XxlRpcException("xxl-rpc reference loadBalance missing."); } if (this.iface==null) { throw new XxlRpcException("xxl-rpc reference iface missing."); } if (this.timeout < 0) { this.timeout = 0; } if (this.invokerFactory == null) { this.invokerFactory = XxlRpcInvokerFactory.getInstance(); } // init Client // initClient(); }
步骤2:动态代理的准备函数
//利用动态代理,得到请求方法返回的参数 public Object getOnject(){ //直接返回动态代理的结果 return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), //这里使用的类加载器是当前线程的类加载器 new Class[]{iface}, //指明需要代理的类的接口名称 new InvocationHandler() { //建立一个内部类,相当于继承了InvocationHandler接口,这里写的需要代理的内容 @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { } }
步骤3:得到resquest所需要的参数
//先得到要请求的类和方法的全部信息,通过这些信息封装成request发送给服务端 String className = method.getDeclaringClass().getName();//类名,包括包名 String varsion_ = version;//版本信息 String methodName = method.getName();//方法名称 Class<?>[] parameterTypes = method.getParameterTypes();//方法参数的类型 Object[] parameters = args;//方法的参数 //得到请求地址, //可能参数中已经传递了请求地址,那么请求就按传递的地址请求,也有可能没有传递请求地址, // 但注册中心可能已经为该请求的类准备了很多的请求地址,那么就需要用到负载均衡 String finalAddress=address; if(finalAddress==null ||finalAddress.trim().length()==0){ //没有传递过来地址,则需要到注册中心去找 //这里需要创建一个工厂类,提供一些需要的方法,包括生成key值,获取地址等函数 if(invokeCallback!=null || invokerFactory.getServiceRegistry()!=null){ //如果这个工厂类不为空,而且通过工厂类能得到注册中心,那么就可以取地址了 //得到存储的key值 String key = XxlRpcProviderFactory.makeServiceKey(className, varsion_); //得到的地址是一个set集合 TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(key); if (addressSet==null || addressSet.size()==0) { //没有得到请求地址 // pass } else if (addressSet.size()==1) { //得到一个请求地址,那么就是这一个 finalAddress = addressSet.first(); } else { //得到很多请求地址,则需要负载均衡 负载均衡的代码之后再说 finalAddress = loadBalance.xxlRpcInvokerRouter.route(key, addressSet); } } } //如果最终还是没有得到请求地址,则抛出异常 if (finalAddress==null || finalAddress.trim().length()==0) { throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty"); } //现在就可以封装request了 XxlRpcRequest xxlRpcRequest = new XxlRpcRequest(); xxlRpcRequest.setRequestId(UUID.randomUUID().toString());//随机的request的id,根据这个唯一的id可以在同步调用时候拿到正确的值 xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());//创建时间,传到服务端与那里的时间相比,如果大于超时时间则抛出异常 xxlRpcRequest.setAccessToken(accessToken);//token值如果有,会与服务端的对比 xxlRpcRequest.setClassName(className);//请求类名 xxlRpcRequest.setMethodName(methodName);//方法名 xxlRpcRequest.setParameterTypes(parameterTypes); xxlRpcRequest.setParameters(parameters);
步骤3:同步调用的实现
//如果是使用同步调用,在netty底层的调用是异步的,异步的意思就是,如果发起多个请求,那么会无法判断返回的响应是否是请求所得
// 因此,需要在请求发送过去之后,阻塞发送请求,直到请求得到响应之后,才能继续发送请求。要实现这样的功能,要用到
//future的get方法。
netty底层的nio模型,客户端只管发,而服务端只管收,然后响应。但具体请求和响应之间不是同步的,因此会出现如法获取到真实的响应的情况。
比如:以下就是异步返回的值:
null null UserDTO{name='liu1', word='hihihi'} UserDTO{name='liu3', word='hihihi'} UserDTO{name='liu4', word='hihihi'} UserDTO{name='liu4', word='hihihi'} UserDTO{name='liu4', word='hihihi'} UserDTO{name='liu5', word='hihihi'} UserDTO{name='liu6', word='hihihi'} UserDTO{name='liu7', word='hihihi'} UserDTO{name='liu7', word='hihihi'} UserDTO{name='liu9', word='hihihi'} UserDTO{name='liu11', word='hihihi'} UserDTO{name='liu12', word='hihihi'} UserDTO{name='liu13', word='hihihi'} UserDTO{name='liu15', word='hihihi'} UserDTO{name='liu16', word='hihihi'} UserDTO{name='liu16', word='hihihi'} UserDTO{name='liu17', word='hihihi'} UserDTO{name='liu18', word='hihihi'} UserDTO{name='liu19', word='hihihi'} UserDTO{name='liu19', word='hihihi'} UserDTO{name='liu21', word='hihihi'} UserDTO{name='liu22', word='hihihi'} UserDTO{name='liu23', word='hihihi'} UserDTO{name='liu24', word='hihihi'}
可以看出,刚开始甚至返回了null,而且每次返回的值都可能重复。因此实现同步很有必要
同步实现的逻辑为:需要在请求发送过去之后,阻塞发送请求,直到请求得到响应之后,才能继续发送请求
这里利用了Future实现类的get方法来对响应返回进行阻塞。
对future类的理解:
Future的核心思想是:一个方法f,计算过程可能非常耗时,等待f返回,显然不明智。可以在调用f的时候,立马返回一个Future,可以通过Future这个数据结构去控制方法f的计算过程。
这里的控制包括:
get方法:获取计算结果(如果还没计算完,也是必须等待的)
cancel方法:还没计算完,可以取消计算过程
isDone方法:判断是否计算完
isCancelled方法:判断计算是否被取消
因此,我们可以创建一个response的future实现类,在这个方法中利用get方法获取response,如果没有获取到,get方法会阻塞掉,线程在这里阻塞,因此不会继续发送请求了。当get方法完成了任务(即得到了resopnse),就可以把阻塞掉的线程唤醒,那么就可以继续发送请求了。
synchronized关键字的使用:可以让其修饰代码块、方法和静态方法,被修饰的语句,只能允许一个线程执行,在该线程被执行完之前,其他线程都无法执行该代码块。它是一个互斥锁。需要线程去申请synchronized括号里的对象锁,只有申请到了才能执行。
接下来我们就编写Future的实现类FutureResponse类,使得其在获取response的时候,先将获取response的线程阻塞,同时释放锁,当resopnse被设置了值之后,再去唤醒锁,让他去参与竞争,去拿到resopnse对象返回。

public class XxlRpcFutureResponse implements Future<XxlRpcResponse> { //这里实现了一个future类,并让其监听resopnse对象,如果response存在,才能返回response对象 //可以看到,在继承了该接口后,需要实现这些方法。 //分析了那些需要实现的方法,可以总结出实现的逻辑 //首先需要知道什么时候response对象已经存在了,需要定义一个设置response对象的方法,如果该方法执行了,代表该方法已经执行了 //1.设置一个标志位,二设置一个锁,get线程只有拿到锁对象才能去返回锁,在response存在之前,get线程被阻塞。这就是get的逻辑 // future lock private boolean done = false; private Object lock = new Object(); private XxlRpcResponse response; public void setResponse(XxlRpcResponse response){ this.response=response; //方法执行到这里,说明response已经存在了,需要告诉get方法, synchronized (lock){ done=true; lock.notifyAll(); } } //还没计算完成,可以取消计算结果 @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } //判断是否计算被取消,若cancel返回了true就取消了 @Override public boolean isCancelled() { return false; } //判断是否计算完 @Override public boolean isDone() { return done; } //获取计算结果,如果没有计算完成,则在这里等到,并阻塞 @Override public XxlRpcResponse get() throws InterruptedException, ExecutionException { //具体get逻辑在下面的get方法中 try { return get(-1,TimeUnit.MILLISECONDS); } catch (TimeoutException e) { e.printStackTrace(); } } //获取计算结果,如果没有计算完,去看是否超时,如果超时了就不等了 @Override public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if(!done){ //get //获取response对象的线程运行到这里,会去获取lock的锁对象,如果此时lock的锁对象正被set方法的线程拿着,那么get需要等待,当锁释放以后,get方法会拿到这个锁 //并且去根据超时数判断需要阻塞的时长,当get的线程阻塞了之后,会释放锁资源,让set方法拿到。如果set方法执行了,done会变true ,那么直接返回response即可 synchronized (lock){ if(timeout<0){//若超时数为-1,则一则阻塞下去,等待唤醒 lock.wait(); }else { long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);//计算需要阻塞的时间 lock.wait(); } } } //如果超时时间已经过去,还没有获取到对象,那么就抛出异常 if(!done){ throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() ); } return response; }
设置response的方法,需要在哪里执行呢,当handle类的read方法,拿到响应后,可以设置response,因此,设置response的线程是netty的io线程执行的。这里将set方法放到工厂里去执行。

package com.xxl.rpc.remoting.invoker; import com.xxl.rpc.registry.ServiceRegistry; import com.xxl.rpc.remoting.net.params.XxlRpcFutureResponse; import com.xxl.rpc.remoting.net.params.XxlRpcResponse; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; public class XxlRpcInvokerFactory { private static volatile XxlRpcInvokerFactory instance = new XxlRpcInvokerFactory(); public static XxlRpcInvokerFactory getInstance() { return instance; } private ServiceRegistry serviceRegistry; public ServiceRegistry getServiceRegistry() { return serviceRegistry; } //要想设置response,需要给一个futureresponse的类,这里设置一个map集合,将futureresponse类存在这里,从这里取,防止每次请求都创建 private ConcurrentMap<String, XxlRpcFutureResponse> futureResponsePool = new ConcurrentHashMap<String, XxlRpcFutureResponse>(); //设置一个初始化函数,在初始化的时候,将XxlRpcFutureResponse类放到池子里 public void setInvokerFuture(String requestId, XxlRpcFutureResponse futureResponse){ futureResponsePool.put(requestId, futureResponse); } public void removeInvokerFuture(String requestId){ futureResponsePool.remove(requestId); } public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){ //先得到XxlRpcFutureResponse的类对象 final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId); //如果没有就不执行 if (futureResponse == null) { return; } //给futureResponse设置response对象 futureResponse.setResponse(xxlRpcResponse); } }
上面的方法被放在handle类的read方法里面,由nio线程调用。
这样,就完成了一个同步的方法。

package com.xxl.rpc.remoting.invoker.reference; import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory; import com.xxl.rpc.remoting.invoker.call.CallType; import com.xxl.rpc.remoting.invoker.route.LoadBalance; import com.xxl.rpc.remoting.net.NetEnum; import com.xxl.rpc.remoting.net.params.XxlRpcFutureResponse; import com.xxl.rpc.remoting.net.params.XxlRpcRequest; import com.xxl.rpc.remoting.net.params.XxlRpcResponse; import com.xxl.rpc.remoting.provider.XxlRpcProviderFactory; import com.xxl.rpc.serialize.Serializer; import com.xxl.rpc.util.XxlRpcException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.TimeUnit; public class XxlRpcReferenceBean { //该类就封装了动态代理的getObject方法 //首先需要传入必要的参数 private NetEnum netType; private Serializer serializer; private CallType callType; private LoadBalance loadBalance; private Class<?> iface; private String version; private long timeout = 1000; private String address; private String accessToken; private XxlRpcInvokerFactory invokerFactory; /** * * @param netType 使用的底层通信模块 * @param serializer 序列化方式 * @param callType 调用方式,有四种 * @param loadBalance 负载均衡的方式 * @param iface 接口名 * @param version 版本 * @param timeout 超时时间 * @param address 请求地址 * @param accessToken 接入token // * @param invokeCallback 回调的调用(可无) * @param invokerFactory 调用工厂(可无) */ public XxlRpcReferenceBean(NetEnum netType, Serializer serializer, CallType callType, LoadBalance loadBalance, Class<?> iface, String version, long timeout, String address, String accessToken, // XxlRpcInvokeCallback invokeCallback, XxlRpcInvokerFactory invokerFactory ){ this.netType = netType; this.serializer = serializer; this.callType = callType; this.loadBalance = loadBalance; this.iface = iface; this.version = version; this.timeout = timeout; this.address = address; this.accessToken = accessToken; // this.invokeCallback = invokeCallback; this.invokerFactory = invokerFactory; // valid if (this.netType==null) { throw new XxlRpcException("xxl-rpc reference netType missing."); } if (this.serializer==null) { throw new XxlRpcException("xxl-rpc reference serializer missing."); } if (this.callType==null) { throw new XxlRpcException("xxl-rpc reference callType missing."); } if (this.loadBalance==null) { throw new XxlRpcException("xxl-rpc reference loadBalance missing."); } if (this.iface==null) { throw new XxlRpcException("xxl-rpc reference iface missing."); } if (this.timeout < 0) { this.timeout = 0; } if (this.invokerFactory == null) { this.invokerFactory = XxlRpcInvokerFactory.getInstance(); } // init Client // initClient(); } //利用动态代理,得到请求方法返回的参数 public Object getOnject() { //直接返回动态代理的结果 return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), //这里使用的类加载器是当前线程的类加载器 new Class[]{iface}, //指明需要代理的类的接口名称 new InvocationHandler() { //建立一个内部类,相当于继承了InvocationHandler接口,这里写的需要代理的内容 @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //下面是动态代理的调用的具体逻辑 //先得到要请求的类和方法的全部信息,通过这些信息封装成request发送给服务端 String className = method.getDeclaringClass().getName();//类名,包括包名 String varsion_ = version;//版本信息 String methodName = method.getName();//方法名称 Class<?>[] parameterTypes = method.getParameterTypes();//方法参数的类型 Object[] parameters = args;//方法的参数 //得到请求地址, //可能参数中已经传递了请求地址,那么请求就按传递的地址请求,也有可能没有传递请求地址, // 但注册中心可能已经为该请求的类准备了很多的请求地址,那么就需要用到负载均衡 String finalAddress = address; if (finalAddress == null || finalAddress.trim().length() == 0) { //没有传递过来地址,则需要到注册中心去找 //这里需要创建一个工厂类,提供一些需要的方法,包括生成key值,获取地址等函数 if (invokerFactory != null || invokerFactory.getServiceRegistry() != null) { //如果这个工厂类不为空,而且通过工厂类能得到注册中心,那么就可以取地址了 //得到存储的key值 String key = XxlRpcProviderFactory.makeServiceKey(className, varsion_); //得到的地址是一个set集合 TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(key); if (addressSet == null || addressSet.size() == 0) { //没有得到请求地址 // pass } else if (addressSet.size() == 1) { //得到一个请求地址,那么就是这一个 finalAddress = addressSet.first(); } else { //得到很多请求地址,则需要负载均衡 负载均衡的代码之后再说 finalAddress = loadBalance.xxlRpcInvokerRouter.route(key, addressSet); } } } //如果最终还是没有得到请求地址,则抛出异常 if (finalAddress == null || finalAddress.trim().length() == 0) { throw new XxlRpcException("xxl-rpc reference bean[" + className + "] address empty"); } //现在就可以封装request了 XxlRpcRequest xxlRpcRequest = new XxlRpcRequest(); xxlRpcRequest.setRequestId(UUID.randomUUID().toString());//随机的request的id,根据这个唯一的id可以在同步调用时候拿到正确的值 xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());//创建时间,传到服务端与那里的时间相比,如果大于超时时间则抛出异常 xxlRpcRequest.setAccessToken(accessToken);//token值如果有,会与服务端的对比 xxlRpcRequest.setClassName(className);//请求类名 xxlRpcRequest.setMethodName(methodName);//方法名 xxlRpcRequest.setParameterTypes(parameterTypes); xxlRpcRequest.setParameters(parameters); //接下来就是调用的逻辑了,四种调用方式用if->else来选择 if (CallType.SYNC == callType) { //如果是使用同步调用,在netty底层的调用是异步的,异步的意思就是,如果发起多个请求,那么会无法判断返回的响应是否是请求所得, // 因此,需要在请求发送过去之后,阻塞发送请求,直到请求得到响应之后,才能继续发送请求。要实现这样的功能,要用到 //future的get方法。 //这里先创建future对象,并初始化 XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest); //然后发送请求 //执行send方法,这里还没有实现 //然后接收响应 //通过get方法的阻塞来同步响应 XxlRpcResponse response = futureResponse.get(timeout, TimeUnit.MILLISECONDS); if (response.getErrorMsg() != null) { throw new XxlRpcException(response.getErrorMsg()); } return response.getResult(); //以上就是同步调用的逻辑 } return null; } }); } }
步骤4:Future异步调用的实现:、
在上面同步的代码中,使用get方法阻塞实现了线程的同步,那么如果用户想自己规定get resopnse的时机,而不是直接返回,就需要future模式,再创建一个调用的future,然后设置一个ThreadLocal,将值存起来,等需要的时候直接获取就可以了。
具体实现,是在上面同步的基础上,定义了一个调用Future,并通过get方法来得到futureresponse对象,这里存的是response,如何取到response的逻辑和上面一致。但不同的是,设置了一个ThreadLocal集合
ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
在ThreadLocal类中有一个Map,用于存储每一个线程的变量副本,Map中元素的键为ThreadLocal对象,而值对应线程的变量副本。这个map是thread类的一个私有属性,所以可以通过线程获取该map的值。
XxlRpcInvokeFuture类

public class XxlRpcInvokeFuture implements Future { //首先需要将futureResponse引进来,要获取response对象,需要从futureResponse中get。 private XxlRpcFutureResponse futureResponse; public XxlRpcInvokeFuture(XxlRpcFutureResponse futureResponse) { this.futureResponse = futureResponse; } public void stop(){ // remove-InvokerFuture } //实现的方法都从futureResponse调用 @Override public boolean cancel(boolean mayInterruptIfRunning) { return futureResponse.cancel(mayInterruptIfRunning); } @Override public boolean isCancelled() { return futureResponse.isCancelled(); } @Override public boolean isDone() { return futureResponse.isDone(); } @Override public Object get() throws InterruptedException, ExecutionException { try { return get(-1, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { throw new XxlRpcException(e); } } @Override public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { try { // future get XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, unit); if (xxlRpcResponse.getErrorMsg() != null) { throw new XxlRpcException(xxlRpcResponse.getErrorMsg()); } return xxlRpcResponse.getResult(); } finally { stop(); } } //创建一个ThreadLocal线程,来绑定invokerfuture的线程 private static ThreadLocal<XxlRpcInvokeFuture> threadInvokerFuture = new ThreadLocal<XxlRpcInvokeFuture>(); //将future保存到线程中去,相当于一个线程副本 public static void setFuture(XxlRpcInvokeFuture future) { threadInvokerFuture.set(future); } public static void removeFuture() { threadInvokerFuture.remove(); } //获取Future对象,然后将线程移除掉,如果不移除,会导致内存泄露 public static <T> Future<T> getFuture(Class<T> type) { Future<T> future = (Future<T>) threadInvokerFuture.get(); threadInvokerFuture.remove(); return future; } }
else if(CallType.FUTURE==callType){ //使用get方法阻塞实现了线程的同步,那么如果用户想自己规定get resopnse的时机, // 而不是直接返回,就需要future模式,再创建一个调用的future,然后设置一个ThreadLocal, // 将值存起来,等需要的时候直接获取就可以了。 //因此,我们需要创建一个调用future类,并将该类的线程交个threadlocal去执行 //先创建futureResponse对象 XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest); //然后将futureResponse对象加到调用future类中,保存到线程里面去 XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse); XxlRpcInvokeFuture.setFuture(invokeFuture); //发送请求 //发送完之后,不用返回,而是等什么时候用,什么时候调用getFuture得到调用future,然后再执行get方法,获取response对象 return null; }
对于每一次请求都会set一个invokerFuture对象,并将其存入ThreadLocal中,它是一个map,因此,依照顺序存入有得到。依照队列的形式存取
@Test public void futureTest() throws ExecutionException, InterruptedException { XxlRpcReferenceBean bean = new XxlRpcReferenceBean(NetEnum.NETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.FUTURE, LoadBalance.ROUND, IDemo.class, null, 500, "127.0.0.1:7080", null, null, null); //得到返回对象 IDemo demo =(IDemo) bean.getObject(); demo.sayHi("liu:"); Future<UserDTO> future = XxlRpcInvokeFuture.getFuture(UserDTO.class); demo.sayHi("xing"); Future<UserDTO> future1 = XxlRpcInvokeFuture.getFuture(UserDTO.class); UserDTO s1 = future.get(); UserDTO s2 = future1.get(); System.out.println(s2); System.out.println(s1); }
测试结果如下:
UserDTO{name='xing', word='hihihi'}
UserDTO{name='liu:', word='hihihi'}
步骤:5:callback方式的调用
callback调用方式使用了回调的理念,即当事件发生时会触发调用函数,并执行。那么就需要一个回调的类,
与future的调用方法相似,创建一个回调的类,并在调用的时候,将该类加到ThreadLocal里面,然后,回调函数如果判断到回调类存在了(即运行了set方法),就执行回调的操作,因此回调函数的执行是在获取到response之后,即放在工厂类的notify里面。

public abstract class XxlRpcInvokeCallback<T> { //设置一个抽象的回调类,定义了回调函数执行的方法名,具体实现在满足条件的时候再调用 //两个抽象方法,规定了在成功和失败时调用的方法 public abstract void onSuccess(T result); public abstract void onFailure(Throwable exception); //与InvokerFuture类似,设置一个ThreadLocal,在调用的时候设置这个回调类,然后去取 private static ThreadLocal<XxlRpcInvokeCallback> threadInvokerFuture = new ThreadLocal<XxlRpcInvokeCallback>(); public static XxlRpcInvokeCallback getCallback() { XxlRpcInvokeCallback invokeCallback = threadInvokerFuture.get(); threadInvokerFuture.remove(); return invokeCallback; } public static void setCallback(XxlRpcInvokeCallback invokeCallback) { threadInvokerFuture.set(invokeCallback); } public static void removeCallback() { threadInvokerFuture.remove(); } }
在工厂类中执行回调函数,是执行的io的线程,因此,需要给他设置一个线程池,来防止将回调逻辑都放到io线程里面去。
这里继续利用线程池额执行类
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){ //先得到XxlRpcFutureResponse的类对象 final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId); //如果没有就不执行 if (futureResponse == null) { return; } //如果回调类不为空,即得到了回调类,那么就开启线程池来执行操作 if(futureResponse.getInvokeCallback()!=null){ //执行线程池的操作 executeResponseCallback(new Runnable() { @Override public void run() { if (xxlRpcResponse.getErrorMsg() != null) { futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg())); } else { System.out.println(Thread.currentThread()); futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult()); } } }); } //给futureResponse设置response对象 futureResponse.setResponse(xxlRpcResponse); } //使用线程池来执行 private ThreadPoolExecutor responseCallbackThreadPool = null; //设置一个线程池 //配置线程池 public void executeResponseCallback(Runnable runnable){ if(responseCallbackThreadPool==null){ synchronized (this){ //这里加一个锁,执行这些操作的时候,不能操作其他的线程 if (responseCallbackThreadPool == null) { responseCallbackThreadPool = new ThreadPoolExecutor( 10, 100, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-rpc, XxlRpcInvokerFactory-responseCallbackThreadPool-" + r.hashCode()); } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { throw new XxlRpcException("xxl-rpc Invoke Callback Thread pool is EXHAUSTED!"); } }); // default maxThreads 300, minThreads 60 } } } }

else if(CallType.CALLBACK==callType){ //有了上面的分析,回调的实现很简单了,将callback类设置到ThreadLocal里面去,然后在回调函数里面去取,取出来执行。 //不同点在于,执行的函数一般不能放在io的线程里面,因此,需要使用一个线程池来维护。 // get callback XxlRpcInvokeCallback finalInvokeCallback=null; XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback(); // System.out.println(threadInvokeCallback); if (threadInvokeCallback != null) { finalInvokeCallback = threadInvokeCallback; } if (finalInvokeCallback == null) { throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null."); } // future-response set XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback); //执行发送代码 // client.asyncSend(finalAddress, xxlRpcRequest); return null; } return null; }
步骤6:单一长连接
因为NIO是非阻塞的,因此,每个客户端只需要连接一次就可以了,这就是单一长连接,它的实现是一个连接池。将一个客户端(adress不变)作为一个连接来维护,将该客户端存到一个ConcurrentHashMap来保存,在进行连接的时候,去看这个ConcurrentHashMap里有没有需要的连接,如果没有,就需要创建,这里的过程是同步的,创建连接需要加锁,
代码如下:

public abstract class ConnectClient { //实现单一长连接的基础 //提供的接口方法 //初始化方法 public abstract void init(String address, final Serializer serializer, final XxlRpcInvokerFactory xxlRpcInvokerFactory) throws Exception; //关闭 public abstract void close(); //验证 public abstract boolean isValidate(); //发送 public abstract void send(XxlRpcRequest xxlRpcRequest) throws Exception ; //异步发送方法 public static void asyncSend(XxlRpcRequest xxlRpcRequest, String address, Class<? extends ConnectClient> connectClientImpl, final XxlRpcReferenceBean xxlRpcReferenceBean) throws Exception { //这里提供一个从池子中取连接的方法 ConnectClient clientPool = ConnectClient.getPool(address, connectClientImpl, xxlRpcReferenceBean); try { // do invoke clientPool.send(xxlRpcRequest); } catch (Exception e) { throw e; } } //使用ConcurrentHashMap来做一个池子,使用ConcurrentHashMap的原因是它实现了一个分段锁机制,保证不同线程在put的时候不会阻塞 private static volatile ConcurrentHashMap<String, ConnectClient> connectClientMap; //每个地址都会有一个锁,来控制连接的创建,如果是相同的地址,要创建连接需要加锁,否则会出现同一地址创建多个连接的情况。 private static volatile ConcurrentHashMap<String, Object> connectClientLockMap = new ConcurrentHashMap<>(); //获取连接的代码 private static ConnectClient getPool(String address, Class<? extends ConnectClient> connectClientImpl, final XxlRpcReferenceBean xxlRpcReferenceBean) throws Exception { //先判断这个池子是否为空,如果为空,则需要创建(这里为空的意思是null) //创建map需要用到双检锁,保证创建的是一个单例 if (connectClientMap == null) { synchronized (ConnectClient.class) { if (connectClientMap == null) { // init connectClientMap = new ConcurrentHashMap<String, ConnectClient>(); // stop callback //这里不管他 } } } //去验证是不是存在客户端,存在了就直接返回 ConnectClient connectClient = connectClientMap.get(address); if (connectClient!=null && connectClient.isValidate()) { return connectClient; } //如果没有客户端连接,就需要去创建连接,但在此之前,需要构建一个锁 Object clientLock = connectClientLockMap.get(address); if (clientLock == null) { connectClientLockMap.putIfAbsent(address, new Object());//putIfAbsent保证了使用原来的锁 clientLock = connectClientLockMap.get(address); } //加锁 synchronized (clientLock) { //需要判断是不是这个连接存活,存活就返回 connectClient = connectClientMap.get(address); if (connectClient!=null && connectClient.isValidate()) { return connectClient; } // remove old if (connectClient != null) { connectClient.close(); connectClientMap.remove(address); } // 创建连接,并加到池子里面,然后返回 ConnectClient connectClient_new = connectClientImpl.newInstance(); connectClientMap.put(address, connectClient_new); return connectClient_new; } }
单一长连接是以上三种调用的基础,将它作为一种调用其实不能符合逻辑。
以上,我们的调用就完成了,客户端通信的连接代码和服务端类似,不需要重复,该项目的精华部分上面已经全部搞定。
下面来对注册中进行研究。