Spring Batch (7) ——異常處理及容錯機制


異常處理及重啟機制

1.對於chunk類型的Step,spring batch為我們提供了用於管理它的狀態

2.狀態的管理是通過ItemStream接口來實現的

3.ItemStream接口:

(1)open():每一次step執行會調用

(2)Update():每一個chunk去執行都會調用

(3)Close():所有的chunk執行完畢會調用

file

構造例子

准備個cvs文件,在第33條數據,添加一條錯誤名字信息 ;當讀取到這條數據時,拋出異常終止程序。

img

ItemReader測試代碼

@Component("restartDemoReader")
public class RestartDemoReader implements ItemStreamReader<Customer> {
 
    // 記錄當前讀取的行數
    private Long curLine = 0L;
    // 重啟狀態初始值
    private boolean restart = false;
 
    private FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
 
    // 持久化信息到數據庫
    private ExecutionContext executionContext;
    RestartDemoReader
    public () {
        
        reader.setResource(new ClassPathResource("restartDemo.csv"));
 
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"id", "firstName", "lastName", "birthdate"});
 
        DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper((fieldSet -> {
            return Customer.builder().id(fieldSet.readLong("id"))
                    .firstName(fieldSet.readString("firstName"))
                    .lastName(fieldSet.readString("lastName"))
                    .birthdate(fieldSet.readString("birthdate"))
                    .build();
        }));
        lineMapper.afterPropertiesSet();
 
        reader.setLineMapper(lineMapper);
    }
 
    @Override
    public Customer read() throws Exception, UnexpectedInputException, ParseException,
            NonTransientResourceException {
 
        Customer customer = null;
 
        this.curLine++;
        //如果是重啟,則從上一步讀取的行數繼續往下執行
        if (restart) {
            reader.setLinesToSkip(this.curLine.intValue()-1);
            restart = false;
            System.out.println("Start reading from line: " + this.curLine);
        }
 
        reader.open(this.executionContext);
 
        customer = reader.read();
        //當匹配到wrongName時,顯示拋出異常,終止程序
        if (customer != null) {
            if (customer.getFirstName().equals("wrongName"))
                throw new RuntimeException("Something wrong. Customer id: " + customer.getId());
        } else {
            curLine--;
        }
        return customer;
    }
 
    /**
     * 判斷是否是重啟job
     * @param executionContext
     * @throws ItemStreamException
     */
    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        this.executionContext = executionContext;
				// 如果是重啟job,從數據庫讀取重啟的行數,從重啟行數開始重新執行
        if (executionContext.containsKey("curLine")) {
            this.curLine = executionContext.getLong("curLine");
            this.restart = true;
        }
				// 如果不是重啟job,初始化行數,從第一行開始執行
				else {
            this.curLine = 0L;
            executionContext.put("curLine", this.curLine.intValue());
        }
 
    }
 
    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
		    // 每執行完一個批次chunk,打印當前行數
        System.out.println("update curLine: " + this.curLine);
        executionContext.put("curLine", this.curLine);
 
    }
 
    @Override
    public void close() throws ItemStreamException {
 
    }
}

Job配置

以10條記錄為一個批次,進行讀取

@Configuration
public class RestartDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
 
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
 
    @Autowired
    @Qualifier("flatFileDemoWriter")
    private ItemWriter<? super Customer> flatFileDemoWriter;
 
    @Autowired
    @Qualifier("restartDemoReader")
    private ItemReader<Customer> restartDemoReader;
 
    @Bean
    public Job restartDemoJob(){
        return jobBuilderFactory.get("restartDemoJob")
                .start(restartDemoStep())
                .build();
 
    }
 
    @Bean
    public Step restartDemoStep() {
        return stepBuilderFactory.get("restartDemoStep")
                .<Customer,Customer>chunk(10)
                .reader(restartDemoReader)
                .writer(flatFileDemoWriter)
                .build();
    }
}

當第一次執行時,程序在33行拋出異常異常,curline值是30;

img

這時,可以查詢數據庫 batch_step_excution表,發現curline值已經以 鍵值對形式,持久化進數據庫(上文以10條數據為一個批次;故33條數據異常時,curline值為30)

![img]https://cdnimg.copyfuture.com/imagesLocal/202003/10/20200310003237548qhevzzsoefofcuj_3.jpg)

接下來,更新wrongName,再次執行程序;

程序會執行open方法,判斷數據庫step中map是否存在curline,如果存在,則是重跑,即讀取curline,從該批次開始往下繼續執行;

img

img

容錯機制

Spring batch的容錯機制是一種與事務機制相結合的機制,它主要包括有3種操作:

  • restart
  • restart是針對job來使用,是重啟job的一個操作。默認情況下,當任務出現異常時,SpringBatch會結束任務,當使用相同參數重啟任務時,SpringBatch會去執行未執行的剩余任務
  • retry
  • retry是對job的某一step而言,處理一條數據item的時候發現有異常,則重試一次該數據item的step的操作。
  • skip
  • skip是對job的某一個step而言,處理一條數據item的時候發現有異常,則跳過該數據item的step的操作。

restart示例代碼如下,當第一次執行的時候,上下文中沒有該字段,拋出異常,第二次執行,已存在該字段,執行成功

img

retry、skip示例如下,更改一下之前step的配置,參考代碼如下:

@Bean
public Step stepForTranscation(StepBuilderFactory stepBuilderFactory, @Qualifier("stepForTranscationReader")ListItemReader<String> reader,
        @Qualifier("stepForTranscationProcessor")ItemProcessor<String, String> processor, @Qualifier("stepForTranscationWriter")ItemWriter<String> writer) {
   return stepBuilderFactory.get("stepForTranscation")
           .<String, String> chunk(3)
           .reader(reader)
           .processor(processor)
           .writer(writer).
           faultTolerant().
           retryLimit(3).
           retry(DataIntegrityViolationException.class).
           skipLimit(1).
           skip(DataIntegrityViolationException.class).
           startLimit(3)
           .build();
}

這里設置了允許重試的次數為3次,允許跳過的數據最多為1條,如果job失敗了,運行重跑次數最多為3次。

在skip后面配置跳過錯誤的監聽器SkipListener

public class Mr implements SkipListener<String, String>{ // 發生讀操作跳過錯誤時,需要執行的監聽 public void onSkipInRead(Throwable t){ }


    // 發生寫操作跳過錯誤時,需要執行的監聽
    public void onSkipInWrite(String item, Throwable t){
    }

    // 處理數據時跳過錯誤時,需要執行的監聽
    public void onSkipInProcess (String item, Throwable t){
        System.out.println(item + "occur exception" + t);
    }

}

重新運行程序,可以得到新的結果:

file

這次可以看到,12條數據中總共有11條數據進入到數據庫,而過長的008008008008數據,則因為設置了skip,所以容錯機制允許它不進入數據庫,這次的Spring batch最終沒有因為回滾而中斷。

查閱一下Spring batch的持久化數據表:

file

可以看出,的確是有一條數據被跳過了,但因為是我們允許它跳過的,所以整個job順利完成,即COMPLETED。

參考:https://blog.csdn.net/chihe9907/article/details/100601523


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM