一、引入依賴
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- SpringBoot依賴 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
</parent>
<groupId>com.qiang.mybaties.plus.test.batch</groupId>
<artifactId>MybatiesPlusTestBatch</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- SpringBatch依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- oracle -->
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc7</artifactId>
<version>12.1.0.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- mybatisPlus 核心庫 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.1.0</version>
</dependency>
<!-- 引入阿里數據庫連接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.6</version>
</dependency>
<!-- redis依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
二、修改配置文件
application.yml
# 配置端口
server:
port: 8080
spring:
redis:
# IP地址
host: 192.168.17.101
# 端口
port: 6379
# 密碼
password: 123456
# 數據庫
database: 0
datasource:
url: jdbc:oracle:thin:@192.168.17.101:1522:XE
username: BATCH
password: 123456
driver-class-name: oracle.jdbc.OracleDriver
type: com.alibaba.druid.pool.DruidDataSource
# 指定數據庫表的位置
schema: classpath:/org/springframework/batch/core/schema-oracle10g.sql
# 啟動時創建表,后續可以將always修改成never
batch:
initialize-schema: always
main:
allow-bean-definition-overriding: true
# mybatis-plus相關配置
mybatis-plus:
# xml掃描,多個目錄用逗號或者分號分隔(告訴 Mapper 所對應的 XML 文件位置)
mapper-locations: classpath:mapper/*.xml
# 以下配置均有默認值,可以不設置
global-config:
db-config:
#主鍵類型 AUTO:"數據庫ID自增" INPUT:"用戶輸入ID",ID_WORKER:"全局唯一ID (數字類型唯一ID)", UUID:"全局唯一ID UUID";
id-type: auto
#字段策略 IGNORED:"忽略判斷" NOT_NULL:"非 NULL 判斷") NOT_EMPTY:"非空判斷"
field-strategy: NOT_EMPTY
#數據庫類型
db-type: ORACLE
configuration:
# 是否開啟自動駝峰命名規則映射:從數據庫列名到Java屬性駝峰命名的類似映射
map-underscore-to-camel-case: true
# 如果查詢結果中包含空值的列,則 MyBatis 在映射的時候,不會映射這個字段
call-setters-on-nulls: true
# 這個配置會將執行的sql打印出來,在開發或測試的時候可以用
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# 一級緩存配置 一級緩存是本地或者說局部緩存,它不能被關閉,只能配置緩存范圍。SESSION 或者 STATEMENT。
local-cache-scope: session
# 二級緩存總開關
cache-enabled: true
三、啟動類開啟注解
@EnableBatchProcessing
package com.qiang.mybaties.plus.test.batch;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
/**
* @author: 小強崽
* @create: 2020/11/27 9:46
* @description:
**/
@SpringBootApplication
@MapperScan({"com.qiang.mybaties.plus.test.batch.dao"})
@EnableCaching
@EnableBatchProcessing
public class MybatiesPlusBatchApplication {
public static void main(String[] args) {
SpringApplication.run(MybatiesPlusBatchApplication.class, args);
}
}
四、測試是否成功
BatchHelloWorldJob
package com.qiang.mybaties.plus.test.batch.job;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: 小強崽
* @create: 2020/11/27 14:32
* @description: 第一個SpringBatch例子
**/
@Configuration
public class BatchHelloWorldJob {
/**
* 創建任務對象
*/
@Autowired
private JobBuilderFactory jobBuilderFactory;
/**
* 執行任務對象
*/
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* 創建任務
*
* @return
*/
@Bean
public Job helloWorld() {
return jobBuilderFactory.get("helloWorld")
//執行step
.start(step()).build();
}
@Bean
public Step step() {
return stepBuilderFactory.get("step").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("Hello World!");
// 返回的狀態
return RepeatStatus.FINISHED;
}
}).build();
}
}
結果執行成功
同時數據庫初始化了表
五、任務重復執行
當任務執行一次后,再次啟動會出現StepExecution。
Step already complete or not restartable, so no action to execute: StepExecution
在測試階段希望程序重復執行。
package com.qiang.mybaties.plus.test.batch.controller;
import com.qiang.mybaties.plus.test.batch.job.BatchHelloWorldJob;
import com.qiang.mybaties.plus.test.batch.response.ResponseResult;
import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
* @author: 小強崽
* @create: 2020/11/27 14:16
* @description:
**/
@RestController
@RequestMapping("/call/batch")
public class CallBatchJobController {
@Autowired
private BatchHelloWorldJob helloWorldJob;
@Autowired
private JobLauncher jobLauncher;
@GetMapping("/hello/world")
public ResponseResult helloWorld() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
// 根據時間配置每個job完成的使用參數,因為時間不一樣所以參數不一樣,所以任務可以重復執行
JobParameters jobParameters = new JobParametersBuilder()
.addDate("date", new Date())
.toJobParameters();
JobExecution run = jobLauncher.run(helloWorldJob.helloWorld(), jobParameters);
BatchStatus status = run.getStatus();
return ResponseResult.success(status);
}
}
六、SpringBatch結構
Spring Batch運行的基本單位是一個Job,一個Job就做一件批處理的事情。 一個Job包含很多Step,step就是每個job要執行的單個步驟。
Step里面,會有Tasklet,Tasklet是一個任務單元,它是屬於可以重復利用的東西。 然后是Chunk,chunk就是數據塊,需要定義多大的數據量是一個chunk。
Chunk里面就是不斷循環的一個流程,讀數據,處理數據,然后寫數據。Spring Batch會不斷的循環 這個流程,直到批處理數據完成。
七、使用Lambda創建Step
Job名稱跟Step名稱不能其它的Job和Step重名。Job名不能跟類名一樣,無論大小寫,否則會報錯,執行失敗。
package com.qiang.mybaties.plus.test.batch.job;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: 小強崽
* @create: 2020/12/1 15:34
* @description: Lambda創建Step
**/
@Configuration
public class BatchLambdaJob {
/**
* 創建任務對象
*/
@Autowired
private JobBuilderFactory jobBuilderFactory;
/**
* 執行任務對象
*/
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* 創建任務
*
* @return
*/
@Bean
public Job lambdaJob() {
return jobBuilderFactory.get("lambdaJob")
// 執行lambdaStep
.start(lambdaStep()).build();
}
public Step lambdaStep() {
return stepBuilderFactory.get("lambdaStep").tasklet((stepContribution, chunkContext) -> {
System.out.println("lambdaStep執行步驟....");
return RepeatStatus.FINISHED;
}
).build();
}
}
八、多任務執行
package com.qiang.mybaties.plus.test.batch.job;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author: 小強崽
* @create: 2020/12/1 15:46
* @description: 多任務執行
**/
@Component
public class BatchMultiJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job multiJob() {
return jobBuilderFactory.get("multiJob")
.start(multiStep1())
.next(multiStep2())
.next(multiStep3())
.build();
}
private Step multiStep1() {
return stepBuilderFactory.get("multiStep1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("multiStep1執行步驟一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step multiStep2() {
return stepBuilderFactory.get("multiStep2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("multiStep2執行步驟二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step multiStep3() {
return stepBuilderFactory.get("multiStep3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("multiStep3執行步驟三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
}
多個步驟在執行過程中也可以通過上一個步驟的執行狀態來決定是否執行下一個步驟。
package com.qiang.mybaties.plus.test.batch.job;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author: 小強崽
* @create: 2020/12/1 16:04
* @description: 多任務根據狀態執行
**/
@Component
public class BatchMultiStatusJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job multiStatusJob() {
return jobBuilderFactory.get("multiStatusJob")
.start(multiStatusStep1())
.on(ExitStatus.COMPLETED.getExitCode()).to(multiStatusStep2())
.from(multiStatusStep2())
.on(ExitStatus.COMPLETED.getExitCode()).to(multiStatusStep3())
.from(multiStatusStep3())
.end()
.build();
}
private Step multiStatusStep1() {
return stepBuilderFactory.get("multiStatusStep1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("multiStatusStep1執行步驟一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step multiStatusStep2() {
return stepBuilderFactory.get("multiStatusStep2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("multiStatusStep2執行步驟二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step multiStatusStep3() {
return stepBuilderFactory.get("multiStatusStep3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("multiStatusStep3執行步驟三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
}
當multiStatusStep1()狀態完成時執行multiStatusStep2(),以此類推,該ExitStatus類包含一下幾種狀態。
/*
* Copyright 2006-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core;
import org.springframework.util.StringUtils;
import java.io.PrintWriter;
import java.io.Serializable;
import java.io.StringWriter;
/**
* Value object used to carry information about the status of a
* job or step execution.
*
* ExitStatus is immutable and therefore thread-safe.
*
* @author Dave Syer
*
*/
@SuppressWarnings("serial")
public class ExitStatus implements Serializable, Comparable<ExitStatus> {
/**
* Convenient constant value representing unknown state - assumed not
* continuable.
*/
public static final ExitStatus UNKNOWN = new ExitStatus("UNKNOWN");
/**
* Convenient constant value representing continuable state where processing
* is still taking place, so no further action is required. Used for
* asynchronous execution scenarios where the processing is happening in
* another thread or process and the caller is not required to wait for the
* result.
*/
public static final ExitStatus EXECUTING = new ExitStatus("EXECUTING");
/**
* Convenient constant value representing finished processing.
*/
public static final ExitStatus COMPLETED = new ExitStatus("COMPLETED");
/**
* Convenient constant value representing job that did no processing (e.g.
* because it was already complete).
*/
public static final ExitStatus NOOP = new ExitStatus("NOOP");
/**
* Convenient constant value representing finished processing with an error.
*/
public static final ExitStatus FAILED = new ExitStatus("FAILED");
/**
* Convenient constant value representing finished processing with
* interrupted status.
*/
public static final ExitStatus STOPPED = new ExitStatus("STOPPED");
}
九、Flow串行的用法
一個Flow包含多個Step,Job流程中包含Flow類型的時候需要在build()方法前調用end()方法。
package com.qiang.mybaties.plus.test.batch.job;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author: 小強崽
* @create: 2020/12/1 16:26
* @description: 一個Flow包含多個Step
**/
@Component
public class BatchFlowJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job flowJob() {
return jobBuilderFactory.get("flowJob")
.start(flow())
.next(flowStep3())
.end()
.build();
}
private Step flowStep1() {
return stepBuilderFactory.get("flowStep1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("flowStep1執行步驟一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step flowStep2() {
return stepBuilderFactory.get("flowStep2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("flowStep2執行步驟二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step flowStep3() {
return stepBuilderFactory.get("flowStep3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("flowStep3執行步驟三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
/**
* 創建一個Flow對象,包含若干個Step
*
* @return
*/
private Flow flow() {
return new FlowBuilder<Flow>("flow")
.start(flowStep1())
.next(flowStep2())
.build();
}
}
十、Split並行的用法
Split任務並行處理。
package com.qiang.mybaties.plus.test.batch.job;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.stereotype.Component;
/**
* @author: 小強崽
* @create: 2020/12/1 16:47
* @description: 並行處理
**/
@Component
public class BatchSplitJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job splitJob() {
return jobBuilderFactory.get("splitJob")
.start(splitFlow1())
.split(new SimpleAsyncTaskExecutor()).add(splitFlow2())
.end()
.build();
}
private Step splitStep1() {
return stepBuilderFactory.get("splitStep1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("splitStep1執行步驟一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step splitStep2() {
return stepBuilderFactory.get("splitStep2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("splitStep2執行步驟二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step splitStep3() {
return stepBuilderFactory.get("splitStep3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("splitStep3執行步驟三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Flow splitFlow1() {
return new FlowBuilder<Flow>("splitFlow1")
.start(splitStep1())
.next(splitStep2())
.build();
}
private Flow splitFlow2() {
return new FlowBuilder<Flow>("splitFlow2")
.start(splitStep3())
.build();
}
}
結果看到任務不是串行處理的,而是異步執行,也就是並行處理。
十一、Decider決策器的使用
日期決策器,判斷今天是周末還是工作日。
package com.qiang.mybaties.plus.test.batch.decider;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import java.time.DayOfWeek;
import java.time.LocalDate;
/**
* @author: 小強崽
* @create: 2020/12/1 16:55
* @description: 日期決策器
**/
@Component
public class DateDecider implements JobExecutionDecider {
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
// 判斷今天是周末還是工作日
DayOfWeek dayOfWeek = LocalDate.now().getDayOfWeek();
System.out.println("今天是: " + dayOfWeek);
if (dayOfWeek == DayOfWeek.SATURDAY || dayOfWeek == DayOfWeek.SUNDAY) {
return new FlowExecutionStatus("weekend");
} else {
return new FlowExecutionStatus("workingDay");
}
}
}
首先執行deciderStep1(),然后使用自定義的日期決策器,如果返回周末weekend那就執行deciderStep2(),如果返回工作日workingDay那就執行deciderStep3(),無論deciderStep3()的結果是什么都會執行deciderStep4()。
package com.qiang.mybaties.plus.test.batch.job;
import com.qiang.mybaties.plus.test.batch.decider.DateDecider;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author: 小強崽
* @create: 2020/12/1 17:04
* @description: 決策器
**/
@Component
public class BatchDeciderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DateDecider dateDecider;
@Bean
public Job deciderJob() {
// 首先執行deciderStep1(),然后使用自定義的日期決策器
// 如果返回周末weekend那就執行deciderStep2()
// 如果返回工作日workingDay那就執行deciderStep3()
// 無論deciderStep3()的結果是什么都會執行deciderStep4()
return jobBuilderFactory.get("deciderJob")
.start(deciderStep1())
.next(dateDecider)
.from(dateDecider).on("weekend").to(deciderStep2())
.from(dateDecider).on("workingDay").to(deciderStep3())
.from(deciderStep3()).on("*").to(deciderStep4())
.end()
.build();
}
private Step deciderStep1() {
return stepBuilderFactory.get("deciderStep1")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("deciderStep1執行步驟一操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step deciderStep2() {
return stepBuilderFactory.get("deciderStep2")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("deciderStep2執行步驟二操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step deciderStep3() {
return stepBuilderFactory.get("deciderStep3")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("deciderStep3執行步驟三操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
private Step deciderStep4() {
return stepBuilderFactory.get("deciderStep4")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("deciderStep4執行步驟四操作。。。");
return RepeatStatus.FINISHED;
}).build();
}
}
十一、任務嵌套
通過childJobOne()和childJobTwo()方法創建了兩個任務Job。關鍵在於childJobOneStep()方法和childJobTwoStep()方法。在childJobOneStep()方法中,通過JobStepBuilder構建了一個名稱為childJobOneStep的Step,顧名思義,它是一個任務型Step的構造工廠,可以將任務轉換為“特殊”的步驟。在構建過程中,還需要傳入任務執行器JobLauncher、任務倉庫JobRepository和事務管理器PlatformTransactionManager。將任務轉換為特殊的步驟后,將其賦給父任務parentJob即可。
package com.qiang.mybaties.plus.test.batch.job;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.JobStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
/**
* @author: 小強崽
* @create: 2020/12/1 17:22
* @description: 任務嵌套
**/
@Component
public class BatchNestedJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobLauncher jobLauncher;
// 任務倉庫
@Autowired
private JobRepository jobRepository;
// 事務管理器
@Autowired
private PlatformTransactionManager platformTransactionManager;
/**
* 父任務
*
* @return
*/
@Bean
public Job parentJob() {
return jobBuilderFactory.get("parentJob")
.start(childJobOneStep())
.next(childJobTwoStep())
.build();
}
/**
* 將任務轉換為特殊的步驟,將任務childJobOne()轉成步驟childJobOneStep()
*
* @return
*/
private Step childJobOneStep() {
return new JobStepBuilder(new StepBuilder("childJobOneStep"))
.job(childJobOne())
.launcher(jobLauncher)
.repository(jobRepository)
.transactionManager(platformTransactionManager)
.build();
}
/**
* 將任務轉換為特殊的步驟,將任務childJobTwo()轉成步驟childJobTwoStep()
*
* @return
*/
private Step childJobTwoStep() {
return new JobStepBuilder(new StepBuilder("childJobTwoStep"))
.job(childJobTwo())
.launcher(jobLauncher)
.repository(jobRepository)
.transactionManager(platformTransactionManager)
.build();
}
/**
* 子任務一,Job執行子任務一
*
* @return
*/
private Job childJobOne() {
return jobBuilderFactory.get("childJobOne")
.start(
stepBuilderFactory.get("childJobOneStep")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("子任務一執行步驟。。。");
return RepeatStatus.FINISHED;
}).build()
).build();
}
/**
* 子任務二,Job執行子任務二
*
* @return
*/
private Job childJobTwo() {
return jobBuilderFactory.get("childJobTwo")
.start(
stepBuilderFactory.get("childJobTwoStep")
.tasklet((stepContribution, chunkContext) -> {
System.out.println("子任務二執行步驟。。。");
return RepeatStatus.FINISHED;
}).build()
).build();
}
}
十二、讀取數據
Spring Batch讀取數據通過ItemReader接口的實現類來完成,包括FlatFileItemReader文本數據讀取、StaxEventItemReader XML文件數據讀取、JsonItemReader JSON文件數據讀取、JdbcPagingItemReader數據庫分頁數據讀取等實現,更多請參考官網。
12.1 簡單的數據讀取
自定義一個ItemReader的實現類,實現簡單數據的讀取,BatchSimpleItemReader。
package com.qiang.mybaties.plus.test.batch.item.reader.job.impl;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import java.util.Iterator;
import java.util.List;
/**
* @author: 小強崽
* @create: 2020/12/2 10:24
* @description: 簡單的數據讀取,泛型為讀取數據的類型
**/
public class BatchSimpleItemReader implements ItemReader<String> {
private Iterator<String> iterator;
public BatchSimpleItemReader(List<String> data) {
this.iterator = data.iterator();
}
/**
* 數據一個接着一個讀取
*
* @return
* @throws Exception
* @throws UnexpectedInputException
* @throws ParseException
* @throws NonTransientResourceException
*/
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
// 遍歷集合的數據 NAVIGATION, 返回null時代表數據讀取已完成
return iterator.hasNext() ? iterator.next() : null;
}
}
測試代碼BatchSimpleItemReaderJob,使用chunk()方法。chunk字面上的意思是“塊”的意思,可以簡單理解為數據塊,泛型<String, String>用於指定讀取的數據和輸出的數據類型,構造器入參指定了數據塊的大小,比如指定為2時表示每當讀取2組數據后做一次數據輸出處理。接着reader()方法指定讀取數據的方式,該方法接收ItemReader的實現類。
package com.qiang.mybaties.plus.test.batch.item.reader.job;
import com.qiang.mybaties.plus.test.batch.item.reader.job.impl.BatchSimpleItemReader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
/**
* @author: 小強崽
* @create: 2020/12/2 10:22
* @description: 簡單的數據讀取
**/
@Component
public class BatchSimpleItemReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job simpleItemReaderJob() {
return jobBuilderFactory
.get("simpleItemReaderJob")
.start(simpleItemReaderStep())
.build();
}
public Step simpleItemReaderStep() {
return stepBuilderFactory
.get("simpleItemReaderStep")
.<String, String>chunk(2)
.reader(simpleItemReader())
.writer(list -> list.forEach(System.out::println))
.build();
}
public ItemReader<String> simpleItemReader() {
List<String> strings = Arrays.asList("存款", "余額", "資金","凍結");
return new BatchSimpleItemReader(strings);
}
}
12.2 文本數據讀取
通過FlatFileItemReader
實現文本數據讀取。
准備數據文件,在resources目錄下新建file文件。
// 演示文件數據讀取
1,11,12,13
2,21,22,23
3,31,32,33
4,41,42,43
5,51,52,53
6,61,62,63
file的數據是一行一行以逗號分隔的數據(在批處理業務中,文本類型的數據文件一般都是有一定規律的)。在文本數據讀取的過程中,我們需要將讀取的數據轉換為POJO對象存儲,所以我們需要創建一個與之對應的POJO對象。新建TestVo
類,因為file文本中的一行數據經過逗號分隔后為1、11、12、13,所以我們創建的與之對應的POJO TestVo包含4個屬性id、field1、field2和field3。
package com.qiang.mybaties.plus.test.batch.item.reader.entity;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import lombok.Data;
import java.io.Serializable;
/**
* @author 小強崽
* @create: 2020-12-04 16:33:11
* @description: 實體類
*/
@Data
public class TestVo extends Model<TestVo> {
/**
* 編號
*/
private Integer id;
/**
* 字段1
*/
private Object field1;
/**
* 字段2
*/
private Object field2;
/**
* 字段3
*/
private Object field3;
/**
* 獲取主鍵值
*
* @return 主鍵值
*/
@Override
protected Serializable pkVal() {
return this.id;
}
}
測試代碼。
package com.qiang.mybaties.plus.test.batch.item.reader.job;
import com.qiang.mybaties.plus.test.batch.item.reader.entity.FileVo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
/**
* @author: 小強崽
* @create: 2020-12-06 22:36
* @description: 文件數據讀取
*/
@Component
public class BatchFileItemReaderJob {
/**
* 任務創建工廠
*/
@Autowired
private JobBuilderFactory jobBuilderFactory;
/**
* 步驟創建工廠
*/
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job fileItemReaderJob() {
return jobBuilderFactory.get("fileItemReaderJob")
.start(fileItemReaderJobStep())
.build();
}
private Step fileItemReaderJobStep() {
return stepBuilderFactory.get("fileItemReaderJobStep")
.<FileVo, FileVo>chunk(2)
.reader(fileItemReader())
.writer(list -> list.forEach(System.out::println))
.build();
}
private ItemReader<FileVo> fileItemReader() {
FlatFileItemReader<FileVo> reader = new FlatFileItemReader<>();
// 設置文件資源地址
reader.setResource(new ClassPathResource("file"));
// 忽略第一行
reader.setLinesToSkip(1);
// AbstractLineTokenizer的三個實現類之一,以固定分隔符處理行數據讀取,
// 使用默認構造器的時候,使用逗號作為分隔符,也可以通過有參構造器來指定分隔符
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
// 設置屬性名,類似於表頭
tokenizer.setNames("id", "field1", "field2", "field3");
// 將每行數據轉換為TestData對象
DefaultLineMapper<FileVo> mapper = new DefaultLineMapper<>();
// 設置LineTokenizer
mapper.setLineTokenizer(tokenizer);
// 設置映射方式,即讀取到的文本怎么轉換為對應的POJO
mapper.setFieldSetMapper(fieldSet -> {
FileVo fileVo = new FileVo();
fileVo.setId(fieldSet.readInt("id"));
fileVo.setField1(fieldSet.readString("field1"));
fileVo.setField2(fieldSet.readString("field2"));
fileVo.setField3(fieldSet.readString("field3"));
return fileVo;
});
reader.setLineMapper(mapper);
return reader;
}
}
測試結果。
12.3 數據庫數據讀取
創建數據庫表。
CREATE TABLE "ITEMREADER"."TEST" (
"ID" NUMBER(10) NOT NULL ,
"FIELD1" VARCHAR2(255) ,
"FIELD2" VARCHAR2(255) ,
"FIELD3" VARCHAR2(255) ,
PRIMARY KEY ("ID")
);
COMMENT ON COLUMN "ITEMREADER"."TEST"."ID" IS '編號';
COMMENT ON COLUMN "ITEMREADER"."TEST"."FIELD1" IS '字段1';
COMMENT ON COLUMN "ITEMREADER"."TEST"."FIELD2" IS '字段2';
COMMENT ON COLUMN "ITEMREADER"."TEST"."FIELD3" IS '字段3';
插入數據。
insert all
into TEST(id,field1,field2,field3) values(1,'11','12','13')
into TEST(id,field1,field2,field3) values(2,'21','22','23')
into TEST(id,field1,field2,field3) values(3,'31','32','33')
into TEST(id,field1,field2,field3) values(4,'41','42','43')
into TEST(id,field1,field2,field3) values(5,'51','52','53')
into TEST(id,field1,field2,field3) values(6,'61','62','63')
select 1 from dual;
創建實體TestVo。
package com.qiang.mybaties.plus.test.batch.item.reader.entity;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import lombok.Data;
import java.io.Serializable;
/**
* @author 小強崽
* @create: 2020-12-04 16:33:11
* @description: 實體類
*/
@Data
public class TestVo extends Model<TestVo> {
/**
* 編號
*/
private Integer id;
/**
* 字段1
*/
private Object field1;
/**
* 字段2
*/
private Object field2;
/**
* 字段3
*/
private Object field3;
/**
* 獲取主鍵值
*
* @return 主鍵值
*/
@Override
protected Serializable pkVal() {
return this.id;
}
}
創建BatchDataSourceItemReaderJob
,dataSourceItemReader()
方法中的主要步驟就是:通過JdbcPagingItemReader
設置對應的數據源,然后設置數據量、獲取數據的sql語句、排序規則和查詢結果與POJO的映射規則等。方法末尾之所以需要調用JdbcPagingItemReader
的afterPropertiesSet()
方法是因為需要設置JDBC模板。
package com.qiang.mybaties.plus.test.batch.item.reader.job;
import com.qiang.mybaties.plus.test.batch.item.reader.entity.TestVo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.OraclePagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
/**
* @author: 小強崽
* @create: 2020/12/4 15:57
* @description: 數據庫數據讀取
**/
@Component
public class BatchDataSourceItemReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* 注入數據源
*/
@Autowired
private DataSource dataSource;
@Bean
public Job dataSourceItemReaderJob() throws Exception {
return jobBuilderFactory.get("dataSourceItemReaderJob")
.start(step())
.build();
}
private Step step() throws Exception {
return stepBuilderFactory.get("step")
.<TestVo, TestVo>chunk(2)
.reader(dataSourceItemReader())
.writer(list -> list.forEach(System.out::println))
.build();
}
private ItemReader<TestVo> dataSourceItemReader() throws Exception {
JdbcPagingItemReader<TestVo> reader = new JdbcPagingItemReader<>();
// 設置數據源
reader.setDataSource(dataSource);
// 每次取多少條記錄
reader.setFetchSize(5);
// 設置每頁數據量
reader.setPageSize(5);
// 指定sql查詢語句 select id,field1,field2,field3 from TEST
//MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
// 這里使用Oracle
OraclePagingQueryProvider oraclePagingQueryProvider = new OraclePagingQueryProvider();
//設置查詢字段
oraclePagingQueryProvider.setSelectClause("id,field1,field2,field3");
// 設置從哪張表查詢
oraclePagingQueryProvider.setFromClause("from TEST");
// 將讀取到的數據轉換為Test對象
reader.setRowMapper((resultSet, rowNum) -> {
TestVo testVo = new TestVo();
testVo.setId(resultSet.getInt(1));
// 讀取第一個字段,類型為String
testVo.setField1(resultSet.getString(2));
testVo.setField2(resultSet.getString(3));
testVo.setField3(resultSet.getString(4));
return testVo;
});
Map<String, Order> sort = new HashMap<>(1);
sort.put("id", Order.ASCENDING);
// 設置排序,通過id 升序
oraclePagingQueryProvider.setSortKeys(sort);
reader.setQueryProvider(oraclePagingQueryProvider);
// 設置namedParameterJdbcTemplate等屬性
reader.afterPropertiesSet();
return reader;
}
}
測試結果
12.4 XML文件讀取
Spring Batch借助Spring OXM可以輕松地實現xml格式數據文件讀取。在resources目錄下新建file.xml。
<?xml version="1.0" encoding="utf-8" ?>
<tests>
<test>
<id>1</id>
<field1>11</field1>
<field2>12</field2>
<field3>13</field3>
</test>
<test>
<id>2</id>
<field1>21</field1>
<field2>22</field2>
<field3>23</field3>
</test>
<test>
<id>3</id>
<field1>31</field1>
<field2>32</field2>
<field3>33</field3>
</test>
<test>
<id>4</id>
<field1>41</field1>
<field2>42</field2>
<field3>43</field3>
</test>
<test>
<id>5</id>
<field1>51</field1>
<field2>52</field2>
<field3>53</field3>
</test>
<test>
<id>6</id>
<field1>61</field1>
<field2>62</field2>
<field3>63</field3>
</test>
</tests>
xml文件內容由一組一組的<test></test>
標簽組成,<test>
標簽又包含四組子標簽,標簽名稱和XmlVo實體類屬性一一對應。准備好xml文件后,我們在pom中引入spring-oxm依賴。
@Data
public class FileVo {
private int id;
private String field1;
private String field2;
private String field3;
}
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<version>1.4.11.1</version>
</dependency>
測試代碼。
package com.qiang.mybaties.plus.test.batch.item.reader.job;
import com.qiang.mybaties.plus.test.batch.item.reader.entity.XmlVo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.xml.StaxEventItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.oxm.xstream.XStreamMarshaller;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @author: 小強崽
* @create: 2020/12/8 10:11
* @description: xml文件讀取
**/
@Component
public class BatchXmlFileItemReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job xmlFileItemReaderJob() {
return jobBuilderFactory.get("xmlFileItemReaderJob")
.start(xmlFileItemReaderStep())
.build();
}
private Step xmlFileItemReaderStep() {
return stepBuilderFactory.get("xmlFileItemReaderStep")
.<XmlVo, XmlVo>chunk(2)
.reader(xmlFileItemReader())
.writer(list -> list.forEach(System.out::println))
.build();
}
private ItemReader<XmlVo> xmlFileItemReader() {
StaxEventItemReader<XmlVo> reader = new StaxEventItemReader<>();
// 設置xml文件源
reader.setResource(new ClassPathResource("file.xml"));
// 指定xml文件的根標簽
reader.setFragmentRootElementName("test");
// 將xml數據轉換為XmlVo對象
XStreamMarshaller marshaller = new XStreamMarshaller();
// 指定需要轉換的目標數據類型
Map<String, Class<XmlVo>> map = new HashMap<>(1);
map.put("test", XmlVo.class);
marshaller.setAliases(map);
reader.setUnmarshaller(marshaller);
return reader;
}
}
12.5 JSON數據讀取
在resources目錄下新建file.json文件。
[
{
"id": 1,
"field1": "11",
"field2": "12",
"field3": "13"
},
{
"id": 2,
"field1": "21",
"field2": "22",
"field3": "23"
},
{
"id": 3,
"field1": "31",
"field2": "32",
"field3": "33"
}
]
JSON對象屬性和JsonVo對象屬性一一對應。
package com.qiang.mybaties.plus.test.batch.item.reader.entity;
import lombok.Data;
/**
* @author: 小強崽
* @create: 2020/12/8 10:39
* @description: json對象
**/
@Data
public class JsonVo {
private int id;
private String field1;
private String field2;
private String field3;
}
測試代碼。
package com.qiang.mybaties.plus.test.batch.item.reader.job;
import com.qiang.mybaties.plus.test.batch.item.reader.entity.JsonVo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.json.JacksonJsonObjectReader;
import org.springframework.batch.item.json.JsonItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
/**
* @author: 小強崽
* @create: 2020/12/8 10:43
* @description: json文件讀取
**/
@Component
public class BatchJsonFileItemReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job jsonFileItemReaderJob() {
return jobBuilderFactory.get("jsonFileItemReaderJob")
.start(jsonFileItemReaderStep())
.build();
}
private Step jsonFileItemReaderStep() {
return stepBuilderFactory.get("step")
.<JsonVo, JsonVo>chunk(2)
.reader(jsonItemReader())
.writer(list -> list.forEach(System.out::println))
.build();
}
private ItemReader<JsonVo> jsonItemReader() {
// 設置json文件地址
ClassPathResource resource = new ClassPathResource("file.json");
// 設置json文件轉換的目標對象類型
JacksonJsonObjectReader<JsonVo> jacksonJsonObjectReader = new JacksonJsonObjectReader<>(JsonVo.class);
JsonItemReader<JsonVo> reader = new JsonItemReader<>(resource, jacksonJsonObjectReader);
// 給reader設置一個別名
reader.setName("testDataJsonItemReader");
return reader;
}
}
測試結果。
12.6 多文本數據讀取
多文本的數據讀取本質還是單文件數據讀取,區別就是多文件讀取需要在單文件讀取的方式上設置一層代理。在resources目錄下新建兩個文件file1和file2。
// file1
1,11,12,13
2,21,22,23
3,31,32,33
4,41,42,43
5,51,52,53
6,61,62,63
十三、常見問題
問題:
Caused by: org.springframework.batch.core.repository.JobExecutionAlreadyRunningException: A job execution for this job is already running: JobExecution: id=152, version=1, startTime=2020-12-02 10:38:40.348, endTime=null, lastUpdated=2020-12-02 10:38:40.349, status=STARTED, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=59, version=0, Job=[simpleItemReaderJob]], jobParameters=[{}]
解決:
由於simpleItemReaderJob異常,導致非正常完成批處理,數據庫表記錄沖突,刪掉相關的記錄,或者恢復數據庫即可解決,后續將simpleItemReaderJob檢查並處理異常即可。
作者(Author):小強崽
來源(Source):https://www.wuduoqiang.com/archives/SpringBoot整合SpringBatch
協議(License):署名-非商業性使用-相同方式共享 4.0 國際 (CC BY-NC-SA 4.0)
版權(Copyright):商業轉載請聯系作者獲得授權,非商業轉載請注明出處。 For commercial use, please contact the author for authorization. For non-commercial use, please indicate the source.