rpc中的高並發


手寫一個高性能的rpc框架

模擬服務端,運行main函數,相當於啟動服務器

public class ServerApplication {
    public static void main(String[] args) throws Exception {

        //開啟服務端,然后等待客戶端發出請求,然后給客戶端響應數據,但如果這些操作都寫在,會使代碼不好維護,因此,將其抽象化,抽象出
        //一個工廠類,專門來做這樣的操作。
        //因此需要一個專門來處理提供者類的工廠類,此類需要提供一個初始化方法、一個開始/停止服務的方法、(如果要加注冊中心的話還需要一個添加服務的方法)
        
    }
}

提供者工廠類

public class XxlRpcProviderFactory {
    //初始化方法(相當於一個構造器)里面的參數,就是開啟服務需要的參數

    /**
     *
     * @param netType      網絡通信方式
     * @param serializer   序列化方式
     * @param ip            ip地址
     * @param port          端口號
     * @param accessToken  接收token(如果有)
     * @param serviceRegistryClass 注冊中心類  (可先不要)
     * @param serviceRegistryParam 注冊中心參數  (可先不要)
     */
    public void init(){

    }

    public void start(){
        //這里使用回調的方式來完成開始和停止的操作
        //首先需要一個Server類來具體管理服務端的數據通信,而使用回調函數,可以設置一個類似事件一樣的功能
        //當Server開啟時需要執行注冊的操作,當關閉的時候需要移除注冊的操作
    }

    public void stop(){
        //在這里將Server停掉
    }

}

 Server抽象類,延遲實現類(功能擴展)

public abstract class Server {

    //Server應該是一個抽象類,因為具體的通信可能不止一個。
    //該類定義了XxlRpcProviderFactory需要的兩個方法,開始、停止,但沒有具體實現,具體實現延時到實現類中,比如NettyServer類
    public abstract void start();

    public abstract void st();
}

 

回調功能的實現

回調是一種思想,首先提供一個回調的抽象類,提供一個執行的抽象方法,但不去實現。然后設置一個條件,當滿足了這個條件之后就執行某項操作,這就是回調。

需求:當Server執行了start方法的時候(條件),就要去注冊服務;當Server執行了stop方法的時候(條件),就移除注冊服務。

實現思路:

從需求上看,當Server的實現類執行start方法后,需要執行一個方法,這個方法可以去注冊服務,但具體的注冊服務代碼不能寫在實現類里面,實現類中只提供一個判斷的功能,如果需要去回調就執行回調,不需要回調就不去回調。但這個顯然和實現類負責具體的通信代碼功能不相符合,因此,判斷條件的方法可以寫在Server抽象類中,並繼承給每一個實現類。

具體判斷邏輯就是看回調類是否實現。回調類應該是一個接口,只提供一個運行的方法,當回調類被實現了,那么回調函數一定需要去執行。從這個邏輯上看,真正執行回調函數的類就是實現了回調接口的實現類

 

BaseCallback

public abstract class BaseCallback {
    public abstract void run();
}

 

Server類中定義判斷條件方法

public abstract class Server {

    //Server應該是一個抽象類,因為具體的通信可能不止一個。
    //該類定義了XxlRpcProviderFactory需要的兩個方法,開始、停止,但沒有具體實現,具體實現延時到實現類中,比如NettyServer類
    public abstract void start();

    public abstract void stop();

    //需要一個傳來的BaseCallback類,來判斷該類是否被實例化
    private BaseCallback startedCallback;
    private BaseCallback stopedCallback;

    //需要從外部將該參數傳進來,而且根據邏輯。需要連個傳值的方法。
    public void setStartedCallback(BaseCallback baseCallback){
        this.startedCallback=baseCallback;
    }
    public void setStopdCallback(BaseCallback baseCallback){
        this.stopedCallback=baseCallback;
    }

    //以下方法就是條件判斷的方法,判斷條件就是baseCallback是否在外面被實例化了
    public void onStart(){
        if(startedCallback!=null){
            //說明start方法需要去注冊服務了
            startedCallback.run();
        }
    }

    public void onStop(){
        if(stopedCallback!=null){
            //說明start方法需要去注冊服務了
            stopedCallback.run();
        }
    }
}
Server

 

在工廠類中,去實現回調run方法

public class XxlRpcProviderFactory {
    //初始化方法(相當於一個構造器)里面的參數,就是開啟服務需要的參數

    /**
     *
     * @param netType      網絡通信方式
     * @param serializer   序列化方式
     * @param ip            ip地址
     * @param port          端口號
     * @param accessToken  接收token(如果有)
     * @param serviceRegistryClass 注冊中心類  (可先不要)
     * @param serviceRegistryParam 注冊中心參數  (可先不要)
     */
    public void init(){

    }

    //先提供一個Server類,然后使用該類的各種方法,但該類是一個抽象類,具體的實現類需要使用newstance創建
    private Server server;

    public void start(){
        //這里使用回調的方式來完成開始和停止的操作
        //首先需要一個Server類來具體管理服務端的數據通信,而使用回調函數,可以設置一個類似事件一樣的功能
        //當Server開啟時需要執行注冊的操作,當關閉的時候需要移除注冊的操作

        //先將回調函數實例化,當后面執行的時候就會按條件執行了
        server.setStartedCallback(new BaseCallback() {
            @Override
            public void run() {
                //執行注冊的功能
            }
        });
        server.setStopdCallback(new BaseCallback() {
            @Override
            public void run() {
                //執行移除注冊的功能
            }
        });
        //開啟服務端的通信
        server.start();
    }

    public void stop(){
        //在這里將Server停掉
        server.stop();
    }

}
XxlRpcProviderFactory

 

 


 

以上就是服務端的總體框架,接下來,只要提供了服務端的底層通信實現,服務端就搭建完成了。

下面開始實現Server類,使用netty通信模塊,采用NIO模型,傳輸數據是異步的,當然,在服務端幾乎不用考慮異步同步問題,當接收到請求之后,就返回所請求的對象。

為了更好的管理客戶端傳到服務器上的請求,將處理請求的操作啟動不同的線程來管理,這需要一個線程池。

ThreadPoolExecutor類的獲取(說明都在代碼里)

public class ThreadPoolUtil {

    //該類的作用就是返回一個線程池的實現類 ThreadPoolExecutor,包括規定池子的一些參數以及特性
    public static ThreadPoolExecutor makeServerThreadPool(final String serverType){

        //serverHandlerPool服務端處理線程池,將所有請求的處理線程都放到池子里面去
        //一個線程池的實例化過程就是這樣,規定了池子的大小,存活時間,工作隊列,線程創建工廠給線程命名,池子滿了之后的處理
        ThreadPoolExecutor serverHandlerPool =new ThreadPoolExecutor(
                60,
                300,
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-rpc, " + serverType + "-serverHandlerPool-" + r.hashCode());
                    }
                },
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        throw new RuntimeException("xxl-rpc "+serverType+" Thread pool is EXHAUSTED!");
                    }
                }
        );
        return serverHandlerPool;

    }
}
ThreadPoolUtil

 

有了上面的工具類,就可以管理每次請求處理的線程了。

現在工廠類中寫一個將request轉換為response的方法

public class XxlRpcProviderFactory {

    private NetEnum netType;
    private Serializer serializer;

    private String ip;                    // for registry
    private int port;                    // default port
    private String accessToken;

    private Class<? extends ServiceRegistry> serviceRegistryClass;
    private Map<String, String> serviceRegistryParam;

    //初始化方法(相當於一個構造器)里面的參數,就是開啟服務需要的參數

    /**
     *
     * @param netType      網絡通信方式
     * @param serializer   序列化方式
     * @param ip            ip地址
     * @param port          端口號
     * @param accessToken  接收token(如果有)
     * @param serviceRegistryClass 注冊中心類  (可先不要)
     * @param serviceRegistryParam 注冊中心參數  (可先不要)
     */
    public void init(NetEnum netType,
                     Serializer serializer,
                     String ip,
                     int port,
                     String accessToken,
                     Class<? extends ServiceRegistry> serviceRegistryClass,
                     Map<String, String> serviceRegistryParam){

    }

    //先提供一個Server類,然后使用該類的各種方法,但該類是一個抽象類,具體的實現類需要使用newstance創建
    private Server server;

    public void start(){
        //這里使用回調的方式來完成開始和停止的操作
        //首先需要一個Server類來具體管理服務端的數據通信,而使用回調函數,可以設置一個類似事件一樣的功能
        //當Server開啟時需要執行注冊的操作,當關閉的時候需要移除注冊的操作

        //先將回調函數實例化,當后面執行的時候就會按條件執行了
        server.setStartedCallback(new BaseCallback() {
            @Override
            public void run() {
                //執行注冊的功能
            }
        });
        server.setStopdCallback(new BaseCallback() {
            @Override
            public void run() {
                //執行移除注冊的功能
            }
        });
        //開啟服務端的通信
        server.start();
    }

    public void stop(){
        //在這里將Server停掉
        server.stop();
    }

    //存放bean對象的map集合
    private Map<String,Object> serviceData=new HashMap<>();

    public Map<String, Object> getServiceData() {
        return serviceData;
    }
    //這里是將servicebean添加到map中的方法,在application中被調用,並傳給一個實例化好的類
    public void addService(String iface, String version, Object serviceBean){
        String serviceKey = makeServiceKey(iface, version);
        serviceData.put(serviceKey, serviceBean);

    }

    //建立一個制作key值的方法,根據接口名和版本號就可以得到key值 形式為iface#version
    public static String makeServiceKey(String iface, String version){
        String serviceKey = iface;
        if (version!=null && version.trim().length()>0) {
            serviceKey += "#".concat(version);
        }
        return serviceKey;
    }

    //將handle類中的轉換方法放在這里實現
    public XxlRpcResponse invokeService(XxlRpcRequest request){

        XxlRpcResponse response=new XxlRpcResponse();
        //設置response對象
        response.setRequestId(request.getRequestId());

        //這里將之前請求過的bean放在一個map集合中,使用key值來取,可以避免重復的進行反射的操作
        //因此,在上面需要定義一個集合來存放得到的bean對象
        //根據上面定義的方法和map集合,先從map中取,如果取不到,說明請求的接口有問題
        String key = makeServiceKey(request.getClassName(), request.getVersion());
        Object serviceBean = serviceData.get(key);

        //對得到的bean以及時間和token進行判斷,來拋出響應的錯誤
        //沒有發現這個bean
        if(serviceBean==null){
            response.setErrorMsg("The serviceKey["+ key +"] not found.");
            return response;
        }
        //判斷是否超時
        if (System.currentTimeMillis() - request.getCreateMillisTime() > 3*60*1000) {
            response.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
            return response;
        }
        //判斷請求的token和響應的token是否一致
        if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(request.getAccessToken())) {
            response.setErrorMsg("The access token[" + request.getAccessToken() + "] is wrong.");
            return response;
        }
        //接下來,就將請求的方法執行得到的結果返回,利用反射
        Class<?> serviceBeanClass = serviceBean.getClass();
        String methodName = request.getMethodName();
        Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] parameters = request.getParameters();

        //反射得到類的方法
        try {
            Method method = serviceBeanClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            Object result = method.invoke(serviceBean, parameters);

            response.setResult(result);

        } catch (Throwable t) {
            // catch error
            response.setErrorMsg("");
        }
        return response;

    }

}
XxlRpcProviderFactory

 

然后就可以在handle類中調用了

public class NettyServerHandler extends SimpleChannelInboundHandler<XxlRpcRequest>{

    //請求傳來,首先傳到這個類中,然后進行處理
    //XxlRpcRequest類是封裝了請求的客戶端請求的類,這里在NettyServer中添加了編解碼就可以直接傳輸這個類了。

    //向Handle類中傳入線程池類,然后執行
    private ThreadPoolExecutor serverHandlerPool;
    //需要傳入一個工廠類,來調用轉換代碼
    private XxlRpcProviderFactory providerFactory;

    //提供構造方法傳值
    public NettyServerHandler(final XxlRpcProviderFactory providerFactory,final ThreadPoolExecutor serverHandlerPool){
        this.providerFactory=providerFactory;
        this.serverHandlerPool=serverHandlerPool;
    }

    //netty通信,從客戶端傳來的值會直接到這里,request
    @Override
    protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final XxlRpcRequest request) throws Exception {

        serverHandlerPool.execute(new Runnable() {
            @Override
            public void run() {
                //在線程池的某個線程中執行調用函數,將請求轉換為響應。這樣做是為了在邏輯上分開,netty的線程是處理netty的IO操作,
                // 而這個線程就專門處理請求轉換。
                //這個調用代碼直接在這里實現,使代碼有點繁瑣,將其放到工廠類中調用。
                XxlRpcResponse response=providerFactory.invokeService(request);
                channelHandlerContext.writeAndFlush(response);
            }
        });
    }

    //異常處理,就是關掉上下文
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }
}
NettyServerHandler

 

然后將handle類加到具體nettyserver類的pipeline里面,就可以傳輸了

public class NettyServer extends Server{

    //Server的實現類,底層使用的是Netty通信

    //開啟通信的方法,這里專門開辟一個線程開實現開啟功能
    private Thread thread;
    @Override
    public void start(final XxlRpcProviderFactory factory) {
        thread=new Thread(new Runnable() {
            @Override
            public void run() {
                //這里就是具體的netty通信的服務端代碼
                //首先創建一個線程池對象,要傳遞給處理類
                final ThreadPoolExecutor pool = ThreadPoolUtil.makeServerThreadPool(NettyServer.class.getSimpleName());
                //創建兩個線程組
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup();

                // start server
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline()
                                        .addLast(new NettyDecoder(XxlRpcRequest.class, factory.getSerializer()))
                                        .addLast(new NettyEncoder(XxlRpcResponse.class, factory.getSerializer()))
                                        .addLast(new NettyServerHandler(factory, pool));
                            }
                        })
                        .childOption(ChannelOption.TCP_NODELAY, true)
                        .childOption(ChannelOption.SO_KEEPALIVE, true);

                // bind
                try {
                    ChannelFuture future = bootstrap.bind(factory.getPort()).sync();

                    //這里是回調函數的執行體
                    onStart();

                    //等待停止
                    future.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                finally {
                    //關閉池子
                    pool.shutdown();
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                }
            }
        });
        //將該線程設置為守護線程
        thread.setDaemon(true);
        thread.start();

    }

    @Override
    public void stop() {
// destroy server thread
        //如果線程沒有關掉,但需要關掉,則執行線程的中端
        if (thread != null && thread.isAlive()) {
            thread.interrupt();
        }  // on stop
        onStop();
    }
}
NettyServer

 

 

以上就實現了一個rpc的服務端。


 

 

下面來實現客戶端,客戶端相比服務端要復雜很多,服務端僅僅是開啟一個線程,然后靜靜等待,當有請求來了,從線程池中取一個線程來執行請求邏輯,返回一個響應對象。

客戶端比較復雜,其中發送請求和得到響應的過程就有四種調用方式,這四種調用方式也是最難理解的部分

首先,客戶端需要一個模擬的客戶端來完成服務的請求

ClientApplication類,模擬客戶端

public class ClientApplication {

   //1.測試同步發送請求
    @Test
    public void syncTest(){
     //首先提供一個代理類,通過動態代理,得到我們所需的接口方法的返回值(從服務端的轉換方法可以知道,得到了方法的名稱和參數后,返回的是方法的返回值),
     //動態代理的作用就是在客戶端使用哪個方法時,動態的請求哪個方法的返回值。
     //這個動態代理封裝在一個geObject方法里,至於如何得到,有四種調用方式,請求的時候就傳入。
     //創建一個參考的bean類來實現這個方法。


    }

    //2.測試異步發送請求
    @Test
    public void futureTest() throws ExecutionException, InterruptedException {

    }

    //3.測試回調發送請求
    @Test
    public void callbackTest() throws InterruptedException {
    }

    //4,測試單一長連接發送請求
    @Test
    public void onewayTest() throws InterruptedException {

    }
}
ClientApplication

 

因為有四種調用方式,我們在請求服務之初就將四種調用方式,通過枚舉定義在一個類中

public enum CallType {

    //在判斷的時候,可以按照其名字,去選擇不同的調用方式

    SYNC,

    FUTURE,

    CALLBACK,

    ONEWAY;
}

 

定義參考Bean,創建getObject方法

步驟1:得到所需的必須要參數

 //該類就封裝了動態代理的getObject方法
    //首先需要傳入必要的參數

    private NetEnum netType;
    private Serializer serializer;
    private CallType callType;
    private LoadBalance loadBalance;

    private Class<?> iface;
    private String version;

    private long timeout = 1000;

    private String address;
    private String accessToken;

    private XxlRpcInvokeCallback invokeCallback;

    private XxlRpcInvokerFactory invokerFactory;


    /**
     *
     * @param netType       使用的底層通信模塊
     * @param serializer    序列化方式
     * @param callType      調用方式,有四種
     * @param loadBalance   負載均衡的方式
     * @param iface          接口名
     * @param version        版本
     * @param timeout        超時時間
     * @param address        請求地址
     * @param accessToken    接入token
     * @param invokeCallback    回調的調用(可無)
     * @param invokerFactory    調用工廠(可無)
     */
    public XxlRpcReferenceBean(NetEnum netType,
                               Serializer serializer,
                               CallType callType,
                               LoadBalance loadBalance,
                               Class<?> iface,
                               String version,
                               long timeout,
                               String address,
                               String accessToken,
                               XxlRpcInvokeCallback invokeCallback,
                               XxlRpcInvokerFactory invokerFactory
    ){
        this.netType = netType;
        this.serializer = serializer;
        this.callType = callType;
        this.loadBalance = loadBalance;
        this.iface = iface;
        this.version = version;
        this.timeout = timeout;
        this.address = address;
        this.accessToken = accessToken;
        this.invokeCallback = invokeCallback;
        this.invokerFactory = invokerFactory;

        // valid
        if (this.netType==null) {
            throw new XxlRpcException("xxl-rpc reference netType missing.");
        }
        if (this.serializer==null) {
            throw new XxlRpcException("xxl-rpc reference serializer missing.");
        }
        if (this.callType==null) {
            throw new XxlRpcException("xxl-rpc reference callType missing.");
        }
        if (this.loadBalance==null) {
            throw new XxlRpcException("xxl-rpc reference loadBalance missing.");
        }
        if (this.iface==null) {
            throw new XxlRpcException("xxl-rpc reference iface missing.");
        }
        if (this.timeout < 0) {
            this.timeout = 0;
        }
        if (this.invokerFactory == null) {
            this.invokerFactory = XxlRpcInvokerFactory.getInstance();
        }

        // init Client
//        initClient();
    }

 

步驟2:動態代理的准備函數

//利用動態代理,得到請求方法返回的參數
    public Object getOnject(){

        //直接返回動態代理的結果
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),    //這里使用的類加載器是當前線程的類加載器
                new Class[]{iface},                  //指明需要代理的類的接口名稱
                new InvocationHandler() {             //建立一個內部類,相當於繼承了InvocationHandler接口,這里寫的需要代理的內容
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
}
}

 

步驟3:得到resquest所需要的參數

//先得到要請求的類和方法的全部信息,通過這些信息封裝成request發送給服務端
                        String className = method.getDeclaringClass().getName();//類名,包括包名
                        String varsion_ = version;//版本信息
                        String methodName = method.getName();//方法名稱
                        Class<?>[] parameterTypes = method.getParameterTypes();//方法參數的類型
                        Object[] parameters = args;//方法的參數

                        //得到請求地址,
                        //可能參數中已經傳遞了請求地址,那么請求就按傳遞的地址請求,也有可能沒有傳遞請求地址,
                        // 但注冊中心可能已經為該請求的類准備了很多的請求地址,那么就需要用到負載均衡

                        String finalAddress=address;
                        if(finalAddress==null ||finalAddress.trim().length()==0){
                            //沒有傳遞過來地址,則需要到注冊中心去找
                            //這里需要創建一個工廠類,提供一些需要的方法,包括生成key值,獲取地址等函數
                            if(invokeCallback!=null || invokerFactory.getServiceRegistry()!=null){
                                //如果這個工廠類不為空,而且通過工廠類能得到注冊中心,那么就可以取地址了
                                //得到存儲的key值
                                String key = XxlRpcProviderFactory.makeServiceKey(className, varsion_);
                                //得到的地址是一個set集合
                                TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(key);

                                if (addressSet==null || addressSet.size()==0) {   //沒有得到請求地址
                                    // pass
                                } else if (addressSet.size()==1) {  //得到一個請求地址,那么就是這一個
                                    finalAddress = addressSet.first();
                                } else {   //得到很多請求地址,則需要負載均衡  負載均衡的代碼之后再說
                                    finalAddress = loadBalance.xxlRpcInvokerRouter.route(key, addressSet);
                                }
                            }
                        }
                        //如果最終還是沒有得到請求地址,則拋出異常
                        if (finalAddress==null || finalAddress.trim().length()==0) {
                            throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
                        }

                        //現在就可以封裝request了
                        XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
                        xxlRpcRequest.setRequestId(UUID.randomUUID().toString());//隨機的request的id,根據這個唯一的id可以在同步調用時候拿到正確的值
                        xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());//創建時間,傳到服務端與那里的時間相比,如果大於超時時間則拋出異常
                        xxlRpcRequest.setAccessToken(accessToken);//token值如果有,會與服務端的對比
                        xxlRpcRequest.setClassName(className);//請求類名
                        xxlRpcRequest.setMethodName(methodName);//方法名
                        xxlRpcRequest.setParameterTypes(parameterTypes);
                        xxlRpcRequest.setParameters(parameters);

 

步驟3:同步調用的實現

//如果是使用同步調用,在netty底層的調用是異步的,異步的意思就是,如果發起多個請求,那么會無法判斷返回的響應是否是請求所得
// 因此,需要在請求發送過去之后,阻塞發送請求,直到請求得到響應之后,才能繼續發送請求。要實現這樣的功能,要用到

//future的get方法。

netty底層的nio模型,客戶端只管發,而服務端只管收,然后響應。但具體請求和響應之間不是同步的,因此會出現如法獲取到真實的響應的情況。

比如:以下就是異步返回的值:

null
null
UserDTO{name='liu1', word='hihihi'}
UserDTO{name='liu3', word='hihihi'}
UserDTO{name='liu4', word='hihihi'}
UserDTO{name='liu4', word='hihihi'}
UserDTO{name='liu4', word='hihihi'}
UserDTO{name='liu5', word='hihihi'}
UserDTO{name='liu6', word='hihihi'}
UserDTO{name='liu7', word='hihihi'}
UserDTO{name='liu7', word='hihihi'}
UserDTO{name='liu9', word='hihihi'}
UserDTO{name='liu11', word='hihihi'}
UserDTO{name='liu12', word='hihihi'}
UserDTO{name='liu13', word='hihihi'}
UserDTO{name='liu15', word='hihihi'}
UserDTO{name='liu16', word='hihihi'}
UserDTO{name='liu16', word='hihihi'}
UserDTO{name='liu17', word='hihihi'}
UserDTO{name='liu18', word='hihihi'}
UserDTO{name='liu19', word='hihihi'}
UserDTO{name='liu19', word='hihihi'}
UserDTO{name='liu21', word='hihihi'}
UserDTO{name='liu22', word='hihihi'}
UserDTO{name='liu23', word='hihihi'}
UserDTO{name='liu24', word='hihihi'}

 

可以看出,剛開始甚至返回了null,而且每次返回的值都可能重復。因此實現同步很有必要

同步實現的邏輯為:需要在請求發送過去之后,阻塞發送請求,直到請求得到響應之后,才能繼續發送請求

這里利用了Future實現類的get方法來對響應返回進行阻塞。

 

對future類的理解:

Future的核心思想是:一個方法f,計算過程可能非常耗時,等待f返回,顯然不明智。可以在調用f的時候,立馬返回一個Future,可以通過Future這個數據結構去控制方法f的計算過程。

這里的控制包括:

get方法:獲取計算結果(如果還沒計算完,也是必須等待的)

cancel方法:還沒計算完,可以取消計算過程

isDone方法:判斷是否計算完

isCancelled方法:判斷計算是否被取消 

因此,我們可以創建一個response的future實現類,在這個方法中利用get方法獲取response,如果沒有獲取到,get方法會阻塞掉,線程在這里阻塞,因此不會繼續發送請求了。當get方法完成了任務(即得到了resopnse),就可以把阻塞掉的線程喚醒,那么就可以繼續發送請求了。

synchronized關鍵字的使用:可以讓其修飾代碼塊、方法和靜態方法,被修飾的語句,只能允許一個線程執行,在該線程被執行完之前,其他線程都無法執行該代碼塊。它是一個互斥鎖。需要線程去申請synchronized括號里的對象鎖,只有申請到了才能執行。

 

接下來我們就編寫Future的實現類FutureResponse類,使得其在獲取response的時候,先將獲取response的線程阻塞,同時釋放鎖,當resopnse被設置了值之后,再去喚醒鎖,讓他去參與競爭,去拿到resopnse對象返回。

public class XxlRpcFutureResponse implements Future<XxlRpcResponse> {


    //這里實現了一個future類,並讓其監聽resopnse對象,如果response存在,才能返回response對象
    //可以看到,在繼承了該接口后,需要實現這些方法。
    //分析了那些需要實現的方法,可以總結出實現的邏輯

    //首先需要知道什么時候response對象已經存在了,需要定義一個設置response對象的方法,如果該方法執行了,代表該方法已經執行了
    //1.設置一個標志位,二設置一個鎖,get線程只有拿到鎖對象才能去返回鎖,在response存在之前,get線程被阻塞。這就是get的邏輯
    // future lock
    private boolean done = false;
    private Object lock = new Object();

    private XxlRpcResponse response;
    public void setResponse(XxlRpcResponse response){
        this.response=response;
        //方法執行到這里,說明response已經存在了,需要告訴get方法,
        synchronized (lock){
            done=true;
            lock.notifyAll();
        }
    }
    //還沒計算完成,可以取消計算結果
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }
    //判斷是否計算被取消,若cancel返回了true就取消了
    @Override
    public boolean isCancelled() {
        return false;
    }
    //判斷是否計算完
    @Override
    public boolean isDone() {
        return done;
    }
    //獲取計算結果,如果沒有計算完成,則在這里等到,並阻塞
    @Override
    public XxlRpcResponse get() throws InterruptedException, ExecutionException {
        //具體get邏輯在下面的get方法中
        try {
            return get(-1,TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
    //獲取計算結果,如果沒有計算完,去看是否超時,如果超時了就不等了
    @Override
    public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if(!done){
            //get
            //獲取response對象的線程運行到這里,會去獲取lock的鎖對象,如果此時lock的鎖對象正被set方法的線程拿着,那么get需要等待,當鎖釋放以后,get方法會拿到這個鎖
            //並且去根據超時數判斷需要阻塞的時長,當get的線程阻塞了之后,會釋放鎖資源,讓set方法拿到。如果set方法執行了,done會變true ,那么直接返回response即可
            synchronized (lock){
                if(timeout<0){//若超時數為-1,則一則阻塞下去,等待喚醒
                    lock.wait();
                }else {
                    long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);//計算需要阻塞的時間
                    lock.wait();
                }
            }
        }
        //如果超時時間已經過去,還沒有獲取到對象,那么就拋出異常
        if(!done){
            throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() );
        }

        return response;
    }
XxlRpcFutureResponse

 設置response的方法,需要在哪里執行呢,當handle類的read方法,拿到響應后,可以設置response,因此,設置response的線程是netty的io線程執行的。這里將set方法放到工廠里去執行。

package com.xxl.rpc.remoting.invoker;

import com.xxl.rpc.registry.ServiceRegistry;
import com.xxl.rpc.remoting.net.params.XxlRpcFutureResponse;
import com.xxl.rpc.remoting.net.params.XxlRpcResponse;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class XxlRpcInvokerFactory {

    private static volatile XxlRpcInvokerFactory instance = new XxlRpcInvokerFactory();
    public static XxlRpcInvokerFactory getInstance() {
        return instance;
    }

    private ServiceRegistry serviceRegistry;
    public ServiceRegistry getServiceRegistry() {
        return serviceRegistry;
    }

    //要想設置response,需要給一個futureresponse的類,這里設置一個map集合,將futureresponse類存在這里,從這里取,防止每次請求都創建
    private ConcurrentMap<String, XxlRpcFutureResponse> futureResponsePool = new ConcurrentHashMap<String, XxlRpcFutureResponse>();

    //設置一個初始化函數,在初始化的時候,將XxlRpcFutureResponse類放到池子里
    public void setInvokerFuture(String requestId, XxlRpcFutureResponse futureResponse){
        futureResponsePool.put(requestId, futureResponse);
    }
    public void removeInvokerFuture(String requestId){
        futureResponsePool.remove(requestId);
    }

    public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){
        //先得到XxlRpcFutureResponse的類對象
        final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
        //如果沒有就不執行
        if (futureResponse == null) {
            return;
        }
        //給futureResponse設置response對象
        futureResponse.setResponse(xxlRpcResponse);
    }
}
notifyInvokerFuture

 

上面的方法被放在handle類的read方法里面,由nio線程調用。

這樣,就完成了一個同步的方法。

package com.xxl.rpc.remoting.invoker.reference;

import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.xxl.rpc.remoting.invoker.call.CallType;
import com.xxl.rpc.remoting.invoker.route.LoadBalance;
import com.xxl.rpc.remoting.net.NetEnum;
import com.xxl.rpc.remoting.net.params.XxlRpcFutureResponse;
import com.xxl.rpc.remoting.net.params.XxlRpcRequest;
import com.xxl.rpc.remoting.net.params.XxlRpcResponse;
import com.xxl.rpc.remoting.provider.XxlRpcProviderFactory;
import com.xxl.rpc.serialize.Serializer;
import com.xxl.rpc.util.XxlRpcException;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

public class XxlRpcReferenceBean {


    //該類就封裝了動態代理的getObject方法
    //首先需要傳入必要的參數

    private NetEnum netType;
    private Serializer serializer;
    private CallType callType;
    private LoadBalance loadBalance;

    private Class<?> iface;
    private String version;

    private long timeout = 1000;

    private String address;
    private String accessToken;


    private XxlRpcInvokerFactory invokerFactory;


    /**
     *
     * @param netType       使用的底層通信模塊
     * @param serializer    序列化方式
     * @param callType      調用方式,有四種
     * @param loadBalance   負載均衡的方式
     * @param iface          接口名
     * @param version        版本
     * @param timeout        超時時間
     * @param address        請求地址
     * @param accessToken    接入token
//     * @param invokeCallback    回調的調用(可無)
     * @param invokerFactory    調用工廠(可無)
     */
    public XxlRpcReferenceBean(NetEnum netType,
                               Serializer serializer,
                               CallType callType,
                               LoadBalance loadBalance,
                               Class<?> iface,
                               String version,
                               long timeout,
                               String address,
                               String accessToken,
//                               XxlRpcInvokeCallback invokeCallback,
                               XxlRpcInvokerFactory invokerFactory
    ){
        this.netType = netType;
        this.serializer = serializer;
        this.callType = callType;
        this.loadBalance = loadBalance;
        this.iface = iface;
        this.version = version;
        this.timeout = timeout;
        this.address = address;
        this.accessToken = accessToken;
//        this.invokeCallback = invokeCallback;
        this.invokerFactory = invokerFactory;

        // valid
        if (this.netType==null) {
            throw new XxlRpcException("xxl-rpc reference netType missing.");
        }
        if (this.serializer==null) {
            throw new XxlRpcException("xxl-rpc reference serializer missing.");
        }
        if (this.callType==null) {
            throw new XxlRpcException("xxl-rpc reference callType missing.");
        }
        if (this.loadBalance==null) {
            throw new XxlRpcException("xxl-rpc reference loadBalance missing.");
        }
        if (this.iface==null) {
            throw new XxlRpcException("xxl-rpc reference iface missing.");
        }
        if (this.timeout < 0) {
            this.timeout = 0;
        }
        if (this.invokerFactory == null) {
            this.invokerFactory = XxlRpcInvokerFactory.getInstance();
        }

        // init Client
//        initClient();
    }

    //利用動態代理,得到請求方法返回的參數
    public Object getOnject() {

        //直接返回動態代理的結果
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),    //這里使用的類加載器是當前線程的類加載器
                new Class[]{iface},                  //指明需要代理的類的接口名稱
                new InvocationHandler() {             //建立一個內部類,相當於繼承了InvocationHandler接口,這里寫的需要代理的內容
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        //下面是動態代理的調用的具體邏輯

                        //先得到要請求的類和方法的全部信息,通過這些信息封裝成request發送給服務端
                        String className = method.getDeclaringClass().getName();//類名,包括包名
                        String varsion_ = version;//版本信息
                        String methodName = method.getName();//方法名稱
                        Class<?>[] parameterTypes = method.getParameterTypes();//方法參數的類型
                        Object[] parameters = args;//方法的參數

                        //得到請求地址,
                        //可能參數中已經傳遞了請求地址,那么請求就按傳遞的地址請求,也有可能沒有傳遞請求地址,
                        // 但注冊中心可能已經為該請求的類准備了很多的請求地址,那么就需要用到負載均衡

                        String finalAddress = address;
                        if (finalAddress == null || finalAddress.trim().length() == 0) {
                            //沒有傳遞過來地址,則需要到注冊中心去找
                            //這里需要創建一個工廠類,提供一些需要的方法,包括生成key值,獲取地址等函數
                            if (invokerFactory != null || invokerFactory.getServiceRegistry() != null) {
                                //如果這個工廠類不為空,而且通過工廠類能得到注冊中心,那么就可以取地址了
                                //得到存儲的key值
                                String key = XxlRpcProviderFactory.makeServiceKey(className, varsion_);
                                //得到的地址是一個set集合
                                TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(key);

                                if (addressSet == null || addressSet.size() == 0) {   //沒有得到請求地址
                                    // pass
                                } else if (addressSet.size() == 1) {  //得到一個請求地址,那么就是這一個
                                    finalAddress = addressSet.first();
                                } else {   //得到很多請求地址,則需要負載均衡  負載均衡的代碼之后再說
                                    finalAddress = loadBalance.xxlRpcInvokerRouter.route(key, addressSet);
                                }
                            }
                        }
                        //如果最終還是沒有得到請求地址,則拋出異常
                        if (finalAddress == null || finalAddress.trim().length() == 0) {
                            throw new XxlRpcException("xxl-rpc reference bean[" + className + "] address empty");
                        }

                        //現在就可以封裝request了
                        XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
                        xxlRpcRequest.setRequestId(UUID.randomUUID().toString());//隨機的request的id,根據這個唯一的id可以在同步調用時候拿到正確的值
                        xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());//創建時間,傳到服務端與那里的時間相比,如果大於超時時間則拋出異常
                        xxlRpcRequest.setAccessToken(accessToken);//token值如果有,會與服務端的對比
                        xxlRpcRequest.setClassName(className);//請求類名
                        xxlRpcRequest.setMethodName(methodName);//方法名
                        xxlRpcRequest.setParameterTypes(parameterTypes);
                        xxlRpcRequest.setParameters(parameters);

                        //接下來就是調用的邏輯了,四種調用方式用if->else來選擇
                        if (CallType.SYNC == callType) {
                            //如果是使用同步調用,在netty底層的調用是異步的,異步的意思就是,如果發起多個請求,那么會無法判斷返回的響應是否是請求所得,
                            // 因此,需要在請求發送過去之后,阻塞發送請求,直到請求得到響應之后,才能繼續發送請求。要實現這樣的功能,要用到
                            //future的get方法。
                            //這里先創建future對象,並初始化
                            XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest);

                            //然后發送請求
                            //執行send方法,這里還沒有實現
                            //然后接收響應
                            //通過get方法的阻塞來同步響應
                            XxlRpcResponse response = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
                            if (response.getErrorMsg() != null) {
                                throw new XxlRpcException(response.getErrorMsg());
                            }
                            return response.getResult();
                            //以上就是同步調用的邏輯
                        }
                        return null;
                    }
                });

    }




}
getOnject

 

 

步驟4:Future異步調用的實現:、

在上面同步的代碼中,使用get方法阻塞實現了線程的同步,那么如果用戶想自己規定get resopnse的時機,而不是直接返回,就需要future模式,再創建一個調用的future,然后設置一個ThreadLocal,將值存起來,等需要的時候直接獲取就可以了。

具體實現,是在上面同步的基礎上,定義了一個調用Future,並通過get方法來得到futureresponse對象,這里存的是response,如何取到response的邏輯和上面一致。但不同的是,設置了一個ThreadLocal集合

ThreadLocal為每個使用該變量的線程提供獨立的變量副本,所以每一個線程都可以獨立地改變自己的副本,而不會影響其它線程所對應的副本。

在ThreadLocal類中有一個Map,用於存儲每一個線程的變量副本,Map中元素的鍵為ThreadLocal對象,而值對應線程的變量副本。這個map是thread類的一個私有屬性,所以可以通過線程獲取該map的值。

XxlRpcInvokeFuture類

public class XxlRpcInvokeFuture implements Future {

    //首先需要將futureResponse引進來,要獲取response對象,需要從futureResponse中get。
    private XxlRpcFutureResponse futureResponse;

    public XxlRpcInvokeFuture(XxlRpcFutureResponse futureResponse) {
        this.futureResponse = futureResponse;
    }
    public void stop(){
        // remove-InvokerFuture
    }
    //實現的方法都從futureResponse調用
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return futureResponse.cancel(mayInterruptIfRunning);
    }

    @Override
    public boolean isCancelled() {
        return futureResponse.isCancelled();
    }

    @Override
    public boolean isDone() {
        return futureResponse.isDone();
    }

    @Override
    public Object get() throws InterruptedException, ExecutionException {
        try {
            return get(-1, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            throw new XxlRpcException(e);
        }
    }

    @Override
    public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        try {
            // future get
            XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, unit);
            if (xxlRpcResponse.getErrorMsg() != null) {
                throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
            }
            return xxlRpcResponse.getResult();
        } finally {
            stop();
        }
    }

    //創建一個ThreadLocal線程,來綁定invokerfuture的線程
    private static ThreadLocal<XxlRpcInvokeFuture> threadInvokerFuture = new ThreadLocal<XxlRpcInvokeFuture>();

    //將future保存到線程中去,相當於一個線程副本
    public static void setFuture(XxlRpcInvokeFuture future) {
        threadInvokerFuture.set(future);
    }

    public static void removeFuture() {
        threadInvokerFuture.remove();
    }

    //獲取Future對象,然后將線程移除掉,如果不移除,會導致內存泄露
    public static <T> Future<T> getFuture(Class<T> type) {
        Future<T> future = (Future<T>) threadInvokerFuture.get();
        threadInvokerFuture.remove();
        return future;
    }




}
XxlRpcInvokeFuture

 

else if(CallType.FUTURE==callType){
                            //使用get方法阻塞實現了線程的同步,那么如果用戶想自己規定get resopnse的時機,
                            // 而不是直接返回,就需要future模式,再創建一個調用的future,然后設置一個ThreadLocal,
                            // 將值存起來,等需要的時候直接獲取就可以了。
                            //因此,我們需要創建一個調用future類,並將該類的線程交個threadlocal去執行

                            //先創建futureResponse對象
                            XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest);
                            //然后將futureResponse對象加到調用future類中,保存到線程里面去
                            XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
                            XxlRpcInvokeFuture.setFuture(invokeFuture);

                            //發送請求
                            //發送完之后,不用返回,而是等什么時候用,什么時候調用getFuture得到調用future,然后再執行get方法,獲取response對象
                            return null;
                        }

 

對於每一次請求都會set一個invokerFuture對象,並將其存入ThreadLocal中,它是一個map,因此,依照順序存入有得到。依照隊列的形式存取

 @Test
    public void futureTest() throws ExecutionException, InterruptedException {

        XxlRpcReferenceBean bean = new XxlRpcReferenceBean(NetEnum.NETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(),
                CallType.FUTURE, LoadBalance.ROUND, IDemo.class, null, 500, "127.0.0.1:7080",
                null, null, null);
        //得到返回對象
        IDemo demo =(IDemo) bean.getObject();

        demo.sayHi("liu:");
        Future<UserDTO> future = XxlRpcInvokeFuture.getFuture(UserDTO.class);
        demo.sayHi("xing");
        Future<UserDTO> future1 = XxlRpcInvokeFuture.getFuture(UserDTO.class);

        UserDTO s1 = future.get();
        UserDTO s2 = future1.get();
        System.out.println(s2);
        System.out.println(s1);

    }

 

測試結果如下:

UserDTO{name='xing', word='hihihi'}
UserDTO{name='liu:', word='hihihi'}

 

 

步驟:5:callback方式的調用

callback調用方式使用了回調的理念,即當事件發生時會觸發調用函數,並執行。那么就需要一個回調的類,

與future的調用方法相似,創建一個回調的類,並在調用的時候,將該類加到ThreadLocal里面,然后,回調函數如果判斷到回調類存在了(即運行了set方法),就執行回調的操作,因此回調函數的執行是在獲取到response之后,即放在工廠類的notify里面。

public abstract class XxlRpcInvokeCallback<T> {

    //設置一個抽象的回調類,定義了回調函數執行的方法名,具體實現在滿足條件的時候再調用
    //兩個抽象方法,規定了在成功和失敗時調用的方法
    public abstract void onSuccess(T result);

    public abstract void onFailure(Throwable exception);

    //與InvokerFuture類似,設置一個ThreadLocal,在調用的時候設置這個回調類,然后去取
    private static ThreadLocal<XxlRpcInvokeCallback> threadInvokerFuture = new ThreadLocal<XxlRpcInvokeCallback>();

    public static XxlRpcInvokeCallback getCallback() {
        XxlRpcInvokeCallback invokeCallback = threadInvokerFuture.get();
        threadInvokerFuture.remove();
        return invokeCallback;
    }
    public static void setCallback(XxlRpcInvokeCallback invokeCallback) {
        threadInvokerFuture.set(invokeCallback);
    }
    public static void removeCallback() {
        threadInvokerFuture.remove();
    }

}
XxlRpcInvokeCallback

 

在工廠類中執行回調函數,是執行的io的線程,因此,需要給他設置一個線程池,來防止將回調邏輯都放到io線程里面去。

這里繼續利用線程池額執行類

   public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){
        //先得到XxlRpcFutureResponse的類對象
        final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
        //如果沒有就不執行
        if (futureResponse == null) {
            return;
        }
        //如果回調類不為空,即得到了回調類,那么就開啟線程池來執行操作
        if(futureResponse.getInvokeCallback()!=null){

            //執行線程池的操作
            executeResponseCallback(new Runnable() {
                @Override
                public void run() {
                    if (xxlRpcResponse.getErrorMsg() != null) {
                        futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
                    } else {
                        System.out.println(Thread.currentThread());
                        futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
                    }
                }
            });
        }
        //給futureResponse設置response對象
        futureResponse.setResponse(xxlRpcResponse);
    }

    //使用線程池來執行
    private ThreadPoolExecutor responseCallbackThreadPool = null;  //設置一個線程池
    //配置線程池
    public void executeResponseCallback(Runnable runnable){
        if(responseCallbackThreadPool==null){
            synchronized (this){  //這里加一個鎖,執行這些操作的時候,不能操作其他的線程
                if (responseCallbackThreadPool == null) {
                    responseCallbackThreadPool = new ThreadPoolExecutor(
                            10,
                            100,
                            60L,
                            TimeUnit.SECONDS,
                            new LinkedBlockingQueue<Runnable>(1000),
                            new ThreadFactory() {
                                @Override
                                public Thread newThread(Runnable r) {
                                    return new Thread(r, "xxl-rpc, XxlRpcInvokerFactory-responseCallbackThreadPool-" + r.hashCode());
                                }
                            },
                            new RejectedExecutionHandler() {
                                @Override
                                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                    throw new XxlRpcException("xxl-rpc Invoke Callback Thread pool is EXHAUSTED!");
                                }
                            });        // default maxThreads 300, minThreads 60
                }
            }
        }
    }
else if(CallType.CALLBACK==callType){
                            //有了上面的分析,回調的實現很簡單了,將callback類設置到ThreadLocal里面去,然后在回調函數里面去取,取出來執行。
                            //不同點在於,執行的函數一般不能放在io的線程里面,因此,需要使用一個線程池來維護。
                            // get callback
                            XxlRpcInvokeCallback finalInvokeCallback=null;
                            XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
//                            System.out.println(threadInvokeCallback);
                            if (threadInvokeCallback != null) {
                                finalInvokeCallback = threadInvokeCallback;
                            }
                            if (finalInvokeCallback == null) {
                                throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null.");
                            }

                            // future-response set
                            XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);
                            //執行發送代碼
//                         client.asyncSend(finalAddress, xxlRpcRequest);

                            return null;
                        }
                        return null;
                    }
CALLBACK

 

 

步驟6:單一長連接

 因為NIO是非阻塞的,因此,每個客戶端只需要連接一次就可以了,這就是單一長連接,它的實現是一個連接池。將一個客戶端(adress不變)作為一個連接來維護,將該客戶端存到一個ConcurrentHashMap來保存,在進行連接的時候,去看這個ConcurrentHashMap里有沒有需要的連接,如果沒有,就需要創建,這里的過程是同步的,創建連接需要加鎖,

代碼如下:

public abstract class ConnectClient {

    //實現單一長連接的基礎
    //提供的接口方法

    //初始化方法
    public abstract void init(String address, final Serializer serializer, final XxlRpcInvokerFactory xxlRpcInvokerFactory) throws Exception;
    //關閉
    public abstract void close();
    //驗證
    public abstract boolean isValidate();
    //發送
    public abstract void send(XxlRpcRequest xxlRpcRequest) throws Exception ;


    //異步發送方法
    public static void asyncSend(XxlRpcRequest xxlRpcRequest, String address,
                                 Class<? extends ConnectClient> connectClientImpl,
                                 final XxlRpcReferenceBean xxlRpcReferenceBean) throws Exception {

        //這里提供一個從池子中取連接的方法
        ConnectClient clientPool = ConnectClient.getPool(address, connectClientImpl, xxlRpcReferenceBean);

        try {
            // do invoke
            clientPool.send(xxlRpcRequest);
        } catch (Exception e) {
            throw e;
        }
    }
    //使用ConcurrentHashMap來做一個池子,使用ConcurrentHashMap的原因是它實現了一個分段鎖機制,保證不同線程在put的時候不會阻塞
    private static volatile ConcurrentHashMap<String, ConnectClient> connectClientMap;
    //每個地址都會有一個鎖,來控制連接的創建,如果是相同的地址,要創建連接需要加鎖,否則會出現同一地址創建多個連接的情況。
    private static volatile ConcurrentHashMap<String, Object> connectClientLockMap = new ConcurrentHashMap<>();

    //獲取連接的代碼
    private static ConnectClient getPool(String address, Class<? extends ConnectClient> connectClientImpl,
                                         final XxlRpcReferenceBean xxlRpcReferenceBean) throws Exception {

        //先判斷這個池子是否為空,如果為空,則需要創建(這里為空的意思是null)
        //創建map需要用到雙檢鎖,保證創建的是一個單例
        if (connectClientMap == null) {
            synchronized (ConnectClient.class) {
                if (connectClientMap == null) {
                    // init
                    connectClientMap = new ConcurrentHashMap<String, ConnectClient>();
                    // stop callback
                    //這里不管他
                }
            }
        }

        //去驗證是不是存在客戶端,存在了就直接返回
        ConnectClient connectClient = connectClientMap.get(address);
        if (connectClient!=null && connectClient.isValidate()) {
            return connectClient;
        }

        //如果沒有客戶端連接,就需要去創建連接,但在此之前,需要構建一個鎖
        Object clientLock = connectClientLockMap.get(address);
        if (clientLock == null) {
            connectClientLockMap.putIfAbsent(address, new Object());//putIfAbsent保證了使用原來的鎖
            clientLock = connectClientLockMap.get(address);
        }

        //加鎖
        synchronized (clientLock) {

            //需要判斷是不是這個連接存活,存活就返回
            connectClient = connectClientMap.get(address);
            if (connectClient!=null && connectClient.isValidate()) {
                return connectClient;
            }

            // remove old
            if (connectClient != null) {
                connectClient.close();
                connectClientMap.remove(address);
            }

            // 創建連接,並加到池子里面,然后返回
            ConnectClient connectClient_new = connectClientImpl.newInstance();
            connectClientMap.put(address, connectClient_new);

            return connectClient_new;

        }

    }
ConnectClient

 

單一長連接是以上三種調用的基礎,將它作為一種調用其實不能符合邏輯。

 


 

以上,我們的調用就完成了,客戶端通信的連接代碼和服務端類似,不需要重復,該項目的精華部分上面已經全部搞定。

下面來對注冊中進行研究。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM