springbatch---->springbatch的使用(三)


  這里我們對上篇博客的例子做一個修改性的測試來學習一下springbatch的一些關於chunk的一些有用的特性。我漸漸能意會到,深刻並不等於接近事實。

 

springbatch的學習

一、chunk的skip-limit屬性的使用

  關於這個屬性的介紹:Maximum number of skips during processing of the step. If processing reaches the skip limit, the next exception thrown on item processing (read, process, or write) causes the step to fail.

我們修改batch.xml里面的關於readWriter里面的設置屬性。如下:

<!-- old -->
<step id="readWriter" next="clean">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="100" processor="processor">
        </chunk>
    </tasklet>
</step>    

<!-- new -->
<step id="readWriter" next="clean">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="100" skip-limit="2" processor="processor">
            <skippable-exception-classes>
                <include class="org.springframework.batch.item.file.FlatFileParseException"/>
            </skippable-exception-classes>
        </chunk>
    </tasklet>
</step>

  FlatFileParseException:Exception thrown when errors are encountered parsing flat files.修改的解壓文件的內容,讓它有一條數據是錯誤的。如下:這個日期肯定是錯誤的,當然這里是為了測試skip-limit屬性才做如此的方法處理。其實面對這樣的數據,可以放在process里面進行過濾處理的。

運行后的結果,數據庫的數據如下:

可以看到上述的那條錯誤數據沒有插入到表中,但是正常的數據已經插入到數據庫中。如果增加解壓文件的錯誤條數。比如3條的時候。控制台會報錯:org.springframework.batch.core.step.skip.SkipLimitExceededException: Skip limit of '2' exceeded。數據庫表的數據也沒有成功的插入。

 

二、chunk的skip-policy屬性的使用

  如果在意異常數量的話,用上述的skip-limit比較方便和簡單。如果不在意異常數量的話,我們可以自己定義忽略的策略,也就是這段要學習的部分。修改batch.xml里面的關於readWriter里面的設置屬性如下:

<step id="readWriter" next="clean">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="100" skip-policy="skipPolicy" processor="processor"/>
    </tasklet>
</step>
<step id="clean">
    <tasklet ref="cleanTasklet"/>
</step>

 在job.xml中聲明定義skipPolicy,內容如下

<bean id="skipPolicy" class="spring.batch.readFile.ExceptionSkipPolicy">
    <constructor-arg value="org.springframework.batch.item.file.FlatFileParseException"/>
</bean>

ExceptionSkipPolicy是我們自定義的異常策略實現類

package spring.batch.readFile;

import org.springframework.batch.core.step.skip.SkipLimitExceededException;
import org.springframework.batch.core.step.skip.SkipPolicy;

/**
 * @Author: huhx
 * @Date: 2017-11-01 下午 4:58
 */
public class ExceptionSkipPolicy implements SkipPolicy {

    private Class<? extends Exception> exceptionClassToSkip;

    public ExceptionSkipPolicy(Class<? extends Exception> exceptionClassToSkip) {
        super();
        this.exceptionClassToSkip = exceptionClassToSkip;
    }

    @Override
    public boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException {
        return exceptionClassToSkip.isAssignableFrom(t.getClass());
    }
}

修改解壓文件的內容,其實就是上述的錯誤3條數據的內容。如下

運行之后的數據庫數據如下:

 

三、SkipListener監聽skip的數據

我們基於上述做的修改,現在的job節點xml配置如下:

<job id="readFlatFileJob">
    <step id="decompress" next="readWriter">
        <tasklet ref="decompressTasklet"/>
    </step>
    <step id="readWriter" next="clean">
        <tasklet>
            <chunk reader="reader" writer="writer" commit-interval="100" skip-policy="skipPolicy" processor="processor"/>
            <listeners>
                <listener ref="skipListener"/>
            </listeners>
        </tasklet>
    </step>
    <step id="clean">
        <tasklet ref="cleanTasklet"/>
    </step>
</job>

job.xml中配置skipListener

<bean id="skipListener" class="spring.batch.readFile.FileSkipListener"/>

FileSkipListener的代碼如下:

package spring.batch.readFile;

import org.apache.commons.io.FileUtils;
import org.springframework.batch.core.annotation.OnSkipInProcess;
import org.springframework.batch.core.annotation.OnSkipInRead;
import org.springframework.batch.core.annotation.OnSkipInWrite;
import org.springframework.batch.item.file.FlatFileParseException;

import java.io.File;
import java.io.IOException;

/**
 * @Author: huhx
 * @Date: 2017-11-01 下午 5:32
 */
public class FileSkipListener {
    private File file = new File("file/log.txt");

    @OnSkipInRead
    public void readLog(Throwable t) throws IOException {
        if (t instanceof FlatFileParseException) {
            FlatFileParseException ffpe = (FlatFileParseException) t;
            String dataLog = "from read " + ffpe.getInput() + ", line number = " + ffpe.getLineNumber() + "\n";
            FileUtils.write(file, dataLog, true);
        }
    }

    @OnSkipInProcess
    public void processLog(People people, Throwable t) throws IOException {
        if (t instanceof FlatFileParseException) {
            FlatFileParseException ffpe = (FlatFileParseException) t;
            String dataLog = "from process " + ffpe.getInput() + ", line number = " + ffpe.getLineNumber() + "\n";
            String peopleInfo = people.getUsername() + ", address " + people.getBirthday() + "\n";
            FileUtils.write(file, dataLog + peopleInfo, true);
        }
    }

    @OnSkipInWrite
    public void writeLog(People people, Throwable t) throws IOException {
        if (t instanceof FlatFileParseException) {
            FlatFileParseException ffpe = (FlatFileParseException) t;
            String dataLog = "from write " + ffpe.getInput() + ", line number = " + ffpe.getLineNumber() + "\n";
            String peopleInfo = people.getUsername() + ", address " + people.getBirthday() + "\n";
            FileUtils.write(file, dataLog + peopleInfo, true);
        }
    }
}

對於上述錯誤的幾條記錄,我們記日志在log.txt里面。現在log.txt的內容如下:

from read 李元芳|32|黃岡|1985-12-99, line number = 2
from read 王昭君|百里|武漢|1995-10-89, line number = 3
from read 狄仁傑|21|天津|1958-12-99, line number = 4

springbatch提供的SkipListener接口去監聽skip的數據項。

public interface SkipListener<T,S> extends StepListener {
    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);
}

 當然比較方便的一種做法,就是springbatch提供的上述的@OnSkipInRead,@OnSkipInProcess and @OnSkipInWrite注解方式。

 

四、類似於上述的skip策略,springbatch還支持retry(重試)的功能

定義重試的方式有二種,和skip的類似。這里我們列舉如下:

  • 默認retry策略的可以定義重試次數的方式:
<tasklet>
    <chunk reader="reader" writer="writer" commit-interval="100" retry-limit="3">
        <retryable-exception-classes>
            <include class="org.springframework.daoOptimisticLockingFailureException" />
        </retryable-exception-classes>
    </chunk>
</tasklet>
  • 自定義重試策略的方式:
<tasklet>
    <chunk reader="reader" writer="writer" commit-interval="100" retry-policy="retryPolicy" />
</tasklet>

retryPolicy的定義如下:

<bean id="retryPolicy" class="org.springframework.retry.policy.ExceptionClassifierRetryPolicy">
    <property name="policyMap">
        <map>
            <entry key="org.springframework.dao.ConcurrencyFailureException">
                <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                    <property name="maxAttempts" value="3"/>
                </bean>
            </entry>
            <entry key="org.springframework.dao.DeadlockLoserDataAccessException">
                <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                    <property name="maxAttempts" value="5"/>
                </bean>
            </entry>
        </map>
    </property>
</bean>

當然,retry也有類似於skip的SkipListener。操作及用法如下

package spring.batch.readFile;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; import org.springframework.retry.listener.RetryListenerSupport; /** * @Author: huhx * @Date: 2017-11-01 下午 7:00 */ public class Slf4jRetryListener extends RetryListenerSupport { private static final Logger LOG = LoggerFactory.getLogger(Slf4jRetryListener.class); @Override public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { LOG.error("retried operation",throwable); } }

 

友情鏈接

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM