springbatch的封裝與使用


springbatch

主要實現批量數據的處理,我對batch進行的封裝,提出了jobBase類型,具體job需要實現它即可。Spring Batch 不僅提供了統一的讀寫接口、豐富的任務處理方式、靈活的事務管理及並發處理,同時還支持日志、監控、任務重啟與跳過等特性,大大簡化了批處理應用開發,將開發人員從復雜的任務配置管理過程中解放出來,使他們可以更多地去關注核心的業務處理過程。

幾個組件

  • job
  • step
  • read
  • write
  • listener
  • process
  • validator

JobBase定義了幾個公用的方法

 /**
  * springBatch的job基礎類.
  */
 public abstract class JobBase<T> {
 
   /**
    * 批次.
    */
   protected int chunkCount = 5000;
   /**
    * 監聽器.
    */
   private JobExecutionListener jobExecutionListener;
   /**
    * 處理器.
    */
   private ValidatingItemProcessor<T> validatingItemProcessor;
   /**
    * job名稱.
    */
   private String jobName;
   /**
    * 檢驗器.
    */
   private Validator<T> validator;
   @Autowired
   private JobBuilderFactory job;
   @Autowired
   private StepBuilderFactory step;
 
 
   /**
    * 初始化.
    *
    * @param jobName                 job名稱
    * @param jobExecutionListener    監聽器
    * @param validatingItemProcessor 處理器
    * @param validator               檢驗
    */
   public JobBase(String jobName,
                  JobExecutionListener jobExecutionListener,
                  ValidatingItemProcessor<T> validatingItemProcessor,
                  Validator<T> validator) {
     this.jobName = jobName;
     this.jobExecutionListener = jobExecutionListener;
     this.validatingItemProcessor = validatingItemProcessor;
     this.validator = validator;
   }
 
   /**
    * job初始化與啟動.
    */
   public Job getJob() throws Exception {
     return job.get(jobName).incrementer(new RunIdIncrementer())
         .start(syncStep())
         .listener(jobExecutionListener)
         .build();
   }
 
   /**
    * 執行步驟.
    *
    * @return
    */
   public Step syncStep() throws Exception {
     return step.get("step1")
         .<T, T>chunk(chunkCount)
         .reader(reader())
         .processor(processor())
         .writer(writer())
         .build();
   }
 
   /**
    * 單條處理數據.
    *
    * @return
    */
   public ItemProcessor<T, T> processor() {
     validatingItemProcessor.setValidator(processorValidator());
     return validatingItemProcessor;
   }
 
   /**
    * 校驗數據.
    *
    * @return
    */
   @Bean
   public Validator<T> processorValidator() {
     return validator;
   }
 
   /**
    * 批量讀數據.
    *
    * @return
    * @throws Exception
    */
   public abstract ItemReader<T> reader() throws Exception;
 
   /**
    * 批量寫數據.
    *
    * @return
    */
   @Bean
   public abstract ItemWriter<T> writer();
 
 }

主要規定了公用方法的執行策略,而具體的job名稱,讀,寫還是需要具體JOB去實現的。

具體Job實現

 @Configuration
 @EnableBatchProcessing
 public class SyncPersonJob extends JobBase<Person> {
   @Autowired
   private DataSource dataSource;
   @Autowired
   @Qualifier("primaryJdbcTemplate")
   private JdbcTemplate jdbcTemplate;
 
   /**
    * 初始化,規則了job名稱和監視器.
    */
   public SyncPersonJob() {
     super("personJob", new PersonJobListener(), new PersonItemProcessor(), new BeanValidator<>());
   }
 
   @Override
   public ItemReader<Person> reader() throws Exception {
     StringBuffer sb = new StringBuffer();
     sb.append("select * from person");
     String sql = sb.toString();
     JdbcCursorItemReader<Person> jdbcCursorItemReader =
         new JdbcCursorItemReader<>();
     jdbcCursorItemReader.setSql(sql);
     jdbcCursorItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
     jdbcCursorItemReader.setDataSource(dataSource);
 
     return jdbcCursorItemReader;
   }
 
 
   @Override
   @Bean("personJobWriter")
   public ItemWriter<Person> writer() {
     JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
     writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
     String sql = "insert into person_export " + "(id,name,age,nation,address) "
         + "values(:id, :name, :age, :nation,:address)";
     writer.setSql(sql);
     writer.setDataSource(dataSource);
     return writer;
   }
 
 }

寫操作需要定義自己的bean的聲明

注意,需要為每個job的write啟個名稱,否則在多job時,write將會被打亂

  /**
   * 批量寫數據.
   *
   * @return
   */
  @Override
  @Bean("personVerson2JobWriter")
  public ItemWriter<Person> writer() {
   
  }

添加一個api,手動觸發

 @Autowired
  SyncPersonJob syncPersonJob;

  @Autowired
  JobLauncher jobLauncher;

  void exec(Job job) throws Exception {
    JobParameters jobParameters = new JobParametersBuilder()
        .addLong("time", System.currentTimeMillis())
        .toJobParameters();
    jobLauncher.run(job, jobParameters);
  }

  @RequestMapping("/run1")
  public String run1() throws Exception {
    exec(syncPersonJob.getJob());
    return "personJob success";
  }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM