手寫一個高性能的rpc框架
模擬服務端,運行main函數,相當於啟動服務器
public class ServerApplication { public static void main(String[] args) throws Exception { //開啟服務端,然后等待客戶端發出請求,然后給客戶端響應數據,但如果這些操作都寫在,會使代碼不好維護,因此,將其抽象化,抽象出 //一個工廠類,專門來做這樣的操作。 //因此需要一個專門來處理提供者類的工廠類,此類需要提供一個初始化方法、一個開始/停止服務的方法、(如果要加注冊中心的話還需要一個添加服務的方法) } }
提供者工廠類
public class XxlRpcProviderFactory { //初始化方法(相當於一個構造器)里面的參數,就是開啟服務需要的參數 /** * * @param netType 網絡通信方式 * @param serializer 序列化方式 * @param ip ip地址 * @param port 端口號 * @param accessToken 接收token(如果有) * @param serviceRegistryClass 注冊中心類 (可先不要) * @param serviceRegistryParam 注冊中心參數 (可先不要) */ public void init(){ } public void start(){ //這里使用回調的方式來完成開始和停止的操作 //首先需要一個Server類來具體管理服務端的數據通信,而使用回調函數,可以設置一個類似事件一樣的功能 //當Server開啟時需要執行注冊的操作,當關閉的時候需要移除注冊的操作 } public void stop(){ //在這里將Server停掉 } }
Server抽象類,延遲實現類(功能擴展)
public abstract class Server { //Server應該是一個抽象類,因為具體的通信可能不止一個。 //該類定義了XxlRpcProviderFactory需要的兩個方法,開始、停止,但沒有具體實現,具體實現延時到實現類中,比如NettyServer類 public abstract void start(); public abstract void st(); }
回調功能的實現
回調是一種思想,首先提供一個回調的抽象類,提供一個執行的抽象方法,但不去實現。然后設置一個條件,當滿足了這個條件之后就執行某項操作,這就是回調。
需求:當Server執行了start方法的時候(條件),就要去注冊服務;當Server執行了stop方法的時候(條件),就移除注冊服務。
實現思路:
從需求上看,當Server的實現類執行start方法后,需要執行一個方法,這個方法可以去注冊服務,但具體的注冊服務代碼不能寫在實現類里面,實現類中只提供一個判斷的功能,如果需要去回調就執行回調,不需要回調就不去回調。但這個顯然和實現類負責具體的通信代碼功能不相符合,因此,判斷條件的方法可以寫在Server抽象類中,並繼承給每一個實現類。
具體判斷邏輯就是看回調類是否實現。回調類應該是一個接口,只提供一個運行的方法,當回調類被實現了,那么回調函數一定需要去執行。從這個邏輯上看,真正執行回調函數的類就是實現了回調接口的實現類
BaseCallback
public abstract class BaseCallback { public abstract void run(); }
Server類中定義判斷條件方法

public abstract class Server { //Server應該是一個抽象類,因為具體的通信可能不止一個。 //該類定義了XxlRpcProviderFactory需要的兩個方法,開始、停止,但沒有具體實現,具體實現延時到實現類中,比如NettyServer類 public abstract void start(); public abstract void stop(); //需要一個傳來的BaseCallback類,來判斷該類是否被實例化 private BaseCallback startedCallback; private BaseCallback stopedCallback; //需要從外部將該參數傳進來,而且根據邏輯。需要連個傳值的方法。 public void setStartedCallback(BaseCallback baseCallback){ this.startedCallback=baseCallback; } public void setStopdCallback(BaseCallback baseCallback){ this.stopedCallback=baseCallback; } //以下方法就是條件判斷的方法,判斷條件就是baseCallback是否在外面被實例化了 public void onStart(){ if(startedCallback!=null){ //說明start方法需要去注冊服務了 startedCallback.run(); } } public void onStop(){ if(stopedCallback!=null){ //說明start方法需要去注冊服務了 stopedCallback.run(); } } }
在工廠類中,去實現回調run方法

public class XxlRpcProviderFactory { //初始化方法(相當於一個構造器)里面的參數,就是開啟服務需要的參數 /** * * @param netType 網絡通信方式 * @param serializer 序列化方式 * @param ip ip地址 * @param port 端口號 * @param accessToken 接收token(如果有) * @param serviceRegistryClass 注冊中心類 (可先不要) * @param serviceRegistryParam 注冊中心參數 (可先不要) */ public void init(){ } //先提供一個Server類,然后使用該類的各種方法,但該類是一個抽象類,具體的實現類需要使用newstance創建 private Server server; public void start(){ //這里使用回調的方式來完成開始和停止的操作 //首先需要一個Server類來具體管理服務端的數據通信,而使用回調函數,可以設置一個類似事件一樣的功能 //當Server開啟時需要執行注冊的操作,當關閉的時候需要移除注冊的操作 //先將回調函數實例化,當后面執行的時候就會按條件執行了 server.setStartedCallback(new BaseCallback() { @Override public void run() { //執行注冊的功能 } }); server.setStopdCallback(new BaseCallback() { @Override public void run() { //執行移除注冊的功能 } }); //開啟服務端的通信 server.start(); } public void stop(){ //在這里將Server停掉 server.stop(); } }
以上就是服務端的總體框架,接下來,只要提供了服務端的底層通信實現,服務端就搭建完成了。
下面開始實現Server類,使用netty通信模塊,采用NIO模型,傳輸數據是異步的,當然,在服務端幾乎不用考慮異步同步問題,當接收到請求之后,就返回所請求的對象。
為了更好的管理客戶端傳到服務器上的請求,將處理請求的操作啟動不同的線程來管理,這需要一個線程池。
ThreadPoolExecutor類的獲取(說明都在代碼里)

public class ThreadPoolUtil { //該類的作用就是返回一個線程池的實現類 ThreadPoolExecutor,包括規定池子的一些參數以及特性 public static ThreadPoolExecutor makeServerThreadPool(final String serverType){ //serverHandlerPool服務端處理線程池,將所有請求的處理線程都放到池子里面去 //一個線程池的實例化過程就是這樣,規定了池子的大小,存活時間,工作隊列,線程創建工廠給線程命名,池子滿了之后的處理 ThreadPoolExecutor serverHandlerPool =new ThreadPoolExecutor( 60, 300, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-rpc, " + serverType + "-serverHandlerPool-" + r.hashCode()); } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { throw new RuntimeException("xxl-rpc "+serverType+" Thread pool is EXHAUSTED!"); } } ); return serverHandlerPool; } }
有了上面的工具類,就可以管理每次請求處理的線程了。
現在工廠類中寫一個將request轉換為response的方法

public class XxlRpcProviderFactory { private NetEnum netType; private Serializer serializer; private String ip; // for registry private int port; // default port private String accessToken; private Class<? extends ServiceRegistry> serviceRegistryClass; private Map<String, String> serviceRegistryParam; //初始化方法(相當於一個構造器)里面的參數,就是開啟服務需要的參數 /** * * @param netType 網絡通信方式 * @param serializer 序列化方式 * @param ip ip地址 * @param port 端口號 * @param accessToken 接收token(如果有) * @param serviceRegistryClass 注冊中心類 (可先不要) * @param serviceRegistryParam 注冊中心參數 (可先不要) */ public void init(NetEnum netType, Serializer serializer, String ip, int port, String accessToken, Class<? extends ServiceRegistry> serviceRegistryClass, Map<String, String> serviceRegistryParam){ } //先提供一個Server類,然后使用該類的各種方法,但該類是一個抽象類,具體的實現類需要使用newstance創建 private Server server; public void start(){ //這里使用回調的方式來完成開始和停止的操作 //首先需要一個Server類來具體管理服務端的數據通信,而使用回調函數,可以設置一個類似事件一樣的功能 //當Server開啟時需要執行注冊的操作,當關閉的時候需要移除注冊的操作 //先將回調函數實例化,當后面執行的時候就會按條件執行了 server.setStartedCallback(new BaseCallback() { @Override public void run() { //執行注冊的功能 } }); server.setStopdCallback(new BaseCallback() { @Override public void run() { //執行移除注冊的功能 } }); //開啟服務端的通信 server.start(); } public void stop(){ //在這里將Server停掉 server.stop(); } //存放bean對象的map集合 private Map<String,Object> serviceData=new HashMap<>(); public Map<String, Object> getServiceData() { return serviceData; } //這里是將servicebean添加到map中的方法,在application中被調用,並傳給一個實例化好的類 public void addService(String iface, String version, Object serviceBean){ String serviceKey = makeServiceKey(iface, version); serviceData.put(serviceKey, serviceBean); } //建立一個制作key值的方法,根據接口名和版本號就可以得到key值 形式為iface#version public static String makeServiceKey(String iface, String version){ String serviceKey = iface; if (version!=null && version.trim().length()>0) { serviceKey += "#".concat(version); } return serviceKey; } //將handle類中的轉換方法放在這里實現 public XxlRpcResponse invokeService(XxlRpcRequest request){ XxlRpcResponse response=new XxlRpcResponse(); //設置response對象 response.setRequestId(request.getRequestId()); //這里將之前請求過的bean放在一個map集合中,使用key值來取,可以避免重復的進行反射的操作 //因此,在上面需要定義一個集合來存放得到的bean對象 //根據上面定義的方法和map集合,先從map中取,如果取不到,說明請求的接口有問題 String key = makeServiceKey(request.getClassName(), request.getVersion()); Object serviceBean = serviceData.get(key); //對得到的bean以及時間和token進行判斷,來拋出響應的錯誤 //沒有發現這個bean if(serviceBean==null){ response.setErrorMsg("The serviceKey["+ key +"] not found."); return response; } //判斷是否超時 if (System.currentTimeMillis() - request.getCreateMillisTime() > 3*60*1000) { response.setErrorMsg("The timestamp difference between admin and executor exceeds the limit."); return response; } //判斷請求的token和響應的token是否一致 if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(request.getAccessToken())) { response.setErrorMsg("The access token[" + request.getAccessToken() + "] is wrong."); return response; } //接下來,就將請求的方法執行得到的結果返回,利用反射 Class<?> serviceBeanClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); //反射得到類的方法 try { Method method = serviceBeanClass.getMethod(methodName, parameterTypes); method.setAccessible(true); Object result = method.invoke(serviceBean, parameters); response.setResult(result); } catch (Throwable t) { // catch error response.setErrorMsg(""); } return response; } }
然后就可以在handle類中調用了

public class NettyServerHandler extends SimpleChannelInboundHandler<XxlRpcRequest>{ //請求傳來,首先傳到這個類中,然后進行處理 //XxlRpcRequest類是封裝了請求的客戶端請求的類,這里在NettyServer中添加了編解碼就可以直接傳輸這個類了。 //向Handle類中傳入線程池類,然后執行 private ThreadPoolExecutor serverHandlerPool; //需要傳入一個工廠類,來調用轉換代碼 private XxlRpcProviderFactory providerFactory; //提供構造方法傳值 public NettyServerHandler(final XxlRpcProviderFactory providerFactory,final ThreadPoolExecutor serverHandlerPool){ this.providerFactory=providerFactory; this.serverHandlerPool=serverHandlerPool; } //netty通信,從客戶端傳來的值會直接到這里,request @Override protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final XxlRpcRequest request) throws Exception { serverHandlerPool.execute(new Runnable() { @Override public void run() { //在線程池的某個線程中執行調用函數,將請求轉換為響應。這樣做是為了在邏輯上分開,netty的線程是處理netty的IO操作, // 而這個線程就專門處理請求轉換。 //這個調用代碼直接在這里實現,使代碼有點繁瑣,將其放到工廠類中調用。 XxlRpcResponse response=providerFactory.invokeService(request); channelHandlerContext.writeAndFlush(response); } }); } //異常處理,就是關掉上下文 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } }
然后將handle類加到具體nettyserver類的pipeline里面,就可以傳輸了

public class NettyServer extends Server{ //Server的實現類,底層使用的是Netty通信 //開啟通信的方法,這里專門開辟一個線程開實現開啟功能 private Thread thread; @Override public void start(final XxlRpcProviderFactory factory) { thread=new Thread(new Runnable() { @Override public void run() { //這里就是具體的netty通信的服務端代碼 //首先創建一個線程池對象,要傳遞給處理類 final ThreadPoolExecutor pool = ThreadPoolUtil.makeServerThreadPool(NettyServer.class.getSimpleName()); //創建兩個線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); // start server ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new NettyDecoder(XxlRpcRequest.class, factory.getSerializer())) .addLast(new NettyEncoder(XxlRpcResponse.class, factory.getSerializer())) .addLast(new NettyServerHandler(factory, pool)); } }) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true); // bind try { ChannelFuture future = bootstrap.bind(factory.getPort()).sync(); //這里是回調函數的執行體 onStart(); //等待停止 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //關閉池子 pool.shutdown(); workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }); //將該線程設置為守護線程 thread.setDaemon(true); thread.start(); } @Override public void stop() { // destroy server thread //如果線程沒有關掉,但需要關掉,則執行線程的中端 if (thread != null && thread.isAlive()) { thread.interrupt(); } // on stop onStop(); } }
以上就實現了一個rpc的服務端。
下面來實現客戶端,客戶端相比服務端要復雜很多,服務端僅僅是開啟一個線程,然后靜靜等待,當有請求來了,從線程池中取一個線程來執行請求邏輯,返回一個響應對象。
客戶端比較復雜,其中發送請求和得到響應的過程就有四種調用方式,這四種調用方式也是最難理解的部分
首先,客戶端需要一個模擬的客戶端來完成服務的請求
ClientApplication類,模擬客戶端

public class ClientApplication { //1.測試同步發送請求 @Test public void syncTest(){ //首先提供一個代理類,通過動態代理,得到我們所需的接口方法的返回值(從服務端的轉換方法可以知道,得到了方法的名稱和參數后,返回的是方法的返回值), //動態代理的作用就是在客戶端使用哪個方法時,動態的請求哪個方法的返回值。 //這個動態代理封裝在一個geObject方法里,至於如何得到,有四種調用方式,請求的時候就傳入。 //創建一個參考的bean類來實現這個方法。 } //2.測試異步發送請求 @Test public void futureTest() throws ExecutionException, InterruptedException { } //3.測試回調發送請求 @Test public void callbackTest() throws InterruptedException { } //4,測試單一長連接發送請求 @Test public void onewayTest() throws InterruptedException { } }
因為有四種調用方式,我們在請求服務之初就將四種調用方式,通過枚舉定義在一個類中
public enum CallType { //在判斷的時候,可以按照其名字,去選擇不同的調用方式 SYNC, FUTURE, CALLBACK, ONEWAY; }
定義參考Bean,創建getObject方法
步驟1:得到所需的必須要參數
//該類就封裝了動態代理的getObject方法 //首先需要傳入必要的參數 private NetEnum netType; private Serializer serializer; private CallType callType; private LoadBalance loadBalance; private Class<?> iface; private String version; private long timeout = 1000; private String address; private String accessToken; private XxlRpcInvokeCallback invokeCallback; private XxlRpcInvokerFactory invokerFactory; /** * * @param netType 使用的底層通信模塊 * @param serializer 序列化方式 * @param callType 調用方式,有四種 * @param loadBalance 負載均衡的方式 * @param iface 接口名 * @param version 版本 * @param timeout 超時時間 * @param address 請求地址 * @param accessToken 接入token * @param invokeCallback 回調的調用(可無) * @param invokerFactory 調用工廠(可無) */ public XxlRpcReferenceBean(NetEnum netType, Serializer serializer, CallType callType, LoadBalance loadBalance, Class<?> iface, String version, long timeout, String address, String accessToken, XxlRpcInvokeCallback invokeCallback, XxlRpcInvokerFactory invokerFactory ){ this.netType = netType; this.serializer = serializer; this.callType = callType; this.loadBalance = loadBalance; this.iface = iface; this.version = version; this.timeout = timeout; this.address = address; this.accessToken = accessToken; this.invokeCallback = invokeCallback; this.invokerFactory = invokerFactory; // valid if (this.netType==null) { throw new XxlRpcException("xxl-rpc reference netType missing."); } if (this.serializer==null) { throw new XxlRpcException("xxl-rpc reference serializer missing."); } if (this.callType==null) { throw new XxlRpcException("xxl-rpc reference callType missing."); } if (this.loadBalance==null) { throw new XxlRpcException("xxl-rpc reference loadBalance missing."); } if (this.iface==null) { throw new XxlRpcException("xxl-rpc reference iface missing."); } if (this.timeout < 0) { this.timeout = 0; } if (this.invokerFactory == null) { this.invokerFactory = XxlRpcInvokerFactory.getInstance(); } // init Client // initClient(); }
步驟2:動態代理的准備函數
//利用動態代理,得到請求方法返回的參數 public Object getOnject(){ //直接返回動態代理的結果 return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), //這里使用的類加載器是當前線程的類加載器 new Class[]{iface}, //指明需要代理的類的接口名稱 new InvocationHandler() { //建立一個內部類,相當於繼承了InvocationHandler接口,這里寫的需要代理的內容 @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { } }
步驟3:得到resquest所需要的參數
//先得到要請求的類和方法的全部信息,通過這些信息封裝成request發送給服務端 String className = method.getDeclaringClass().getName();//類名,包括包名 String varsion_ = version;//版本信息 String methodName = method.getName();//方法名稱 Class<?>[] parameterTypes = method.getParameterTypes();//方法參數的類型 Object[] parameters = args;//方法的參數 //得到請求地址, //可能參數中已經傳遞了請求地址,那么請求就按傳遞的地址請求,也有可能沒有傳遞請求地址, // 但注冊中心可能已經為該請求的類准備了很多的請求地址,那么就需要用到負載均衡 String finalAddress=address; if(finalAddress==null ||finalAddress.trim().length()==0){ //沒有傳遞過來地址,則需要到注冊中心去找 //這里需要創建一個工廠類,提供一些需要的方法,包括生成key值,獲取地址等函數 if(invokeCallback!=null || invokerFactory.getServiceRegistry()!=null){ //如果這個工廠類不為空,而且通過工廠類能得到注冊中心,那么就可以取地址了 //得到存儲的key值 String key = XxlRpcProviderFactory.makeServiceKey(className, varsion_); //得到的地址是一個set集合 TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(key); if (addressSet==null || addressSet.size()==0) { //沒有得到請求地址 // pass } else if (addressSet.size()==1) { //得到一個請求地址,那么就是這一個 finalAddress = addressSet.first(); } else { //得到很多請求地址,則需要負載均衡 負載均衡的代碼之后再說 finalAddress = loadBalance.xxlRpcInvokerRouter.route(key, addressSet); } } } //如果最終還是沒有得到請求地址,則拋出異常 if (finalAddress==null || finalAddress.trim().length()==0) { throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty"); } //現在就可以封裝request了 XxlRpcRequest xxlRpcRequest = new XxlRpcRequest(); xxlRpcRequest.setRequestId(UUID.randomUUID().toString());//隨機的request的id,根據這個唯一的id可以在同步調用時候拿到正確的值 xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());//創建時間,傳到服務端與那里的時間相比,如果大於超時時間則拋出異常 xxlRpcRequest.setAccessToken(accessToken);//token值如果有,會與服務端的對比 xxlRpcRequest.setClassName(className);//請求類名 xxlRpcRequest.setMethodName(methodName);//方法名 xxlRpcRequest.setParameterTypes(parameterTypes); xxlRpcRequest.setParameters(parameters);
步驟3:同步調用的實現
//如果是使用同步調用,在netty底層的調用是異步的,異步的意思就是,如果發起多個請求,那么會無法判斷返回的響應是否是請求所得
// 因此,需要在請求發送過去之后,阻塞發送請求,直到請求得到響應之后,才能繼續發送請求。要實現這樣的功能,要用到
//future的get方法。
netty底層的nio模型,客戶端只管發,而服務端只管收,然后響應。但具體請求和響應之間不是同步的,因此會出現如法獲取到真實的響應的情況。
比如:以下就是異步返回的值:
null null UserDTO{name='liu1', word='hihihi'} UserDTO{name='liu3', word='hihihi'} UserDTO{name='liu4', word='hihihi'} UserDTO{name='liu4', word='hihihi'} UserDTO{name='liu4', word='hihihi'} UserDTO{name='liu5', word='hihihi'} UserDTO{name='liu6', word='hihihi'} UserDTO{name='liu7', word='hihihi'} UserDTO{name='liu7', word='hihihi'} UserDTO{name='liu9', word='hihihi'} UserDTO{name='liu11', word='hihihi'} UserDTO{name='liu12', word='hihihi'} UserDTO{name='liu13', word='hihihi'} UserDTO{name='liu15', word='hihihi'} UserDTO{name='liu16', word='hihihi'} UserDTO{name='liu16', word='hihihi'} UserDTO{name='liu17', word='hihihi'} UserDTO{name='liu18', word='hihihi'} UserDTO{name='liu19', word='hihihi'} UserDTO{name='liu19', word='hihihi'} UserDTO{name='liu21', word='hihihi'} UserDTO{name='liu22', word='hihihi'} UserDTO{name='liu23', word='hihihi'} UserDTO{name='liu24', word='hihihi'}
可以看出,剛開始甚至返回了null,而且每次返回的值都可能重復。因此實現同步很有必要
同步實現的邏輯為:需要在請求發送過去之后,阻塞發送請求,直到請求得到響應之后,才能繼續發送請求
這里利用了Future實現類的get方法來對響應返回進行阻塞。
對future類的理解:
Future的核心思想是:一個方法f,計算過程可能非常耗時,等待f返回,顯然不明智。可以在調用f的時候,立馬返回一個Future,可以通過Future這個數據結構去控制方法f的計算過程。
這里的控制包括:
get方法:獲取計算結果(如果還沒計算完,也是必須等待的)
cancel方法:還沒計算完,可以取消計算過程
isDone方法:判斷是否計算完
isCancelled方法:判斷計算是否被取消
因此,我們可以創建一個response的future實現類,在這個方法中利用get方法獲取response,如果沒有獲取到,get方法會阻塞掉,線程在這里阻塞,因此不會繼續發送請求了。當get方法完成了任務(即得到了resopnse),就可以把阻塞掉的線程喚醒,那么就可以繼續發送請求了。
synchronized關鍵字的使用:可以讓其修飾代碼塊、方法和靜態方法,被修飾的語句,只能允許一個線程執行,在該線程被執行完之前,其他線程都無法執行該代碼塊。它是一個互斥鎖。需要線程去申請synchronized括號里的對象鎖,只有申請到了才能執行。
接下來我們就編寫Future的實現類FutureResponse類,使得其在獲取response的時候,先將獲取response的線程阻塞,同時釋放鎖,當resopnse被設置了值之后,再去喚醒鎖,讓他去參與競爭,去拿到resopnse對象返回。

public class XxlRpcFutureResponse implements Future<XxlRpcResponse> { //這里實現了一個future類,並讓其監聽resopnse對象,如果response存在,才能返回response對象 //可以看到,在繼承了該接口后,需要實現這些方法。 //分析了那些需要實現的方法,可以總結出實現的邏輯 //首先需要知道什么時候response對象已經存在了,需要定義一個設置response對象的方法,如果該方法執行了,代表該方法已經執行了 //1.設置一個標志位,二設置一個鎖,get線程只有拿到鎖對象才能去返回鎖,在response存在之前,get線程被阻塞。這就是get的邏輯 // future lock private boolean done = false; private Object lock = new Object(); private XxlRpcResponse response; public void setResponse(XxlRpcResponse response){ this.response=response; //方法執行到這里,說明response已經存在了,需要告訴get方法, synchronized (lock){ done=true; lock.notifyAll(); } } //還沒計算完成,可以取消計算結果 @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } //判斷是否計算被取消,若cancel返回了true就取消了 @Override public boolean isCancelled() { return false; } //判斷是否計算完 @Override public boolean isDone() { return done; } //獲取計算結果,如果沒有計算完成,則在這里等到,並阻塞 @Override public XxlRpcResponse get() throws InterruptedException, ExecutionException { //具體get邏輯在下面的get方法中 try { return get(-1,TimeUnit.MILLISECONDS); } catch (TimeoutException e) { e.printStackTrace(); } } //獲取計算結果,如果沒有計算完,去看是否超時,如果超時了就不等了 @Override public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if(!done){ //get //獲取response對象的線程運行到這里,會去獲取lock的鎖對象,如果此時lock的鎖對象正被set方法的線程拿着,那么get需要等待,當鎖釋放以后,get方法會拿到這個鎖 //並且去根據超時數判斷需要阻塞的時長,當get的線程阻塞了之后,會釋放鎖資源,讓set方法拿到。如果set方法執行了,done會變true ,那么直接返回response即可 synchronized (lock){ if(timeout<0){//若超時數為-1,則一則阻塞下去,等待喚醒 lock.wait(); }else { long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);//計算需要阻塞的時間 lock.wait(); } } } //如果超時時間已經過去,還沒有獲取到對象,那么就拋出異常 if(!done){ throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() ); } return response; }
設置response的方法,需要在哪里執行呢,當handle類的read方法,拿到響應后,可以設置response,因此,設置response的線程是netty的io線程執行的。這里將set方法放到工廠里去執行。

package com.xxl.rpc.remoting.invoker; import com.xxl.rpc.registry.ServiceRegistry; import com.xxl.rpc.remoting.net.params.XxlRpcFutureResponse; import com.xxl.rpc.remoting.net.params.XxlRpcResponse; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; public class XxlRpcInvokerFactory { private static volatile XxlRpcInvokerFactory instance = new XxlRpcInvokerFactory(); public static XxlRpcInvokerFactory getInstance() { return instance; } private ServiceRegistry serviceRegistry; public ServiceRegistry getServiceRegistry() { return serviceRegistry; } //要想設置response,需要給一個futureresponse的類,這里設置一個map集合,將futureresponse類存在這里,從這里取,防止每次請求都創建 private ConcurrentMap<String, XxlRpcFutureResponse> futureResponsePool = new ConcurrentHashMap<String, XxlRpcFutureResponse>(); //設置一個初始化函數,在初始化的時候,將XxlRpcFutureResponse類放到池子里 public void setInvokerFuture(String requestId, XxlRpcFutureResponse futureResponse){ futureResponsePool.put(requestId, futureResponse); } public void removeInvokerFuture(String requestId){ futureResponsePool.remove(requestId); } public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){ //先得到XxlRpcFutureResponse的類對象 final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId); //如果沒有就不執行 if (futureResponse == null) { return; } //給futureResponse設置response對象 futureResponse.setResponse(xxlRpcResponse); } }
上面的方法被放在handle類的read方法里面,由nio線程調用。
這樣,就完成了一個同步的方法。

package com.xxl.rpc.remoting.invoker.reference; import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory; import com.xxl.rpc.remoting.invoker.call.CallType; import com.xxl.rpc.remoting.invoker.route.LoadBalance; import com.xxl.rpc.remoting.net.NetEnum; import com.xxl.rpc.remoting.net.params.XxlRpcFutureResponse; import com.xxl.rpc.remoting.net.params.XxlRpcRequest; import com.xxl.rpc.remoting.net.params.XxlRpcResponse; import com.xxl.rpc.remoting.provider.XxlRpcProviderFactory; import com.xxl.rpc.serialize.Serializer; import com.xxl.rpc.util.XxlRpcException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.TimeUnit; public class XxlRpcReferenceBean { //該類就封裝了動態代理的getObject方法 //首先需要傳入必要的參數 private NetEnum netType; private Serializer serializer; private CallType callType; private LoadBalance loadBalance; private Class<?> iface; private String version; private long timeout = 1000; private String address; private String accessToken; private XxlRpcInvokerFactory invokerFactory; /** * * @param netType 使用的底層通信模塊 * @param serializer 序列化方式 * @param callType 調用方式,有四種 * @param loadBalance 負載均衡的方式 * @param iface 接口名 * @param version 版本 * @param timeout 超時時間 * @param address 請求地址 * @param accessToken 接入token // * @param invokeCallback 回調的調用(可無) * @param invokerFactory 調用工廠(可無) */ public XxlRpcReferenceBean(NetEnum netType, Serializer serializer, CallType callType, LoadBalance loadBalance, Class<?> iface, String version, long timeout, String address, String accessToken, // XxlRpcInvokeCallback invokeCallback, XxlRpcInvokerFactory invokerFactory ){ this.netType = netType; this.serializer = serializer; this.callType = callType; this.loadBalance = loadBalance; this.iface = iface; this.version = version; this.timeout = timeout; this.address = address; this.accessToken = accessToken; // this.invokeCallback = invokeCallback; this.invokerFactory = invokerFactory; // valid if (this.netType==null) { throw new XxlRpcException("xxl-rpc reference netType missing."); } if (this.serializer==null) { throw new XxlRpcException("xxl-rpc reference serializer missing."); } if (this.callType==null) { throw new XxlRpcException("xxl-rpc reference callType missing."); } if (this.loadBalance==null) { throw new XxlRpcException("xxl-rpc reference loadBalance missing."); } if (this.iface==null) { throw new XxlRpcException("xxl-rpc reference iface missing."); } if (this.timeout < 0) { this.timeout = 0; } if (this.invokerFactory == null) { this.invokerFactory = XxlRpcInvokerFactory.getInstance(); } // init Client // initClient(); } //利用動態代理,得到請求方法返回的參數 public Object getOnject() { //直接返回動態代理的結果 return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), //這里使用的類加載器是當前線程的類加載器 new Class[]{iface}, //指明需要代理的類的接口名稱 new InvocationHandler() { //建立一個內部類,相當於繼承了InvocationHandler接口,這里寫的需要代理的內容 @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //下面是動態代理的調用的具體邏輯 //先得到要請求的類和方法的全部信息,通過這些信息封裝成request發送給服務端 String className = method.getDeclaringClass().getName();//類名,包括包名 String varsion_ = version;//版本信息 String methodName = method.getName();//方法名稱 Class<?>[] parameterTypes = method.getParameterTypes();//方法參數的類型 Object[] parameters = args;//方法的參數 //得到請求地址, //可能參數中已經傳遞了請求地址,那么請求就按傳遞的地址請求,也有可能沒有傳遞請求地址, // 但注冊中心可能已經為該請求的類准備了很多的請求地址,那么就需要用到負載均衡 String finalAddress = address; if (finalAddress == null || finalAddress.trim().length() == 0) { //沒有傳遞過來地址,則需要到注冊中心去找 //這里需要創建一個工廠類,提供一些需要的方法,包括生成key值,獲取地址等函數 if (invokerFactory != null || invokerFactory.getServiceRegistry() != null) { //如果這個工廠類不為空,而且通過工廠類能得到注冊中心,那么就可以取地址了 //得到存儲的key值 String key = XxlRpcProviderFactory.makeServiceKey(className, varsion_); //得到的地址是一個set集合 TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(key); if (addressSet == null || addressSet.size() == 0) { //沒有得到請求地址 // pass } else if (addressSet.size() == 1) { //得到一個請求地址,那么就是這一個 finalAddress = addressSet.first(); } else { //得到很多請求地址,則需要負載均衡 負載均衡的代碼之后再說 finalAddress = loadBalance.xxlRpcInvokerRouter.route(key, addressSet); } } } //如果最終還是沒有得到請求地址,則拋出異常 if (finalAddress == null || finalAddress.trim().length() == 0) { throw new XxlRpcException("xxl-rpc reference bean[" + className + "] address empty"); } //現在就可以封裝request了 XxlRpcRequest xxlRpcRequest = new XxlRpcRequest(); xxlRpcRequest.setRequestId(UUID.randomUUID().toString());//隨機的request的id,根據這個唯一的id可以在同步調用時候拿到正確的值 xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());//創建時間,傳到服務端與那里的時間相比,如果大於超時時間則拋出異常 xxlRpcRequest.setAccessToken(accessToken);//token值如果有,會與服務端的對比 xxlRpcRequest.setClassName(className);//請求類名 xxlRpcRequest.setMethodName(methodName);//方法名 xxlRpcRequest.setParameterTypes(parameterTypes); xxlRpcRequest.setParameters(parameters); //接下來就是調用的邏輯了,四種調用方式用if->else來選擇 if (CallType.SYNC == callType) { //如果是使用同步調用,在netty底層的調用是異步的,異步的意思就是,如果發起多個請求,那么會無法判斷返回的響應是否是請求所得, // 因此,需要在請求發送過去之后,阻塞發送請求,直到請求得到響應之后,才能繼續發送請求。要實現這樣的功能,要用到 //future的get方法。 //這里先創建future對象,並初始化 XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest); //然后發送請求 //執行send方法,這里還沒有實現 //然后接收響應 //通過get方法的阻塞來同步響應 XxlRpcResponse response = futureResponse.get(timeout, TimeUnit.MILLISECONDS); if (response.getErrorMsg() != null) { throw new XxlRpcException(response.getErrorMsg()); } return response.getResult(); //以上就是同步調用的邏輯 } return null; } }); } }
步驟4:Future異步調用的實現:、
在上面同步的代碼中,使用get方法阻塞實現了線程的同步,那么如果用戶想自己規定get resopnse的時機,而不是直接返回,就需要future模式,再創建一個調用的future,然后設置一個ThreadLocal,將值存起來,等需要的時候直接獲取就可以了。
具體實現,是在上面同步的基礎上,定義了一個調用Future,並通過get方法來得到futureresponse對象,這里存的是response,如何取到response的邏輯和上面一致。但不同的是,設置了一個ThreadLocal集合
ThreadLocal為每個使用該變量的線程提供獨立的變量副本,所以每一個線程都可以獨立地改變自己的副本,而不會影響其它線程所對應的副本。
在ThreadLocal類中有一個Map,用於存儲每一個線程的變量副本,Map中元素的鍵為ThreadLocal對象,而值對應線程的變量副本。這個map是thread類的一個私有屬性,所以可以通過線程獲取該map的值。
XxlRpcInvokeFuture類

public class XxlRpcInvokeFuture implements Future { //首先需要將futureResponse引進來,要獲取response對象,需要從futureResponse中get。 private XxlRpcFutureResponse futureResponse; public XxlRpcInvokeFuture(XxlRpcFutureResponse futureResponse) { this.futureResponse = futureResponse; } public void stop(){ // remove-InvokerFuture } //實現的方法都從futureResponse調用 @Override public boolean cancel(boolean mayInterruptIfRunning) { return futureResponse.cancel(mayInterruptIfRunning); } @Override public boolean isCancelled() { return futureResponse.isCancelled(); } @Override public boolean isDone() { return futureResponse.isDone(); } @Override public Object get() throws InterruptedException, ExecutionException { try { return get(-1, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { throw new XxlRpcException(e); } } @Override public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { try { // future get XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, unit); if (xxlRpcResponse.getErrorMsg() != null) { throw new XxlRpcException(xxlRpcResponse.getErrorMsg()); } return xxlRpcResponse.getResult(); } finally { stop(); } } //創建一個ThreadLocal線程,來綁定invokerfuture的線程 private static ThreadLocal<XxlRpcInvokeFuture> threadInvokerFuture = new ThreadLocal<XxlRpcInvokeFuture>(); //將future保存到線程中去,相當於一個線程副本 public static void setFuture(XxlRpcInvokeFuture future) { threadInvokerFuture.set(future); } public static void removeFuture() { threadInvokerFuture.remove(); } //獲取Future對象,然后將線程移除掉,如果不移除,會導致內存泄露 public static <T> Future<T> getFuture(Class<T> type) { Future<T> future = (Future<T>) threadInvokerFuture.get(); threadInvokerFuture.remove(); return future; } }
else if(CallType.FUTURE==callType){ //使用get方法阻塞實現了線程的同步,那么如果用戶想自己規定get resopnse的時機, // 而不是直接返回,就需要future模式,再創建一個調用的future,然后設置一個ThreadLocal, // 將值存起來,等需要的時候直接獲取就可以了。 //因此,我們需要創建一個調用future類,並將該類的線程交個threadlocal去執行 //先創建futureResponse對象 XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest); //然后將futureResponse對象加到調用future類中,保存到線程里面去 XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse); XxlRpcInvokeFuture.setFuture(invokeFuture); //發送請求 //發送完之后,不用返回,而是等什么時候用,什么時候調用getFuture得到調用future,然后再執行get方法,獲取response對象 return null; }
對於每一次請求都會set一個invokerFuture對象,並將其存入ThreadLocal中,它是一個map,因此,依照順序存入有得到。依照隊列的形式存取
@Test public void futureTest() throws ExecutionException, InterruptedException { XxlRpcReferenceBean bean = new XxlRpcReferenceBean(NetEnum.NETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.FUTURE, LoadBalance.ROUND, IDemo.class, null, 500, "127.0.0.1:7080", null, null, null); //得到返回對象 IDemo demo =(IDemo) bean.getObject(); demo.sayHi("liu:"); Future<UserDTO> future = XxlRpcInvokeFuture.getFuture(UserDTO.class); demo.sayHi("xing"); Future<UserDTO> future1 = XxlRpcInvokeFuture.getFuture(UserDTO.class); UserDTO s1 = future.get(); UserDTO s2 = future1.get(); System.out.println(s2); System.out.println(s1); }
測試結果如下:
UserDTO{name='xing', word='hihihi'}
UserDTO{name='liu:', word='hihihi'}
步驟:5:callback方式的調用
callback調用方式使用了回調的理念,即當事件發生時會觸發調用函數,並執行。那么就需要一個回調的類,
與future的調用方法相似,創建一個回調的類,並在調用的時候,將該類加到ThreadLocal里面,然后,回調函數如果判斷到回調類存在了(即運行了set方法),就執行回調的操作,因此回調函數的執行是在獲取到response之后,即放在工廠類的notify里面。

public abstract class XxlRpcInvokeCallback<T> { //設置一個抽象的回調類,定義了回調函數執行的方法名,具體實現在滿足條件的時候再調用 //兩個抽象方法,規定了在成功和失敗時調用的方法 public abstract void onSuccess(T result); public abstract void onFailure(Throwable exception); //與InvokerFuture類似,設置一個ThreadLocal,在調用的時候設置這個回調類,然后去取 private static ThreadLocal<XxlRpcInvokeCallback> threadInvokerFuture = new ThreadLocal<XxlRpcInvokeCallback>(); public static XxlRpcInvokeCallback getCallback() { XxlRpcInvokeCallback invokeCallback = threadInvokerFuture.get(); threadInvokerFuture.remove(); return invokeCallback; } public static void setCallback(XxlRpcInvokeCallback invokeCallback) { threadInvokerFuture.set(invokeCallback); } public static void removeCallback() { threadInvokerFuture.remove(); } }
在工廠類中執行回調函數,是執行的io的線程,因此,需要給他設置一個線程池,來防止將回調邏輯都放到io線程里面去。
這里繼續利用線程池額執行類
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){ //先得到XxlRpcFutureResponse的類對象 final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId); //如果沒有就不執行 if (futureResponse == null) { return; } //如果回調類不為空,即得到了回調類,那么就開啟線程池來執行操作 if(futureResponse.getInvokeCallback()!=null){ //執行線程池的操作 executeResponseCallback(new Runnable() { @Override public void run() { if (xxlRpcResponse.getErrorMsg() != null) { futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg())); } else { System.out.println(Thread.currentThread()); futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult()); } } }); } //給futureResponse設置response對象 futureResponse.setResponse(xxlRpcResponse); } //使用線程池來執行 private ThreadPoolExecutor responseCallbackThreadPool = null; //設置一個線程池 //配置線程池 public void executeResponseCallback(Runnable runnable){ if(responseCallbackThreadPool==null){ synchronized (this){ //這里加一個鎖,執行這些操作的時候,不能操作其他的線程 if (responseCallbackThreadPool == null) { responseCallbackThreadPool = new ThreadPoolExecutor( 10, 100, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-rpc, XxlRpcInvokerFactory-responseCallbackThreadPool-" + r.hashCode()); } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { throw new XxlRpcException("xxl-rpc Invoke Callback Thread pool is EXHAUSTED!"); } }); // default maxThreads 300, minThreads 60 } } } }

else if(CallType.CALLBACK==callType){ //有了上面的分析,回調的實現很簡單了,將callback類設置到ThreadLocal里面去,然后在回調函數里面去取,取出來執行。 //不同點在於,執行的函數一般不能放在io的線程里面,因此,需要使用一個線程池來維護。 // get callback XxlRpcInvokeCallback finalInvokeCallback=null; XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback(); // System.out.println(threadInvokeCallback); if (threadInvokeCallback != null) { finalInvokeCallback = threadInvokeCallback; } if (finalInvokeCallback == null) { throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null."); } // future-response set XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback); //執行發送代碼 // client.asyncSend(finalAddress, xxlRpcRequest); return null; } return null; }
步驟6:單一長連接
因為NIO是非阻塞的,因此,每個客戶端只需要連接一次就可以了,這就是單一長連接,它的實現是一個連接池。將一個客戶端(adress不變)作為一個連接來維護,將該客戶端存到一個ConcurrentHashMap來保存,在進行連接的時候,去看這個ConcurrentHashMap里有沒有需要的連接,如果沒有,就需要創建,這里的過程是同步的,創建連接需要加鎖,
代碼如下:

public abstract class ConnectClient { //實現單一長連接的基礎 //提供的接口方法 //初始化方法 public abstract void init(String address, final Serializer serializer, final XxlRpcInvokerFactory xxlRpcInvokerFactory) throws Exception; //關閉 public abstract void close(); //驗證 public abstract boolean isValidate(); //發送 public abstract void send(XxlRpcRequest xxlRpcRequest) throws Exception ; //異步發送方法 public static void asyncSend(XxlRpcRequest xxlRpcRequest, String address, Class<? extends ConnectClient> connectClientImpl, final XxlRpcReferenceBean xxlRpcReferenceBean) throws Exception { //這里提供一個從池子中取連接的方法 ConnectClient clientPool = ConnectClient.getPool(address, connectClientImpl, xxlRpcReferenceBean); try { // do invoke clientPool.send(xxlRpcRequest); } catch (Exception e) { throw e; } } //使用ConcurrentHashMap來做一個池子,使用ConcurrentHashMap的原因是它實現了一個分段鎖機制,保證不同線程在put的時候不會阻塞 private static volatile ConcurrentHashMap<String, ConnectClient> connectClientMap; //每個地址都會有一個鎖,來控制連接的創建,如果是相同的地址,要創建連接需要加鎖,否則會出現同一地址創建多個連接的情況。 private static volatile ConcurrentHashMap<String, Object> connectClientLockMap = new ConcurrentHashMap<>(); //獲取連接的代碼 private static ConnectClient getPool(String address, Class<? extends ConnectClient> connectClientImpl, final XxlRpcReferenceBean xxlRpcReferenceBean) throws Exception { //先判斷這個池子是否為空,如果為空,則需要創建(這里為空的意思是null) //創建map需要用到雙檢鎖,保證創建的是一個單例 if (connectClientMap == null) { synchronized (ConnectClient.class) { if (connectClientMap == null) { // init connectClientMap = new ConcurrentHashMap<String, ConnectClient>(); // stop callback //這里不管他 } } } //去驗證是不是存在客戶端,存在了就直接返回 ConnectClient connectClient = connectClientMap.get(address); if (connectClient!=null && connectClient.isValidate()) { return connectClient; } //如果沒有客戶端連接,就需要去創建連接,但在此之前,需要構建一個鎖 Object clientLock = connectClientLockMap.get(address); if (clientLock == null) { connectClientLockMap.putIfAbsent(address, new Object());//putIfAbsent保證了使用原來的鎖 clientLock = connectClientLockMap.get(address); } //加鎖 synchronized (clientLock) { //需要判斷是不是這個連接存活,存活就返回 connectClient = connectClientMap.get(address); if (connectClient!=null && connectClient.isValidate()) { return connectClient; } // remove old if (connectClient != null) { connectClient.close(); connectClientMap.remove(address); } // 創建連接,並加到池子里面,然后返回 ConnectClient connectClient_new = connectClientImpl.newInstance(); connectClientMap.put(address, connectClient_new); return connectClient_new; } }
單一長連接是以上三種調用的基礎,將它作為一種調用其實不能符合邏輯。
以上,我們的調用就完成了,客戶端通信的連接代碼和服務端類似,不需要重復,該項目的精華部分上面已經全部搞定。
下面來對注冊中進行研究。