Spring batch是用來處理大量數據操作的一個框架,主要用來讀取大量數據,然后進行一定處理后輸出成指定的形式。
Spring batch主要有以下部分組成:
- JobRepository 用來注冊job的容器
- JobLauncher 用來啟動Job的接口
- Job 實際執行的任務,包含一個或多個Step
- Step step包含ItemReader、ItemProcessor和ItemWriter
- ItemReader 用來讀取數據的接口
- ItemProcessor 用來處理數據的接口
- ItemWriter 用來輸出數據的接口
以上Spring Batch的主要組成部分只需要注冊成Spring的Bean即可。若想開啟批處理的支持還需在配置類上使用@EnableBatchProcessing,在Spring Batch中提供了大量的ItemReader和ItemWriter的實現,用來讀取不同的數據來源,數據的處理和校驗都要通過ItemProcessor接口實現來完成。
Spring Boot的支持
Spring Boot對Spring Batch支持的源碼位於org.springframework.boot.autoconfigure.batch下。
Spring Boot為我們自動初始化了Spring Batch存儲批處理記錄的數據庫。
spring batch會自動加載hsqldb驅動,根據需求選擇去留。
下面是一個spring boot支持spring batch 的例子:
1. 實體類
1 public class Person { 2 3 @Size(max=4,min=2) //使用JSR-303注解來校驗注解 4 private String name; 5 6 private int age; 7 8 private String nation; 9 10 private String address; 11 12 public String getName() { 13 return name; 14 } 15 16 public void setName(String name) { 17 this.name = name; 18 } 19 20 public int getAge() { 21 return age; 22 } 23 24 public void setAge(int age) { 25 this.age = age; 26 } 27 28 public String getNation() { 29 return nation; 30 } 31 32 public void setNation(String nation) { 33 this.nation = nation; 34 } 35 36 public String getAddress() { 37 return address; 38 } 39 40 public void setAddress(String address) { 41 this.address = address; 42 } 43 }
2. 校驗器
1 public class CsvBeanValidator<T> implements Validator<T>,InitializingBean { 2 private javax.validation.Validator validator; 3 @Override 4 public void afterPropertiesSet() throws Exception { //使用JSR-303的Validator來校驗我們的數據,在此處進行JSR-303的Validator的初始化 5 ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); 6 validator = validatorFactory.usingContext().getValidator(); 7 } 8 9 @Override 10 public void validate(T value) throws ValidationException { 11 Set<ConstraintViolation<T>> constraintViolations = validator.validate(value); //使用Validator的validate方法校驗數據 12 if(constraintViolations.size()>0){ 13 14 StringBuilder message = new StringBuilder(); 15 for (ConstraintViolation<T> constraintViolation : constraintViolations) { 16 message.append(constraintViolation.getMessage() + "\n"); 17 } 18 throw new ValidationException(message.toString()); 19 20 } 21 22 } 23 24 }
3. ItemProcessor
1 public class CsvItemProcessor extends ValidatingItemProcessor<Person>{ 2 3 @Override 4 public Person process(Person item) throws ValidationException { 5 super.process(item); //需要執行super.process(item)才會調用自定義校驗器 6 7 if(item.getNation().equals("漢族")){ //對數據做簡單的處理,若民族為漢族,則數據轉換成01,其余轉換成02 8 item.setNation("01"); 9 }else{ 10 item.setNation("02"); 11 } 12 return item; 13 } 14 15 16 }
4. Job監聽(監聽器要實現JobExecutionListener接口,並重寫其beforeJob、afterJob方法即可)
1 public class CsvJobListener implements JobExecutionListener{ 2 3 long startTime; 4 long endTime; 5 @Override 6 public void beforeJob(JobExecution jobExecution) { 7 startTime = System.currentTimeMillis(); 8 System.out.println("任務處理開始"); 9 } 10 11 @Override 12 public void afterJob(JobExecution jobExecution) { 13 endTime = System.currentTimeMillis(); 14 System.out.println("任務處理結束"); 15 System.out.println("耗時:" + (endTime - startTime) + "ms"); 16 } 17 18 }
5. 配置
1 @Configuration 2 @EnableBatchProcessing 3 public class CsvBatchConfig { 4 5 @Bean 6 public ItemReader<Person> reader() throws Exception { 7 FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>(); //使用FlatFileItemReader讀取文件 8 reader.setResource(new ClassPathResource("people.csv")); //使用FlatFileItemReader的setResource方法設置CSV文件的路徑 9 reader.setLineMapper(new DefaultLineMapper<Person>() {{ //在此處對CVS文件的數據和領域模型類做對應映射 10 setLineTokenizer(new DelimitedLineTokenizer() {{ 11 setNames(new String[] { "name","age", "nation" ,"address"}); 12 }}); 13 setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ 14 setTargetType(Person.class); 15 }}); 16 }}); 17 return reader; 18 } 19 20 @Bean 21 public ItemProcessor<Person, Person> processor() { 22 CsvItemProcessor processor = new CsvItemProcessor(); //使用自定義的ItemProcessor的實現 23 processor.setValidator(csvBeanValidator()); //為Processor指定校驗器 24 return processor; 25 } 26 27 28 29 @Bean 30 public ItemWriter<Person> writer(DataSource dataSource) {//Spring能讓容器中已有的Bean以參數的形式注入,Spring boot已經定義了DataSource 31 JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>(); //使用JDBC批處理的JdbcBatchItemWriter來寫數據到數據庫 32 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>()); 33 String sql = "insert into person " + "(id,name,age,nation,address) " 34 + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)"; 35 writer.setSql(sql); //在此設置要執行批處理的sql語句 36 writer.setDataSource(dataSource); 37 return writer; 38 } 39 40 @Bean 41 public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) 42 throws Exception { 43 JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); 44 jobRepositoryFactoryBean.setDataSource(dataSource); 45 jobRepositoryFactoryBean.setTransactionManager(transactionManager); 46 jobRepositoryFactoryBean.setDatabaseType("oracle"); 47 return jobRepositoryFactoryBean.getObject(); 48 } 49 50 @Bean 51 public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) 52 throws Exception { 53 SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); 54 jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); 55 return jobLauncher; 56 } 57 58 @Bean 59 public Job importJob(JobBuilderFactory jobs, Step s1) { 60 return jobs.get("importJob") 61 .incrementer(new RunIdIncrementer()) 62 .flow(s1) //指定step 63 .end() 64 .listener(csvJobListener()) //綁定監聽器 65 .build(); 66 } 67 68 @Bean 69 public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, 70 ItemProcessor<Person,Person> processor) { 71 return stepBuilderFactory 72 .get("step1") 73 .<Person, Person>chunk(65000) //批處理每次提交65000條數據 74 .reader(reader) //給step綁定reader 75 .processor(processor) //給step綁定Processor 76 .writer(writer) //給step綁定writer 77 .build(); 78 } 79 80 81 82 @Bean 83 public CsvJobListener csvJobListener() { 84 return new CsvJobListener(); 85 } 86 87 @Bean 88 public Validator<Person> csvBeanValidator() { 89 return new CsvBeanValidator<Person>(); 90 } 91 92 93 }
6.application.xml
1 spring.datasource.driverClassName=oracle.jdbc.OracleDriver 2 spring.datasource.url=jdbc\:oracle\:thin\:@localhost\:1521\:xe 3 spring.datasource.username=boot 4 spring.datasource.password=boot 5 6 spring.batch.job.enabled=true 7 8 logging.level.org.springframework.web = DEBUG
上面的例子是自動觸發批處理的,當我們需要手動觸發批處理時,需要將CsvBatchConfig類的@Configuration注解注釋掉,讓此配置類不再起效,新建TriggerBatchConfig配置類,內容與CsvBatchConfig完全一致,除了修改定義ItemReader這個Bean;另外,還需要修改application.xml配置文件spring.batch.job.enable=false
1 @Configuration 2 @EnableBatchProcessing 3 public class TriggerBatchConfig { 4 5 @Bean 6 @StepScope 7 public FlatFileItemReader<Person> reader(@Value("#{jobParameters['input.file.name']}") String pathToFile) throws Exception { 8 FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>(); // 9 reader.setResource(new ClassPathResource(pathToFile)); // 10 reader.setLineMapper(new DefaultLineMapper<Person>() {{ // 11 setLineTokenizer(new DelimitedLineTokenizer() {{ 12 setNames(new String[] { "name","age", "nation" ,"address"}); 13 }}); 14 setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ 15 setTargetType(Person.class); 16 }}); 17 }}); 18 19 return reader; 20 } 21 22 @Bean 23 public ItemProcessor<Person, Person> processor() { 24 CsvItemProcessor processor = new CsvItemProcessor(); 25 processor.setValidator(csvBeanValidator()); 26 return processor; 27 } 28 29 30 31 @Bean 32 public ItemWriter<Person> writer(DataSource dataSource) { 33 JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>(); 34 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>()); 35 String sql = "insert into person " + "(id,name,age,nation,address) " 36 + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)"; 37 writer.setSql(sql); //3 38 writer.setDataSource(dataSource); 39 return writer; 40 } 41 42 @Bean 43 public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) 44 throws Exception { 45 JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); 46 jobRepositoryFactoryBean.setDataSource(dataSource); 47 jobRepositoryFactoryBean.setTransactionManager(transactionManager); 48 jobRepositoryFactoryBean.setDatabaseType("oracle"); 49 return jobRepositoryFactoryBean.getObject(); 50 } 51 52 @Bean 53 public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) 54 throws Exception { 55 SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); 56 jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); 57 return jobLauncher; 58 } 59 60 @Bean 61 public Job importJob(JobBuilderFactory jobs, Step s1) { 62 return jobs.get("importJob") 63 .incrementer(new RunIdIncrementer()) 64 .flow(s1) 65 .end() 66 .listener(csvJobListener()) 67 .build(); 68 } 69 70 @Bean 71 public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, 72 ItemProcessor<Person,Person> processor) { 73 return stepBuilderFactory 74 .get("step1") 75 .<Person, Person>chunk(65000) 76 .reader(reader) 77 .processor(processor) 78 .writer(writer) 79 .build(); 80 } 81 82 83 84 @Bean 85 public CsvJobListener csvJobListener() { 86 return new CsvJobListener(); 87 } 88 89 @Bean 90 public Validator<Person> csvBeanValidator() { 91 return new CsvBeanValidator<Person>(); 92 } 93 94 95 }
控制層代碼
1 @RestController 2 public class DemoController { 3 4 @Autowired 5 JobLauncher jobLauncher; 6 7 @Autowired 8 Job importJob; 9 public JobParameters jobParameters; 10 11 @RequestMapping("/read") 12 public String imp(String fileName) throws Exception{ 13 14 String path = fileName+".csv"; 15 jobParameters = new JobParametersBuilder() 16 .addLong("time", System.currentTimeMillis()) 17 .addString("input.file.name", path) 18 .toJobParameters(); 19 jobLauncher.run(importJob,jobParameters); 20 return "ok"; 21 } 22 23 }