dubbo框架揭秘之服務引用


        ApplicationConfig config = new ApplicationConfig("hello-worldp");

        RegistryConfig reg = new RegistryConfig("127.0.0.1:2181");
        reg.setProtocol("zookeeper");
        
        ReferenceConfig<DemoService> refrence=new ReferenceConfig<DemoService>();
        refrence.setApplication(config);
        refrence.setRegistry(reg);
        refrence.setInterface(DemoService.class);
        refrence.setVersion("1.0");
        
        DemoService demo=refrence.get();
        System.out.println(demo.sayHello("selrain"));
        
        Thread.sleep(Integer.MAX_VALUE);

和服務發布一樣,不采用Spring配置的方式,手動編寫消費者的代碼。其中ReferenceConfig的get()方法獲取代理類。

public synchronized T get() {
        if (ref == null) {
            init();
        }
        return ref;
    }
    
private void init() {
        ...        
        if (isJvmRefer) {
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            if (url != null && url.length() > 0) { // 用戶指定URL,指定的URL可能是對點對直連地址,也可能是注冊中心URL
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // 通過注冊中心配置拼裝URL
                List<URL> us = loadRegistries(false);
                if (us != null && us.size() > 0) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls == null || urls.size() == 0) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName  + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }

            if (urls.size() == 1) {
                //注冊中心注冊消費者地址
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // 用了最后一個registry url
                    }
                }
                if (registryURL != null) { // 有 注冊中心協議的URL
                    // 對有注冊中心的Cluster 只用 AvailableCluster
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                }  else { // 不是 注冊中心的URL
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }
        ref = createProxy(map);
    }    
private T createProxy(Map<String, String> map) {
        ...
        // 創建服務代理
        return (T) proxyFactory.getProxy(invoker);
    }    

 

proxyFactory通過ExtensionLoader的getAdaptiveExtension方法初始化,為StubProxyFactoryWrapper

private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

看看StubProxyFactoryWrapper的getProxy方法,其調用了內部的proxyFactory的getProxy方法,而StubProxyFactoryWrapper在初始化的時候proxyFactory也一並初始化了,其值為JavassistProxyFactory。

 public <T> T getProxy(Invoker<T> invoker) throws RpcException {
        T proxy = proxyFactory.getProxy(invoker);
        ...
        return proxy;
    }

看看JavassistProxyFactory的getProxy方法:

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

Proxy: 創建代理類

public static Proxy getProxy(ClassLoader cl, Class<?>... ics)
    {
        if( ics.length > 65535 )
            throw new IllegalArgumentException("interface limit exceeded");
        
        StringBuilder sb = new StringBuilder();
        for(int i=0;i<ics.length;i++)
        {
            String itf = ics[i].getName();
            if( !ics[i].isInterface() )
                throw new RuntimeException(itf + " is not a interface.");

            Class<?> tmp = null;
            try
            {
                tmp = Class.forName(itf, false, cl);
            }
            catch(ClassNotFoundException e)
            {}

            if( tmp != ics[i] )
                throw new IllegalArgumentException(ics[i] + " is not visible from class loader");

            sb.append(itf).append(';');
        }

        // use interface class name list as key.
        String key = sb.toString();

        // get cache by class loader.
        Map<String, Object> cache;
        synchronized( ProxyCacheMap )
        {
            cache = ProxyCacheMap.get(cl);
            if( cache == null )
            {
                cache = new HashMap<String, Object>();
                ProxyCacheMap.put(cl, cache);
            }
        }

        Proxy proxy = null;
        synchronized( cache )
        {
            do
            {
                Object value = cache.get(key);
                if( value instanceof Reference<?> )
                {
                    proxy = (Proxy)((Reference<?>)value).get();
                    if( proxy != null )
                        return proxy;
                }

                if( value == PendingGenerationMarker )
                {
                    try{ cache.wait(); }catch(InterruptedException e){}
                }
                else
                {
                    cache.put(key, PendingGenerationMarker);
                    break;
                }
            }
            while( true );
        }

        long id = PROXY_CLASS_COUNTER.getAndIncrement();
        String pkg = null;
        ClassGenerator ccp = null, ccm = null;
        try
        {
            ccp = ClassGenerator.newInstance(cl);

            Set<String> worked = new HashSet<String>();
            List<Method> methods = new ArrayList<Method>();

            for(int i=0;i<ics.length;i++)
            {
                if( !Modifier.isPublic(ics[i].getModifiers()) )
                {
                    String npkg = ics[i].getPackage().getName();
                    if( pkg == null )
                    {
                        pkg = npkg;
                    }
                    else
                    {
                        if( !pkg.equals(npkg)  )
                            throw new IllegalArgumentException("non-public interfaces from different packages");
                    }
                }
                ccp.addInterface(ics[i]);

                for( Method method : ics[i].getMethods() )
                {
                    String desc = ReflectUtils.getDesc(method);
                    if( worked.contains(desc) )
                        continue;
                    worked.add(desc);

                    int ix = methods.size();
                    Class<?> rt = method.getReturnType();
                    Class<?>[] pts = method.getParameterTypes();
                    //為代理類實例添加方法,當我們調用sayHello()方法,執行這里為sayHello方法生成的代碼,即:handler.invoke()
                    StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                    for(int j=0;j<pts.length;j++)
                        code.append(" args[").append(j).append("] = ($w)$").append(j+1).append(";");
                    code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
                    if( !Void.TYPE.equals(rt) )
                        code.append(" return ").append(asArgument(rt, "ret")).append(";");

                    methods.add(method);        
                    ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
                }
            }

            if( pkg == null )
                pkg = PACKAGE_NAME;

            // create ProxyInstance class. 
            String pcn = pkg + ".proxy" + id;
            ccp.setClassName(pcn);
            ccp.addField("public static java.lang.reflect.Method[] methods;");
            ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
            ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{ InvocationHandler.class }, new Class<?>[0], "handler=$1;");
            ccp.addDefaultConstructor();
            Class<?> clazz = ccp.toClass();
            clazz.getField("methods").set(null, methods.toArray(new Method[0]));

            // create Proxy class. 
            String fcn = Proxy.class.getName() + id;
            ccm = ClassGenerator.newInstance(cl);
            ccm.setClassName(fcn);
            ccm.addDefaultConstructor();
            ccm.setSuperClass(Proxy.class);            
            ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
            Class<?> pc = ccm.toClass();
            proxy = (Proxy)pc.newInstance();
        }
        catch(RuntimeException e)
        {
            throw e;
        }
        catch(Exception e)
        {
            throw new RuntimeException(e.getMessage(), e);
        }
        ...
        return proxy;
    }

最終調用DubboInvoker 的doInvoke(Invocation)方法進行與服務器間通信

protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
        
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout) ;
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

 參考:

     你應該知道的RPC原理


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM