其中涉及到了Spring Batch的幾個主要組成部分,JobRepository、JobLauncher、ItemReader、ItemProcessor、ItemWriter、Step、Job等。
JobRepository:存儲任務執行的狀態信息,有內存模式和數據庫模式;
JobLauncher:用於執行Job,並返回JobInstance;
ItemReader:讀操作抽象接口;
ItemProcessor:處理邏輯抽象接口;
ItemWriter:寫操作抽象接口;
Step:組成一個Job的各個步驟;
Job:可被多次執行的任務,每次執行返回一個JobInstance。
其中 JobRepository、JobLauncher無需配置(第二個例子會簡化該配置),Spring Boot 的自配置已經實現,當然也可以自定義。
FlatFileItemReader 和 FlatFileItemWriter 就是框架實現好的文件讀和寫操作,分別采用了兩種創建方式:構造器和建造器,Spring官方推薦使用后者。文件與對象的映射則是通過LineMapper,實現與 Spring JDBC 的 RowMapper 極其相似,完成配置關系后,ItemReader會讀取(文件/數據庫/消息隊列)並填充對象給ItemProcessor使用,ItemProcessor通過處理返回的對象則會丟給ItemWriter寫入(文件/數據庫/消息隊列)。
JobFlow - 一個job 可以有多個step ,每個step 有自己的邏輯,Flow可以對多個step進行排序,判斷。
實例:基於銀行信用卡每月自動生成賬單,自動扣費
git地址傳送門:lsr-batch-processing模塊
建表SQL在resource下
pom(基於springboot 2.1.0)
<dependencies> <!--######################### 定義 spring batch 版本 #########################--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <!--######################### spring boot web 依賴 #########################--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--######################### 定義 mysql 版本 #########################--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!--######################### 定義 jpa 版本 #########################--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <!--######################### 定義 log4j 版本 #########################--> <!-- 支持log4j2的模塊,注意把spring-boot-starter和spring-boot-starter-web包中的logging去掉 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <!--######################### 定義 test 版本 #########################--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies>
實體准備:
UserInfo(用戶信息)
package cn.lsr.entity; import javax.persistence.*; import java.io.Serializable; /** * @Description: 用戶信息 * @Package: lsr-microservice * @author: Hacker_lsr@126.com **/ @Entity @Table(name = "user_info") public class UserInfo implements Serializable { @Id @GeneratedValue @Column(name = "id") private Integer id ; @Column(name = "userId") private Integer userId; @Column(name = "name") private String name ; @Column(name = "age") private Integer age; @Column(name = "description") private String description; //get set }
UserAccount(賬戶類)
package cn.lsr.entity; import javax.persistence.*; import java.math.BigDecimal; import java.util.Date; /** * @Description: 賬戶 * @Package: lsr-microservice * @author: Hacker_lsr@126.com **/ @Entity @Table(name = "user_account") public class UserAccount { @Id @GeneratedValue @Column(name = "id") private Integer id; @Column(name = "username") private String username; @Column(name = "accountBalance") private BigDecimal accountBalance; @Column(name = "accountStatus") private Boolean accountStatus; @Column(name = "createTime") private Date createTime; //get set }
MonthBill(月賬單)
package cn.lsr.entity; import javax.persistence.*; import java.math.BigDecimal; import java.util.Date; /** * @Description: 月賬單 * @Package: lsr-microservice * @author: Hacker_lsr@126.com **/ @Entity @Table(name = "month_bill") public class MonthBill { @Id @GeneratedValue @Column(name = "id") private Integer id; /** * 用戶ID */ @Column(name = "userId") private Integer userId; /** * 總費用 */ @Column(name = "totalFee") private BigDecimal totalFee; /** * 是否已還款 */ @Column(name = "isPaid") private Boolean isPaid; /** * 是否通知 */ @Column(name = "isNotice") private Boolean isNotice; /** * 賬單生成時間 */ @Column(name = "createTime") private Date createTime; // get set }
ConsumeRecord(消費記錄)
package cn.lsr.entity; import javax.persistence.*; import java.math.BigDecimal; /** * @Description: 消費記錄 * @Package: lsr-microservice * @author: Hacker_lsr@126.com **/ @Entity @Table(name = "consume_record") public class ConsumeRecord { @Id @GeneratedValue @Column(name = "id") private Integer id; /** * 用戶Id */ @Column(name = "userId") private Integer userId; /** * 花費金額 */ @Column(name = "consumption") private BigDecimal consumption; /** * 是否生成賬單 */ @Column(name = "isGenerateBill") private Boolean isGenerateBill; // get set }
DAO層
package cn.lsr.repository; import cn.lsr.entity.UserInfo; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; /** * @Description: 用戶信息 * @Package: lsr-microservice * @author: Hacker_lsr@126.com **/ @Repository public interface UserInfoRepository extends JpaRepository<UserInfo,Integer> { } ######################################### package cn.lsr.repository; import cn.lsr.entity.UserAccount; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; /** * @Description: 賬戶 * @Package: lsr-microservice * @author: Hacker_lsr@126.com **/ @Repository public interface UserAccountRepository extends JpaRepository<UserAccount,Integer> { } ######################################### package cn.lsr.repository; import cn.lsr.entity.MonthBill; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; import org.springframework.stereotype.Repository; import java.util.Date; import java.util.List; /** * @Description: 月賬單 * @Package: lsr-microservice * @author: Hacker_lsr@126.com **/ @Repository public interface MonthBillRepository extends JpaRepository<MonthBill,Integer> { @Query("select m from MonthBill m where m.isNotice = false and m.isPaid = false and m.createTime between ?1 and ?2") List<MonthBill> seleMothBillNoPlayAll(Date start, Date end); } ######################################### package cn.lsr.repository; import cn.lsr.entity.ConsumeRecord; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; /** * @Description: 消費記錄 * @Package: lsr-microservice * @author: Hacker_lsr@126.com **/ @Repository public interface ConsumeRecordRepository extends JpaRepository<ConsumeRecord,Integer> { }
新建FlowBatchConfig(batch業務類)
package cn.lsr.flow; import cn.lsr.entity.ConsumeRecord; import cn.lsr.entity.MonthBill; import cn.lsr.entity.UserAccount; import cn.lsr.excepiton.MoneyException; import cn.lsr.repository.ConsumeRecordRepository; import cn.lsr.repository.MonthBillRepository; import cn.lsr.repository.UserAccountRepository; import cn.lsr.util.DateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.*; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.job.flow.FlowExecutionStatus; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.database.JpaItemWriter; import org.springframework.batch.item.database.JpaPagingItemReader; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.persistence.EntityManagerFactory; import java.math.BigDecimal; import java.util.Date; import java.util.List; import java.util.Optional; /** * @Description: 信用卡賬單批處理 * @Package: lsr-microservice * @author: Hacker_lsr@126.com **/ @Configuration public class FlowBatchConfig { private static final Logger log = LoggerFactory.getLogger(FlowBatchConfig.class); private EntityManagerFactory entityManagerFactory; private StepBuilderFactory stepBuilderFactory; private JobBuilderFactory jobBuilderFactory; public FlowBatchConfig(EntityManagerFactory entityManagerFactory,StepBuilderFactory stepBuilderFactory,JobBuilderFactory jobBuilderFactory){ this.entityManagerFactory=entityManagerFactory; this.stepBuilderFactory=stepBuilderFactory; this.jobBuilderFactory=jobBuilderFactory; } /** * 生成信用卡賬單 * @return */ @Bean public Step generateVisaBillStep(ConsumeRecordRepository consumeRecordRepository){ return stepBuilderFactory.get("generateBillStep") .<ConsumeRecord, MonthBill>chunk(10) .reader(new JpaPagingItemReader<ConsumeRecord>(){{ setQueryString("from ConsumeRecord"); setEntityManagerFactory(entityManagerFactory); }}) .processor((ItemProcessor<ConsumeRecord,MonthBill>) data->{ if (data.getGenerateBill()){ // 已生成的不會生成月賬單 return null; }else { MonthBill monthBill = new MonthBill(); //組裝賬單 monthBill.setUserId(data.getUserId()); monthBill.setPaid(false); monthBill.setNotice(false); //計算利息 monthBill.setTotalFee(data.getConsumption().multiply(BigDecimal.valueOf(1.5d))); monthBill.setCreateTime(new Date()); //是否生成賬單 data.setGenerateBill(true); consumeRecordRepository.save(data); return monthBill; } }) .writer(new JpaItemWriter<MonthBill>(){{ setEntityManagerFactory(entityManagerFactory); }}) .build(); } /** * 自動扣費的 * @param monthBillRepository 月賬單 * @param userAccountRepository 賬戶余額 * @return */ @Bean public Step autoDeductionStep(MonthBillRepository monthBillRepository,UserAccountRepository userAccountRepository){ return stepBuilderFactory.get("autoDeductionStep") .<MonthBill, UserAccount>chunk(10) .reader(new JpaPagingItemReader<MonthBill>(){{ setQueryString("from MonthBill"); setEntityManagerFactory(entityManagerFactory); }}) .processor((ItemProcessor<MonthBill,UserAccount>) data->{ if (data.getPaid()||data.getNotice()){ // 如果通知||已還款 return null; } // 根據賬單信息查找賬戶信息 Optional<UserAccount> optionalUserAccount = userAccountRepository.findById(data.getUserId()); if (optionalUserAccount.isPresent()){ UserAccount userAccount = optionalUserAccount.get(); //賬戶狀態檢查 if(userAccount.getAccountStatus()==true){ //余額 if (userAccount.getAccountBalance().compareTo(data.getTotalFee()) > -1){ userAccount.setAccountBalance(userAccount.getAccountBalance().subtract(data.getTotalFee())); //已還款 data.setPaid(true); //已通知 data.setNotice(true); }else{ // 余額不足 throw new MoneyException(); } }else{ //狀態異常 //設置通知 data.setNotice(true); System.out.println(String.format("Message sent to UserID %s ——> your water bill this month is %s¥",data.getUserId(),data.getTotalFee())); } monthBillRepository.save(data); return userAccount; }else { //賬戶不存在 log.error(String.format("用戶ID %s,的用戶不存在",data.getUserId())); return null; } }) .writer(new JpaItemWriter<UserAccount>(){{ setEntityManagerFactory(entityManagerFactory); }}) .build(); } /** * 余額不足,扣款失敗通知 * @return */ @Bean public Step visaPaymentNoticeStep(MonthBillRepository monthBillRepository){ return stepBuilderFactory.get("visaPaymentNoticeStep") .tasklet((s,c)->{ List<MonthBill> monthBills = monthBillRepository.seleMothBillNoPlayAll(DateUtils.getBeginDayOfMonth(), DateUtils.getEndDayOfMonth()); monthBills.forEach(mo->{ System.out.println(String.format("Message sent to UserID %s ——> your water bill this month is ¥%s,please pay for it", mo.getUserId(), mo.getTotalFee())); }); return RepeatStatus.FINISHED; }) .build(); } public static void main(String[] args) { System.out.println(DateUtils.getBeginDayOfMonth()+"@"+DateUtils.getEndDayOfMonth()); } /** * 流程開始 * @param generateVisaBillStep 生成月賬單 * @param autoDeductionStep 自動扣費 * @param visaPaymentNoticeStep 賬戶余額不足 * @return */ @Bean public Job flowJob(Step generateVisaBillStep,Step autoDeductionStep,Step visaPaymentNoticeStep){ return jobBuilderFactory.get("flowJob") .listener(new JobExecutionListener() { private long time; @Override public void beforeJob(JobExecution jobExecution) { time = System.currentTimeMillis(); } @Override public void afterJob(JobExecution jobExecution) { System.out.println(String.format("任務耗時:%sms", System.currentTimeMillis() - time)); } }) .flow(generateVisaBillStep) .next(autoDeductionStep) .next((jobExecution,stepExecution)->{ if (stepExecution.getExitStatus().equals(ExitStatus.COMPLETED)&&stepExecution.getCommitCount()>0){ return new FlowExecutionStatus("NOTICE USER"); }else { return new FlowExecutionStatus(stepExecution.getStatus().toString()); } }) .on("COMPLETED").end() .on("NOTICE USER").to(visaPaymentNoticeStep) .end() .build(); } }
Service層
FlowJobService
package cn.lsr.serivce; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.util.Date; /** * @Description: 流程job * @Package: lsr-microservice * @author: Hacker_lsr@126.com **/ @Service public class FlowJobService { private JobLauncher jobLauncher; private Job flowJob; @Autowired public FlowJobService(JobLauncher jobLauncher,Job flowJob){ this.jobLauncher=jobLauncher; this.flowJob=flowJob; } @Scheduled(fixedRate = 24 * 60 * 60 * 1000) public void run() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException { JobParameters jobParameters = new JobParametersBuilder().addDate("time", new Date()).toJobParameters(); jobLauncher.run(flowJob, jobParameters); } }
Controller層
package cn.lsr.controller; import cn.lsr.serivce.FlowJobService; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description: 流程控制器 * @Package: lsr-microservice * @author: Hacker_lsr@126.com **/ @RestController public class FlowJobController { @Autowired private FlowJobService flowJobService; @GetMapping("/run") public void run() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException { flowJobService.run(); } }
啟動類注意注解使用 @EnableBatchProcessing @EnableScheduling
package cn; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; /** * @Description: Batch啟動類 * @Package: lsr-microservice * @author: Hacker_lsr@126.com **/ @EnableBatchProcessing @EnableScheduling @SpringBootApplication public class BatchServiceApplication { public static void main(String[] args) { SpringApplication.run(BatchServiceApplication.class, args); Logger logger = LoggerFactory.getLogger(BatchServiceApplication.class); logger.info("********************************"); logger.info("**** 啟動 batch-service 成功 ****"); logger.info("********************************"); } }
啟動自動運行,基於Service層的 @Scheduled(fixedRate = 24 * 60 * 60 * 1000) / 也可以手動訪問xxx:端口號/run
附加:
exception類
package cn.lsr.excepiton; /** * @Description: 資金異常 * @Package: lsr-microservice * @author: Hacker_lsr@126.com **/ public class MoneyException extends Exception{ public MoneyException(){} public MoneyException(String message) { super(message); } public MoneyException(String message, Throwable cause) { super(message, cause); } public MoneyException(Throwable cause) { super(cause); } }
Util工具類
package cn.lsr.util; import java.sql.Timestamp; import java.util.Calendar; import java.util.Date; import java.util.GregorianCalendar; /** * @Description: 時間工具類 * @Package: lsr-microservice * @author: Hacker_lsr@126.com **/ public class DateUtils { /** * 獲取本月的開始時間 */ public static Date getBeginDayOfMonth() { Calendar calendar = Calendar.getInstance(); calendar.set(getNowYear(), getNowMonth() - 1, 1); return getDayStartTime(calendar.getTime()); } /** * 獲取本月的結束時間 */ public static Date getEndDayOfMonth() { Calendar calendar = Calendar.getInstance(); calendar.set(getNowYear(), getNowMonth() - 1, 1); int day = calendar.getActualMaximum(5); calendar.set(getNowYear(), getNowMonth() - 1, day); return getDayEndTime(calendar.getTime()); } /** * 獲取某個日期的開始時間 */ private static Timestamp getDayStartTime(Date d) { Calendar calendar = Calendar.getInstance(); if (null != d) calendar.setTime(d); calendar.set(calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH), calendar.get(Calendar.DAY_OF_MONTH), 0, 0, 0); calendar.set(Calendar.MILLISECOND, 0); return new Timestamp(calendar.getTimeInMillis()); } /** * 獲取某個日期的結束時間 */ private static Timestamp getDayEndTime(Date d) { Calendar calendar = Calendar.getInstance(); if (null != d) calendar.setTime(d); calendar.set(calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH), calendar.get(Calendar.DAY_OF_MONTH), 23, 59, 59); calendar.set(Calendar.MILLISECOND, 999); return new Timestamp(calendar.getTimeInMillis()); } /** * 獲取今年是哪一年 */ private static Integer getNowYear() { Date date = new Date(); GregorianCalendar gc = (GregorianCalendar) Calendar.getInstance(); gc.setTime(date); return Integer.valueOf(gc.get(1)); } /** * 獲取本月是哪一月 */ private static int getNowMonth() { Date date = new Date(); GregorianCalendar gc = (GregorianCalendar) Calendar.getInstance(); gc.setTime(date); return gc.get(2) + 1; } }
application.properties
server.port=8010 spring.batch.job.enabled=false spring.datasource.driver-class-name = com.mysql.jdbc.Driver spring.datasource.url = jdbc:mysql://192.168.0.104:3306/springbatch spring.datasource.username = root spring.datasource.password = lishirui # JPA #表示在控制台輸出hibernate讀寫數據庫時候的SQL。 spring.jpa.show-sql = true spring.jpa.hibernate.ddl-auto = update spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5InnoDBDialect spring.jpa.properties.hibernate.format_sql = true #字段映射 _ 問題 spring.jpa.hibernate.naming.physical-strategy = org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl # 項目啟動時創建數據表(用於記錄批處理執行狀態)的 SQL 腳本,該腳本由Spring Batch提供 spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql # 項目啟動時執行建表 SQL spring.batch.initialize-schema=always # 默認情況下,項目啟動時就會自動執行配置好的批處理操作。這里將其設為不自動執行,后面我們通過手動觸發執行批處理 #當遇到同樣名字的時候,是否允許覆蓋注冊 spring.main.allow-bean-definition-overriding: true
。