Spring Batch是一個基於Spring的企業級批處理框架,它通過配合定時器Quartz來輕易實現大批量的數據讀取或插入,並且全程自動化,無需人員管理。
在使用spring batch之前,得對spring batch的流程有一個基本了解

每個batch它都包含了一個job,而一個job中卻有可能包含多個step,整個batch中干活的是step,batch主要是用來對數據的操作,所以step就有三個操作數據的東西,一個是ItemReader用來讀取數據的,一個是ItemProcessor用來處理數據的,一個是ItemWriter用來寫數據(可以是文件也可以是插入sql語句),JobLauncher用來啟動Job,JobRepository是上述處理提供的一種持久化機制,它為JobLauncher,Job,和Step實例提供CRUD操作。
pom.xml 三個batch的jar包
- <span style="white-space:pre;"> </span><dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-batch-core</artifactId>
- <version>2.1.8.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-batch-infrastructure</artifactId>
- <version>2.1.8.RELEASE</version>
- <span style="white-space:pre;"> </span></dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-batch-test</artifactId>
- <version>2.1.8.RELEASE</version>
- </dependency>
batch.xml
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/batch
- http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
- http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
- ">
- <bean id="jobLauncher"
- class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
- <property name="jobRepository" ref="jobRepository" />
- </bean>
- <bean id="jobRepository"
- class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
- <property name="validateTransactionState" value="false" />
- </bean>
- <span style="white-space:pre;"> </span><!--一個job-->
- <batch:job id="writerteacherInterview">
- <batch:step id="teacherInterview">
- <batch:tasklet>
- <batch:chunk reader="jdbcItemReaderTeacherInterview" writer="teacherInterviewItemWriter"
- processor="teacherInterviewProcessor" commit-interval="10">
- </batch:chunk>
- </batch:tasklet>
- </batch:step>
- </batch:job>
- <!--job的讀取數據操作-->
- <bean id="jdbcItemReaderTeacherInterview"
- class="org.springframework.batch.item.database.JdbcCursorItemReader"
- scope="step">
- <property name="dataSource" ref="dataSource" />
- <property name="sql"
- value="select distinct teacherName ,count(teacherName) as num from examininterviewrecord where pdate >'${detail_startime}' and pdate < '${detail_endtime}' GROUP BY teacherName " />
- <property name="rowMapper" ref="teacherInterviewMapper">
- </property>
- </bean>
- </beans>
讀取數據 teacherInterviewMapper
- package com.yc.batch;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import org.springframework.jdbc.core.RowMapper;
- import org.springframework.stereotype.Component;
- import com.yc.vo.TeacherInterviewdetail;
- import com.yc.vo.TeacherWorkdetail;
- import com.yc.vo.Workdetail;
- @Component("teacherInterviewMapper")
- public class TeacherInterviewMapper implements RowMapper {
- @Override
- public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
- TeacherInterviewdetail TId=new TeacherInterviewdetail();
- TId.setTeacherName(rs.getString("teacherName"));
- TId.setNum(rs.getInt("num"));
- return TId;
- }
- }
處理數據 teacherInterviewProcessor ,這個處理數據方法,一般都是在這里在這里進行一些數據的加工,比如有些數據沒有讀到,你也可以在這個方法和后面那個寫入數據的類里面寫,所以就導致了這個類里面你可以什么都不敢,直接把數據拋到后面去,讓后面的寫數據類來處理;我這里就是處理數據的這個類什么都沒寫,但是最好還是按它的規則來!
- package com.yc.batch;
- import org.hibernate.engine.transaction.jta.platform.internal.SynchronizationRegistryBasedSynchronizationStrategy;
- import org.springframework.batch.item.ItemProcessor;
- import org.springframework.stereotype.Component;
- import org.springframework.stereotype.Service;
- import com.yc.vo.TeacherInterviewdetail;
- import com.yc.vo.TeacherWorkdetail;
- import com.yc.vo.Workdetail;
- //業務層
- @Component("teacherInterviewProcessor")
- public class TeacherInterviewProcessor implements ItemProcessor<TeacherInterviewdetail, TeacherInterviewdetail> {
- @Override
- public TeacherInterviewdetail process(TeacherInterviewdetail teacherInterviewdetail) throws Exception {
- return teacherInterviewdetail;
- }
- }
寫數據 teacherInterviewItemWriter 這個類里面主要是把數據寫進一個文件里,同時我這個類里面還有一些數據處理
- package com.yc.batch;
- import java.io.InputStream;
- import java.text.NumberFormat;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Properties;
- import javax.annotation.Resource;
- import org.springframework.batch.item.ItemWriter;
- import org.springframework.stereotype.Component;
- import org.springframework.stereotype.Service;
- import com.yc.biz.ExamineeClassBiz;
- import com.yc.biz.WorkBiz;
- import com.yc.utils.CsvUtils;
- import com.yc.vo.TeacherInterviewdetail;
- import com.yc.vo.TeacherWorkdetail;
- import com.yc.vo.Workdetail;
- import net.sf.ehcache.util.PropertyUtil;
- //寫
- @Component("teacherInterviewItemWriter")
- public class TeacherInterviewItemWriter implements ItemWriter<TeacherInterviewdetail>{
- @Override
- public void write(List<? extends TeacherInterviewdetail> teacherInterviewdetails) throws Exception {
- Properties props = new Properties();
- InputStream in= PropertyUtil.class.getClassLoader().getResourceAsStream("connectionConfig.properties");
- props.load(in);
- String time=props.getProperty("detail_time");
- CsvUtils cu=new CsvUtils();
- List<Object> works=new ArrayList<Object>();
- for(TeacherInterviewdetail t:teacherInterviewdetails){
- works.add(t);
- }
- String path=this.getClass().getResource("/").getPath();
- path=path.substring(0,path.lastIndexOf("/"));
- path=path.substring(0,path.lastIndexOf("/"));
- path=path.substring(0,path.lastIndexOf("/"));
- path=path.substring(0,path.lastIndexOf("/"));
- cu.writeCsv(path+"/csv/teacherInterview_"+time+".csv",works );
- }
- }
我這里有用到一個吧數據寫進CSV文件的jar包
- <span style="white-space:pre;"> </span><dependency>
- <groupId>net.sourceforge.javacsv</groupId>
- <artifactId>javacsv</artifactId>
- <version>2.0</version>
- </dependency>
CsvUtils幫助類的寫入CSV文件方法
- /**
- * 寫入CSV文件
- * @throws IOException
- */
- public void writeCsv(String path,List<Object> t) throws IOException{
- String csvFilePath = path;
- String filepath=path.substring(0,path.lastIndexOf("/"));
- File f=new File(filepath);
- if(!f.exists()){
- f.mkdirs();
- }
- File file=new File(path);
- if(!file.exists()){
- file.createNewFile();
- }
- CsvWriter wr =new CsvWriter(csvFilePath,',',Charset.forName("GBK"));
- try {
- for(Object obj:t){
- String[] contents=obj.toString().split(",");
- wr.writeRecord(contents);
- }
- wr.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
就這樣一個基本的batch流程就跑起來了,它通過從數據里讀取一些數據,然后經過處理后,被存進服務器下的一個文件里面,之后像這種數據的讀取就不需要去數據庫里面
查詢了,而是可以直接通過讀取CSV文件來處理這個業務。一般使用這個的都會配一個定時器,讓它們每隔一段時間跑一次,從而獲得較新的數據
下面是定時器的配置
定時器的配置非常簡單,我是使用注解方式來配置的
定時器任務類
- package com.yc.task.impl;
- import javax.transaction.Transactional;
- import org.springframework.batch.core.JobParametersInvalidException;
- import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
- import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
- import org.springframework.batch.core.repository.JobRestartException;
- import org.springframework.batch.item.ItemProcessor;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
- import org.springframework.stereotype.Service;
- import com.yc.batch.ClassBatch;
- import com.yc.batch.MessageItemBatch;
- import com.yc.batch.TeacherInterviewBatch;
- import com.yc.batch.TearcherBatch;
- import com.yc.po.Work;
- import com.yc.task.WorkTask;
- import com.yc.vo.Workdetail;
- @Service
- public class WorkTaskImpl implements WorkTask{
- @Autowired
- private TeacherInterviewBatch teacherInterviewBatch;//教師訪談記錄
- public void setTeacherInterviewBatch(TeacherInterviewBatch teacherInterviewBatch) {
- this.teacherInterviewBatch = teacherInterviewBatch;
- }
- @Scheduled(cron= "0 30 22 * * ?") //每天晚上十點30執行一次 這個注解會讓框架會自動把這個方法看成任務啟動方法
- @Override
- public void task() {
- try {
- teacherInterviewBatch.test();//教師訪談
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
定時器所真正要執行的方法
- package com.yc.batch;
- import javax.annotation.Resource;
- import org.apache.commons.jexl2.Main;
- import org.springframework.batch.core.Job;
- import org.springframework.batch.core.JobExecution;
- import org.springframework.batch.core.JobParameters;
- import org.springframework.batch.core.JobParametersBuilder;
- import org.springframework.batch.core.JobParametersInvalidException;
- import org.springframework.batch.core.launch.JobLauncher;
- import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
- import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
- import org.springframework.batch.core.repository.JobRestartException;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- @Component
- public class TeacherInterviewBatch {
- private Job job;
- private JobLauncher launcher;
- @Resource(name="writerteacherInterview")
- public void setJob(Job job) {
- this.job = job;
- }
- @Autowired
- public void setLauncher(JobLauncher launcher) {
- this.launcher = launcher;
- }
- public void test() throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException{
- JobParameters jobParameters =
- new JobParametersBuilder()
- .addLong("time",System.currentTimeMillis()).toJobParameters();
- JobExecution result = launcher.run(job, jobParameters);
- }
- }
就這樣batch就被定時器調度起來了,每天十點准時使用batch來操作數據
轉自:https://blog.csdn.net/pttaoge/article/details/76684656
