springbatch---->springbatch的使用(二)


  這里我們對springbatch做一個比較深入的學習例子,解壓文件,讀取文件內容過濾寫入到數據庫中。如果你掉進了黑暗里,你能做的,不過是靜心等待,直到你的雙眼適應黑暗。

 

springbatch的使用案例

  首先,我們來列舉一下spring batch里面所涉及到的概念。

1、Job repository
    An infrastructure component that persists job execution metadata
2、Job launcher 
    An infrastructure component that starts job executions
3、Job 
    An application component that represents a batch process
4、Step 
    A phase in a job; a job is a sequence of steps
5、Tasklet 
    A transactional, potentially repeatable process occurring in a step
6、Item 
    A record read from or written to a data source
7、Chunk 
    A list of items of a given size
8、Item reader 
    A component responsible for reading items from a data source
9、Item processor 
    A component responsible for processing (transforming, validating, or filtering) a read item before it’s written
10、Item writer 
    A component responsible for writing a chunk to a data source

   我們的項目是基於上篇博客的,具體的可以參考博客:springbatch---->springbatch的使用(一)。流程:將得到的zip文件解壓成txt文件,根據里面的信息。我們過濾到年齡大於30的數據,並把過濾后的數據按格式存放到數據庫中。以下我們只列出新增或者修改的文件內容。

 

一、關於xml文件的修改,增加一個job.xml文件和修改batch.xml文件

  • batch.xml文件增加內容:
<!-- 讀取文件寫入到數據庫的job -->
<job id="readFlatFileJob">
    <step id="decompress" next="readWriter">
        <tasklet ref="decompressTasklet"/>
    </step>
    <step id="readWriter" next="clean">
        <tasklet>
            <chunk reader="reader" writer="writer" commit-interval="100" processor="processor"/>
        </tasklet>
    </step>
    <step id="clean">
        <tasklet ref="cleanTasklet"/>
    </step>
</job>

  關於commit-interval:Number of items to process before issuing a commit. When the number of items read reaches the commit interval number, the entire corresponding chunk is written out through the item writer and the transaction is committed.

  • 新增的job.xml文件內容如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- 對壓縮文件進行解壓 -->
    <bean id="decompressTasklet" class="spring.batch.readFile.DecompressTasklet">
        <property name="inputResource" value="file:file/file.zip"/>
        <property name="targetDirectory" value="file"/>
        <property name="targetFile" value="file.txt"/>
    </bean>

    <!-- 讀取文本文件 -->
    <bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader">
        <property name="lineMapper" ref="lineMapper"/>
        <property name="linesToSkip" value="1"/> <!-- 跳過第一行,也就是從第二行開始讀 -->
        <property name="resource" value="file:file/file.txt"/>
    </bean>

    <!-- 對讀取的內容做過濾處理 -->
    <bean id="processor" class="spring.batch.readFile.FileProcessor"/>

    <!-- 將讀取的內容寫到數據庫中 -->
    <bean id="writer" class="spring.batch.readFile.FileWriter">
        <constructor-arg ref="dataSource"/>
    </bean>

    <!-- 清除壓縮文件-->
    <bean id="cleanTasklet" class="spring.batch.readFile.FileCleanTasklet">
        <property name="resource" value="file:file/file.zip"/>
    </bean>

    <!-- 對文件里面的數據進行|分割,映射成一個類 -->
    <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
        <property name="lineTokenizer" ref="delimitedLineTokenizer"/>
        <property name="fieldSetMapper">
            <bean class="spring.batch.readFile.ReadFileMapper"/>
        </property>
    </bean>
    <bean id="delimitedLineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
        <constructor-arg name="delimiter" value="|"/>
    </bean>

    <!-- 簡單的數據源配置 -->
    <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://127.0.0.1:3306/csiilearn"/>
        <property name="username" value="root"/>
        <property name="password" value="*****"/>
    </bean>
</beans>

其實關於有格式的文件的讀寫,springbatch為我們提供了FlatFileItemWriter和FlatFileItemReader類。我們可以直接使用,只需要配置它的屬性就行。

 

二、新增加的java類,如下所示

 

  • DecompressTasklet:對zip文件進行解壓
package spring.batch.readFile;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.core.io.Resource;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.util.zip.ZipInputStream;

/**
 * @Author: huhx
 * @Date: 2017-11-01 上午 9:39
 */
public class DecompressTasklet implements Tasklet {
    private Resource inputResource;
    private String targetDirectory;
    private String targetFile;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        System.out.println("file path: " + inputResource.getFile().getAbsolutePath());
        ZipInputStream zis = new ZipInputStream(
                new BufferedInputStream(
                       inputResource.getInputStream()));
        File targetDirectoryAsFile = new File(targetDirectory);
        if (!targetDirectoryAsFile.exists()) {
            FileUtils.forceMkdir(targetDirectoryAsFile);
        }
        File target = new File(targetDirectory, targetFile);
        BufferedOutputStream dest = null;
        while (zis.getNextEntry() != null) {
            if (!target.exists()) {
                target.createNewFile();
            }
            FileOutputStream fos = new FileOutputStream(target);
            dest = new BufferedOutputStream(fos);
            IOUtils.copy(zis, dest);
            dest.flush();
            dest.close();
        }
        zis.close();
        if (!target.exists()) {
            throw new IllegalStateException("Could not decompress anything from the archive!");
        }
        return RepeatStatus.FINISHED;
    }

    public void setInputResource(Resource inputResource) {
        this.inputResource = inputResource;
    }

    public void setTargetDirectory(String targetDirectory) {
        this.targetDirectory = targetDirectory;
    }

    public void setTargetFile(String targetFile) {
        this.targetFile = targetFile;
    }
}
  •  People:數據庫映射的javaBean類
public class People implements Serializable {
    private String username;
    private int age;
    private String address;
    private Date birthday;
    
    ..get...set...
}
  • ReadFileMapper:讀取文件字段的映射
package spring.batch.readFile;

import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

/**
 * @Author: huhx
 * @Date: 2017-11-01 上午 10:11
 */
public class ReadFileMapper implements FieldSetMapper<People> {

    @Override
    public People mapFieldSet(FieldSet fieldSet) throws BindException {
        People people = new People();
        people.setUsername(fieldSet.readString(0));
        people.setAge(fieldSet.readInt(1));
        people.setAddress(fieldSet.readString(2));
        people.setBirthday(fieldSet.readDate(3));
        return people;
    }
}
  • FileProcessor:對讀取的數據做進一步的處理,這里我們是過濾操作
package spring.batch.readFile;

import org.springframework.batch.item.ItemProcessor;

/**
 * @Author: huhx
 * @Date: 2017-11-01 上午 9:43
 */
public class FileProcessor implements ItemProcessor<People, People> {
    @Override
    public People process(People item) throws Exception {
        return needsToBeFiltered(item) ? null : item;
    }

    private boolean needsToBeFiltered(People item) {
        int age = item.getAge();
        return age > 30 ? true : false;
    }
}
  • FileWriter:對最終過濾后的數據寫入數據庫中
package spring.batch.readFile;

import org.springframework.batch.item.ItemWriter;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;
import java.util.List;

/**
 * @Author: huhx
 * @Date: 2017-11-01 上午 9:42
 */
public class FileWriter implements ItemWriter<People> {
    private static final String INSERT_PRODUCT = "INSERT INTO batch_user (user_name, age, address, birthday) VALUES(?, ?, ?, ?)";

    private JdbcTemplate jdbcTemplate;

    public FileWriter(DataSource dataSource) {
        this.jdbcTemplate = new JdbcTemplate(dataSource);
    }

    @Override
    public void write(List<? extends People> items) throws Exception {
        for (People people : items) {
            jdbcTemplate.update(INSERT_PRODUCT, people.getUsername(), people.getAge(), people.getAddress(), people.getBirthday());
        }
    }
}
  • FileCleanTasklet:做流程最后的清理處理,刪除zip文件
package spring.batch.readFile;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.core.io.Resource;

import java.io.File;

/**
 * @Author: huhx
 * @Date: 2017-11-01 上午 9:44
 */
public class FileCleanTasklet implements Tasklet {
    private Resource resource;

    public void setResource(Resource resource) {
        this.resource = resource;
    }

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        File file = resource.getFile();
        file.deleteOnExit();
        return RepeatStatus.FINISHED;
    }
}

 

三、修改JobLaunch.java文件里面的內容

Job job = (Job) context.getBean("readFlatFileJob");
  • zip解壓后的文件內容:
姓名|年齡|地址|生日
李元芳|32|黃岡|1985-12-15
王昭君|23|武漢|1995-10-15
狄仁傑|21|天津|1958-12-12
孫悟空|34|黃岡|1985-12-25
牛魔王|26|武漢|1999-09-12
孫尚香|27|天津|1969-12-12
  • 運行后的數據庫batch_user表的數據:

 

友情鏈接

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM