簡介
A lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.
Spring Batch provides reusable functions that are essential in processing large volumes of records, including logging/tracing, transaction management, job processing statistics, job restart, skip, and resource management. It also provides more advanced technical services and features that will enable extremely high-volume and high performance batch jobs through optimization and partitioning techniques. Simple as well as complex, high-volume batch jobs can leverage the framework in a highly scalable manner to process significant volumes of information.
大體翻譯如下:
一個輕量的、廣泛的批處理框架,該框架的設計目的是為了支持對企業系統日常運營至關重要的批處理應用程序的開發。
Spring Batch 提供了在處理大量記錄時必需的可復用功能,包括日志記錄/跟蹤、事務管理、任務處理統計、任務重啟、任務跳過和資源管理。它也提供了更加高級的技術服務和特征,通過優化和分區的方式獲得極高容量和高性能的批處理任務。簡單和復雜的大容量批處理任務都可以以高度可擴展的方式利用該框架來處理大量的信息
處理架構
Spring Batch 的處理結構如下所示:
其中,任務的處理是在 Step
這個階段定義的。在 Step
中,需要定義數據的讀取、數據的處理、數據的寫出操作,在這三個階段中,數據的處理是真正進行數據處理的地方。具體 Step
的流程如下圖所示:
Reader
(架構圖中的Item Reader
):主要的任務是定義數據的讀取操作,包括讀取文件的位置、對讀取首先要進行的划分(如以 ',' 作為分隔符)、將讀取到的文件映射到相關對象的屬性字段等Process
(架構圖中的Item Processor
):這里是真正對數據進行處理的地方,數據的處理邏輯都在這里定義Writer
(架構圖中的Item Writer
):這個階段的主要任務是定義數據的輸出操作,包括將數據寫入到數據庫等
使用前的准備
在使用 Spring Batch 之前,首先需要創建 Spring Batch 需要的元數據表和它需要使用的元數據類型,這些可以在數據庫中進行定義,這些元數據表和元數據類型是和 Spring Batch 中的域對象緊密相關的。
元數據表
元數據表的關聯關系如下所示:

相關的表解釋如下:
BATCH_JOB_INSTANCE
:與這個表相對應的是JobInstance
域對象,這個域對象是整個層次結構的頂層,表示具體的任務BATCH_JOB_EXECUTION_PARAMS
:與這個表對應的是JobParameters
域對象,它包含了 0 個或多個 key-value 鍵值對,作為每次運行任務時使用的參數,通過JobParameters
對象和Job
對象,可以得到唯一確定的JobInstance
實例BATCH_JOB_EXECUTION
:與這個表對應的是JobExecution
域對象,每次運行一個任務時,都會創建一個新的JobExecution
對象BATCH_STEP_EXECUTION
:與這個表對應的是StepExecution
對象,這個對象與JobExecution
類似,與JobExecution
相關聯的地方在於一個JobExecution
可以有多個StepExecution
BATCH_JOB_EXECUTION_CONTEXT
:這個表存儲的是每個Job
的執行上下文信息BATCH_STEP_EXECUTION_CONTEXT
:這個表存儲的是每個Job
中每個Step
的執行上下文信息
元數據類型
BATCH_JOB_INSTANCE
、BATCH_JOB_EXECUTION
、BATCH_STEP_EXECUTION
這三個表都包含了以 _ID
結尾的列,這個列會作為它們所在表的實際主鍵。然而,這個列不是由數據庫產生的,而是由單獨的序列來產生的,這是因為:在插入數據之后,需要在插入的數據上設置給定的鍵,這樣才能確保它們在 Java
應用中的唯一性。盡管較新的 JDBC
支持主鍵自增,但是為了能夠更好地兼容,因此還是有必要為這三個數據表設置對應的序列類型。
定義元數據類型的 SQL
如下:
CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_SEQ;
由於有的數據庫(如MySQL
)不支持 SEQUENCE
這種類型,一般的做法是創建一個表來代理 SEQUENCE
:
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (ID BIGINT NOT NULL) ENGINE = InnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ values(0);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (ID BIGINT NOT NULL) ENGINE =InnoDB;
INSERT INTO BATCH_JOB_EXECUTION_SEQ values(0);
CREATE TABLE BATCH_JOB_SEQ (ID BIGINT NOT NULL) ENGINE = InnoDB;
INSERT INTO BATCH_JOB_SEQ values(0);
最終創建元數據表和元數據類型的 SQL
腳本如下:
PostgreSQL
:https://raw.githubusercontent.com/LiuXianghai-coder/Test-Repo/master/SQL/batch_postgresql_meta.sql
MySQL
:https://raw.githubusercontent.com/LiuXianghai-coder/Test-Repo/master/SQL/batch_mysql_meta.sql
開始使用
在這個例子中,需要實現的功能為:從一個文件中批量地讀取數據,將這些數據進行相應的轉換(小寫字母變大寫),再將它們寫入到數據庫中。
對應的數據文本如下(sample-data.csv):
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
創建實體類
首先,創建處理的數據對應的實體和關聯的數據表,如下所示:
// 與數據
public class Person {
private final String lastName;
private final String firstName;
public Person(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
// 省略一部分 getter 方法
}
對應的實體表如下所示:
-- 以 PostgreSQL 為例
CREATE TABLE people
(
person_id SERIAL8 NOT NULL PRIMARY KEY,
first_name VARCHAR(20),
last_name VARCHAR(20)
);
處理的核心邏輯
接下來,需要定義對於每條數據的處理邏輯,處理邏輯對應的類需要實現 org.springframework.batch.item.ItemProcessor<I, O>
接口,其中,I
范型表示要處理的實體類類型,O
表示經過處理之后返回的結果類型。由於這里只是對 Peroson
類的名和姓進行大寫的轉換,因此輸入類型和輸出類型都是 Person
具體的處理邏輯如下:
import org.springframework.batch.item.ItemProcessor;
/*
處理數據的邏輯類,對應架構中的 Item Processor 部分
*/
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
@Override
public Person process(Person person) {
final String firstName = person.getFirstName().toUpperCase();
final String lastName = person.getLastName().toUpperCase();
// 可以在這個處理邏輯中定義一些其它的操作。。。。。。。。。
return new Person(firstName, lastName);
}
}
數據的讀取
有了處理的核心邏輯部分之后,剩下的重要部分就是數據的輸入和輸出了,正如上文所描述的那樣,輸入部分定義了數據的來源、對初始數據進行處理等任務。
數據的讀取部分如下所示:
@Bean(name = "reader") // 可以把這個 Bean 放在任意的一個配置類或組件類中
/*
由於讀取的數據來源是來自一般的文件,因此采用 FlatFileItemReader 的實現類;
其它的實現類可以查看 org.springframework.batch.item.ItemReader 的實現類
*/
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>() // 以構建者模式的方式構建新的 FlatFileItemReader 對象
.name("personItemReader")
.resource(new ClassPathResource("sample-data.csv")) // 讀取數據的來源,這里表示在類路徑的 resourcec 目錄下的 sample-data.csv 文件
.delimited().delimiter(",") // 指定每行的數據字段的分割符為 ,
.names("firstName", "lastName") // 將分割的字段映射到 firstName 和 lastName 屬性字段
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
setTargetType(Person.class);
}}) // 這些分割的屬性字段對應的類,使用 {{}} 的方式來將初始化的 BeanWrapperFieldSetMapper 調用 setTargetType 方法,可能是一個比較簡潔的方式,但這種方式可能會導致內存泄漏
.build(); // 通過設置的屬性構造 FlatFileItemReader 對象
}
數據的寫出
數據的寫出部分如下所示:
@Bean(name = "writer")
/*
由於這里的寫出是寫入的數據庫中,因此采用 JdbcBatchItemWriter 的實現類進行寫出;
其它的實現類可以查看 org.springframework.batch.item.ItemWriter 的實現類
*/
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>() // 以構建者模式的方式創建 JdbcBatchItemWriter 實例對象
.itemSqlParameterSourceProvider(
new BeanPropertyItemSqlParameterSourceProvider<Person>() // 提供執行相關 SQL 需要的參數,這里以實體類(輸出類)的方式存儲需要的參數
)
.sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)") // 寫入數據庫中具體執行的 SQL
.dataSource(dataSource) // 設置數據源,這個對象可以手動創建,但是一般在配置相關的 datasource 屬性之后,Spring 會自動生成這個類型的 Bean
.build();
}
整合到 Step
上文介紹過,一個 Step
包含了數據的讀取、數據的處理、數據的寫出三個部分,而一個批處理任務可以由多個 Step
來組成。現在,需要做的是將上文提到的寫入、處理、寫出三個部分組合成為一個 Step
具體代碼如下所示:
@Bean(name = "step1") // Step 也是一個 Bean,這里將它命名為 step1
// Step 類是批處理任務的執行單元
public Step step1(JdbcBatchItemWriter<Person> writer) {
return stepBuilderFactory // 由 Spring 容器自行注入;注意這里使用的工廠模式
.get("step1") // 創建一個會創建名為 step1 的 StepBuilder 對象;注意這里的構建者模式的使用
.<Person, Person>chunk(10) // 這個 Step 一次處理的數據的數量,前綴 <I, O> 范型表示的含義與 Item Process 中的一致,因此這里兩個范型都設置為 Person
.reader(reader()) // 數據的讀取部分
.processor(processor()) // 數據的處理部分
.writer(writer) // 寫出部分,由於 writer 需要注入 DataSurce 對象,因此直接作為一個注入參數參數並使用會更加方便;當然,reader 和 process 也可以通過注入參數的方式直接使用,因為它們都被定義成了 Spring 中的 Bean
.build();
}
任務執行監聽器
如果想要在任務執行前或者任務執行之后做一些相關的處理操作,那么設置對應的任務執行監聽器會很有用。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
@Component
public class JobCompletionNotificationListener implements JobExecutionListener {
// 日志打印對象
private final static Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
private final JdbcTemplate jdbcTemplate;
@Autowired
public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override // 執行任務之前的一些操作
public void beforeJob(JobExecution jobExecution) {
log.info("Ready to execution job.......");
}
@Override // 在任務執行完成之后執行的一些操作,這里是執行完成之后查詢寫入到數據庫中的結果
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! JOB FINISHED! Time to verify the results");
jdbcTemplate.query("SELECT first_name, last_name FROM people",
(rs, rowNum) -> new Person(rs.getString(1), rs.getString(2))
).forEach(person -> log.info("Found <" + person.toString() + "> in the database."));
}
}
}
創建 Job
批處理的最頂層的抽象便是 Job
,Job
是一個批處理任務,現在整合上文的內容,創建一個 Job
@Bean(name = "importUserJob")
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
return jobBuilderFactory // 這個工廠對象是由 Spring 自動注入的;同樣地,使用的是工廠模式
.get("importUserJob") // 創建一個會創建 importUserJob 任務的 JobBuilder 對象;構建者模式
.incrementer(new RunIdIncrementer()) // 增加這個 Job 的參數信息,具體可以參見 Spring Batch 的元數據信息
.listener(listener) // 添加之前創建的任務執行監聽器,使得在任務開始和結束時執行相應的操作
.flow(step1) // 添加上文定義的 step1 處理
.end() // 任務結束
.build();
}
如果想要添加多個 Step
,那么可以按照下面的方式進行添加:
@Bean(name = "importUserJob")
public Job importUserJob(JobCompletionNotificationListener listener) {
return jobBuilderFactory
.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.start(step1) // 定義的 step1
.next(step2) // 定義的 step2
.build();
}
值得注意的是,由於上文定義的任務執行監聽器監聽的是任務(即 Job
) 的狀態,因此當添加多個 Step
時,只有在完成最后的 Step
之后才會觸發這個事件監聽。
啟動批處理任務
如果想要啟動批處理任務,首先需要創建一個配置類,如下所示:
mport org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
@EnableBatchProcessing // 使得能夠開啟批處理任務處理,這樣 JobBuilderFactory 才能夠被 Spring 注入
public class TestConfiguration {
private final JobBuilderFactory jobBuilderFactory;
@Resource(name = "step1")
private Step step1;
@Resource(name = "step2")
private Step step2;
public TestConfiguration(JobBuilderFactory jobBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
}
@Bean(name = "importUserJob") // 具體的任務 Bean,這個 Bean 會在 Spring 容器啟動的時候進行加載,因此任務也會執行
public Job importUserJob(JobCompletionNotificationListener listener) {
return jobBuilderFactory
.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.start(step1)
.next(step2)
.build();
}
}
除了在配置類中加上 @EnableBatchProcessing
開啟批處理任務之外,在配置文件 application.yml
文件中也需要做相應的配置:
spring:
batch:
job:
enabled: true # 使得能夠開啟批處理任務
現在,啟動 Spring 應用程序(可以使用 SpringApplication.run()
方法來啟動),你會發現正在進行批處理任務:

任務執行完成之后,查看數據庫的寫入內容:

可以發現,處理過的數據已經成功寫入到數據庫中了
查看執行的日志,可能如下所示:

以上就是有關 Spring Batch 的一些基本使用,希望它可以幫到你
項目地址:https://github.com/LiuXianghai-coder/Spring-Study/tree/master/spring-batch
參考: