Spring batch的學習


  Spring batch是用來處理大量數據操作的一個框架,主要用來讀取大量數據,然后進行一定處理后輸出成指定的形式。

  Spring batch主要有以下部分組成:

  • JobRepository       用來注冊job的容器
  • JobLauncher             用來啟動Job的接口
  • Job                          實際執行的任務,包含一個或多個Step
  • Step                        step包含ItemReader、ItemProcessor和ItemWriter
  • ItemReader              用來讀取數據的接口
  • ItemProcessor          用來處理數據的接口
  • ItemWriter               用來輸出數據的接口

以上Spring Batch的主要組成部分只需要注冊成Spring的Bean即可。若想開啟批處理的支持還需在配置類上使用@EnableBatchProcessing,在Spring Batch中提供了大量的ItemReader和ItemWriter的實現,用來讀取不同的數據來源,數據的處理和校驗都要通過ItemProcessor接口實現來完成。

 

Spring Boot的支持

  Spring Boot對Spring Batch支持的源碼位於org.springframework.boot.autoconfigure.batch下。

  Spring Boot為我們自動初始化了Spring Batch存儲批處理記錄的數據庫。

  spring batch會自動加載hsqldb驅動,根據需求選擇去留。

下面是一個spring boot支持spring batch 的例子:

  1. 實體類

 1 public class Person {
 2     
 3     @Size(max=4,min=2) //使用JSR-303注解來校驗注解
 4     private String name;
 5     
 6     private int age;
 7     
 8     private String nation;
 9     
10     private String address;
11 
12     public String getName() {
13         return name;
14     }
15 
16     public void setName(String name) {
17         this.name = name;
18     }
19 
20     public int getAge() {
21         return age;
22     }
23 
24     public void setAge(int age) {
25         this.age = age;
26     }
27 
28     public String getNation() {
29         return nation;
30     }
31 
32     public void setNation(String nation) {
33         this.nation = nation;
34     }
35 
36     public String getAddress() {
37         return address;
38     }
39 
40     public void setAddress(String address) {
41         this.address = address;
42     }
43 }

  2. 校驗器

 1 public class CsvBeanValidator<T> implements Validator<T>,InitializingBean {
 2     private javax.validation.Validator validator; 
 3     @Override
 4     public void afterPropertiesSet() throws Exception { //使用JSR-303的Validator來校驗我們的數據,在此處進行JSR-303的Validator的初始化
 5         ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory();
 6         validator = validatorFactory.usingContext().getValidator();
 7     }
 8 
 9     @Override
10     public void validate(T value) throws ValidationException {
11         Set<ConstraintViolation<T>> constraintViolations = validator.validate(value); //使用Validator的validate方法校驗數據
12         if(constraintViolations.size()>0){
13             
14             StringBuilder message = new StringBuilder();
15             for (ConstraintViolation<T> constraintViolation : constraintViolations) {
16                 message.append(constraintViolation.getMessage() + "\n");
17             }
18             throw new ValidationException(message.toString());
19 
20         }
21 
22     }
23 
24 }

  3. ItemProcessor  

 1 public class CsvItemProcessor  extends ValidatingItemProcessor<Person>{
 2 
 3     @Override
 4     public Person process(Person item) throws ValidationException {
 5         super.process(item); //需要執行super.process(item)才會調用自定義校驗器
 6 
 7         if(item.getNation().equals("漢族")){ //對數據做簡單的處理,若民族為漢族,則數據轉換成01,其余轉換成02
 8             item.setNation("01");
 9         }else{
10             item.setNation("02");
11         }
12         return item;
13     }
14 
15 
16 }

  4. Job監聽(監聽器要實現JobExecutionListener接口,並重寫其beforeJob、afterJob方法即可)

 1 public class CsvJobListener implements JobExecutionListener{ 
 2 
 3     long startTime;
 4     long endTime;
 5     @Override
 6     public void beforeJob(JobExecution jobExecution) {
 7         startTime = System.currentTimeMillis();
 8         System.out.println("任務處理開始");
 9     }
10 
11     @Override
12     public void afterJob(JobExecution jobExecution) {
13         endTime = System.currentTimeMillis();
14         System.out.println("任務處理結束");
15         System.out.println("耗時:" + (endTime - startTime) + "ms");
16     }
17 
18 }

  5. 配置

 1 @Configuration    
 2 @EnableBatchProcessing
 3 public class CsvBatchConfig {
 4 
 5     @Bean
 6     public ItemReader<Person> reader() throws Exception {
 7         FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>(); //使用FlatFileItemReader讀取文件
 8         reader.setResource(new ClassPathResource("people.csv")); //使用FlatFileItemReader的setResource方法設置CSV文件的路徑
 9             reader.setLineMapper(new DefaultLineMapper<Person>() {{ //在此處對CVS文件的數據和領域模型類做對應映射
10                 setLineTokenizer(new DelimitedLineTokenizer() {{
11                     setNames(new String[] { "name","age", "nation" ,"address"});
12                 }});
13                 setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
14                     setTargetType(Person.class);
15                 }});
16             }});
17             return reader;
18     }
19     
20     @Bean
21     public ItemProcessor<Person, Person> processor() {
22         CsvItemProcessor processor = new CsvItemProcessor(); //使用自定義的ItemProcessor的實現
23         processor.setValidator(csvBeanValidator()); //為Processor指定校驗器
24         return processor;
25     }
26     
27     
28 
29     @Bean
30     public ItemWriter<Person> writer(DataSource dataSource) {//Spring能讓容器中已有的Bean以參數的形式注入,Spring boot已經定義了DataSource
31         JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>(); //使用JDBC批處理的JdbcBatchItemWriter來寫數據到數據庫
32         writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
33         String sql = "insert into person " + "(id,name,age,nation,address) "
34                 + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)";
35         writer.setSql(sql); //在此設置要執行批處理的sql語句
36         writer.setDataSource(dataSource);
37         return writer;
38     }
39 
40     @Bean
41     public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager)
42             throws Exception {
43         JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
44         jobRepositoryFactoryBean.setDataSource(dataSource);
45         jobRepositoryFactoryBean.setTransactionManager(transactionManager);
46         jobRepositoryFactoryBean.setDatabaseType("oracle");
47         return jobRepositoryFactoryBean.getObject();
48     }
49 
50     @Bean
51     public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager)
52             throws Exception {
53         SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
54         jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
55         return jobLauncher;
56     }
57 
58     @Bean
59     public Job importJob(JobBuilderFactory jobs, Step s1) {
60         return jobs.get("importJob")
61                 .incrementer(new RunIdIncrementer())
62                 .flow(s1) //指定step
63                 .end()
64                 .listener(csvJobListener()) //綁定監聽器
65                 .build();
66     }
67 
68     @Bean
69     public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer,
70             ItemProcessor<Person,Person> processor) {
71         return stepBuilderFactory
72                 .get("step1")
73                 .<Person, Person>chunk(65000) //批處理每次提交65000條數據
74                 .reader(reader) //給step綁定reader
75                 .processor(processor) //給step綁定Processor
76                 .writer(writer) //給step綁定writer
77                 .build();
78     }
79 
80 
81 
82     @Bean
83     public CsvJobListener csvJobListener() {
84         return new CsvJobListener();
85     }
86 
87     @Bean
88     public Validator<Person> csvBeanValidator() {
89         return new CsvBeanValidator<Person>();
90     }
91     
92 
93 }

  6.application.xml

1 spring.datasource.driverClassName=oracle.jdbc.OracleDriver
2 spring.datasource.url=jdbc\:oracle\:thin\:@localhost\:1521\:xe
3 spring.datasource.username=boot
4 spring.datasource.password=boot
5 
6 spring.batch.job.enabled=true
7 
8 logging.level.org.springframework.web = DEBUG

 

上面的例子是自動觸發批處理的,當我們需要手動觸發批處理時,需要將CsvBatchConfig類的@Configuration注解注釋掉,讓此配置類不再起效,新建TriggerBatchConfig配置類,內容與CsvBatchConfig完全一致,除了修改定義ItemReader這個Bean;另外,還需要修改application.xml配置文件spring.batch.job.enable=false

 1 @Configuration
 2 @EnableBatchProcessing
 3 public class TriggerBatchConfig {
 4 
 5     @Bean
 6     @StepScope
 7     public FlatFileItemReader<Person> reader(@Value("#{jobParameters['input.file.name']}") String pathToFile) throws Exception {
 8         FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>(); //
 9          reader.setResource(new ClassPathResource(pathToFile)); //
10             reader.setLineMapper(new DefaultLineMapper<Person>() {{ //
11                 setLineTokenizer(new DelimitedLineTokenizer() {{
12                     setNames(new String[] { "name","age", "nation" ,"address"});
13                 }});
14                 setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
15                     setTargetType(Person.class);
16                 }});
17             }});
18            
19             return reader;
20     }
21     
22     @Bean
23     public ItemProcessor<Person, Person> processor() {
24         CsvItemProcessor processor = new CsvItemProcessor(); 
25         processor.setValidator(csvBeanValidator()); 
26         return processor;
27     }
28     
29     
30 
31     @Bean
32     public ItemWriter<Person> writer(DataSource dataSource) {
33         JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>(); 
34         writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
35         String sql = "insert into person " + "(id,name,age,nation,address) "
36                 + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)";
37         writer.setSql(sql); //3
38         writer.setDataSource(dataSource);
39         return writer;
40     }
41 
42     @Bean
43     public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager)
44             throws Exception {
45         JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
46         jobRepositoryFactoryBean.setDataSource(dataSource);
47         jobRepositoryFactoryBean.setTransactionManager(transactionManager);
48         jobRepositoryFactoryBean.setDatabaseType("oracle");
49         return jobRepositoryFactoryBean.getObject();
50     }
51 
52     @Bean
53     public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager)
54             throws Exception {
55         SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
56         jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
57         return jobLauncher;
58     }
59 
60     @Bean
61     public Job importJob(JobBuilderFactory jobs, Step s1) {
62         return jobs.get("importJob")
63                 .incrementer(new RunIdIncrementer())
64                 .flow(s1) 
65                 .end()
66                 .listener(csvJobListener()) 
67                 .build();
68     }
69 
70     @Bean
71     public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer,
72             ItemProcessor<Person,Person> processor) {
73         return stepBuilderFactory
74                 .get("step1")
75                 .<Person, Person>chunk(65000) 
76                 .reader(reader) 
77                 .processor(processor) 
78                 .writer(writer) 
79                 .build();
80     }
81 
82 
83 
84     @Bean
85     public CsvJobListener csvJobListener() {
86         return new CsvJobListener();
87     }
88 
89     @Bean
90     public Validator<Person> csvBeanValidator() {
91         return new CsvBeanValidator<Person>();
92     }
93     
94 
95 }

  控制層代碼

 1 @RestController
 2 public class DemoController {
 3     
 4         @Autowired
 5         JobLauncher jobLauncher;
 6 
 7         @Autowired
 8         Job importJob;
 9         public JobParameters   jobParameters;
10         
11         @RequestMapping("/read")
12         public String imp(String fileName) throws Exception{
13             
14             String path = fileName+".csv";
15             jobParameters = new JobParametersBuilder()
16                     .addLong("time", System.currentTimeMillis())
17                     .addString("input.file.name", path)
18                     .toJobParameters();
19             jobLauncher.run(importJob,jobParameters);
20             return "ok";
21         }
22 
23 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM