spring cloud stream 3.1.2 源碼搭配rocketmq學習 (二)


現在我們從源碼來分析(一)中所涉及的東西


前言

這個functionBindingRegistrar其實是channel的注冊

提問

問一下自己想從源碼中知道什么, 帶着目的去看源碼才容易搞懂.

從下述的代碼中發現定義了一個Function的Bean和在yaml中定義了definition, 那么這兩個定義的作用是什么呢? Function是怎么樣去綁定、注冊的呢?

帶着問題我們就可以去找對應的實現.

@Bean
public Function<Flux<Message<String>>, Mono<Void>> consumerEvent() {
    return flux -> flux.map(message -> {
        System.out.println(message.getPayload());
        return message;
    }).then();
}
spring:
 cloud:
   stream:
     function:
       definition: consumerEvent

怎么找springboot項目的啟動

首先我們看META-INF/spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfigura:\
(...省略一部分)
org.springframework.cloud.stream.function.FunctionConfiguration

發現自動裝載了一個FunctionConfiguration的類

進到這個類里面看, 發現他注冊了一個functionBindingRegistrar的Bean.

看英文---(functionBindingRegistrar) 方法綁定注冊, 這好像是我們想知道的東西.

那么接着往下看


functionBindingRegistrar

看傳入的參數發現這個Bean是根據StreamFunctionProperties注冊的一個的Bean.
// FunctionConfiguration#functionBindingRegistrar
@Bean
public InitializingBean functionBindingRegistrar(Environment environment, FunctionCatalog functionCatalog, StreamFunctionProperties streamFunctionProperties) {
    return new FunctionConfiguration.FunctionBindingRegistrar(functionCatalog, streamFunctionProperties);
}

因為這個Bean是InitializingBean所以直接看afterPropertiesSet這個方法

看源碼最重要就是抓住主線, 從源碼中發現有這樣的一段代碼.

// 注冊了一個Bean定義, functionBindableProxyDefinition.
registry.registerBeanDefinition(name, functionBindableProxyDefinition);

咦? 那functionBindableProxyDefinition是一個什么東西呢??

往上找這個的賦值.

// 看到這行, 他是一個 BindableFunctionProxyFactory
functionBindableProxyDefinition = new RootBeanDefinition(BindableFunctionProxyFactory.class);

// 為構造參數進行了賦值
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(functionDefinition);
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.inputCount);
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.outputCount);
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.streamFunctionProperties);

初始化了一個RootBeanDefinition, 並對構造函數進行呢相對應的賦值, 那參數從哪來的呢, 繼續看源碼.

//  這些參數怎么來的呢
//  看到下面這些代碼, 對於Function/Supplier/Consumer怎么區分的, 是不是有的清晰的認知
FunctionInvocationWrapper function = (FunctionInvocationWrapper)this.functionCatalog.lookup(functionDefinition);

Type functionType = function.getFunctionType();
if (function.isSupplier()) {
    this.inputCount = 0;
    this.outputCount = this.getOutputCount(functionType, true);
} else if (function.isConsumer()) {
    this.inputCount = FunctionTypeUtils.getInputCount(functionType);
    this.outputCount = 0;
} else {
    this.inputCount = FunctionTypeUtils.getInputCount(functionType);
    this.outputCount = this.getOutputCount(functionType, false);
}

看到上面這一段代碼inputCount/outCount是計算出來的, 對於怎么區分Function/Supplier/Consumer是Input還是Output也有了一定的了解

那functionDefinition是什么呢

functionDefinition = var3[var5];

我們發現他是這樣賦值的, 再往上看

sourceNames = this.filterEligibleFunctionDefinitions();
var3 = sourceNames;
var4 = sourceNames.length;

通過閱讀發現filterEligibleFunctionDefinitions這個方法里對我們配置文件中Definition進行了解析處理, 並返回了合格的sourcesNames.

哦! 原來Definition的意義在這.

到此這個bean的注冊就已經完成了,那就來看看BindableFunctionProxyFactory,
發現他又是一個InitializingBean, 所以上訴代碼設置完初始化參數后, 在spring實例化Bean之后會調用afterPropertiesSet方法

public void afterPropertiesSet() {
    Assert.notEmpty(this.bindingTargetFactories, "'bindingTargetFactories' cannot be empty");
    int i;
    if (this.inputCount > 0) {
        for(i = 0; i < this.inputCount; ++i) {
            this.createInput(this.buildInputNameForIndex(i));
        }
    }

    if (this.outputCount > 0) {
        for(i = 0; i < this.outputCount; ++i) {
            this.createOutput(this.buildOutputNameForIndex(i));
        }
    }

}

然后看到這兩個函數. 發現綁定名稱原來是這樣的.

private String buildInputNameForIndex(int index) {
    return this.functionDefinition.replace(",", "|").replace("|", "") + "-" + "in" + "-" + index;
}

private String buildOutputNameForIndex(int index) {
    return this.functionDefinition.replace(",", "|").replace("|", "") + "-" + "out" + "-" + index;
}

以Input為例子, 我們看看createInput的方法.

先從簡單的來看, 先不看pollable

this.inputHolders.put(name, new BoundTargetHolder(this.getBindingTargetFactory(SubscribableChannel.class).createInput(name), true));

SubscribableChannel的createInput, 我們找到SubscribableChannelBindingTargetFactory#createInput的

public SubscribableChannel createInput(String name) {
    DirectWithAttributesChannel subscribableChannel = new DirectWithAttributesChannel();
    subscribableChannel.setComponentName(name);
    subscribableChannel.setAttribute("type", "input");
    this.messageChannelConfigurer.configureInputChannel(subscribableChannel, name);
    if (this.context != null && !this.context.containsBean(name)) {
        this.context.registerBean(name, DirectWithAttributesChannel.class, () -> {
            return subscribableChannel;
        }, new BeanDefinitionCustomizer[0]);
    }

    return subscribableChannel;
}

發現返回了DirectWithAttributesChannel的一個類, 並且把他注冊成為了Bean.

后面把這個類封裝在BoundTargetHolder中並放入inputHolders中就結束了Function注冊的過程

總結

  1. 啟動之后會注冊一個FunctionBindingRegistrar的Bean, 在這個Bean中會讀取配置文件找到對應的FunctionBean, 處理這個FunctionBean生成注冊需要的參數並把這些內容構成一個functionBindableProxyDefinition的Bean.
  2. functionBindableProxyDefinition的Bean處理上述構造函數傳入的參數並生成對應的Input/Output的Bean.

至此, funciton的注冊就完成了嗎. (不!). 其實還沒有完成, 細心的朋友會發現 還有functionInitializer的Bean. 下一節我們來看看這個.

Wish.
Do.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM