異常處理及重啟機制
1.對於chunk類型的Step,spring batch為我們提供了用於管理它的狀態
2.狀態的管理是通過ItemStream接口來實現的
3.ItemStream接口:
(1)open():每一次step執行會調用
(2)Update():每一個chunk去執行都會調用
(3)Close():所有的chunk執行完畢會調用
構造例子
准備個cvs文件,在第33條數據,添加一條錯誤名字信息 ;當讀取到這條數據時,拋出異常終止程序。
ItemReader測試代碼
@Component("restartDemoReader")
public class RestartDemoReader implements ItemStreamReader<Customer> {
// 記錄當前讀取的行數
private Long curLine = 0L;
// 重啟狀態初始值
private boolean restart = false;
private FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
// 持久化信息到數據庫
private ExecutionContext executionContext;
RestartDemoReader
public () {
reader.setResource(new ClassPathResource("restartDemo.csv"));
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[]{"id", "firstName", "lastName", "birthdate"});
DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper((fieldSet -> {
return Customer.builder().id(fieldSet.readLong("id"))
.firstName(fieldSet.readString("firstName"))
.lastName(fieldSet.readString("lastName"))
.birthdate(fieldSet.readString("birthdate"))
.build();
}));
lineMapper.afterPropertiesSet();
reader.setLineMapper(lineMapper);
}
@Override
public Customer read() throws Exception, UnexpectedInputException, ParseException,
NonTransientResourceException {
Customer customer = null;
this.curLine++;
//如果是重啟,則從上一步讀取的行數繼續往下執行
if (restart) {
reader.setLinesToSkip(this.curLine.intValue()-1);
restart = false;
System.out.println("Start reading from line: " + this.curLine);
}
reader.open(this.executionContext);
customer = reader.read();
//當匹配到wrongName時,顯示拋出異常,終止程序
if (customer != null) {
if (customer.getFirstName().equals("wrongName"))
throw new RuntimeException("Something wrong. Customer id: " + customer.getId());
} else {
curLine--;
}
return customer;
}
/**
* 判斷是否是重啟job
* @param executionContext
* @throws ItemStreamException
*/
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
this.executionContext = executionContext;
// 如果是重啟job,從數據庫讀取重啟的行數,從重啟行數開始重新執行
if (executionContext.containsKey("curLine")) {
this.curLine = executionContext.getLong("curLine");
this.restart = true;
}
// 如果不是重啟job,初始化行數,從第一行開始執行
else {
this.curLine = 0L;
executionContext.put("curLine", this.curLine.intValue());
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
// 每執行完一個批次chunk,打印當前行數
System.out.println("update curLine: " + this.curLine);
executionContext.put("curLine", this.curLine);
}
@Override
public void close() throws ItemStreamException {
}
}
Job配置
以10條記錄為一個批次,進行讀取
@Configuration
public class RestartDemoJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("flatFileDemoWriter")
private ItemWriter<? super Customer> flatFileDemoWriter;
@Autowired
@Qualifier("restartDemoReader")
private ItemReader<Customer> restartDemoReader;
@Bean
public Job restartDemoJob(){
return jobBuilderFactory.get("restartDemoJob")
.start(restartDemoStep())
.build();
}
@Bean
public Step restartDemoStep() {
return stepBuilderFactory.get("restartDemoStep")
.<Customer,Customer>chunk(10)
.reader(restartDemoReader)
.writer(flatFileDemoWriter)
.build();
}
}
當第一次執行時,程序在33行拋出異常異常,curline值是30;
這時,可以查詢數據庫 batch_step_excution表,發現curline值已經以 鍵值對形式,持久化進數據庫(上文以10條數據為一個批次;故33條數據異常時,curline值為30)
![img]https://cdnimg.copyfuture.com/imagesLocal/202003/10/20200310003237548qhevzzsoefofcuj_3.jpg)
接下來,更新wrongName,再次執行程序;
程序會執行open方法,判斷數據庫step中map是否存在curline,如果存在,則是重跑,即讀取curline,從該批次開始往下繼續執行;
容錯機制
Spring batch的容錯機制是一種與事務機制相結合的機制,它主要包括有3種操作:
- restart
- restart是針對job來使用,是重啟job的一個操作。默認情況下,當任務出現異常時,SpringBatch會結束任務,當使用相同參數重啟任務時,SpringBatch會去執行未執行的剩余任務
- retry
- retry是對job的某一step而言,處理一條數據item的時候發現有異常,則重試一次該數據item的step的操作。
- skip
- skip是對job的某一個step而言,處理一條數據item的時候發現有異常,則跳過該數據item的step的操作。
restart示例代碼如下,當第一次執行的時候,上下文中沒有該字段,拋出異常,第二次執行,已存在該字段,執行成功
retry、skip示例如下,更改一下之前step的配置,參考代碼如下:
@Bean
public Step stepForTranscation(StepBuilderFactory stepBuilderFactory, @Qualifier("stepForTranscationReader")ListItemReader<String> reader,
@Qualifier("stepForTranscationProcessor")ItemProcessor<String, String> processor, @Qualifier("stepForTranscationWriter")ItemWriter<String> writer) {
return stepBuilderFactory.get("stepForTranscation")
.<String, String> chunk(3)
.reader(reader)
.processor(processor)
.writer(writer).
faultTolerant().
retryLimit(3).
retry(DataIntegrityViolationException.class).
skipLimit(1).
skip(DataIntegrityViolationException.class).
startLimit(3)
.build();
}
這里設置了允許重試的次數為3次,允許跳過的數據最多為1條,如果job失敗了,運行重跑次數最多為3次。
在skip后面配置跳過錯誤的監聽器SkipListener
public class Mr implements SkipListener<String, String>{ // 發生讀操作跳過錯誤時,需要執行的監聽 public void onSkipInRead(Throwable t){ }
// 發生寫操作跳過錯誤時,需要執行的監聽
public void onSkipInWrite(String item, Throwable t){
}
// 處理數據時跳過錯誤時,需要執行的監聽
public void onSkipInProcess (String item, Throwable t){
System.out.println(item + "occur exception" + t);
}
}
重新運行程序,可以得到新的結果:
這次可以看到,12條數據中總共有11條數據進入到數據庫,而過長的008008008008數據,則因為設置了skip,所以容錯機制允許它不進入數據庫,這次的Spring batch最終沒有因為回滾而中斷。
查閱一下Spring batch的持久化數據表:
可以看出,的確是有一條數據被跳過了,但因為是我們允許它跳過的,所以整個job順利完成,即COMPLETED。
參考:https://blog.csdn.net/chihe9907/article/details/100601523