dubbo泛化分析
1.RPC的本质
dubbo是个rpc框架,本质就是网络通信,把客户端(consumer)信息编码为二进制流,通过网络发送,服务端(provider)接收到二进制流,进行解码,获取到所需的业务数据,进而进行业务处理。和socket通信、http调用的本质是一样。dubbo默认使用的dubbo协议进行通信,所谓协议,就是个双方约定的特定数据结构,客户端按照此协议结构进行编码,服务端按照此协议结构进行解码。协议通常有定长、边长、特殊字符结尾等各种类型协议,dubbo协议属于边长协议,dubbo协议的结构如下图(来源官方)
我们只关注上图的最后一行,协议内容部分,这部分也是实际的业务数据,内容部分包含:服务名、version、方法名、参数、参数类型、attachments(隐式参数)等。那么我们在只需要知道服务名、version、方法名、参数、参数类型的情况下,就可以进行调用服务端了,可以不需要对服务端接口进行代理,但是,为什么使用dubbo还要指定服务方的接口呢?原因是我们要在协议内设置接口方法名、参数、参数类型,没有接口,不知道要设置什么。因此对于调用端来说,服务接口的作用仅用于创建代理对象,基于面向接口开发而已。
通常开发中,我们针对每个接口dubbo给我们实现了一个对应的代理,那么如果想在一个调用端,根据传入的接口和方法进行调用服务呢(比如最常见的网关),基于这种情况,Dubbo 定义了一个统一的接口 GenericSerive ,调用端在使用该接口的代理时指定相关调用信息即可。
2.泛化接口GenericSerive
2.1.泛化使用例子
泛化通常不在服务端使用,基本在consumer端使用,比如最常见的网关,下面举例说明下泛化的使用
//spring service方法
@Override
public Result<Map> getProductGeneric(ProductDTO dto) {
ReferenceConfigCache referenceCache = ReferenceConfigCache.getCache();//使用ReferenceConfigCache缓存ReferenceConfig,否则每此请求都会创建一个ReferenceConfig,并在zk注册节点,最终可能导致zk节点过多影响性能次,最终可能导致zk宕机
ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>();
ApplicationConfig application = new ApplicationConfig();
application.setName("pangu-client-consumer-generic");
// 连接注册中心配置
RegistryConfig registry = new RegistryConfig();
registry.setAddress("zookeeper://127.0.0.1:2181");
// 服务消费者缺省值配置
ConsumerConfig consumer = new ConsumerConfig();
consumer.setTimeout(10000);
consumer.setRetries(0);
reference.setApplication(application);
reference.setRegistry(registry);
reference.setConsumer(consumer);
reference.setInterface(org.pangu.api.ProductService.class); // 弱类型接口名 实际网关中接口是传入的
// reference.setVersion("");
// reference.setGroup("");
reference.setGeneric(true); // 声明为泛化接口
GenericService svc = referenceCache.get(reference);//referenceCache.get方法中会缓存 Reference对象,并且调用ReferenceConfig.get方法启动ReferenceConfig
Object target = svc.$invoke("findProduct", new String[]{ProductDTO.class.getName()}, new Object[]{dto});//实际网关中,方法名、参数类型、参数是作为参数传入
return Result.success((Map)target);
}
上述代码很简单,请求服务端的org.pangu.api.ProductService#findProduct方法,使用的泛型,这里只是特例,实际接口、方法、方法参数等都是动态由方法传入的。注意:这里要使用ReferenceConfigCache或进行缓存,否则每次请求执行一次getProductGeneric方法,就会在zk上创建一个consumer节点,最终会导致zk节点过多导致zk性能问题,严重会导致zk宕机,之前遇到过有别的团队使用泛化不进行缓存reference导致zk节点过多导致zk故障问题。
在代码referenceCache.get(reference);
缓存并获取reference,reference不存在,则创建reference,创建流程和dubbo启动过程中创建是一样的,也是在zk创建consumers节点,并注册监听目标服务org.pangu.api.ProductService
的configurators、providers、routers节点,这些节点变化,也会通知refreshInvoker操作。
说明: 泛化引用虽然会将服务接口类型设置为 GenericService ,但是并不影响服务发现。
2.2.泛化实现原理
consumer泛化,会在filter chain增加GenericImplFilter,泛化的具体实现原理是由consumer端的GenericImplFilter和provider端GenericFilter共同实现。
GenericImplFilter
@Activate(group = Constants.CONSUMER, value = Constants.GENERIC_KEY, order = 20000)
public class GenericImplFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class);
private static final Class<?>[] GENERIC_PARAMETER_TYPES = new Class<?>[]{String.class, String[].class, Object[].class};
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY);//获取url上的generic值,泛化调用是true
if (ProtocolUtils.isGeneric(generic)
&& !Constants.$INVOKE.equals(invocation.getMethodName())//调用的方法名非$invoke,返回调用不匹配,因此该分支返回调用不执行
&& invocation instanceof RpcInvocation) {
//省略本分支的其它逻辑,因为泛化调用不执行
}
if (invocation.getMethodName().equals(Constants.$INVOKE)//请求方法名是$invoke
&& invocation.getArguments() != null
&& invocation.getArguments().length == 3//返回调用只有三个参数。参数1:方法名,参数2:参数类型,参数3:参数值
&& ProtocolUtils.isGeneric(generic)) {
Object[] args = (Object[]) invocation.getArguments()[2];//获取参数值
//其它逻辑忽略
((RpcInvocation) invocation).setAttachment(
Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY));//把泛化标识放到RpcInvocationd的attachment内,上送给provider端
}
return invoker.invoke(invocation);//invoker调用
}
//其它忽略
}
看逻辑很简单,只是给RpcInvocationd的attachment内存放了返回标识而已,并没有其它多余逻辑处理
接着看服务端的泛化处理
GenericFilter
@Activate(group = Constants.PROVIDER, order = -20000)
public class GenericFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !invoker.getInterface().equals(GenericService.class)) {
String name = ((String) inv.getArguments()[0]).trim();//目标服务的方法名
String[] types = (String[]) inv.getArguments()[1];//目标服务的方法的参数类型
Object[] args = (Object[]) inv.getArguments()[2];//目标服务的方法参数
try {
Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);//反射调用看目标服务是否有此方法,没有抛出NoSuchMethodException
Class<?>[] params = method.getParameterTypes();//获取目标方法的参数类型
if (args == null) {
args = new Object[params.length];
}
String generic = inv.getAttachment(Constants.GENERIC_KEY);//这里获取的就是GenericImplFilter放置到RpcInvocationd的attachment的泛化标识
if (StringUtils.isBlank(generic)) {
generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
}
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {//泛化方法
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());//代码@1 realize操作是把map参数转换为POJO对象
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
//忽略不需要的分支逻辑
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
//忽略不需要的分支逻辑
}
Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));//代码@2 重写创建RpcInvocation的原因是,旧的inv方法是$invoke,而此时需要去调用目标服务方法了,因此需要使用目标服务的方法和参数重建RpcInvocation,用于找到目标服务执行
if (result.hasException()
&& !(result.getException() instanceof GenericException)) {
return new RpcResult(new GenericException(result.getException()));
}
RpcResult rpcResult;
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
//忽略不需要的分支逻辑
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
//忽略不需要的分支逻辑
} else {
rpcResult = new RpcResult(PojoUtils.generalize(result.getValue()));//代码@3 PojoUtils.generalize把目标方法执行结果转换为HashMap,因此consumer端泛化调用获取的结果都是HashMap
}
rpcResult.setAttachments(result.getAttachments());
return rpcResult;
} catch (NoSuchMethodException e) {
throw new RpcException(e.getMessage(), e);
} catch (ClassNotFoundException e) {
throw new RpcException(e.getMessage(), e);
}
}
return invoker.invoke(inv);//非泛化调用
}
}
GenericFilter的逻辑就是:先判断目标服务是否存在调用的目标方法,接着使用PojoUtils.realize把方法参数由HashMap转换为POJO(如果参数是Map类型情况下就转换),接着以目标方法重写创建RpcInvocation,进行invoker调用,invoker调用结束,使用PojoUtils.generalize把调用结果转换为HashMap,把调用方的隐式参数保存到调用结果,最后返回。
其中重要的步骤就是:
代码@1:PojoUtils.realize是把参数由Map转换为POJO,如果参数类型是POJO,则不进行转换。这样做的原因是防止请求的参数是map。
代码@2:这里为什么需要重建RpcInvocation呢?因为inv里面的方法是$invoke,并非调用的目标方法,因此需要重建。那么RpcInvocation是个什么呢?它就是个方法名、方法类型、方法参数的封装而已。invoker的调用,参数定为需要RpcInvocation,即需要知道使用的参数类型和参数调用的方法。
代码@3:result.getValue()是调用结果,PojoUtils.generalize是把调用结果由POJO转换为HashMap,因此consumer端泛化调用获取的结果都是HashMap。
这里比较有用的工具类PojoUtils。
2.3.总结
Dubbo 泛化调用实际是在 Filter 过滤链上执行的序列化(PojoUtils.realize provider端处理完毕后,序列化为hashmap)和反序列化(PojoUtils.generalize consumer接收后反序列化为对象)操作,服务端通过调用端传递的调用信息反射获取对应的服务方法,进而进行服务调用。
Dubbo 泛化本质就是提供了个用于穿透的架子,让各种未知服务都可以套用。具体实现就是消费端GenericImplFilter设置泛化标识(其实没啥具体作用,不会进行PojoUtils序列化),服务提供端GenericFilter重新设置RpcInvocation,包装了目标服务和方法名,用于查找到目标服务进行调用;目标服务调用完毕后,把调用结果使用PojoUtils进行序列化为HashMap,这样消费端收到的结果都是HashMap类型。 网上所说的在GenericImplFilter进行序列化为HashMap,服务提供端收到后GenericFilter反序列为目标对象参数,这个说法是不对的。
从功能上来看,泛化调用提供了在没有接口依赖情况下进行的解决方案,丰富框架的使用场景。
从设计上来看,泛化调用的功能还是通过扩展的方式实现的,侵入性不强,值得学习借鉴。
3.隐式参数
在分析dubbo泛化时候,GenericImplFilter内把泛化标识放到RpcInvocationd的attachment内,这样在provider端的GenericFilter通过inv.getAttachment(Constants.GENERIC_KEY)获取泛化标识。这个是如何传送过来的呢?
还是先看dubbo协议,dubbo协议的业务数据内,除了接口、方法、方法参数类型、方法参数,还有attachments,attachments在dubbo中成为隐式参数,即不在接口方法参数内,但是,可以随着请求上送和响应。原理还是放在了协议里面。那么dubbo具体是如何实现的呢?
3.1.consumer端请求设置attachments
隐式参数设置有两种
-
业务代码中 RpcContext.getContext().setAttachment("xxx", "xxx");
-
RpcInvocation.setAttachment(String, String) 或者 RpcInvocation.addAttachments(Map<String, String>) 保存到RpcInvocation.attachments
对于第一种,是把隐式参数设置到了RpcContext.attachments上,接着在FailoverClusterInvoker#invoke操作中会把RpcContext.attachments保存到RpcInvocation.attachments,代码截图如下
对于第二种,隐式参数直接设置到RpcInvocation.attachments
隐式参数设置了,那么肯定是在编码过程中设置到dubbo协议报文上,具体在DubboCodec.encodeRequestData(Channel, ObjectOutput, Object, String),如下图
至此明白了consumer设置隐式参数,都会保存到RpcInvocation,继而在编码阶段对隐式参数编码写入到输出流。
那么provider端如何处理的呢?
首先肯定是要先解码了,dubbo中请求被封装为Request,响应是Response,其中Request.mData就是RpcInvocation,因此provider端通过解码后就获取到了RpcInvocation,当然也就获取到了consumer设置的attachments。
然后在ContextFilter内把consumer端上送的隐式参数RpcInvocation.attachments保存到RpcContext.getContext(),provider端获取隐式参数通过RpcContext.getContext().getAttachment("xxx"); 获取
3.2.provider端响应设置attachments
provider在目标对象调用执行完毕后,也可以在RpcResult设置attachments回传给consumer端,设置方式有两种
-
RpcContext.getServerContext().setAttachment("xxx", "xxx"); 在ContextFilter内invoker调用后,会把RpcContext.getServerContext()合并到RpcResult
-
RpcResult.setAttachments(Map<String, String>)
consumer端获取provider回传的隐式参数方法:RpcContext.getServerContext().getAttachment("xxx");
dubbo的响应是Response,其属性mResult是RpcResult,RpcResult有三个属性result-执行结果,exception-异常,attachments-provider端回传的隐式参数。那么在哪里把RpcResult.attachments保存到RpcContext.getServerContext()
的呢? 答案是ConsumerContextFilter
,在invoker调用完毕后,执行RpcContext.getServerContext().setAttachments(result.getAttachments());
,注意RpcContext.getServerContext()没有clear的原因是内部先进行了clear操作。
3.3.隐式参数总结
隐式参数consumer->priovider:consumer端通过RpcContext.getContext().setAttachment("xxx", "xxx"); provider端RpcContext.getContext().getAttachment("xxx"); 获取
隐式参数priovider->consumer:provider端通过RpcContext.getServerContext().setAttachment("xxx", "xxx"); consumer端RpcContext.getServerContext().getAttachment("xxx"); 获取