Spring Batch : 在不同steps間傳遞數據


參考文檔:

How can we share data between the different steps of a Job in Spring Batch?

Job Scoped Beans in Spring Batch

https://github.com/spring-projects/spring-batch/issues/1335

有三種方式:

  1. Use StepContext and promote it to JobContext and you have access to it from each step, you must as noted obey limit in size
  2. Create @JobScope bean and add data to that bean, @Autowire it where needed and use it (drawback is that it is in-memory structure and if job fails data is lost, migh cause problems with restartability)
  3. We had larger datasets needed to be processed across steps (read each line in csv and write to DB, read from DB, aggregate and send to API) so we decided to model data in new table in same DB as spring batch meta tables, keep ids in JobContext and access when needed and delete that temporary table when job finishes successfully.

使用 JobContext

通過step_execution 或者 job_execution來在不同step中傳遞數據。

但是如果數據量大的話,這將不是一種好的方式.因為spring batch默認會通過job repository將 step_executionjob_execution進行持久化.

Use ExecutionContextPromotionListener:

首先在第一個Step中放到StepExecution

public class YourItemWriter implements ItemWriter<Object> {
    private StepExecution stepExecution;
    public void write(List<? extends Object> items) throws Exception {
        // Some Business Logic

        // put your data into stepexecution context
        ExecutionContext stepContext = this.stepExecution.getExecutionContext();
        stepContext.put("someKey", someObject);
    }
    @BeforeStep
    public void saveStepExecution(Final StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

然后使用promotionListener在監聽器中將其升級到JobExecution中:

@Bean
public Step step1() {
        return stepBuilder
        .get("step1")<Company,Company>  chunk(10)
        .reader(reader()).processor(processor()).writer(writer())
        .listener(promotionListener()).build();
}

@Bean
public ExecutionContextPromotionListener promotionListener() {
    ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
    listener.setKeys(new String[] {"someKey"});
    listener.setStrict(true);
    return listener;
}

在第二個Step中使用ExecutionContext獲取:

public class RetrievingItemWriter implements ItemWriter<Object> {
    private Object someObject;
    public void write(List<? extends Object> items) throws Exception {
        // ...
    }
    @BeforeStep
    public void retrieveInterstepData(StepExecution stepExecution) {
        JobExecution jobExecution = stepExecution.getJobExecution();
        ExecutionContext jobContext = jobExecution.getExecutionContext();
        this.someObject = jobContext.get("someKey");
    }
}

如果你使用的是tasklets的話,可以這樣獲取:

List<YourObject> yourObjects = (List<YourObject>) chunkContent.getStepContext().getJobExecutionContext().get("someKey");

用自己定義的bean傳遞數據

創建一個data holder

@Component
@JobScope
public class PublicCompanyHolder {
 
 private List<PublicCompanyInfo> publicCompanyList;
 
 public List<PublicCompanyInfo> getPublicCompanyList() {
  return publicCompanyList;
 }
 
 public void setPublicCompanyList(List<PublicCompanyInfo> publicCompanyList) {
  this.publicCompanyList = publicCompanyList;
 }
  
}

在step 1中設置數據:

@Component("pubTasklet")
public class PubTasklet implements Tasklet {
 
 @Autowired
 private PublicCompanyHolder publicCompanyHolder;
 
 public RepeatStatus execute(StepContribution contribution,
   ChunkContext chunkContext) throws Exception {
 
  List<PublicCompanyInfo> infoContainer = new ArrayList<PublicCompanyInfo>();
 
  for (int i=0; i < 10; i++) {
   PublicCompanyInfo info = new PublicCompanyInfo();
   info.setPublicCompanyId("ID-" + i);
   info.setPublicCompanyName("Name*" + i);
   infoContainer.add(info);
  }
  publicCompanyHolder.setPublicCompanyList(infoContainer);
 
  return RepeatStatus.FINISHED;
 }
 
}

在step2中取數據:

@Component("pubTasklet2")
public class PubTasklet2 implements Tasklet {
 @Autowired
 private PublicCompanyHolder publicCompanyHolder;
 
 public RepeatStatus execute(StepContribution contribution,
   ChunkContext chunkContext) throws Exception {
  System.out.println("received holder:" + publicCompanyHolder.getPublicCompanyList());
  
  return RepeatStatus.FINISHED;
 }
 
}

job配置

  • define.xml
  <job id="pubJob" restartable="true">
  <step id="step1" next="step2">
   <tasklet ref="pubTasklet" />
  </step>
  <step id="step2" next="step1">  // if you do not want to loop, remove next
   <tasklet ref="pubTasklet2" />
  </step>
  <listeners>
   <listener ref="pubListener" />
  </listeners>
 </job>
  • Job Config
public Job build(Step step1, Step step2) {
    return jobBuilderFactory.get("jobName")
            .incrementer(new RunIdIncrementer())
            .start(step1)
            .next(step2)
            .build();
}
public Step step1() {
    return stepBuilderFactory.get("step1Name")
            .tasklet(new PubTasklet())
            .build();
}

public Step step2() {
    return stepBuilderFactory.get("step2Name")
            .tasklet(new PubTasklet2())
            .build();
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM