批處理任務的主要業務邏輯都是在Step
中去完成的。可以將Job
理解為運行Step
的框架,而Step
理解為業務功能。
Step配置
Step
是Job
中的工作單元,每一個Step
涵蓋了單行記錄的處理閉環。下圖是一個Step
的簡要結構:
一個Step
通常涵蓋三個部分:讀數據(Reader)、處理數據(Processor)和寫數據(Writer)。但是並不是所有的Step
都需要自身來完成數據的處理,比如存儲過程等方式是通過外部功能來完成,因此Spring Batch提供了2種Step的處理方式:1)面向分片的ChunkStep
,2)面向過程的TaskletStep
。但是基本上大部分情況下都是使用面向分片的方式來解決問題。
面向分片的處理過程
在Step
中數據是按記錄(按行)處理的,但是每條記錄處理完畢之后馬上提交事物反而會導致IO的巨大壓力。因此Spring Batch提供了數據處理的分片功能。設置了分片之后,一次工作會從Read開始,然后交由給Processor處理。處理完畢后會進行聚合,待聚合到一定的數量的數據之后一次性調用Write將數據提交到物理數據庫。其過程大致為:
在Spring Batch中所謂的事物和數據事物的概念一樣,就是一次性提交多少數據。如果在聚合數據期間出現任何錯誤,所有的這些數據都將不執行寫入。
面向對象配置Step
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
return this.jobBuilderFactory.get("sampleJob")
.repository(jobRepository)
.start(sampleStep)
.build();
}
@Bean
public Step sampleStep(PlatformTransactionManager transactionManager) {
return this.stepBuilderFactory.get("sampleStep")
.transactionManager(transactionManager)
.<String, String>chunk(10) //分片配置
.reader(itemReader()) //reader配置
.writer(itemWriter()) //write配置
.build();
}
觀察sampleStep方法:
- reader: 使用ItemReader提供讀數據的方法。
- write:ItemWrite提供寫數據的方法。
- transactionManager:使用默認的
PlatformTransactionManager
對事物進行管理。當配置好事物之后Spring Batch會自動對事物進行管理,無需開發人員顯示操作。 - chunk:指定一次性數據提交的記錄數,因為任務是基於Step分次處理的,當累計到chunk配置的次數則進行一次提交。提交的內容除了業務數據,還有批處理任務運行相關的元數據。
是否使用ItemProcessor
是一個可選項。如果沒有Processor可以將數據視為讀取並直接寫入。
提交間隔
Step
使用PlatformTransactionManager
管理事物。每次事物提交的間隔根據chunk
方法中配置的數據執行。如果設置為1,那么在每一條數據處理完之后都會調用ItemWrite
進行提交。提交間隔設置太小,那么會浪費需要多不必要的資源,提交間隔設置的太長,會導致事物鏈太長占用空間,並且出現失敗會導致大量數據回滾。因此設定一個合理的間隔是非常必要的,這需要根據實際業務情況、性能要求、以及數據安全程度來設定。如果沒有明確的評估目標,設置為10~20較為合適。
配置Step重啟
前文介紹了Job
的重啟,但是每次重啟對Step
也是有很大的影響的,因此需要特定的配置。
限定重啟次數
某些Step
可能用於處理一些先決的任務,所以當Job再次重啟時這Step
就沒必要再執行,可以通過設置startLimit來限定某個Step
重啟的次數。當設置為1時候表示僅僅運行一次,而出現重啟時將不再執行:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.startLimit(1)
.build();
}
重啟已經完成任務的Step
在單個JobInstance
的上下文中,如果某個Step
已經處理完畢(COMPLETED)那么在默認情況下重啟之后這個Step
並不會再執行。可以通過設置allow-start-if-complete
為true告知框架每次重啟該Step
都要執行:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.allowStartIfComplete(true)
.build();
}
配置略過邏輯
某些時候在任務處理單個記錄時中出現失敗並不應該停止任務,而應該跳過繼續處理下一條數據。是否跳過需要根據業務來判定,因此框架提供了跳過機制交給開發人員使用。如何配置跳過機制:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(FlatFileParseException.class)
.build();
}
代碼的含義是當處理過程中拋出FlatFileParseException
異常時就跳過該條記錄的處理。skip-limit
(skipLimit方法)配置的參數表示當跳過的次數超過數值時則會導致整個Step
失敗,從而停止繼續運行。還可以通過反向配置的方式來忽略某些異常:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(Exception.class)
.noSkip(FileNotFoundException.class)
.build();
}
skip
表示要當捕捉到Exception異常就跳過。但是Exception有很多繼承類,此時可以使用noSkip
方法指定某些異常不能跳過。
設置重試邏輯
當處理記錄出個異常之后並不希望他立即跳過或者停止運行,而是希望可以多次嘗試執行直到失敗:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
retry(DeadlockLoserDataAccessException.class)
表示只有捕捉到該異常才會重試,retryLimit(3)
表示最多重試3次,faultTolerant()
表示啟用對應的容錯功能。
事物回滾控制
默認情況下,無論是設置了重試(retry)還是跳過(skip),只要從Writer
拋出一個異常都會導致事物回滾。如果配置了skip機制,那么在Reader
中拋出的異常不會導致回滾。有些從Writer
拋出一個異常並不需要回滾數據,noRollback
屬性為Step
提供了不必進行事物回滾的異常配置:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.noRollback(ValidationException.class) //不必回滾的異常
.build();
}
事物數據讀取的緩存
一次Setp
分為Reader
、Processor
和Writer
三個階段,這些階段統稱為Item
。默認情況下如果錯誤不是發生在Reader階段,那么沒必要再去重新讀取一次數據。但是某些場景下需要Reader部分也需要重新執行,比如Reader是從一個JMS隊列中消費消息,當發生回滾的時候消息也會在隊列上重放,因此也要將Reader納入到回滾的事物中,根據這個場景可以使用readerIsTransactionalQueue
來配置數據重讀:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.readerIsTransactionalQueue() //數據重讀
.build();
}
事物屬性
事物的屬性包括隔離等級(isolation)、傳播方式(propagation)以及過期時間(timeout)。關於事物的控制詳見Spring Data Access的說明,下面是相關配置的方法:
@Bean
public Step step1() {
//配置事物屬性
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.REQUIRED.value());
attribute.setIsolationLevel(Isolation.DEFAULT.value());
attribute.setTimeout(30);
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.transactionAttribute(attribute) //設置事物屬性
.build();
}
向Step注冊 ItemStream
ItemStream
是用於每一個階段(Reader、Processor、Writer)的“生命周期回調數據處理器”,后續的文章會詳細介紹ItemStream
。在4.×版本之后默認注入注冊了通用的ItemStream
。
有2種方式將ItemStream
注冊到Step
中,一是使用stream
方法:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(compositeItemWriter())
.stream(fileItemWriter1())
.stream(fileItemWriter2())
.build();
}
二是使用相關方法的代理:
@Bean
public CompositeItemWriter compositeItemWriter() {
List<ItemWriter> writers = new ArrayList<>(2);
writers.add(fileItemWriter1());
writers.add(fileItemWriter2());
CompositeItemWriter itemWriter = new CompositeItemWriter();
itemWriter.setDelegates(writers);
return itemWriter;
}
StepExecution攔截器
在Step
執行的過程中會產生各種各樣的事件,開發人員可以利用各種Listener
接口對Step
及Item
進行監聽。通常在創建一個Step的時候添加攔截器:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(reader())
.writer(writer())
.listener(chunkListener()) //添加攔截器
.build();
}
Spring Batch提供了多個接口以滿足不同事件的監聽。
StepExecutionListener
StepExecutionListener
可以看做一個通用的Step
攔截器,他的作用是在Step開始之前和結束之后進行攔截處理:
public interface StepExecutionListener extends StepListener {
void beforeStep(StepExecution stepExecution); //Step執行之前
ExitStatus afterStep(StepExecution stepExecution); //Step執行完畢之后
}
在結束的時候開發人員可以自己定義返回的ExitStatus
,用於配合流程控制(見后文)實現對整個Step執行過程的控制。
ChunkListener
ChunkListener
是在數據事物發生的兩端被觸發。chunk
的配置決定了處理多少項記錄才進行一次事物提交,ChunkListener
的作用就是對一次事物開始之后或事物提交之后進行攔截:
public interface ChunkListener extends StepListener {
void beforeChunk(ChunkContext context); //事物開始之后,ItemReader調用之前
void afterChunk(ChunkContext context); //事物提交之后
void afterChunkError(ChunkContext context); //事物回滾之后
}
如果沒有設定chunk也可以使用ChunkListener
,它會被TaskletStep
調用(TaskletStep
見后文)。
ItemReadListener
該接口用於對Reader
相關的事件進行監控:
public interface ItemReadListener<T> extends StepListener {
void beforeRead();
void afterRead(T item);
void onReadError(Exception ex);
}
beforeRead
在每次Reader
調用之前被調用,afterRead
在每次Reader
成功返回之后被調用,而onReadError
會在出現異常之后被調用,可以將其用於記錄異常日志。
ItemProcessListener
ItemProcessListener
和ItemReadListener
類似,是圍繞着ItemProcessor
進行處理的:
public interface ItemProcessListener<T, S> extends StepListener {
void beforeProcess(T item); //processor執行之前
void afterProcess(T item, S result); //processor直線成功之后
void onProcessError(T item, Exception e); //processor執行出現異常
}
ItemWriteListener
ItemWriteListener
的功能和ItemReadListener
、ItemReadListener
類似,但是需要注意的是它接收和處理的數據對象是一個List
。List
的長度與chunk配置相關。
public interface ItemWriteListener<S> extends StepListener {
void beforeWrite(List<? extends S> items);
void afterWrite(List<? extends S> items);
void onWriteError(Exception exception, List<? extends S> items);
}
SkipListener
ItemReadListener
、ItemProcessListener
和ItemWriteListener
都提供了錯誤攔截處理的機制,但是沒有處理跳過(skip)的數據記錄。因此框架提供了SkipListener
來專門處理那么被跳過的記錄:
public interface SkipListener<T,S> extends StepListener {
void onSkipInRead(Throwable t); //Read期間導致跳過的異常
void onSkipInProcess(T item, Throwable t); //Process期間導致跳過的異常
void onSkipInWrite(S item, Throwable t); //Write期間導致跳過的異常
}
SkipListener
的價值是可以將那些未能成功處理的記錄在某個位置保存下來,然后交給其他批處理進一步解決,或者人工來處理。Spring Batch保證以下2個特征:
- 跳過的元素只會出現一次。
SkipListener
始終在事物提交之前被調用,這樣可以保證監聽器使用的事物資源不會被業務事物影響。
TaskletStep
面向分片(Chunk-oriented processing )的過程並不是Step的唯一執行方式。比如用數據庫的存儲過程來處理數據,這個時候使用標准的Reader、Processor、Writer會很奇怪,針對這些情況框架提供了TaskletStep
。
TaskletStep
是一個非常簡單的接口,僅有一個方法——execute
。TaskletStep
會反復的調用這個方法直到獲取一個RepeatStatus.FINISHED
返回或者拋出一個異常。所有的Tasklet
調用都會包裝在一個事物中。
注冊一個TaskletStep
非常簡單,只要添加一個實現了Tasklet
接口的類即可:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.tasklet(myTasklet()) //注入Tasklet的實現
.build();
}
TaskletStep
還支持適配器處理等,詳見官網說明。
TaskletAdapter
As with other adapters for the ItemReader
and ItemWriter
interfaces, the Tasklet
interface contains an implementation that allows for adapting itself to any pre-existing class: TaskletAdapter
. An example where this may be useful is an existing DAO that is used to update a flag on a set of records. The TaskletAdapter
can be used to call this class without having to write an adapter for the Tasklet
interface.
The following example shows how to define a TaskletAdapter
in Java:
Java Configuration
@Bean
public MethodInvokingTaskletAdapter myTasklet() {
MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();
adapter.setTargetObject(fooDao());
adapter.setTargetMethod("updateFoo");
return adapter;
}
Example Tasklet
Implementation
Many batch jobs contain steps that must be done before the main processing begins in order to set up various resources or after processing has completed to cleanup those resources. In the case of a job that works heavily with files, it is often necessary to delete certain files locally after they have been uploaded successfully to another location. The following example (taken from the Spring Batch samples project) is a Tasklet
implementation with just such a responsibility:
public class FileDeletingTasklet implements Tasklet, InitializingBean {
private Resource directory;
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
File dir = directory.getFile();
Assert.state(dir.isDirectory());
File[] files = dir.listFiles();
for (int i = 0; i < files.length; i++) {
boolean deleted = files[i].delete();
if (!deleted) {
throw new UnexpectedJobExecutionException("Could not delete file " +
files[i].getPath());
}
}
return RepeatStatus.FINISHED;
}
public void setDirectoryResource(Resource directory) {
this.directory = directory;
}
public void afterPropertiesSet() throws Exception {
Assert.notNull(directory, "directory must be set");
}
}
The preceding tasklet
implementation deletes all files within a given directory. It should be noted that the execute
method is called only once. All that is left is to reference the tasklet
from the step
.
The following example shows how to reference the tasklet
from the step
in Java:
Java Configuration
@Bean
public Job taskletJob() {
return this.jobBuilderFactory.get("taskletJob")
.start(deleteFilesInDir())
.build();
}
@Bean
public Step deleteFilesInDir() {
return this.stepBuilderFactory.get("deleteFilesInDir")
.tasklet(fileDeletingTasklet())
.build();
}
@Bean
public FileDeletingTasklet fileDeletingTasklet() {
FileDeletingTasklet tasklet = new FileDeletingTasklet();
tasklet.setDirectoryResource(new FileSystemResource("target/test-outputs/test-dir"));
return tasklet;
}
控制Step執行流程
順序執行
默認情況下。Step與Step之間是順序執行的,如下圖:
順序執行通過next
方法來標記:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(stepA())
.next(stepB()) //順序執行
.next(stepC())
.build();
}
條件執行
在順序執行的過程中,在整個執行鏈條中有一個Step
執行失敗則整個Job
就會停止。但是通過條件執行,可以指定各種情況下的執行分支:
為了實現更加復雜的控制,可以通過Step
執行后的退出命名來定義條件分之。先看一個簡單的代碼:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(stepA()) //啟動時執行的step
.on("*").to(stepB()) //默認跳轉到stepB
.from(stepA()).on("FAILED").to(stepC()) //當返回的ExitStatus為"FAILED"時,執行。
.end()
.build();
}
這里使用來表示默認處理,是一個通配符表示處理任意字符串,對應的還可以使用?表示匹配任意字符。在Spring Batch(1)——數據批處理概念一文中介紹了Step的退出都會有ExitStatus
,命名都來源於它。下面是一個更加全面的代碼。
- 配置攔截器處理ExitCode:
public class SkipCheckingListener extends StepExecutionListenerSupport {
public ExitStatus afterStep(StepExecution stepExecution) {
String exitCode = stepExecution.getExitStatus().getExitCode();
if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
stepExecution.getSkipCount() > 0) { //當Skip的Item大於0時,則指定ExitStatus的內容
return new ExitStatus("COMPLETED WITH SKIPS");
}
else {
return null;
}
}
}
攔截器指示當有一個以上被跳過的記錄時,返回的ExitStatus
為"COMPLETED WITH SKIPS"。對應的控制流程:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1()).on("FAILED").end() //執行失敗直接退出
.from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1()) //有跳過元素執行 errorPrint1()
.from(step1()).on("*").to(step2()) //默認(成功)情況下執行 Step2
.end()
.build();
}
Step的停機退出機制
Spring Batch為Job
提供了三種退出機制,這些機制為批處理的執行提供了豐富的控制方法。在介紹退出機制之前需要回顧一下 數據批處理概念一文中關於StepExecution
的內容。在StepExecution
中有2個表示狀態的值,一個名為status
,另外一個名為exitStatus
。前者也被稱為BatchStatus
。
前面以及介紹了ExitStatus
的使用,他可以控制Step執行鏈條的條件執行過程。除此之外BatchStatus
也會參與到過程的控制。
End退出
默認情況下(沒有使用end
、fail
方法結束),Job
要順序執行直到退出,這個退出稱為end
。這個時候,BatchStatus
=COMPLETED
、ExitStatus
=COMPLETED
,表示成功執行。
除了Step
鏈式處理自然退出,也可以顯示調用end
來退出系統。看下面的例子:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1()) //啟動
.next(step2()) //順序執行
.on("FAILED").end()
.from(step2()).on("*").to(step3()) //條件執行
.end()
.build();
}
上面的代碼,step1
到step2
是順序執行,當step2
的exitStatus
返回"FAILED"時則直接End退出。其他情況執行Step3
。
Fail退出
除了end
還可以使用fail
退出,這個時候,BatchStatus
=FAILED
、ExitStatus
=EARLY TERMINATION
,表示執行失敗。這個狀態與End
最大的區別是Job
會嘗試重啟執行新的JobExecution
。看下面代碼的例子:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1()) //執行step1
.next(step2()).on("FAILED").fail() //step2的ExitStatus=FAILED 執行fail
.from(step2()).on("*").to(step3()) //否則執行step3
.end()
.build();
}
在指定的節點中斷
Spring Batch還支持在指定的節點退出,退出后下次重啟會從中斷的點繼續執行。中斷的作用是某些批處理到某個步驟后需要人工干預,當干預完之后又接着處理:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
//如果step1的ExitStatus=COMPLETED則在step2中斷
.start(step1()).on("COMPLETED").stopAndRestart(step2())
//否則直接退出批處理
.end()
.build();
}
程序化流程的分支
可以直接進行編碼來控制Step
之間的扭轉,Spring Batch提供了JobExecutionDecider
接口來協助分支管理:
public class MyDecider implements JobExecutionDecider {
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
String status;
if (someCondition()) {
status = "FAILED";
}
else {
status = "COMPLETED";
}
return new FlowExecutionStatus(status);
}
}
接着將MyDecider
作為過濾器添加到配置過程中:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1())
.next(decider()).on("FAILED").to(step2())
.from(decider()).on("COMPLETED").to(step3())
.end()
.build();
}
流程分裂
在線性處理過程中,流程都是一個接着一個執行的。但是為了滿足某些特殊的需要,Spring Batch提供了執行的過程分裂並行Step
的方法。參看下面的Job
配置:
@Bean
public Job job() {
Flow flow1 = new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();//並行流程1
Flow flow2 = new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();//並行流程2
return this.jobBuilderFactory.get("job")
.start(flow1)
.split(new SimpleAsyncTaskExecutor()) //創建一個異步執行任務
.add(flow2)
.next(step4()) //2個分支執行完畢之后再執行step4。
.end()
.build();
}
這里表示flow1和flow2會並行執行,待2者執行成功后執行step4。
數據綁定
在Job
或Step
的任何位置,都可以獲取到統一配置的數據。比如使用標准的Spring Framework方式:
@Bean
public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
當我們通過配置文件(application.properties中 input.file.name=filepath
)或者jvm參數(-Dinput.file.name=filepath
)指定某些數據時,都可以通過這種方式獲取到對應的配置參數。
此外,也可以從JobParameters
從獲取到Job
運行的上下文參數:
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
無論是JobExecution
還是StepExecution
,其中的內容都可以通過這種方式去獲取參數,例如:
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
或者
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
Step Scope
注意看上面的代碼例子,都有一個@StepScope
注解。這是為了進行后期綁定進行的標識。因為在Spring的IoCs容器進行初始化的階段並沒有任何的*Execution
在執行,進而也不存在任何*ExecutionContext
,所以這個時候根本無法注入標記的數據。所以需要使用注解顯式的告訴容器直到Step
執行的階段才初始化這個@Bean
。
Job Scope
Job Scope的概念和 Step Scope類似,都是用於標識在到了某個執行時間段再添加和注入Bean。@JobScope
用於告知框架知道JobInstance
存在時候才初始化對應的@Bean
:
@JobScope
@Bean
// 初始化獲取 jobParameters中的參數
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@JobScope
@Bean
// 初始化獲取jobExecutionContext中的參數
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext中的參數['input.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}