spring batch的使用和定時器Quart的使用


 Spring Batch是一個基於Spring的企業級批處理框架,它通過配合定時器Quartz來輕易實現大批量的數據讀取或插入,並且全程自動化,無需人員管理。

在使用spring batch之前,得對spring batch的流程有一個基本了解

每個batch它都包含了一個job,而一個job中卻有可能包含多個step,整個batch中干活的是step,batch主要是用來對數據的操作,所以step就有三個操作數據的東西,一個是ItemReader用來讀取數據的,一個是ItemProcessor用來處理數據的,一個是ItemWriter用來寫數據(可以是文件也可以是插入sql語句),JobLauncher用來啟動Job,JobRepository是上述處理提供的一種持久化機制,它為JobLauncher,Job,和Step實例提供CRUD操作。

 

pom.xml  三個batch的jar包

 

[html]  view plain  copy
  1. <span style="white-space:pre;">     </span><dependency>  
  2.          <groupId>org.springframework</groupId>  
  3.          <artifactId>spring-batch-core</artifactId>  
  4.          <version>2.1.8.RELEASE</version>  
  5.         </dependency>  
  6.           
  7.         <dependency>  
  8.           <groupId>org.springframework</groupId>  
  9.           <artifactId>spring-batch-infrastructure</artifactId>  
  10.           <version>2.1.8.RELEASE</version>  
  11.     <span style="white-space:pre;"</span></dependency>  
  12.           
  13.          <dependency>  
  14.           <groupId>org.springframework</groupId>  
  15.           <artifactId>spring-batch-test</artifactId>  
  16.           <version>2.1.8.RELEASE</version>  
  17.         </dependency>    

 

batch.xml

 

[html]  view plain  copy
  1. <beans xmlns="http://www.springframework.org/schema/beans"  
  2.     xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  3.     xsi:schemaLocation="http://www.springframework.org/schema/batch  
  4.         http://www.springframework.org/schema/batch/spring-batch-2.1.xsd  
  5.         http://www.springframework.org/schema/beans   
  6.         http://www.springframework.org/schema/beans/spring-beans-3.1.xsd  
  7.     ">  
  8.   
  9.   
  10.     <bean id="jobLauncher"  
  11.         class="org.springframework.batch.core.launch.support.SimpleJobLauncher">  
  12.         <property name="jobRepository" ref="jobRepository" />  
  13.     </bean>  
  14.   
  15.     <bean id="jobRepository"  
  16.         class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">  
  17.         <property name="validateTransactionState" value="false" />  
  18.   
  19.     </bean>  
  20. <span style="white-space:pre;">     </span><!--一個job-->  
  21.         <batch:job id="writerteacherInterview">  
  22.                 <batch:step id="teacherInterview">  
  23.                     <batch:tasklet>  
  24.                         <batch:chunk reader="jdbcItemReaderTeacherInterview" writer="teacherInterviewItemWriter"  
  25.                             processor="teacherInterviewProcessor" commit-interval="10">  
  26.                         </batch:chunk>  
  27.                     </batch:tasklet>  
  28.                 </batch:step>  
  29.             </batch:job>  
  30.   
  31.   
  32.   
  33.   
  34.   
  35.     <!--job的讀取數據操作-->  
  36.     <bean id="jdbcItemReaderTeacherInterview"  
  37.         class="org.springframework.batch.item.database.JdbcCursorItemReader"  
  38.         scope="step">  
  39.         <property name="dataSource" ref="dataSource" />  
  40.         <property name="sql"  
  41.             value="select distinct teacherName ,count(teacherName) as num from examininterviewrecord   where pdate >'${detail_startime}' and pdate < '${detail_endtime}'  GROUP BY teacherName " />  
  42.         <property name="rowMapper" ref="teacherInterviewMapper">  
  43.         </property>  
  44.     </bean>  
  45.   
  46.   
  47. </beans>  

 

讀取數據    teacherInterviewMapper

[java]  view plain  copy
  1. package com.yc.batch;  
  2.   
  3. import java.sql.ResultSet;  
  4. import java.sql.SQLException;  
  5. import org.springframework.jdbc.core.RowMapper;  
  6. import org.springframework.stereotype.Component;  
  7. import com.yc.vo.TeacherInterviewdetail;  
  8. import com.yc.vo.TeacherWorkdetail;  
  9. import com.yc.vo.Workdetail;  
  10. @Component("teacherInterviewMapper")    
  11. public class TeacherInterviewMapper implements RowMapper {    
  12.     @Override  
  13.     public Object mapRow(ResultSet rs, int rowNum) throws SQLException {    
  14.           
  15.         TeacherInterviewdetail TId=new TeacherInterviewdetail();  
  16.         TId.setTeacherName(rs.getString("teacherName"));  
  17.          TId.setNum(rs.getInt("num"));  
  18.         return TId;    
  19.     }  
  20. }   

 

 

處理數據  teacherInterviewProcessor ,這個處理數據方法,一般都是在這里在這里進行一些數據的加工,比如有些數據沒有讀到,你也可以在這個方法和后面那個寫入數據的類里面寫,所以就導致了這個類里面你可以什么都不敢,直接把數據拋到后面去,讓后面的寫數據類來處理;我這里就是處理數據的這個類什么都沒寫,但是最好還是按它的規則來!

[java]  view plain  copy
  1. package com.yc.batch;  
  2.   
  3. import org.hibernate.engine.transaction.jta.platform.internal.SynchronizationRegistryBasedSynchronizationStrategy;  
  4. import org.springframework.batch.item.ItemProcessor;  
  5. import org.springframework.stereotype.Component;  
  6. import org.springframework.stereotype.Service;  
  7.   
  8. import com.yc.vo.TeacherInterviewdetail;  
  9. import com.yc.vo.TeacherWorkdetail;  
  10. import com.yc.vo.Workdetail;  
  11.   
  12.   
  13. //業務層  
  14. @Component("teacherInterviewProcessor")  
  15. public class TeacherInterviewProcessor implements ItemProcessor<TeacherInterviewdetail, TeacherInterviewdetail> {  
  16.   
  17.     @Override  
  18.     public TeacherInterviewdetail process(TeacherInterviewdetail teacherInterviewdetail) throws Exception {  
  19.            
  20.         return teacherInterviewdetail;  
  21.     }  
  22. }  

 

 

寫數據 teacherInterviewItemWriter 這個類里面主要是把數據寫進一個文件里,同時我這個類里面還有一些數據處理

 

[java]  view plain  copy
  1. package com.yc.batch;  
  2.   
  3. import java.io.InputStream;  
  4.   
  5. import java.text.NumberFormat;  
  6. import java.util.ArrayList;  
  7. import java.util.List;  
  8. import java.util.Properties;  
  9. import javax.annotation.Resource;  
  10. import org.springframework.batch.item.ItemWriter;  
  11. import org.springframework.stereotype.Component;  
  12. import org.springframework.stereotype.Service;  
  13. import com.yc.biz.ExamineeClassBiz;  
  14. import com.yc.biz.WorkBiz;  
  15. import com.yc.utils.CsvUtils;  
  16. import com.yc.vo.TeacherInterviewdetail;  
  17. import com.yc.vo.TeacherWorkdetail;  
  18. import com.yc.vo.Workdetail;  
  19. import net.sf.ehcache.util.PropertyUtil;  
  20. //寫  
  21. @Component("teacherInterviewItemWriter")  
  22. public class TeacherInterviewItemWriter implements ItemWriter<TeacherInterviewdetail>{  
  23.   
  24.     @Override  
  25.     public void write(List<? extends TeacherInterviewdetail> teacherInterviewdetails) throws Exception {  
  26.         Properties props = new Properties();  
  27.         InputStream in= PropertyUtil.class.getClassLoader().getResourceAsStream("connectionConfig.properties");  
  28.         props.load(in);  
  29.         String time=props.getProperty("detail_time");  
  30.         CsvUtils cu=new CsvUtils();  
  31.          List<Object> works=new ArrayList<Object>();  
  32.          for(TeacherInterviewdetail t:teacherInterviewdetails){  
  33.              works.add(t);  
  34.          }  
  35.            
  36.         String path=this.getClass().getResource("/").getPath();  
  37.         path=path.substring(0,path.lastIndexOf("/"));  
  38.         path=path.substring(0,path.lastIndexOf("/"));  
  39.         path=path.substring(0,path.lastIndexOf("/"));  
  40.         path=path.substring(0,path.lastIndexOf("/"));  
  41.         cu.writeCsv(path+"/csv/teacherInterview_"+time+".csv",works );    
  42.     }   
  43. }  


我這里有用到一個吧數據寫進CSV文件的jar包

 

 

[html]  view plain  copy
  1. <span style="white-space:pre;">         </span><dependency>  
  2.           <groupId>net.sourceforge.javacsv</groupId>  
  3.           <artifactId>javacsv</artifactId>  
  4.           <version>2.0</version>  
  5.         </dependency>  

 

 

CsvUtils幫助類的寫入CSV文件方法

 

[cpp]  view plain  copy
  1. /**  
  2.      * 寫入CSV文件  
  3.      * @throws IOException  
  4.      */    
  5.     public void writeCsv(String path,List<Object> t) throws IOException{    
  6.         String csvFilePath = path;  
  7.         String filepath=path.substring(0,path.lastIndexOf("/"));  
  8.         File f=new File(filepath);  
  9.         if(!f.exists()){  
  10.             f.mkdirs();  
  11.         }  
  12.         File file=new File(path);  
  13.         if(!file.exists()){  
  14.             file.createNewFile();  
  15.         }  
  16.         CsvWriter wr =new CsvWriter(csvFilePath,',',Charset.forName("GBK"));  
  17.         try {   
  18.             for(Object obj:t){  
  19.                 String[] contents=obj.toString().split(",");  
  20.                 wr.writeRecord(contents);    
  21.             }  
  22.             wr.close();    
  23.         } catch (IOException e) {    
  24.             e.printStackTrace();    
  25.         }    
  26.     }    


就這樣一個基本的batch流程就跑起來了,它通過從數據里讀取一些數據,然后經過處理后,被存進服務器下的一個文件里面,之后像這種數據的讀取就不需要去數據庫里面

 

查詢了,而是可以直接通過讀取CSV文件來處理這個業務。一般使用這個的都會配一個定時器,讓它們每隔一段時間跑一次,從而獲得較新的數據

下面是定時器的配置

定時器的配置非常簡單,我是使用注解方式來配置的

定時器任務類

 

[java]  view plain  copy
  1. package com.yc.task.impl;  
  2. import javax.transaction.Transactional;  
  3. import org.springframework.batch.core.JobParametersInvalidException;  
  4. import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;  
  5. import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;  
  6. import org.springframework.batch.core.repository.JobRestartException;  
  7. import org.springframework.batch.item.ItemProcessor;  
  8. import org.springframework.beans.factory.annotation.Autowired;  
  9. import org.springframework.scheduling.annotation.Scheduled;  
  10. import org.springframework.stereotype.Component;  
  11. import org.springframework.stereotype.Service;  
  12.   
  13. import com.yc.batch.ClassBatch;  
  14. import com.yc.batch.MessageItemBatch;  
  15. import com.yc.batch.TeacherInterviewBatch;  
  16. import com.yc.batch.TearcherBatch;  
  17. import com.yc.po.Work;  
  18. import com.yc.task.WorkTask;  
  19. import com.yc.vo.Workdetail;  
  20. @Service  
  21. public class WorkTaskImpl implements WorkTask{  
  22.   
  23.     @Autowired  
  24.     private TeacherInterviewBatch teacherInterviewBatch;//教師訪談記錄  
  25.     public void setTeacherInterviewBatch(TeacherInterviewBatch teacherInterviewBatch) {  
  26.         this.teacherInterviewBatch = teacherInterviewBatch;  
  27.     }  
  28.       
  29.     @Scheduled(cron= "0 30 22 * * ?")   //每天晚上十點30執行一次  這個注解會讓框架會自動把這個方法看成任務啟動方法   
  30.     @Override  
  31.     public void task() {  
  32.         try {  
  33.             teacherInterviewBatch.test();//教師訪談  
  34.         } catch (Exception e) {  
  35.             e.printStackTrace();  
  36.         }  
  37.           
  38.     }  
  39.   
  40. }  

定時器所真正要執行的方法

 

 

[java]  view plain  copy
  1. package com.yc.batch;  
  2.   
  3. import javax.annotation.Resource;  
  4. import org.apache.commons.jexl2.Main;  
  5. import org.springframework.batch.core.Job;  
  6. import org.springframework.batch.core.JobExecution;  
  7. import org.springframework.batch.core.JobParameters;  
  8. import org.springframework.batch.core.JobParametersBuilder;  
  9. import org.springframework.batch.core.JobParametersInvalidException;  
  10. import org.springframework.batch.core.launch.JobLauncher;  
  11. import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;  
  12. import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;  
  13. import org.springframework.batch.core.repository.JobRestartException;  
  14. import org.springframework.beans.factory.annotation.Autowired;  
  15. import org.springframework.stereotype.Component;  
  16.   
  17. @Component  
  18. public class TeacherInterviewBatch {  
  19.   
  20.     private Job job;  
  21.     private JobLauncher launcher;  
  22.   
  23.     @Resource(name="writerteacherInterview")  
  24.     public void setJob(Job job) {  
  25.         this.job = job;  
  26.     }  
  27.   
  28.     @Autowired   
  29.     public void setLauncher(JobLauncher launcher) {  
  30.         this.launcher = launcher;  
  31.     }  
  32.   
  33.   
  34.     public void test() throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException{  
  35.   
  36.         JobParameters jobParameters =  
  37.                 new JobParametersBuilder()  
  38.                 .addLong("time",System.currentTimeMillis()).toJobParameters();  
  39.   
  40.         JobExecution result = launcher.run(job, jobParameters);  
  41.     }  
  42.   
  43. }  


就這樣batch就被定時器調度起來了,每天十點准時使用batch來操作數據

 

 

 

 

 

轉自:https://blog.csdn.net/pttaoge/article/details/76684656


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM