簡單的Spring Batch示例


使用Spring Batch做為批處理框架,可以完成常規的數據量不是特別大的離線計算。

現在寫一個簡單的入門版示例。

這里默認大家已經掌握了Spring Batch的基本知識,示例只是為了快速上手實踐

目標1:程序隨機生成字符串,經過Spring Batch后,統一在字符串后加入“----PROCESSED”,並輸出

目標2:程序讀取txt文件,經過Spring Batch后,統一加入如上字段,並輸出

Spring Batch的流程

  1. 讀取數據----itemReader
  2. 處理數據----itemProcess
  3. 數據寫入----itemWrite

分析目標可知,兩個目標的輸入數據源不同,處理方式基本一致,數據完成后的寫入規則一致

由此可以分段完成代碼

itemReader

目標一

這里沒有使用Spring Batch自帶的集中reader,所以自定義了隨機生成字符串的reader

這里代碼並不完善,reader會無線循環生成隨機字符串,但不影響本次學習的目的

 
public class MyItemReader implements ItemReader<String> {
    @Override
    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        return RandomStringUtils.randomAlphabetic(10);
    }
}

目標二

由於是讀取文件中的內容,所以不用自定義reader實現,可直接使用FlatFileItemReader,在Batch的config中配置即可

    @Bean
    public ItemReader<String> textReader(){
 
        FlatFileItemReader<String> reader=new FlatFileItemReader<>();
        File file = new File("D:\\FTP\\ttest.txt");
        reader.setResource(new FileSystemResource(file));
        reader.setLineMapper(new LineMapper<String>() {
            @Override
            public String mapLine(String line, int lineNumber) throws Exception {
                return line;
            }
        });
        return reader;
 
    }

itemProcess

這里采用同一種處理方式即可

public class MyItemProcessor implements ItemProcessor<String,String> {
 
    @Override
    public String process(String s) throws Exception {
        return s+"---------PROCESSED";
    }
}

itemWriter

也采用同一種即可

public class MyItemWriter implements ItemWriter<String> {
    @Override
    public void write(List<? extends String> items) throws Exception {
        for (String item : items) {
            System.out.println(item);
        }
    }
}

配置完成Batch Config

@Configuration
@EnableBatchProcessing
public class BatchConfiguration extends DefaultBatchConfigurer {
 
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    @Autowired
    public JobBuilderFactory jobBuilderFactory;
 
    @Bean
    public MyItemProcessor processor(){
        return new MyItemProcessor();
    }
 
    @Bean
    public ItemWriter<String> writer(){
        return new MyItemWriter();
    }
 
    @Bean
    public ItemReader<String> textReader(){
        FlatFileItemReader<String> reader=new FlatFileItemReader<>();
        File file = new File("D:\\FTP\\ttest.txt");
        reader.setResource(new FileSystemResource(file));
        reader.setLineMapper(new LineMapper<String>() {
            @Override
            public String mapLine(String line, int lineNumber) throws Exception {
                return line;
            }
        });
        return reader;
    }
 
    @Bean
    public ItemReader<String> stringReader(){
        return new MyItemReader();
    }
 
    @Override
    public void setDataSource(DataSource dataSource) {
        super.setDataSource(dataSource);
    }
 
    @Bean
    public Step myStep(){
        return stepBuilderFactory
                .get("step1")
                //這個chunk size是最后調用寫入的時候,一次性寫入多少條已處理的數據
                .<String,String>chunk(10)
//                .reader(textReader())
                .reader(stringReader())
                .processor(processor())
                .writer(writer())
                .build();
 
    }
 
    @Bean
    public Job MyJob(){
        return jobBuilderFactory
                .get("MyJOB")
                .listener(new JobExecutionListenerSupport(){
                    //所有處理結束后調用
                    @Override
                    public void afterJob(JobExecution jobExecution) {
                        if(jobExecution.getStatus() == BatchStatus.COMPLETED){
                            System.out.println("OK");
                        }
                    }
                })
                .flow(myStep())
                .end()
                .build();
    }
}

結束

最后直接運行spring boot程序即可


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM