dubbo系列八、dubbo泛化和隐式参数记录


dubbo泛化分析

1.RPC的本质

dubbo是个rpc框架,本质就是网络通信,把客户端(consumer)信息编码为二进制流,通过网络发送,服务端(provider)接收到二进制流,进行解码,获取到所需的业务数据,进而进行业务处理。和socket通信、http调用的本质是一样。dubbo默认使用的dubbo协议进行通信,所谓协议,就是个双方约定的特定数据结构,客户端按照此协议结构进行编码,服务端按照此协议结构进行解码。协议通常有定长、边长、特殊字符结尾等各种类型协议,dubbo协议属于边长协议,dubbo协议的结构如下图(来源官方

dubbo_protocol_header

我们只关注上图的最后一行,协议内容部分,这部分也是实际的业务数据,内容部分包含:服务名、version、方法名、参数、参数类型、attachments(隐式参数)等。那么我们在只需要知道服务名、version、方法名、参数、参数类型的情况下,就可以进行调用服务端了,可以不需要对服务端接口进行代理,但是,为什么使用dubbo还要指定服务方的接口呢?原因是我们要在协议内设置接口方法名、参数、参数类型,没有接口,不知道要设置什么。因此对于调用端来说,服务接口的作用仅用于创建代理对象,基于面向接口开发而已。

通常开发中,我们针对每个接口dubbo给我们实现了一个对应的代理,那么如果想在一个调用端,根据传入的接口和方法进行调用服务呢(比如最常见的网关),基于这种情况,Dubbo 定义了一个统一的接口 GenericSerive ,调用端在使用该接口的代理时指定相关调用信息即可。

2.泛化接口GenericSerive

2.1.泛化使用例子

泛化通常不在服务端使用,基本在consumer端使用,比如最常见的网关,下面举例说明下泛化的使用

//spring service方法
@Override
public Result<Map> getProductGeneric(ProductDTO dto) {
    ReferenceConfigCache referenceCache = ReferenceConfigCache.getCache();//使用ReferenceConfigCache缓存ReferenceConfig,否则每此请求都会创建一个ReferenceConfig,并在zk注册节点,最终可能导致zk节点过多影响性能次,最终可能导致zk宕机

    ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>();
    ApplicationConfig application = new ApplicationConfig();
    application.setName("pangu-client-consumer-generic");
    // 连接注册中心配置
    RegistryConfig registry = new RegistryConfig();
    registry.setAddress("zookeeper://127.0.0.1:2181");

    // 服务消费者缺省值配置
    ConsumerConfig consumer = new ConsumerConfig();
    consumer.setTimeout(10000);
    consumer.setRetries(0);

    reference.setApplication(application);
    reference.setRegistry(registry);
    reference.setConsumer(consumer);
    reference.setInterface(org.pangu.api.ProductService.class); // 弱类型接口名  实际网关中接口是传入的
    //        reference.setVersion("");
    //        reference.setGroup("");
    reference.setGeneric(true); // 声明为泛化接口
    GenericService svc = referenceCache.get(reference);//referenceCache.get方法中会缓存 Reference对象,并且调用ReferenceConfig.get方法启动ReferenceConfig

    Object target = svc.$invoke("findProduct", new String[]{ProductDTO.class.getName()}, new Object[]{dto});//实际网关中,方法名、参数类型、参数是作为参数传入

    return Result.success((Map)target);

}

上述代码很简单,请求服务端的org.pangu.api.ProductService#findProduct方法,使用的泛型,这里只是特例,实际接口、方法、方法参数等都是动态由方法传入的。注意:这里要使用ReferenceConfigCache或进行缓存,否则每次请求执行一次getProductGeneric方法,就会在zk上创建一个consumer节点,最终会导致zk节点过多导致zk性能问题,严重会导致zk宕机,之前遇到过有别的团队使用泛化不进行缓存reference导致zk节点过多导致zk故障问题。

在代码referenceCache.get(reference);缓存并获取reference,reference不存在,则创建reference,创建流程和dubbo启动过程中创建是一样的,也是在zk创建consumers节点,并注册监听目标服务org.pangu.api.ProductService的configurators、providers、routers节点,这些节点变化,也会通知refreshInvoker操作。

说明: 泛化引用虽然会将服务接口类型设置为 GenericService ,但是并不影响服务发现。

2.2.泛化实现原理

consumer泛化,会在filter chain增加GenericImplFilter,泛化的具体实现原理是由consumer端的GenericImplFilter和provider端GenericFilter共同实现。

GenericImplFilter

@Activate(group = Constants.CONSUMER, value = Constants.GENERIC_KEY, order = 20000)
public class GenericImplFilter implements Filter {

    private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class);

    private static final Class<?>[] GENERIC_PARAMETER_TYPES = new Class<?>[]{String.class, String[].class, Object[].class};

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY);//获取url上的generic值,泛化调用是true
        if (ProtocolUtils.isGeneric(generic)
                && !Constants.$INVOKE.equals(invocation.getMethodName())//调用的方法名非$invoke,返回调用不匹配,因此该分支返回调用不执行
                && invocation instanceof RpcInvocation) {
            //省略本分支的其它逻辑,因为泛化调用不执行
        }

        if (invocation.getMethodName().equals(Constants.$INVOKE)//请求方法名是$invoke
                && invocation.getArguments() != null
                && invocation.getArguments().length == 3//返回调用只有三个参数。参数1:方法名,参数2:参数类型,参数3:参数值
                && ProtocolUtils.isGeneric(generic)) {

            Object[] args = (Object[]) invocation.getArguments()[2];//获取参数值
            //其它逻辑忽略

            ((RpcInvocation) invocation).setAttachment(
                    Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY));//把泛化标识放到RpcInvocationd的attachment内,上送给provider端
        }
        return invoker.invoke(invocation);//invoker调用
    }

    //其它忽略

}

看逻辑很简单,只是给RpcInvocationd的attachment内存放了返回标识而已,并没有其它多余逻辑处理

接着看服务端的泛化处理

GenericFilter

@Activate(group = Constants.PROVIDER, order = -20000)
public class GenericFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
        if (inv.getMethodName().equals(Constants.$INVOKE)
                && inv.getArguments() != null
                && inv.getArguments().length == 3
                && !invoker.getInterface().equals(GenericService.class)) {
            String name = ((String) inv.getArguments()[0]).trim();//目标服务的方法名
            String[] types = (String[]) inv.getArguments()[1];//目标服务的方法的参数类型
            Object[] args = (Object[]) inv.getArguments()[2];//目标服务的方法参数
            try {
                Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);//反射调用看目标服务是否有此方法,没有抛出NoSuchMethodException
                Class<?>[] params = method.getParameterTypes();//获取目标方法的参数类型
                if (args == null) {
                    args = new Object[params.length];
                }
                String generic = inv.getAttachment(Constants.GENERIC_KEY);//这里获取的就是GenericImplFilter放置到RpcInvocationd的attachment的泛化标识

                if (StringUtils.isBlank(generic)) {
                    generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
                }

                if (StringUtils.isEmpty(generic)
                        || ProtocolUtils.isDefaultGenericSerialization(generic)) {//泛化方法
                    args = PojoUtils.realize(args, params, method.getGenericParameterTypes());//代码@1 realize操作是把map参数转换为POJO对象
                } else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                    //忽略不需要的分支逻辑
                } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                    //忽略不需要的分支逻辑
                }
                Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));//代码@2 重写创建RpcInvocation的原因是,旧的inv方法是$invoke,而此时需要去调用目标服务方法了,因此需要使用目标服务的方法和参数重建RpcInvocation,用于找到目标服务执行
                if (result.hasException()
                        && !(result.getException() instanceof GenericException)) {
                    return new RpcResult(new GenericException(result.getException()));
                }
                RpcResult rpcResult;
                if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                   //忽略不需要的分支逻辑
                } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                   //忽略不需要的分支逻辑
                } else {
                    rpcResult = new RpcResult(PojoUtils.generalize(result.getValue()));//代码@3 PojoUtils.generalize把目标方法执行结果转换为HashMap,因此consumer端泛化调用获取的结果都是HashMap
                }
                rpcResult.setAttachments(result.getAttachments());
                return rpcResult;
            } catch (NoSuchMethodException e) {
                throw new RpcException(e.getMessage(), e);
            } catch (ClassNotFoundException e) {
                throw new RpcException(e.getMessage(), e);
            }
        }
        return invoker.invoke(inv);//非泛化调用
    }
}

GenericFilter的逻辑就是:先判断目标服务是否存在调用的目标方法,接着使用PojoUtils.realize把方法参数由HashMap转换为POJO(如果参数是Map类型情况下就转换),接着以目标方法重写创建RpcInvocation,进行invoker调用,invoker调用结束,使用PojoUtils.generalize把调用结果转换为HashMap,把调用方的隐式参数保存到调用结果,最后返回。

其中重要的步骤就是:

代码@1:PojoUtils.realize是把参数由Map转换为POJO,如果参数类型是POJO,则不进行转换。这样做的原因是防止请求的参数是map。

代码@2:这里为什么需要重建RpcInvocation呢?因为inv里面的方法是$invoke,并非调用的目标方法,因此需要重建。那么RpcInvocation是个什么呢?它就是个方法名、方法类型、方法参数的封装而已。invoker的调用,参数定为需要RpcInvocation,即需要知道使用的参数类型和参数调用的方法。

代码@3:result.getValue()是调用结果,PojoUtils.generalize是把调用结果由POJO转换为HashMap,因此consumer端泛化调用获取的结果都是HashMap。

这里比较有用的工具类PojoUtils。

2.3.总结

Dubbo 泛化调用实际是在 Filter 过滤链上执行的序列化(PojoUtils.realize provider端处理完毕后,序列化为hashmap)和反序列化(PojoUtils.generalize consumer接收后反序列化为对象)操作,服务端通过调用端传递的调用信息反射获取对应的服务方法,进而进行服务调用。

Dubbo 泛化本质就是提供了个用于穿透的架子,让各种未知服务都可以套用。具体实现就是消费端GenericImplFilter设置泛化标识(其实没啥具体作用,不会进行PojoUtils序列化),服务提供端GenericFilter重新设置RpcInvocation,包装了目标服务和方法名,用于查找到目标服务进行调用;目标服务调用完毕后,把调用结果使用PojoUtils进行序列化为HashMap,这样消费端收到的结果都是HashMap类型。 网上所说的在GenericImplFilter进行序列化为HashMap,服务提供端收到后GenericFilter反序列为目标对象参数,这个说法是不对的。

从功能上来看,泛化调用提供了在没有接口依赖情况下进行的解决方案,丰富框架的使用场景。

从设计上来看,泛化调用的功能还是通过扩展的方式实现的,侵入性不强,值得学习借鉴。

3.隐式参数

在分析dubbo泛化时候,GenericImplFilter内把泛化标识放到RpcInvocationd的attachment内,这样在provider端的GenericFilter通过inv.getAttachment(Constants.GENERIC_KEY)获取泛化标识。这个是如何传送过来的呢?

还是先看dubbo协议,dubbo协议的业务数据内,除了接口、方法、方法参数类型、方法参数,还有attachments,attachments在dubbo中成为隐式参数,即不在接口方法参数内,但是,可以随着请求上送和响应。原理还是放在了协议里面。那么dubbo具体是如何实现的呢?

3.1.consumer端请求设置attachments

隐式参数设置有两种

  1. 业务代码中 RpcContext.getContext().setAttachment("xxx", "xxx");

  2. RpcInvocation.setAttachment(String, String) 或者 RpcInvocation.addAttachments(Map<String, String>) 保存到RpcInvocation.attachments

对于第一种,是把隐式参数设置到了RpcContext.attachments上,接着在FailoverClusterInvoker#invoke操作中会把RpcContext.attachments保存到RpcInvocation.attachments,代码截图如下

image-20210726235215136

对于第二种,隐式参数直接设置到RpcInvocation.attachments

隐式参数设置了,那么肯定是在编码过程中设置到dubbo协议报文上,具体在DubboCodec.encodeRequestData(Channel, ObjectOutput, Object, String),如下图

image-20210817215106261

至此明白了consumer设置隐式参数,都会保存到RpcInvocation,继而在编码阶段对隐式参数编码写入到输出流。

那么provider端如何处理的呢?

首先肯定是要先解码了,dubbo中请求被封装为Request,响应是Response,其中Request.mData就是RpcInvocation,因此provider端通过解码后就获取到了RpcInvocation,当然也就获取到了consumer设置的attachments。

然后在ContextFilter内把consumer端上送的隐式参数RpcInvocation.attachments保存到RpcContext.getContext(),provider端获取隐式参数通过RpcContext.getContext().getAttachment("xxx"); 获取

3.2.provider端响应设置attachments

provider在目标对象调用执行完毕后,也可以在RpcResult设置attachments回传给consumer端,设置方式有两种

  1. RpcContext.getServerContext().setAttachment("xxx", "xxx"); 在ContextFilter内invoker调用后,会把RpcContext.getServerContext()合并到RpcResult

  2. RpcResult.setAttachments(Map<String, String>)

consumer端获取provider回传的隐式参数方法:RpcContext.getServerContext().getAttachment("xxx");

dubbo的响应是Response,其属性mResult是RpcResult,RpcResult有三个属性result-执行结果,exception-异常,attachments-provider端回传的隐式参数。那么在哪里把RpcResult.attachments保存到RpcContext.getServerContext()的呢? 答案是ConsumerContextFilter,在invoker调用完毕后,执行RpcContext.getServerContext().setAttachments(result.getAttachments());,注意RpcContext.getServerContext()没有clear的原因是内部先进行了clear操作。

3.3.隐式参数总结

隐式参数consumer->priovider:consumer端通过RpcContext.getContext().setAttachment("xxx", "xxx"); provider端RpcContext.getContext().getAttachment("xxx"); 获取

隐式参数priovider->consumer:provider端通过RpcContext.getServerContext().setAttachment("xxx", "xxx"); consumer端RpcContext.getServerContext().getAttachment("xxx"); 获取


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM