dubbo泛化分析
1.RPC的本質
dubbo是個rpc框架,本質就是網絡通信,把客戶端(consumer)信息編碼為二進制流,通過網絡發送,服務端(provider)接收到二進制流,進行解碼,獲取到所需的業務數據,進而進行業務處理。和socket通信、http調用的本質是一樣。dubbo默認使用的dubbo協議進行通信,所謂協議,就是個雙方約定的特定數據結構,客戶端按照此協議結構進行編碼,服務端按照此協議結構進行解碼。協議通常有定長、邊長、特殊字符結尾等各種類型協議,dubbo協議屬於邊長協議,dubbo協議的結構如下圖(來源官方)
我們只關注上圖的最后一行,協議內容部分,這部分也是實際的業務數據,內容部分包含:服務名、version、方法名、參數、參數類型、attachments(隱式參數)等。那么我們在只需要知道服務名、version、方法名、參數、參數類型的情況下,就可以進行調用服務端了,可以不需要對服務端接口進行代理,但是,為什么使用dubbo還要指定服務方的接口呢?原因是我們要在協議內設置接口方法名、參數、參數類型,沒有接口,不知道要設置什么。因此對於調用端來說,服務接口的作用僅用於創建代理對象,基於面向接口開發而已。
通常開發中,我們針對每個接口dubbo給我們實現了一個對應的代理,那么如果想在一個調用端,根據傳入的接口和方法進行調用服務呢(比如最常見的網關),基於這種情況,Dubbo 定義了一個統一的接口 GenericSerive ,調用端在使用該接口的代理時指定相關調用信息即可。
2.泛化接口GenericSerive
2.1.泛化使用例子
泛化通常不在服務端使用,基本在consumer端使用,比如最常見的網關,下面舉例說明下泛化的使用
//spring service方法
@Override
public Result<Map> getProductGeneric(ProductDTO dto) {
ReferenceConfigCache referenceCache = ReferenceConfigCache.getCache();//使用ReferenceConfigCache緩存ReferenceConfig,否則每此請求都會創建一個ReferenceConfig,並在zk注冊節點,最終可能導致zk節點過多影響性能次,最終可能導致zk宕機
ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>();
ApplicationConfig application = new ApplicationConfig();
application.setName("pangu-client-consumer-generic");
// 連接注冊中心配置
RegistryConfig registry = new RegistryConfig();
registry.setAddress("zookeeper://127.0.0.1:2181");
// 服務消費者缺省值配置
ConsumerConfig consumer = new ConsumerConfig();
consumer.setTimeout(10000);
consumer.setRetries(0);
reference.setApplication(application);
reference.setRegistry(registry);
reference.setConsumer(consumer);
reference.setInterface(org.pangu.api.ProductService.class); // 弱類型接口名 實際網關中接口是傳入的
// reference.setVersion("");
// reference.setGroup("");
reference.setGeneric(true); // 聲明為泛化接口
GenericService svc = referenceCache.get(reference);//referenceCache.get方法中會緩存 Reference對象,並且調用ReferenceConfig.get方法啟動ReferenceConfig
Object target = svc.$invoke("findProduct", new String[]{ProductDTO.class.getName()}, new Object[]{dto});//實際網關中,方法名、參數類型、參數是作為參數傳入
return Result.success((Map)target);
}
上述代碼很簡單,請求服務端的org.pangu.api.ProductService#findProduct方法,使用的泛型,這里只是特例,實際接口、方法、方法參數等都是動態由方法傳入的。注意:這里要使用ReferenceConfigCache或進行緩存,否則每次請求執行一次getProductGeneric方法,就會在zk上創建一個consumer節點,最終會導致zk節點過多導致zk性能問題,嚴重會導致zk宕機,之前遇到過有別的團隊使用泛化不進行緩存reference導致zk節點過多導致zk故障問題。
在代碼referenceCache.get(reference);
緩存並獲取reference,reference不存在,則創建reference,創建流程和dubbo啟動過程中創建是一樣的,也是在zk創建consumers節點,並注冊監聽目標服務org.pangu.api.ProductService
的configurators、providers、routers節點,這些節點變化,也會通知refreshInvoker操作。
說明: 泛化引用雖然會將服務接口類型設置為 GenericService ,但是並不影響服務發現。
2.2.泛化實現原理
consumer泛化,會在filter chain增加GenericImplFilter,泛化的具體實現原理是由consumer端的GenericImplFilter和provider端GenericFilter共同實現。
GenericImplFilter
@Activate(group = Constants.CONSUMER, value = Constants.GENERIC_KEY, order = 20000)
public class GenericImplFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class);
private static final Class<?>[] GENERIC_PARAMETER_TYPES = new Class<?>[]{String.class, String[].class, Object[].class};
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY);//獲取url上的generic值,泛化調用是true
if (ProtocolUtils.isGeneric(generic)
&& !Constants.$INVOKE.equals(invocation.getMethodName())//調用的方法名非$invoke,返回調用不匹配,因此該分支返回調用不執行
&& invocation instanceof RpcInvocation) {
//省略本分支的其它邏輯,因為泛化調用不執行
}
if (invocation.getMethodName().equals(Constants.$INVOKE)//請求方法名是$invoke
&& invocation.getArguments() != null
&& invocation.getArguments().length == 3//返回調用只有三個參數。參數1:方法名,參數2:參數類型,參數3:參數值
&& ProtocolUtils.isGeneric(generic)) {
Object[] args = (Object[]) invocation.getArguments()[2];//獲取參數值
//其它邏輯忽略
((RpcInvocation) invocation).setAttachment(
Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY));//把泛化標識放到RpcInvocationd的attachment內,上送給provider端
}
return invoker.invoke(invocation);//invoker調用
}
//其它忽略
}
看邏輯很簡單,只是給RpcInvocationd的attachment內存放了返回標識而已,並沒有其它多余邏輯處理
接着看服務端的泛化處理
GenericFilter
@Activate(group = Constants.PROVIDER, order = -20000)
public class GenericFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !invoker.getInterface().equals(GenericService.class)) {
String name = ((String) inv.getArguments()[0]).trim();//目標服務的方法名
String[] types = (String[]) inv.getArguments()[1];//目標服務的方法的參數類型
Object[] args = (Object[]) inv.getArguments()[2];//目標服務的方法參數
try {
Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);//反射調用看目標服務是否有此方法,沒有拋出NoSuchMethodException
Class<?>[] params = method.getParameterTypes();//獲取目標方法的參數類型
if (args == null) {
args = new Object[params.length];
}
String generic = inv.getAttachment(Constants.GENERIC_KEY);//這里獲取的就是GenericImplFilter放置到RpcInvocationd的attachment的泛化標識
if (StringUtils.isBlank(generic)) {
generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
}
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {//泛化方法
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());//代碼@1 realize操作是把map參數轉換為POJO對象
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
//忽略不需要的分支邏輯
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
//忽略不需要的分支邏輯
}
Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));//代碼@2 重寫創建RpcInvocation的原因是,舊的inv方法是$invoke,而此時需要去調用目標服務方法了,因此需要使用目標服務的方法和參數重建RpcInvocation,用於找到目標服務執行
if (result.hasException()
&& !(result.getException() instanceof GenericException)) {
return new RpcResult(new GenericException(result.getException()));
}
RpcResult rpcResult;
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
//忽略不需要的分支邏輯
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
//忽略不需要的分支邏輯
} else {
rpcResult = new RpcResult(PojoUtils.generalize(result.getValue()));//代碼@3 PojoUtils.generalize把目標方法執行結果轉換為HashMap,因此consumer端泛化調用獲取的結果都是HashMap
}
rpcResult.setAttachments(result.getAttachments());
return rpcResult;
} catch (NoSuchMethodException e) {
throw new RpcException(e.getMessage(), e);
} catch (ClassNotFoundException e) {
throw new RpcException(e.getMessage(), e);
}
}
return invoker.invoke(inv);//非泛化調用
}
}
GenericFilter的邏輯就是:先判斷目標服務是否存在調用的目標方法,接着使用PojoUtils.realize把方法參數由HashMap轉換為POJO(如果參數是Map類型情況下就轉換),接着以目標方法重寫創建RpcInvocation,進行invoker調用,invoker調用結束,使用PojoUtils.generalize把調用結果轉換為HashMap,把調用方的隱式參數保存到調用結果,最后返回。
其中重要的步驟就是:
代碼@1:PojoUtils.realize是把參數由Map轉換為POJO,如果參數類型是POJO,則不進行轉換。這樣做的原因是防止請求的參數是map。
代碼@2:這里為什么需要重建RpcInvocation呢?因為inv里面的方法是$invoke,並非調用的目標方法,因此需要重建。那么RpcInvocation是個什么呢?它就是個方法名、方法類型、方法參數的封裝而已。invoker的調用,參數定為需要RpcInvocation,即需要知道使用的參數類型和參數調用的方法。
代碼@3:result.getValue()是調用結果,PojoUtils.generalize是把調用結果由POJO轉換為HashMap,因此consumer端泛化調用獲取的結果都是HashMap。
這里比較有用的工具類PojoUtils。
2.3.總結
Dubbo 泛化調用實際是在 Filter 過濾鏈上執行的序列化(PojoUtils.realize provider端處理完畢后,序列化為hashmap)和反序列化(PojoUtils.generalize consumer接收后反序列化為對象)操作,服務端通過調用端傳遞的調用信息反射獲取對應的服務方法,進而進行服務調用。
Dubbo 泛化本質就是提供了個用於穿透的架子,讓各種未知服務都可以套用。具體實現就是消費端GenericImplFilter設置泛化標識(其實沒啥具體作用,不會進行PojoUtils序列化),服務提供端GenericFilter重新設置RpcInvocation,包裝了目標服務和方法名,用於查找到目標服務進行調用;目標服務調用完畢后,把調用結果使用PojoUtils進行序列化為HashMap,這樣消費端收到的結果都是HashMap類型。 網上所說的在GenericImplFilter進行序列化為HashMap,服務提供端收到后GenericFilter反序列為目標對象參數,這個說法是不對的。
從功能上來看,泛化調用提供了在沒有接口依賴情況下進行的解決方案,豐富框架的使用場景。
從設計上來看,泛化調用的功能還是通過擴展的方式實現的,侵入性不強,值得學習借鑒。
3.隱式參數
在分析dubbo泛化時候,GenericImplFilter內把泛化標識放到RpcInvocationd的attachment內,這樣在provider端的GenericFilter通過inv.getAttachment(Constants.GENERIC_KEY)獲取泛化標識。這個是如何傳送過來的呢?
還是先看dubbo協議,dubbo協議的業務數據內,除了接口、方法、方法參數類型、方法參數,還有attachments,attachments在dubbo中成為隱式參數,即不在接口方法參數內,但是,可以隨着請求上送和響應。原理還是放在了協議里面。那么dubbo具體是如何實現的呢?
3.1.consumer端請求設置attachments
隱式參數設置有兩種
-
業務代碼中 RpcContext.getContext().setAttachment("xxx", "xxx");
-
RpcInvocation.setAttachment(String, String) 或者 RpcInvocation.addAttachments(Map<String, String>) 保存到RpcInvocation.attachments
對於第一種,是把隱式參數設置到了RpcContext.attachments上,接着在FailoverClusterInvoker#invoke操作中會把RpcContext.attachments保存到RpcInvocation.attachments,代碼截圖如下
對於第二種,隱式參數直接設置到RpcInvocation.attachments
隱式參數設置了,那么肯定是在編碼過程中設置到dubbo協議報文上,具體在DubboCodec.encodeRequestData(Channel, ObjectOutput, Object, String),如下圖
至此明白了consumer設置隱式參數,都會保存到RpcInvocation,繼而在編碼階段對隱式參數編碼寫入到輸出流。
那么provider端如何處理的呢?
首先肯定是要先解碼了,dubbo中請求被封裝為Request,響應是Response,其中Request.mData就是RpcInvocation,因此provider端通過解碼后就獲取到了RpcInvocation,當然也就獲取到了consumer設置的attachments。
然后在ContextFilter內把consumer端上送的隱式參數RpcInvocation.attachments保存到RpcContext.getContext(),provider端獲取隱式參數通過RpcContext.getContext().getAttachment("xxx"); 獲取
3.2.provider端響應設置attachments
provider在目標對象調用執行完畢后,也可以在RpcResult設置attachments回傳給consumer端,設置方式有兩種
-
RpcContext.getServerContext().setAttachment("xxx", "xxx"); 在ContextFilter內invoker調用后,會把RpcContext.getServerContext()合並到RpcResult
-
RpcResult.setAttachments(Map<String, String>)
consumer端獲取provider回傳的隱式參數方法:RpcContext.getServerContext().getAttachment("xxx");
dubbo的響應是Response,其屬性mResult是RpcResult,RpcResult有三個屬性result-執行結果,exception-異常,attachments-provider端回傳的隱式參數。那么在哪里把RpcResult.attachments保存到RpcContext.getServerContext()
的呢? 答案是ConsumerContextFilter
,在invoker調用完畢后,執行RpcContext.getServerContext().setAttachments(result.getAttachments());
,注意RpcContext.getServerContext()沒有clear的原因是內部先進行了clear操作。
3.3.隱式參數總結
隱式參數consumer->priovider:consumer端通過RpcContext.getContext().setAttachment("xxx", "xxx"); provider端RpcContext.getContext().getAttachment("xxx"); 獲取
隱式參數priovider->consumer:provider端通過RpcContext.getServerContext().setAttachment("xxx", "xxx"); consumer端RpcContext.getServerContext().getAttachment("xxx"); 獲取