spring batch(二):核心部分(1):配置Spring batch


chapter 3、Batch configuration

1、spring batch 的命名空間

spring xml中指定batch的前綴作為命名空間。

示例:

 

Xml代碼   收藏代碼
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  4.     xmlns:batch="http://www.springframework.org/schema/batch"  
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans  
  6.     http://www.springframework.org/schema/beans/spring-beans.xsd  
  7.     http://www.springframework.org/schema/batch  
  8.     http://www.springframework.org/schema/batch/spring-batch.xsd">  
  9.       
  10.     <batch:job id="importProductsJob">  
  11.     (...)  
  12.     </batch:job>  
  13. </beans>  

 

 

當指定batch命名空間之后,bean中的聲明都需要加上batch的前綴,例如:<batch:job id="importProductsJob">

spring的命名空間的前綴可以指定任意的名稱,這里采用batch作為前綴,為了方便理解。

 

2、spring batch XML的主要標簽有:job、step、tasklet、chunk、job-repository

3、Job配置。job元素是整個配置的頂級元素,它的屬性有:

a、id

b、restartable

c、incrementer

d、abstract

e、parent

f、job-repository

 

restartable 屬性如果是false,則程序不允許重啟。如果發生重啟,會拋出JobRestartException異常。

 

Java代碼   收藏代碼
  1. <batch:job id="importProductsJob" restartable="false">  
  2.     (...)  
  3. </batch:job>  

 job除了這些屬性外,還可以配置驗證器<batch:validator ref="parameterValidator" />,用來校驗工作參數(job parameters),可以實現JobParametersValidator接口。

如果無法通過驗證,會拋出JobParametersInvalidException異常。spring batch提供了一個默認的實現類DefaultJobParametersValidator,完成絕大部分的工作。如果還是無法滿足需求,可以自己編碼實現接口。

實例:

 

Java代碼   收藏代碼
  1. <batch:job id="importProductsJob">  
  2.     (...)  
  3.     <batch:validator ref="validator"/>  
  4. </batch:job>  
  5. <bean id="validator" class="org.springframework.batch.core.job.DefaultJobParametersValidator">  
  6.     <property name="requiredKeys">  
  7.         <set>  
  8.             <value>date</value>  
  9.         </set>  
  10.     </property>  
  11.     <property name="optionalKeys">  
  12.         <set>  
  13.             <value>productId</value>  
  14.         </set>  
  15.     </property>  
  16. </bean>  

 

4、step步驟的配置。

step的屬性:

a、next

b、parent

c、abstract

示例:

 

Java代碼   收藏代碼
  1. <job id="importProductsJob">  
  2.     <step id="decompress" next="readWrite">  
  3.         (...)  
  4.     </step>  
  5.     <step id="readWrite">  
  6.         (...)  
  7.     </step>  
  8. </job>  

 

5、tasklet和chunk的配置。

tasklet和chunk是用來指定處理過程的。

一個tasklet對應於一個事務性的、潛在可重復的過程中發生的一個步驟。

你可以自己實現Tasklet 接口,定義自己的tasklet。這個特性很有用,比如:用於解壓文件,執行系統命令,執行存儲過程等待。

你也可以使用系統tasklet的屬性有:

a、ref指定應用的bean

b、transaction-manager事物管理器

c、start-limittasklet重試retry的次數

d、allow-start-if-complete如果tasklet成功完成之后,是否可以進行重試retry操作。

示例:

 

Java代碼   收藏代碼
  1. <batch:job id="importProductsJob">  
  2.     (...)  
  3.     <batch:step id="readWriteStep">  
  4.         <batch:tasklet  
  5.             transaction-manager="transactionManager"  
  6.             start-limit="3"  
  7.             allow-start-if-complete="true">  
  8.             (...)  
  9.         </batch:tasklet>  
  10.     </batch:step>  
  11. </batch:job>  
  12. <bean id="transactionManager" class="(...)">  
  13.     (...)  
  14. </bean>  

 

chunk ,ChunkOrientedTasklet 類實現類“塊處理(chunk processing)”。

配置tasklet很簡單,但是配置“塊處理”就會復雜一點,因為它涉及的內容更多。

chunk的屬性有:

a、reader

b、processor

c、writer

d、commit-interval事物提交一次處理的items的數量。也是chunk的大小。

e、skip-limit跳躍的次數

f、skip-policy跳躍的策略:要實現SkipPolicy接口

g、retry-policy重試的策略:要實現RetryPolicy接口

h、retry-limit最大的重試次數

i、cache-capacity重試的緩存策略

j、reader-transactional-queue從一個擁有事務的JMS的queue讀取item數據

k、processor-transactional處理器是否包含事務處理

l、chunk-completion-policychunk的完成策略

示例:

 

Java代碼   收藏代碼
  1. <batch:job id="importProductsJob">  
  2.     (...)  
  3.     <batch:step id="readWrite">  
  4.         <batch:tasklet>  
  5.             <batch:chunk  
  6.                 reader="productItemReader"  
  7.                 processor="productItemProcessor"  
  8.                 writer="productItemWriter"  
  9.                 commit-interval="100"   
  10.                 skip-limit="20"  
  11.                 retry-limit="3"  
  12.                 cache-capacity="100"  
  13.                 chunk-completion-policy="timeoutCompletionPolicy"/>  
  14.         </batch:tasklet>  
  15.     </batch:step>  
  16. </batch:job>  
  17. <bean id="productItemReader" class="(...)">  
  18. (...)  
  19. </bean>  
  20. <bean id="productItemProcessor" class="(...)">  
  21. (...)  
  22. </bean>  
  23. <bean id="productItemWriter" class="(...)">  
  24. (...)  
  25. </bean>  
  26. <bean id="timeoutCompletionPolicy"  
  27.     class="org.springframework.batch.repeat.policy.TimeoutTerminationPolicy">  
  28.         <constructor-arg value="60"/>  
  29. </bean>  

 

chunk還有幾個子標簽,包括:reader、processor、writer、skip-policy、retry-policy

示例:

 

Java代碼   收藏代碼
  1. <batch:job id="importProductsJob">  
  2.     (...)  
  3.     <batch:step id="readWrite">  
  4.         <batch:tasklet>  
  5.             <batch:chunk commit-interval="100">  
  6.                 <batch:reader>  
  7.                     <bean class="(...)">  
  8.                         (...)  
  9.                     </bean>  
  10.                 </batch:reader>  
  11.                 <batch:processor>  
  12.                     <bean class="(...)">  
  13.                         (...)  
  14.                     </bean>  
  15.                 </batch:processor>  
  16.                 <batch:writer>  
  17.                     <bean class="(...)">  
  18.                         (...)  
  19.                     </bean>  
  20.                 </batch:writer>  
  21.             </batch:chunk>  
  22.         </batch:tasklet>  
  23.     </batch:step>  
  24. </batch:job>  

 

chunk的一些其他額外的子標簽:retry-listeners、skippable-exception-classes、retryable-exception-classes、streams

示例:

 

Xml代碼   收藏代碼
  1. <batch:job id="importProductsJob">  
  2.     (...)  
  3.     <batch:step id="readWrite">  
  4.         <batch:tasklet>  
  5.             <batch:chunk commit-interval="100"  
  6.                 skip-limit="10">  
  7.                 <skippable-exception-classes>  
  8.                     <include class="org.springframework.batch.item.file.FlatFileParseException"/>  
  9.                     <exclude class="java.io.FileNotFoundException"/>  
  10.                 </skippable-exception-classes>  
  11.             </batch:chunk>  
  12.         </batch:tasklet>  
  13.     </batch:step>  
  14. </batch:job>  

 

 

在chunk里面配置streams

示例:

 

Xml代碼   收藏代碼
  1. <batch:job id="importProductsJob">  
  2.     (...)  
  3.     <batch:step id="readWrite">  
  4.             <batch:tasklet>  
  5.                 <batch:chunk reader="productItemReader" writer="compositeWriter"/>  
  6.                 <streams>  
  7.                     <stream ref="productItemWriter1"/>  
  8.                     <stream ref="productItemWriter2"/>  
  9.                 </streams>  
  10.             </batch:tasklet>  
  11.     </batch:step>  
  12. </batch:job>  
  13.   
  14. <bean id="compositeWriter"  
  15.     class="org.springframework.batch.item.support.CompositeItemWriter">  
  16.     <property name="delegates">  
  17.         <list>  
  18.             <ref bean="productItemWriter1"/>  
  19.             <ref bean="productItemWriter2"/>  
  20.         </list>  
  21.     </property>  
  22. </bean>  

 

使用一個復合的stream形式,如上例的itemWriter,內部是writer1和writer2是stream的,需要在chunk里面的streams元素里面注冊。

 

6、事務配置。

    事務是spring batch的一個重要主題。主要作用於batch處理程序的健壯性,chunk處理的合並來完成它的工作。因為事務調用的對象類型不同,你可以在不同的層次配置事務。

示例:

Xml代碼   收藏代碼
  1. <batch:job id="importProductsJob">  
  2.     (...)  
  3.     <batch:step id="readWrite">  
  4.         <batch:tasklet transaction-manager="transactionManager" (...)>  
  5.             (...)  
  6.         </batch:tasklet>  
  7.     </batch:step>  
  8. </batch:job>  

 事務,有幾個定義的屬性:行為,隔離 isolation,超時 timeout。

spring batch提供transaction-attributes標簽來描述這些事務的屬性。

 示例:

Xml代碼   收藏代碼
  1. <batch:tasklet>  
  2.     <batch:chunk reader="productItemReader"  
  3.         writer="productItemReader"  
  4.         commit-interval="100"/>  
  5.         <batch:transaction-attributes isolation="DEFAULT"  
  6.         propagation="REQUIRED"  
  7.         timeout="30"/>  
  8.     </batch:chunk>  
  9. </batch:tasklet>  

 isolation 定義數據庫的隔離級別以及對外部事務的可見性。

spring的事務處理是基於java的異常體系的。java里面的異常分為兩類:檢查型異常(extends Exception)和非檢查型異常(extends RuntimeException)

spring對檢查型異常進行commit提交處理,對非檢查型異常進行rollback回滾處理。

spring batch運行你對不需要觸發rollback回滾動作的異常進行定義。你可以在tasklet的no-rollback-exception-class元素中進行指定。

 示例:

Xml代碼   收藏代碼
  1. <batch:tasklet>  
  2.     (...)  
  3.     <batch:no-rollback-exception-classes>  
  4.         <batch:include  
  5.             class="org.springframework.batch.item.validator.ValidationException"/>  
  6.     </batch:no-rollback-exception-classes>  
  7. </batch:tasklet>  

 對於JMS,spring batch提供chunk的reader-transactional-queue 屬性,提供事務處理。

 

7、配置job倉庫

job倉庫(job repository)是spring batch底層基礎的一個關鍵特征,它為spring batch的運行提供信息。

job倉庫必須實現JobRepository接口,spring batch只提供了一個具體的實現類:SimpleJobRepository。

spring batch為DAO提供了兩種實現:

    a、內存無持久化

    b、jdbc元數據的持久化

第一種“內存無持久化”的方式可以用來spring batch的測試和開發,但是不能應用於生產環境。因為內存中的東西會丟失。

設定job倉庫的參數

    (1)、配置內存模式的job倉庫:Configuring an in-memory job repository

示例:

Xml代碼   收藏代碼
  1. <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">  
  2.     <property name="transactionManager-ref" ref="transactionManager"/>  
  3. </bean>  
  4.   
  5. <bean id="transactionManager"    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>  
  6.   
  7. <batch:job id="importInvoicesJob"    job-repository="jobRepository">  
  8.     (...)  
  9. </batch:job>  

 由於job倉庫是內存模式的,所以事務管理器采用ResourcelessTransactionManager。 這個類是NOOP (NO OPeration)無操作的PlatformTransactionManager接口實現。

     (2)、持久化的job倉庫:Configuring a persistent job repository

關系型數據庫的job倉庫的屬性如下:data-source,transaction-manager,isolation-level-for-create,max-varchar-length,table-prefix,lob-handler

示例:

Xml代碼   收藏代碼
  1. <bean id="dataSource"  
  2.     class="org.apache.commons.dbcp.BasicDataSource"  
  3.     destroy-method="close">  
  4.     <property name="driverClassName" value="${batch.jdbc.driver}" />  
  5.     <property name="url" value="${batch.jdbc.url}" />  
  6.     <property name="username" value="${batch.jdbc.user}" />  
  7.     <property name="password" value="${batch.jdbc.password}" />  
  8. </bean>  
  9.   
  10. <bean id="transactionManager" lazy-init="true"  
  11.     class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
  12.     <property name="dataSource" ref="dataSource" />  
  13. </bean>  
  14.   
  15. <batch:job-repository id="jobRepository"  
  16.     data-source="dataSource"  
  17.     transaction-manager="transactionManager"  
  18.     isolation-level-for-create="SERIALIZABLE"  
  19.     table-prefix="BATCH_"  
  20. />  
  21.   
  22. <batch:job id="importInvoicesJob" job-repository="jobRepository">  
  23.     (...)  
  24. </batch:job>  

 [問題:如果在不同的物理節點上面運行同樣的job會發生什么呢?]

前提:使用同一份spring batch的元數據,即采用同一份數據庫上面的表結構。

當創建job實例和執行信息的元數據的時候,job倉庫扮演了一個集中維護的工作庫的角色。當job並發運行的時候,spring batch 的job倉庫,可以防止創建相同的工作情況。這個功能依賴於底層數據庫的事務能力,完成job的同步任務。我們設置job倉庫的屬性isolation-level-for-create="SERIALIZABLE",來避免發生job的並發問題。由於有這種防護措施,你可以把你的spring batch的job分布到各個不同的物理節點,保證你的job實例不會被創建兩次。

     job倉庫是spring batch底層結構中的重要部分,它記錄了batch處理的信息,並且跟蹤job運行的成功與失敗。

 

 8、spring batch配置的高級主題。

(1)、使用step作用域。當使用SpEL的時候,step作用域很有用,來實現屬性的晚綁定。

(2)、Spring表達式語言:Spring Expression Language (SpEL) 

Spring3.× 開始提供。

 step的作用的實例范圍包括:jobParameters、jobExecutionContext、stepExecutionContext

 示例:

 

Xml代碼   收藏代碼
  1. <bean id="decompressTasklet"  
  2.             class="com.manning.sbia.ch01.batch.DecompressTasklet"  
  3.             scope="step">  
  4.     <property name="inputResource"  
  5.     value="#{jobParameters['inputResource']}" />  
  6.     <property name="targetDirectory"  
  7.     value="#{jobParameters['targetDirectory']}" />  
  8.     <property name="targetFile"  
  9.     value="#{jobParameters['targetFile']}" />  
  10. </bean>  

 SpEL由 #{ 和 } 組成

 

 在jobExecutionContext 和 stepExecutionContext也可以應用SpEL表達式。

 (3)、使用Linteners提供的更多的處理。

Spring batch在job和step級別上面提供listener。

Spring batch提供的listener的類型:

a、Job listener:在job級別監聽處理過程

b、Step listeners:在step級別監聽處理過程

c、Item listeners:監聽item的retry重試和repeat重做動作

一、Job listener 可以監聽job的運行,在job運行前和后添加動作。可以利用 listener標簽,在job標簽下面作為子元素進行添加。

示例1:

 

Xml代碼   收藏代碼
  1. <batch:job id="importProductsJob">  
  2.     <batch:listeners>  
  3.     <batch:listener ref="importProductsJobListener"/>  
  4.     </batch:listeners>  
  5. </batch:job>  

 importProductsJobListener不管job運行成功還是失敗,它都會在job運行的開始和結束的時候,接收job的通知,進行監聽。

 

 

還可以通過普通的POJO的java對象來做監聽器,只需要進行簡單的配置即可。

示例2:

 

Xml代碼   收藏代碼
  1. <batch:listeners>  
  2.     <batch:listener ref="importProductsJobListener" after-job-method="afterJob" before-job-method="beforeJob"/>  
  3. </batch:listeners>  

 可以在listener里面的ref指定引用的POJO的bean,通過after-job-method="afterJob" before-job-method="beforeJob" 來指定job之前和之后的執行方法。不過,被指定的這兩個方法的參數都需要是:JobExecution jobExecution。 這個2個方法的返回值都是void

 

還有一種方法是利用“注釋”來配置listener,spring batch會自己發現並運行該類。

 

二、Step listener

    Step有一系列的listener來跟蹤step的處理過程。這里所有的listener接口都繼承了StepListener接口。

Spring batch提供的Step的listener有:

a、ChunkListener:在chunk執行的之前和之后調用。

b、ItemProcessListener:在 ItemProcessor得到一個item之前和之后調用,在ItemProcessor拋出一個異常的時候調用。

c、ItemReadListener:在讀取item之前和讀取item之后調用,或者在讀取item的過程中觸發異常的時候調用。

d、ItemWriteListener:在一個item輸出之前和之后調用,或者在item輸出的過程中調用。

e、SkipListener:當讀取、處理和輸出的過程中產生了skip跳躍一個item的時候調用。

f、StepExecutionListener:在step運行之前和之后調用。

 接口代碼:

Java代碼   收藏代碼
  1. public interface StepExecutionListener extends StepListener {  
  2.     void beforeStep(StepExecution stepExecution);  
  3.     ExitStatus afterStep(StepExecution stepExecution);  
  4. }  
  5.   
  6. public interface ChunkListener extends StepListener {  
  7.     void beforeChunk();  
  8.     void afterChunk();  
  9. }  
  10.   
  11.   
  12. public interface ItemProcessListener<T, S> extends StepListener {  
  13.     void beforeProcess(T item);  
  14.     void afterProcess(T item, S result);  
  15.     void onProcessError(T item, Exception e);  
  16. }  
  17.   
  18. public interface ItemReadListener<T> extends StepListener {  
  19.     void beforeRead();  
  20.     void afterRead(T item);  
  21.     void onReadError(Exception ex);  
  22. }  
  23.   
  24. public interface ItemWriteListener<S> extends StepListener {  
  25.     void beforeWrite(List<? extends S> items);  
  26.     void afterWrite(List<? extends S> items);  
  27.     void onWriteError(Exception exception, List<? extends S> items);  
  28. }  
  29.   
  30. public interface SkipListener<T,S> extends StepListener {  
  31.     void onSkipInRead(Throwable t);  
  32.     void onSkipInProcess(T item, Throwable t);  
  33.     void onSkipInWrite(S item, Throwable t);  
  34. }  

 Step listener 作為tasklet標簽的一個子標簽進行配置。

 上面這些所有的Step listener都可以作為tasklet標簽的子標簽以相同的方式和等級進行配置。

示例:

Xml代碼   收藏代碼
  1. <bean id="importProductsJobListener"  
  2.     class="test.case01.java.batch.listener.job.ImportProductsJobListener" />  
  3.       
  4. <bean id="productStepExecutionListener"  
  5.     class="test.case01.java.batch.listener.step.ProductStepExecutionListener" />  
  6.   
  7. <bean id="productChunkListener"  
  8.     class="test.case01.java.batch.listener.step.chunk.ProductChunkListener" />  
  9.       
  10. <bean id="productItemProcessListener"  
  11.     class="test.case01.java.batch.listener.step.chunk.item.ProductItemProcessListener" />  
  12.       
  13. <batch:job id="importProducts" restartable="false">  
  14.     <batch:step id="readWriteProducts">  
  15.         <batch:tasklet>  
  16.             <batch:chunk reader="reader" writer="writer" processor="processor"  
  17.                 commit-interval="100" skip-limit="5">  
  18.                 <batch:skippable-exception-classes>  
  19.                     <batch:include  
  20.                         class="org.springframework.batch.item.file.FlatFileParseException" />  
  21.                 </batch:skippable-exception-classes>  
  22.             </batch:chunk>  
  23.             <batch:listeners>  
  24.                 <!-- here configure three kinds listeners for StepExecutionListener ,  ChunkListener , ItemProcessListener  for example.-->  
  25.                 <batch:listener ref="productStepExecutionListener" />  
  26.                 <batch:listener ref="productChunkListener" />  
  27.                 <batch:listener ref="productItemProcessListener" />  
  28.             </batch:listeners>  
  29.         </batch:tasklet>  
  30.     </batch:step>  
  31.     <batch:validator ref="parameterValidator" />  
  32.     <batch:listeners>  
  33.         <batch:listener ref="importProductsJobListener" />  
  34.     </batch:listeners>  
  35. </batch:job>  

 

 三、retry重試和repeat重做的listener

repeat listener擁有的方法名稱:before、after、close(在一個item最后一次repeat重做之后調用,不管repeat成功與否)、onError、open

retry listener擁有的方法名稱:close(在一個item上面最后一次嘗試retry之后調用,不管retry成功與否)、onError、open

接口代碼:

Java代碼   收藏代碼
  1. public interface RepeatListener {  
  2.     void before(RepeatContext context);  
  3.     void after(RepeatContext context, RepeatStatus result);  
  4.     void open(RepeatContext context);  
  5.     void onError(RepeatContext context, Throwable e);  
  6.     void close(RepeatContext context);  
  7. }  
  8.   
  9. public interface RetryListener {  
  10.     <T> void open(RetryContext context, RetryCallback<T> callback);  
  11.     <T> void onError(RetryContext context,  
  12.     RetryCallback<T> callback, Throwable e);  
  13.     <T> void close(RetryContext context,  
  14.     RetryCallback<T> callback, Throwable e);  
  15. }  

 這2個接口,和上面的step listener的配置位置和方式一致,都是tasklet標簽的子標簽位置

 

四、配置繼承關系

spring的XML提供配置的繼承。使用abstract和parent兩個屬性。從一個parent的bean繼承,表示該bean可以利用parent bean中的所有的屬性,並且可以覆蓋這些屬性。

示例:

Xml代碼   收藏代碼
  1. <bean id="parentBean" abstract="true">  
  2.     <property name="propertyOne" value="(...)"/>  
  3. </bean>  
  4.   
  5. <bean id="childBean" parent="parentBean">  
  6.     <property name="propertyOne" value="(...)"/>  
  7.     <property name="propertyTwo" value="(...)"/>  
  8. </bean>  

 這種繼承關系是由spring提供的,在spring batch里面也可以使用。

listeners標簽,提供merge屬性,可以用來合並parent和自身的listener

 示例:

Xml代碼   收藏代碼
  1. <job id="parentJob" abstract="true">  
  2.     <listeners>  
  3.         <listener ref="globalListener"/>  
  4.     <listeners>  
  5. </job>  
  6.   
  7. <job id="importProductsJob" parent="parentJob">  
  8.     (...)  
  9.     <listeners merge="true">  
  10.         <listener ref="specificListener"/>  
  11.     <listeners>  
  12. </job>  

 spring XML的這種繼承關系,使得spring batch的XML配置更簡單。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM