現在我們從源碼來分析(一)中所涉及的東西
前言
這個functionBindingRegistrar其實是channel的注冊
提問
問一下自己想從源碼中知道什么, 帶着目的去看源碼才容易搞懂.
從下述的代碼中發現定義了一個Function的Bean和在yaml中定義了definition, 那么這兩個定義的作用是什么呢? Function是怎么樣去綁定、注冊的呢?
帶着問題我們就可以去找對應的實現.
@Bean
public Function<Flux<Message<String>>, Mono<Void>> consumerEvent() {
return flux -> flux.map(message -> {
System.out.println(message.getPayload());
return message;
}).then();
}
spring:
cloud:
stream:
function:
definition: consumerEvent
怎么找springboot項目的啟動
首先我們看META-INF/spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfigura:\
(...省略一部分)
org.springframework.cloud.stream.function.FunctionConfiguration
發現自動裝載了一個FunctionConfiguration的類
進到這個類里面看, 發現他注冊了一個functionBindingRegistrar的Bean.
看英文---(functionBindingRegistrar) 方法綁定注冊, 這好像是我們想知道的東西.
那么接着往下看
functionBindingRegistrar
看傳入的參數發現這個Bean是根據StreamFunctionProperties注冊的一個的Bean.// FunctionConfiguration#functionBindingRegistrar
@Bean
public InitializingBean functionBindingRegistrar(Environment environment, FunctionCatalog functionCatalog, StreamFunctionProperties streamFunctionProperties) {
return new FunctionConfiguration.FunctionBindingRegistrar(functionCatalog, streamFunctionProperties);
}
因為這個Bean是InitializingBean所以直接看afterPropertiesSet這個方法
看源碼最重要就是抓住主線, 從源碼中發現有這樣的一段代碼.
// 注冊了一個Bean定義, functionBindableProxyDefinition.
registry.registerBeanDefinition(name, functionBindableProxyDefinition);
咦? 那functionBindableProxyDefinition是一個什么東西呢??
往上找這個的賦值.
// 看到這行, 他是一個 BindableFunctionProxyFactory
functionBindableProxyDefinition = new RootBeanDefinition(BindableFunctionProxyFactory.class);
// 為構造參數進行了賦值
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(functionDefinition);
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.inputCount);
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.outputCount);
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.streamFunctionProperties);
初始化了一個RootBeanDefinition, 並對構造函數進行呢相對應的賦值, 那參數從哪來的呢, 繼續看源碼.
// 這些參數怎么來的呢
// 看到下面這些代碼, 對於Function/Supplier/Consumer怎么區分的, 是不是有的清晰的認知
FunctionInvocationWrapper function = (FunctionInvocationWrapper)this.functionCatalog.lookup(functionDefinition);
Type functionType = function.getFunctionType();
if (function.isSupplier()) {
this.inputCount = 0;
this.outputCount = this.getOutputCount(functionType, true);
} else if (function.isConsumer()) {
this.inputCount = FunctionTypeUtils.getInputCount(functionType);
this.outputCount = 0;
} else {
this.inputCount = FunctionTypeUtils.getInputCount(functionType);
this.outputCount = this.getOutputCount(functionType, false);
}
看到上面這一段代碼inputCount/outCount是計算出來的, 對於怎么區分Function/Supplier/Consumer是Input還是Output也有了一定的了解
那functionDefinition是什么呢
functionDefinition = var3[var5];
我們發現他是這樣賦值的, 再往上看
sourceNames = this.filterEligibleFunctionDefinitions();
var3 = sourceNames;
var4 = sourceNames.length;
通過閱讀發現filterEligibleFunctionDefinitions這個方法里對我們配置文件中Definition進行了解析處理, 並返回了合格的sourcesNames.
哦! 原來Definition的意義在這.
到此這個bean的注冊就已經完成了,那就來看看BindableFunctionProxyFactory,
發現他又是一個InitializingBean, 所以上訴代碼設置完初始化參數后, 在spring實例化Bean之后會調用afterPropertiesSet方法
public void afterPropertiesSet() {
Assert.notEmpty(this.bindingTargetFactories, "'bindingTargetFactories' cannot be empty");
int i;
if (this.inputCount > 0) {
for(i = 0; i < this.inputCount; ++i) {
this.createInput(this.buildInputNameForIndex(i));
}
}
if (this.outputCount > 0) {
for(i = 0; i < this.outputCount; ++i) {
this.createOutput(this.buildOutputNameForIndex(i));
}
}
}
然后看到這兩個函數. 發現綁定名稱原來是這樣的.
private String buildInputNameForIndex(int index) {
return this.functionDefinition.replace(",", "|").replace("|", "") + "-" + "in" + "-" + index;
}
private String buildOutputNameForIndex(int index) {
return this.functionDefinition.replace(",", "|").replace("|", "") + "-" + "out" + "-" + index;
}
以Input為例子, 我們看看createInput的方法.
先從簡單的來看, 先不看pollable
this.inputHolders.put(name, new BoundTargetHolder(this.getBindingTargetFactory(SubscribableChannel.class).createInput(name), true));
SubscribableChannel的createInput, 我們找到SubscribableChannelBindingTargetFactory#createInput的
public SubscribableChannel createInput(String name) {
DirectWithAttributesChannel subscribableChannel = new DirectWithAttributesChannel();
subscribableChannel.setComponentName(name);
subscribableChannel.setAttribute("type", "input");
this.messageChannelConfigurer.configureInputChannel(subscribableChannel, name);
if (this.context != null && !this.context.containsBean(name)) {
this.context.registerBean(name, DirectWithAttributesChannel.class, () -> {
return subscribableChannel;
}, new BeanDefinitionCustomizer[0]);
}
return subscribableChannel;
}
發現返回了DirectWithAttributesChannel的一個類, 並且把他注冊成為了Bean.
后面把這個類封裝在BoundTargetHolder中並放入inputHolders中就結束了Function注冊的過程
總結
- 啟動之后會注冊一個FunctionBindingRegistrar的Bean, 在這個Bean中會讀取配置文件找到對應的FunctionBean, 處理這個FunctionBean生成注冊需要的參數並把這些內容構成一個functionBindableProxyDefinition的Bean.
- functionBindableProxyDefinition的Bean處理上述構造函數傳入的參數並生成對應的Input/Output的Bean.
至此, funciton的注冊就完成了嗎. (不!). 其實還沒有完成, 細心的朋友會發現 還有functionInitializer的Bean. 下一節我們來看看這個.