springbatch實現一個完整的flow


其中涉及到了Spring Batch的幾個主要組成部分,JobRepository、JobLauncher、ItemReader、ItemProcessor、ItemWriter、Step、Job等。

JobRepository:存儲任務執行的狀態信息,有內存模式和數據庫模式;
JobLauncher:用於執行Job,並返回JobInstance;
ItemReader:讀操作抽象接口;
ItemProcessor:處理邏輯抽象接口;
ItemWriter:寫操作抽象接口;
Step:組成一個Job的各個步驟;
Job:可被多次執行的任務,每次執行返回一個JobInstance。
其中 JobRepository、JobLauncher無需配置(第二個例子會簡化該配置),Spring Boot 的自配置已經實現,當然也可以自定義。
FlatFileItemReader 和 FlatFileItemWriter 就是框架實現好的文件讀和寫操作,分別采用了兩種創建方式:構造器和建造器,Spring官方推薦使用后者。文件與對象的映射則是通過LineMapper,實現與 Spring JDBC 的 RowMapper 極其相似,完成配置關系后,ItemReader會讀取(文件/數據庫/消息隊列)並填充對象給ItemProcessor使用,ItemProcessor通過處理返回的對象則會丟給ItemWriter寫入(文件/數據庫/消息隊列)。

JobFlow - 一個job 可以有多個step ,每個step 有自己的邏輯,Flow可以對多個step進行排序,判斷。

 

實例:基於銀行信用卡每月自動生成賬單,自動扣費

git地址傳送門lsr-batch-processing模塊

建表SQL在resource下

pom(基於springboot 2.1.0)

<dependencies>
        <!--######################### 定義 spring batch 版本 #########################-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <!--######################### spring boot web 依賴 #########################-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--######################### 定義 mysql 版本 #########################-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!--######################### 定義 jpa 版本 #########################-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <!--######################### 定義 log4j 版本 #########################-->
        <!-- 支持log4j2的模塊,注意把spring-boot-starter和spring-boot-starter-web包中的logging去掉 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
        <!--######################### 定義 test 版本 #########################-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

實體准備:

UserInfo(用戶信息)

package cn.lsr.entity;

import javax.persistence.*;
import java.io.Serializable;

/**
 * @Description: 用戶信息
 * @Package: lsr-microservice
 * @author: Hacker_lsr@126.com
 **/
@Entity
@Table(name = "user_info")
public class UserInfo implements Serializable {
    @Id
    @GeneratedValue
    @Column(name = "id")
    private Integer id ;
    @Column(name = "userId")
    private Integer userId;
    @Column(name = "name")
    private String name ;
    @Column(name = "age")
    private Integer age;
    @Column(name = "description")
    private String description;

    //get set
}

UserAccount(賬戶類)

package cn.lsr.entity;

import javax.persistence.*;
import java.math.BigDecimal;
import java.util.Date;

/**
 * @Description: 賬戶
 * @Package: lsr-microservice
 * @author: Hacker_lsr@126.com
 **/
@Entity
@Table(name = "user_account")
public class UserAccount {
    @Id
    @GeneratedValue
    @Column(name = "id")
    private Integer id;
    @Column(name = "username")
    private String username;
    @Column(name = "accountBalance")
    private BigDecimal accountBalance;
    @Column(name = "accountStatus")
    private Boolean accountStatus;
    @Column(name = "createTime")
    private Date createTime;

    //get set
}

MonthBill(月賬單)

package cn.lsr.entity;

import javax.persistence.*;
import java.math.BigDecimal;
import java.util.Date;

/**
 * @Description: 月賬單
 * @Package: lsr-microservice
 * @author: Hacker_lsr@126.com
 **/
@Entity
@Table(name = "month_bill")
public class MonthBill {
    @Id
    @GeneratedValue
    @Column(name = "id")
    private Integer id;
    /**
     * 用戶ID
     */
    @Column(name = "userId")
    private Integer userId;
    /**
     * 總費用
     */
    @Column(name = "totalFee")
    private BigDecimal totalFee;

    /**
     * 是否已還款
     */
    @Column(name = "isPaid")
    private Boolean isPaid;

    /**
     * 是否通知
     */
    @Column(name = "isNotice")
    private Boolean isNotice;

    /**
     * 賬單生成時間
     */
    @Column(name = "createTime")
    private Date createTime;

    // get set
}

ConsumeRecord(消費記錄)

package cn.lsr.entity;


import javax.persistence.*;
import java.math.BigDecimal;

/**
 * @Description: 消費記錄
 * @Package: lsr-microservice
 * @author: Hacker_lsr@126.com
 **/
@Entity
@Table(name = "consume_record")
public class ConsumeRecord {
    @Id
    @GeneratedValue
    @Column(name = "id")
    private Integer id;
    /**
     * 用戶Id
     */
    @Column(name = "userId")
    private Integer userId;
    /**
     * 花費金額
     */
    @Column(name = "consumption")
    private BigDecimal consumption;
    /**
     * 是否生成賬單
     */
    @Column(name = "isGenerateBill")
    private Boolean isGenerateBill;

    // get set 
}

DAO層

package cn.lsr.repository;

import cn.lsr.entity.UserInfo;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

/**
 * @Description: 用戶信息
 * @Package: lsr-microservice
 * @author: Hacker_lsr@126.com
 **/
@Repository
public interface UserInfoRepository extends JpaRepository<UserInfo,Integer> {
}

#########################################

package cn.lsr.repository;

import cn.lsr.entity.UserAccount;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

/**
 * @Description: 賬戶
 * @Package: lsr-microservice
 * @author: Hacker_lsr@126.com
 **/
@Repository
public interface UserAccountRepository extends JpaRepository<UserAccount,Integer> {
}

#########################################

package cn.lsr.repository;

import cn.lsr.entity.MonthBill;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;

import java.util.Date;
import java.util.List;

/**
 * @Description: 月賬單
 * @Package: lsr-microservice
 * @author: Hacker_lsr@126.com
 **/
@Repository
public interface MonthBillRepository extends JpaRepository<MonthBill,Integer> {
    @Query("select m from MonthBill m where m.isNotice = false and m.isPaid = false and m.createTime between ?1 and ?2")
    List<MonthBill> seleMothBillNoPlayAll(Date start, Date end);
}

#########################################

package cn.lsr.repository;

import cn.lsr.entity.ConsumeRecord;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

/**
 * @Description: 消費記錄
 * @Package: lsr-microservice
 * @author: Hacker_lsr@126.com
 **/
@Repository
public interface ConsumeRecordRepository extends JpaRepository<ConsumeRecord,Integer> {
}

新建FlowBatchConfig(batch業務類)

package cn.lsr.flow;

import cn.lsr.entity.ConsumeRecord;
import cn.lsr.entity.MonthBill;
import cn.lsr.entity.UserAccount;
import cn.lsr.excepiton.MoneyException;
import cn.lsr.repository.ConsumeRecordRepository;
import cn.lsr.repository.MonthBillRepository;
import cn.lsr.repository.UserAccountRepository;
import cn.lsr.util.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.persistence.EntityManagerFactory;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
import java.util.Optional;

/**
 * @Description: 信用卡賬單批處理
 * @Package: lsr-microservice
 * @author: Hacker_lsr@126.com
 **/
@Configuration
public class FlowBatchConfig {
    private static final Logger log = LoggerFactory.getLogger(FlowBatchConfig.class);

    private EntityManagerFactory entityManagerFactory;

    private StepBuilderFactory stepBuilderFactory;

    private JobBuilderFactory jobBuilderFactory;

    public FlowBatchConfig(EntityManagerFactory entityManagerFactory,StepBuilderFactory stepBuilderFactory,JobBuilderFactory jobBuilderFactory){
        this.entityManagerFactory=entityManagerFactory;
        this.stepBuilderFactory=stepBuilderFactory;
        this.jobBuilderFactory=jobBuilderFactory;
    }

    /**
     * 生成信用卡賬單
     * @return
     */
    @Bean
    public Step generateVisaBillStep(ConsumeRecordRepository consumeRecordRepository){
        return  stepBuilderFactory.get("generateBillStep")
                .<ConsumeRecord, MonthBill>chunk(10)
                .reader(new JpaPagingItemReader<ConsumeRecord>(){{
                    setQueryString("from ConsumeRecord");
                    setEntityManagerFactory(entityManagerFactory);
                }})
                .processor((ItemProcessor<ConsumeRecord,MonthBill>) data->{
                    if (data.getGenerateBill()){
                        // 已生成的不會生成月賬單
                        return null;
                    }else {
                        MonthBill monthBill = new MonthBill();
                        //組裝賬單
                        monthBill.setUserId(data.getUserId());
                        monthBill.setPaid(false);
                        monthBill.setNotice(false);
                        //計算利息
                        monthBill.setTotalFee(data.getConsumption().multiply(BigDecimal.valueOf(1.5d)));
                        monthBill.setCreateTime(new Date());
                        //是否生成賬單
                        data.setGenerateBill(true);
                        consumeRecordRepository.save(data);
                        return monthBill;
                    }
                })
                .writer(new JpaItemWriter<MonthBill>(){{
                    setEntityManagerFactory(entityManagerFactory);
                }})
                .build();
    }

    /**
     * 自動扣費的
     * @param monthBillRepository 月賬單
     * @param userAccountRepository 賬戶余額
     * @return
     */
    @Bean
    public Step autoDeductionStep(MonthBillRepository monthBillRepository,UserAccountRepository userAccountRepository){
        return stepBuilderFactory.get("autoDeductionStep")
                .<MonthBill, UserAccount>chunk(10)
                .reader(new JpaPagingItemReader<MonthBill>(){{
                    setQueryString("from MonthBill");
                    setEntityManagerFactory(entityManagerFactory);
                }})
                .processor((ItemProcessor<MonthBill,UserAccount>) data->{
                    if (data.getPaid()||data.getNotice()){
                        // 如果通知||已還款
                        return null;
                    }
                    // 根據賬單信息查找賬戶信息
                    Optional<UserAccount> optionalUserAccount = userAccountRepository.findById(data.getUserId());
                    if (optionalUserAccount.isPresent()){
                        UserAccount userAccount = optionalUserAccount.get();
                        //賬戶狀態檢查
                        if(userAccount.getAccountStatus()==true){
                            //余額
                            if (userAccount.getAccountBalance().compareTo(data.getTotalFee()) > -1){
                                userAccount.setAccountBalance(userAccount.getAccountBalance().subtract(data.getTotalFee()));
                                //已還款
                                data.setPaid(true);
                                //已通知
                                data.setNotice(true);
                            }else{
                                // 余額不足
                                throw new MoneyException();
                            }
                        }else{
                            //狀態異常
                            //設置通知
                            data.setNotice(true);
                            System.out.println(String.format("Message sent to UserID %s ——> your water bill this month is %s¥",data.getUserId(),data.getTotalFee()));

                        }
                        monthBillRepository.save(data);
                        return userAccount;
                    }else {
                        //賬戶不存在
                        log.error(String.format("用戶ID %s,的用戶不存在",data.getUserId()));
                        return null;
                    }
                })
                .writer(new JpaItemWriter<UserAccount>(){{
                    setEntityManagerFactory(entityManagerFactory);
                }})
                .build();
    }
    /**
     * 余額不足,扣款失敗通知
     * @return
     */
    @Bean
    public Step visaPaymentNoticeStep(MonthBillRepository monthBillRepository){
        return stepBuilderFactory.get("visaPaymentNoticeStep")
                .tasklet((s,c)->{
                    List<MonthBill> monthBills = monthBillRepository.seleMothBillNoPlayAll(DateUtils.getBeginDayOfMonth(), DateUtils.getEndDayOfMonth());
                    monthBills.forEach(mo->{
                        System.out.println(String.format("Message sent to UserID %s ——> your water bill this month is ¥%s,please pay for it",
                                mo.getUserId(), mo.getTotalFee()));
                    });
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    public static void main(String[] args) {
        System.out.println(DateUtils.getBeginDayOfMonth()+"@"+DateUtils.getEndDayOfMonth());
    }
    /**
     * 流程開始
     * @param generateVisaBillStep 生成月賬單
     * @param autoDeductionStep 自動扣費
     * @param visaPaymentNoticeStep 賬戶余額不足
     * @return
     */
    @Bean
    public Job flowJob(Step generateVisaBillStep,Step autoDeductionStep,Step visaPaymentNoticeStep){
        return jobBuilderFactory.get("flowJob")
                .listener(new JobExecutionListener() {
                    private long time;
                    @Override
                    public void beforeJob(JobExecution jobExecution) {
                        time = System.currentTimeMillis();
                    }

                    @Override
                    public void afterJob(JobExecution jobExecution) {
                        System.out.println(String.format("任務耗時:%sms", System.currentTimeMillis() - time));
                    }
                })
                .flow(generateVisaBillStep)
                .next(autoDeductionStep)
                .next((jobExecution,stepExecution)->{
                    if (stepExecution.getExitStatus().equals(ExitStatus.COMPLETED)&&stepExecution.getCommitCount()>0){
                        return new FlowExecutionStatus("NOTICE USER");
                    }else {
                        return new FlowExecutionStatus(stepExecution.getStatus().toString());
                    }
                })
                .on("COMPLETED").end()
                .on("NOTICE USER").to(visaPaymentNoticeStep)
                .end()
                .build();
    }

}

Service層

FlowJobService

package cn.lsr.serivce;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.Date;

/**
 * @Description: 流程job
 * @Package: lsr-microservice
 * @author: Hacker_lsr@126.com
 **/
@Service
public class FlowJobService {
    private JobLauncher jobLauncher;
    private Job flowJob;
    @Autowired
    public FlowJobService(JobLauncher jobLauncher,Job flowJob){
        this.jobLauncher=jobLauncher;
        this.flowJob=flowJob;
    }
    @Scheduled(fixedRate = 24 * 60 * 60 * 1000)
    public void run() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        JobParameters jobParameters = new JobParametersBuilder().addDate("time", new Date()).toJobParameters();
        jobLauncher.run(flowJob, jobParameters);
    }
}

Controller層

package cn.lsr.controller;

import cn.lsr.serivce.FlowJobService;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description: 流程控制器
 * @Package: lsr-microservice
 * @author: Hacker_lsr@126.com
 **/
@RestController
public class FlowJobController {
    @Autowired
    private FlowJobService flowJobService;

    @GetMapping("/run")
    public void run() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        flowJobService.run();
    }
}

啟動類注意注解使用 @EnableBatchProcessing @EnableScheduling

package cn;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
 * @Description: Batch啟動類
 * @Package: lsr-microservice
 * @author: Hacker_lsr@126.com
 **/
@EnableBatchProcessing
@EnableScheduling
@SpringBootApplication
public class BatchServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(BatchServiceApplication.class, args);
        Logger logger = LoggerFactory.getLogger(BatchServiceApplication.class);
        logger.info("********************************");
        logger.info("**** 啟動 batch-service 成功 ****");
        logger.info("********************************");
    }
}

 

啟動自動運行,基於Service層的 @Scheduled(fixedRate = 24 * 60 * 60 * 1000) / 也可以手動訪問xxx:端口號/run

附加:

exception類

package cn.lsr.excepiton;

/**
 * @Description: 資金異常
 * @Package: lsr-microservice
 * @author: Hacker_lsr@126.com
 **/
public class MoneyException extends Exception{
    public MoneyException(){}

    public MoneyException(String message) {
        super(message);
    }

    public MoneyException(String message, Throwable cause) {
        super(message, cause);
    }

    public MoneyException(Throwable cause) {
        super(cause);
    }
}

Util工具類

package cn.lsr.util;

import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;

/**
 * @Description: 時間工具類
 * @Package: lsr-microservice
 * @author: Hacker_lsr@126.com
 **/
public class DateUtils {

    /**
     * 獲取本月的開始時間
     */
    public static Date getBeginDayOfMonth() {
        Calendar calendar = Calendar.getInstance();
        calendar.set(getNowYear(), getNowMonth() - 1, 1);
        return getDayStartTime(calendar.getTime());
    }

    /**
     * 獲取本月的結束時間
     */
    public static Date getEndDayOfMonth() {
        Calendar calendar = Calendar.getInstance();
        calendar.set(getNowYear(), getNowMonth() - 1, 1);
        int day = calendar.getActualMaximum(5);
        calendar.set(getNowYear(), getNowMonth() - 1, day);
        return getDayEndTime(calendar.getTime());
    }

    /**
     * 獲取某個日期的開始時間
     */
    private static Timestamp getDayStartTime(Date d) {
        Calendar calendar = Calendar.getInstance();
        if (null != d) calendar.setTime(d);
        calendar.set(calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH), calendar.get(Calendar.DAY_OF_MONTH), 0, 0, 0);
        calendar.set(Calendar.MILLISECOND, 0);
        return new Timestamp(calendar.getTimeInMillis());
    }

    /**
     * 獲取某個日期的結束時間
     */
    private static Timestamp getDayEndTime(Date d) {
        Calendar calendar = Calendar.getInstance();
        if (null != d) calendar.setTime(d);
        calendar.set(calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH), calendar.get(Calendar.DAY_OF_MONTH), 23, 59, 59);
        calendar.set(Calendar.MILLISECOND, 999);
        return new Timestamp(calendar.getTimeInMillis());
    }

    /**
     * 獲取今年是哪一年
     */
    private static Integer getNowYear() {
        Date date = new Date();
        GregorianCalendar gc = (GregorianCalendar) Calendar.getInstance();
        gc.setTime(date);
        return Integer.valueOf(gc.get(1));
    }

    /**
     * 獲取本月是哪一月
     */
    private static int getNowMonth() {
        Date date = new Date();
        GregorianCalendar gc = (GregorianCalendar) Calendar.getInstance();
        gc.setTime(date);
        return gc.get(2) + 1;
    }
}

application.properties

server.port=8010

spring.batch.job.enabled=false
spring.datasource.driver-class-name = com.mysql.jdbc.Driver
spring.datasource.url = jdbc:mysql://192.168.0.104:3306/springbatch
spring.datasource.username = root
spring.datasource.password = lishirui
# JPA
#表示在控制台輸出hibernate讀寫數據庫時候的SQL。
spring.jpa.show-sql = true
spring.jpa.hibernate.ddl-auto = update
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5InnoDBDialect
spring.jpa.properties.hibernate.format_sql = true
#字段映射 _ 問題
spring.jpa.hibernate.naming.physical-strategy = org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl

# 項目啟動時創建數據表(用於記錄批處理執行狀態)的 SQL 腳本,該腳本由Spring Batch提供
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
# 項目啟動時執行建表 SQL
spring.batch.initialize-schema=always
# 默認情況下,項目啟動時就會自動執行配置好的批處理操作。這里將其設為不自動執行,后面我們通過手動觸發執行批處理
#當遇到同樣名字的時候,是否允許覆蓋注冊
spring.main.allow-bean-definition-overriding: true 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM