Spring Batch源碼閱讀-初始化(三)


例子

@SpringBootApplication
//可選參數預先初始化還是延遲初始化
@EnableBatchProcessing(modular = true)
public class SpringBatchDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBatchDemoApplication.class, args);
    }
}

我們使用spring batch 使用了 @EnableBatchProcessing 此注解

@EnableBatchProcessing作用

我們打開源碼可以發現使用Import注解  import注解使用可以查看 https://www.cnblogs.com/LQBlog/p/15410425.html

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(BatchConfigurationSelector.class)//<1> public @interface EnableBatchProcessing {

    /**
     * Indicate whether the configuration is going to be modularized into multiple application contexts. If true then
     * you should not create any &#64;Bean Job definitions in this context, but rather supply them in separate (child)
     * contexts through an {@link ApplicationContextFactory}.
     *
     * @return boolean indicating whether the configuration is going to be
     * modularized into multiple application contexts.  Defaults to false.
     */
    boolean modular() default false;

}

<1>

 public class BatchConfigurationSelector implements ImportSelector {
            @Override
            public String[] selectImports(AnnotationMetadata importingClassMetadata) {
                Class<?> annotationType = EnableBatchProcessing.class;
                //獲取注解元數據
                AnnotationAttributes attributes = AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(
                        annotationType.getName(), false));
                Assert.notNull(attributes, String.format("@%s is not present on importing class '%s' as expected",
                        annotationType.getSimpleName(), importingClassMetadata.getClassName()));
                String[] imports;
                //是實時初始化 延時延遲初始化 
                if (attributes.containsKey("modular") && attributes.getBoolean("modular")) {
                    //<2> 導入ModularBatchConfiguration類
                    imports = new String[] { ModularBatchConfiguration.class.getName() };
                }
                else {
                    //是延遲初始化 點進去看代碼可以看到get相關對象都是創建代理
                    imports = new String[] { SimpleBatchConfiguration.class.getName() };
                }

                return imports;
            }

<2>

/**
 * @Project spring-batch-test-demo
 * @PackageName springbatchsimpledemo.demo.batch.job.order
 * @ClassName Test
 * @Author qiang.li
 * @Date 2021/10/21 5:51 下午
 * @Description TODO
 */
//proxyBeanMethods = false @Bean創建的對象將不使用代理
@Configuration(proxyBeanMethods = false)
public class ModularBatchConfiguration extends AbstractBatchConfiguration {

    @Autowired
    private ApplicationContext context;
     //初始化參考<>
    @Autowired(required = false)
    private Collection<BatchConfigurer> configurers;

    private AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();

    /**
     * 獲得jobRepository  主要是用來管理 job執行期間相關元數據的
     * 比如redis,文件,h2庫 默認我們都是用的mysql
     * 可參考:org.springframework.batch.core.repository.support.SimpleJobRepository
     * @return
     * @throws Exception
     */
    @Override
    @Bean
    public JobRepository jobRepository() throws Exception {
        return getConfigurer(configurers).getJobRepository();
    }

    /**
     * 獲得JobLauncher
     * JobLauncher主要job的容器 主要是管理job的啟動
     * @return
     * @throws Exception
     */
    @Override
    @Bean
    public JobLauncher jobLauncher() throws Exception {
        //<3>可以看到獲取是根據configures 是通過容器注入的
        return getConfigurer(configurers).getJobLauncher();
    }

    /**
     * job 執行期間的提交和關閉事物使用的事物管理器
     * @return
     * @throws Exception
     */
    @Override
    @Bean
    public PlatformTransactionManager transactionManager() throws Exception {
        // //<3>可以看到獲取是根據configures 是通過容器注入的
        return getConfigurer(configurers).getTransactionManager();
    }

    /**
     * 與JobRepository 后續看源碼再看具體用來做啥
     * @return
     * @throws Exception
     */
    @Override
    @Bean
    public JobExplorer jobExplorer() throws Exception {
        // //<3>可以看到獲取是根據configures 是通過容器注入的
        return getConfigurer(configurers).getJobExplorer();
    }

    /**
     * J內部通過實現 LifecycleProcessor 當spring 容器啟動成功時候自動實現將容器中job 注冊到jobRegistry
* *
@return * @throws Exception */ @Bean public AutomaticJobRegistrar jobRegistrar() throws Exception { for (ApplicationContextFactory factory : context.getBeansOfType(ApplicationContextFactory.class).values()) { registrar.addApplicationContextFactory(factory); } return registrar; } }

<3>

org.springframework.batch.core.configuration.annotation.AbstractBatchConfiguration#getConfigurer

 protected BatchConfigurer getConfigurer(Collection<BatchConfigurer> configurers) throws Exception {
        //如果不為空就使用configurer
        if (this.configurer != null) {
            return this.configurer;
        }
        //如果容器沒有初始化默認使用DefaultBatchConfigurer
        if (configurers == null || configurers.isEmpty()) {
            if (dataSource == null) {
                DefaultBatchConfigurer configurer = new DefaultBatchConfigurer();
                configurer.initialize();
                this.configurer = configurer;
                return configurer;
            } else {
                DefaultBatchConfigurer configurer = new DefaultBatchConfigurer(dataSource);
                configurer.initialize();
                this.configurer = configurer;
                return configurer;
            }
        }
        if (configurers.size() > 1) {
            throw new IllegalStateException(
                    "To use a custom BatchConfigurer the context must contain precisely one, found "
                            + configurers.size());
        }
        this.configurer = configurers.iterator().next();
        return this.configurer;
    }

BatchConfiure接口

通過此接口我們可以自定義jobRepostory JobLauncher jobExplore等

定義

public interface BatchConfigurer {

    //獲得jobRepository
    JobRepository getJobRepository() throws Exception;

    //獲得事物管理器
    PlatformTransactionManager getTransactionManager() throws Exception;

    //獲得JobLauncher
    JobLauncher getJobLauncher() throws Exception;
    
    //獲得JobExplorer
    JobExplorer getJobExplorer() throws Exception;
}

類圖

 

BatchConfiure初始化

spring boot項目參考自動化配置:org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration 非Spring boot項目 就參考這個類自己配置就行了

@Configuration(
        proxyBeanMethods = false
)
@ConditionalOnClass({JobLauncher.class, DataSource.class})//class Path有JobLauncher.class, DataSource.class 類時
@AutoConfigureAfter({HibernateJpaAutoConfiguration.class})
@ConditionalOnBean({JobLauncher.class})//當容器中有JobLauncher 實現的時候
@EnableConfigurationProperties({BatchProperties.class})//加載配置
@Import({BatchConfigurerConfiguration.class, DatabaseInitializationDependencyConfigurer.class})//<4>BatchConfigurerConfiguration 為BatchConfigurer的自動化配置
public class BatchAutoConfiguration {
    public BatchAutoConfiguration() {
    }

    /**
     * spring.batch.job.enabled 為true時 默認為true
     * JobLauncherApplicationRunner 主要作用就是實現ApplicationRunner 接口在 項目啟動后自動啟動job
     * @param jobLauncher
     * @param jobExplorer
     * @param jobRepository
     * @param properties
     * @return
     */
    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnProperty(
            prefix = "spring.batch.job",
            name = {"enabled"},
            havingValue = "true",
            matchIfMissing = true
    )
    public JobLauncherApplicationRunner jobLauncherApplicationRunner(JobLauncher jobLauncher, JobExplorer jobExplorer, JobRepository jobRepository, BatchProperties properties) {
        JobLauncherApplicationRunner runner = new JobLauncherApplicationRunner(jobLauncher, jobExplorer, jobRepository);
        String jobNames = properties.getJob().getNames();
        if (StringUtils.hasText(jobNames)) {
            runner.setJobNames(jobNames);
        }

        return runner;
    }

    @Bean
    @ConditionalOnMissingBean({ExitCodeGenerator.class})
    public JobExecutionExitCodeGenerator jobExecutionExitCodeGenerator() {
        return new JobExecutionExitCodeGenerator();
    }

    /**
     * jobLauncher的容器,提供了跟豐富的方法 如停止job
     * @param jobParametersConverter
     * @param jobExplorer
     * @param jobLauncher
     * @param jobRegistry
     * @param jobRepository
     * @return
     * @throws Exception
     */
    @Bean
    @ConditionalOnMissingBean({JobOperator.class})
    public SimpleJobOperator jobOperator(ObjectProvider<JobParametersConverter> jobParametersConverter, JobExplorer jobExplorer, JobLauncher jobLauncher, ListableJobLocator jobRegistry, JobRepository jobRepository) throws Exception {
        SimpleJobOperator factory = new SimpleJobOperator();
        factory.setJobExplorer(jobExplorer);
        factory.setJobLauncher(jobLauncher);
        factory.setJobRegistry(jobRegistry);
        factory.setJobRepository(jobRepository);
        jobParametersConverter.ifAvailable(factory::setJobParametersConverter);
        return factory;
    }

    @Configuration(
            proxyBeanMethods = false
    )
    @ConditionalOnBean({DataSource.class})
    @ConditionalOnClass({DatabasePopulator.class})
    static class DataSourceInitializerConfiguration {
        DataSourceInitializerConfiguration() {
        }

        @Bean
        @ConditionalOnMissingBean
        BatchDataSourceInitializer batchDataSourceInitializer(DataSource dataSource, @BatchDataSource ObjectProvider<DataSource> batchDataSource, ResourceLoader resourceLoader, BatchProperties properties) {
            return new BatchDataSourceInitializer((DataSource)batchDataSource.getIfAvailable(() -> {
                return dataSource;
            }), resourceLoader, properties);
        }
    }
}

 <4>

@ConditionalOnClass({PlatformTransactionManager.class})
@ConditionalOnBean({DataSource.class})
@ConditionalOnMissingBean({BatchConfigurer.class})
@Configuration(
        proxyBeanMethods = false
)
class BatchConfigurerConfiguration {
    BatchConfigurerConfiguration() {
    }

    /**
     * 如果容器中存在entityManagerFactory 表示用的jpa使用JpaBatchConfigurer
     */
    @Configuration(
            proxyBeanMethods = false
    )
    @ConditionalOnClass({EntityManagerFactory.class})
    @ConditionalOnBean(
            name = {"entityManagerFactory"}
    )
    static class JpaBatchConfiguration {
        JpaBatchConfiguration() {
        }

        @Bean
        JpaBatchConfigurer batchConfigurer(BatchProperties properties, DataSource dataSource, @BatchDataSource ObjectProvider<DataSource> batchDataSource, ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers, EntityManagerFactory entityManagerFactory) {
            return new JpaBatchConfigurer(properties, (DataSource)batchDataSource.getIfAvailable(() -> {
                return dataSource;
            }), (TransactionManagerCustomizers)transactionManagerCustomizers.getIfAvailable(), entityManagerFactory);
        }
    }

    /**
     * 如果不是使用的jpa則使用BasicBatchConfigurer
     */
    @Configuration(
            proxyBeanMethods = false
    )
    @ConditionalOnMissingBean(
            name = {"entityManagerFactory"}
    )
    static class JdbcBatchConfiguration {
        JdbcBatchConfiguration() {
        }

        @Bean
        BasicBatchConfigurer batchConfigurer(BatchProperties properties, DataSource dataSource, @BatchDataSource ObjectProvider<DataSource> batchDataSource, ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
            return new BasicBatchConfigurer(properties, (DataSource)batchDataSource.getIfAvailable(() -> {
                return dataSource;
            }), (TransactionManagerCustomizers)transactionManagerCustomizers.getIfAvailable());
        }
    }
}

 

BasicBatchConfigure

public class BasicBatchConfigurer implements BatchConfigurer, InitializingBean {
    //實現了spring的InitializingBean 執行初始化
    public void afterPropertiesSet() {
        this.initialize();
    }

    public void initialize() {
        try {
            this.transactionManager = this.buildTransactionManager();
            this.jobRepository = this.createJobRepository();
            this.jobLauncher = this.createJobLauncher();
            this.jobExplorer = this.createJobExplorer();
        } catch (Exception var2) {
            throw new IllegalStateException("Unable to initialize Spring Batch", var2);
        }
    }

    protected JobExplorer createJobExplorer() throws Exception {
        PropertyMapper map = PropertyMapper.get();
        JobExplorerFactoryBean factory = new JobExplorerFactoryBean();
        factory.setDataSource(this.dataSource);
        BatchProperties.Jdbc var10001 = this.properties.getJdbc();
        var10001.getClass();
        map.from(var10001::getTablePrefix).whenHasText().to(factory::setTablePrefix);
        factory.afterPropertiesSet();
        return factory.getObject();
    }

    protected JobLauncher createJobLauncher() throws Exception {
        //默認SimpleJobLauncher
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(this.getJobRepository());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

    protected JobRepository createJobRepository() throws Exception {
        //默認使用JobRepositoryFactoryBean
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        PropertyMapper map = PropertyMapper.get();
        map.from(this.dataSource).to(factory::setDataSource);
        map.from(this::determineIsolationLevel).whenNonNull().to(factory::setIsolationLevelForCreate);
        BatchProperties.Jdbc var10001 = this.properties.getJdbc();
        var10001.getClass();
        map.from(var10001::getTablePrefix).whenHasText().to(factory::setTablePrefix);
        map.from(this::getTransactionManager).to(factory::setTransactionManager);
        factory.afterPropertiesSet();
        return factory.getObject();
    }

    protected String determineIsolationLevel() {
        return null;
    }
    private PlatformTransactionManager buildTransactionManager() {
        PlatformTransactionManager transactionManager = this.createTransactionManager();
        if (this.transactionManagerCustomizers != null) {
            this.transactionManagerCustomizers.customize(transactionManager);
        }

        return transactionManager;
    }

    protected PlatformTransactionManager createTransactionManager() {
        return new DataSourceTransactionManager(this.dataSource);
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM