前序文章陸續介紹了批處理的基本概念,Job使用、Step控制、Item的結構以及扁平文件的讀寫。本文將接着前面的內容說明數據庫如何進行批處理讀寫。
數據讀取
數據庫是絕大部分系統要用到的數據存儲工具,因此針對數據庫執行批量數據處理任務也是很常見的需求。數據的批量處理與常規業務開發不同,如果一次性讀取百萬條,對於任何系統而言肯定都是不可取的。為了解決這個問題Spring Batch提供了2套數據讀取方案:
- 基於游標讀取數據
- 基於分頁讀取數據
游標讀取數據
對於有經驗大數據工程師而言數據庫游標的操作應該是非常熟悉的,因為這是從數據庫讀取數據流標准方法,而且在Java中也封裝了ResultSet
這種面向游標操作的數據結構。
ResultSet
一直都會指向結果集中的某一行數據,使用next
方法可以讓游標跳轉到下一行數據。Spring Batch同樣使用這個特性來控制數據的讀取:
- 在初始化時打開游標。
- 每一次調用
ItemReader::read
方法就從ResultSet
獲取一行數據並執行next
。 - 返回可用於數據處理的映射結構(map、dict)。
在一切都執行完畢之后,框架會使用回調過程調用ResultSet::close
來關閉游標。由於所有的業務過程都綁定在一個事物之上,所以知道到Step
執行完畢或異常退出調用執行close
。下圖展示了數據讀取的過程:
SQL語句的查詢結果稱為數據集(對於大部分數據庫而言,其SQL執行結果會產生臨時的表空間索引來存放數據集)。游標開始會停滯在ID=2的位置,一次ItemReader
執行完畢后會產生對應的實體FOO2
,然后游標下移直到最后的ID=6。最后關閉游標。
JdbcCursorItemReader
JdbcCursorItemReader
是使用游標讀取數據集的ItemReader
實現類之一。它使用JdbcTemplate
中的DataSource
控制ResultSet
,其過程是將ResultSet
的每行數據轉換為所需要的實體類。
JdbcCursorItemReader
的執行過程有三步:
- 通過
DataSource
創建JdbcTemplate
。 - 設定數據集的SQL語句。
- 創建
ResultSet
到實體類的映射。 大致如下:
//隨風溜達的向日葵 chkui.com
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
除了上面的代碼,JdbcCursorItemReader
還有其他屬性:
屬性名稱 | 說明 |
---|---|
ignoreWarnings | 標記當執行SQL語句出現警告時,是輸出日志還是拋出異常,默認為true——輸出日志 |
fetchSize | 預通知JDBC驅動全量數據的個數 |
maxRows | 設置ResultSet從數據庫中一次讀取記錄的上限 |
queryTimeout | 設置執行SQL語句的等待超時時間,單位秒。當超過這個時間會拋出DataAccessException |
verifyCursorPosition | 對游標位置進行校驗。由於在RowMapper::mapRow方法中ResultSet是直接暴露給使用者的,因此有可能在業務代碼層面調用了ResultSet::next方法。將這個屬性設置為true,在框架中會有一個位置計數器與ResultSet保持一致,當執行完Reader后位置不一致會拋出異常。 |
saveState | 標記讀取的狀態是否被存放到ExecutionContext中。默認為true |
driverSupportsAbsolute | 告訴框架是指直接使用ResultSet::absolute方法來指定游標位置,使用這個屬性需要數據庫驅動支持。建議在支持absolute特性的數據庫上開啟這個特性,能夠明顯的提升性能。默認為false |
setUseSharedExtendedConnection | 標記讀取數據的游標是否與Step其他過程綁定成同一個事物。默認為false,表示讀取數據的游標是單獨建立連接的,具有自身獨立的事物。如果設定為true需要用ExtendedConnectionDataSourceProxy包裝DataSource用於管理事物過程。此時游標的創建標記為'READ_ONLY'、'HOLD_CURSORS_OVER_COMMIT'。需要注意的是該屬性需要數據庫支持3.0以上的JDBC驅動。 |
可執行源碼
源碼在下列地址的items子項目:
- Gitee:https://gitee.com/chkui-com/spring-batch-sample
- Github:https://github.com/chkui/spring-batch-sample
執行JdbcCursorItemReader
的代碼在org.chenkui.spring.batch.sample.items.JdbcReader
。啟動位置是org.chenkui.spring.batch.sample.database.cursor.JdbcCurosrApplication
。
在運行代碼之前請先在數據庫中執行以下DDL語句,並添加部分測試數據。
CREATE TABLE `tmp_test_weather` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`siteid` varchar(64) NOT NULL COMMENT '業務主鍵',
`month` varchar(64) NOT NULL COMMENT '日期',
`type` varchar(64) NOT NULL COMMENT '氣象類型',
`value` int(11) NOT NULL COMMENT '值',
`ext` varchar(255) DEFAULT NULL COMMENT '擴展數據',
PRIMARY KEY (`id`)
) ;
運行代碼:
//隨風溜達的向日葵 chkui.com
public class JdbcReader {
@Bean
public RowMapper<WeatherEntity> weatherEntityRowMapper() {
return new RowMapper<WeatherEntity>() {
public static final String SITEID_COLUMN = "siteId"; // 設置映射字段
public static final String MONTH_COLUMN = "month";
public static final String TYPE_COLUMN = "type";
public static final String VALUE_COLUMN = "value";
public static final String EXT_COLUMN = "ext";
@Override
// 數據轉換
public WeatherEntity mapRow(ResultSet resultSet, int rowNum) throws SQLException {
WeatherEntity weatherEntity = new WeatherEntity();
weatherEntity.setSiteId(resultSet.getString(SITEID_COLUMN));
weatherEntity.setMonth(resultSet.getString(MONTH_COLUMN));
weatherEntity.setType(WeatherEntity.Type.valueOf(resultSet.getString(TYPE_COLUMN)));
weatherEntity.setValue(resultSet.getInt(VALUE_COLUMN));
weatherEntity.setExt(resultSet.getString(EXT_COLUMN));
return weatherEntity;
}
};
}
@Bean
public ItemReader<WeatherEntity> jdbcCursorItemReader(
@Qualifier("weatherEntityRowMapper") RowMapper<WeatherEntity> rowMapper, DataSource datasource) {
JdbcCursorItemReader<WeatherEntity> itemReader = new JdbcCursorItemReader<>();
itemReader.setDataSource(datasource); //設置DataSource
//設置讀取的SQL
itemReader.setSql("SELECT siteId, month, type, value, ext from TMP_TEST_WEATHER");
itemReader.setRowMapper(rowMapper); //設置轉換
return itemReader;
}
}
HibernateCursorItemReader
在Java體系中數據庫操作常見的規范有JPA
或ORM
,Spring Batch提供了HibernateCursorItemReader
來實現HibernateTemplate
,它可以通過Hibernate框架進行游標的控制。
需要注意的是:使用Hibernate框架來處理批量數據到目前為止一直都有爭議,核心原因是Hibernate最初是為在線聯機事物型系統開發的。不過這並不意味着不能使用它來處理批數據,解決此問題就是讓Hibernate使用StatelessSession
用來保持游標,而不是standard session
一次讀寫,這將導致Hibernate的緩存機制和數據臟讀檢查失效,進而影響批處理的過程。關於Hibernate的狀態控制機制請閱讀官方文檔。
HibernateCursorItemReader
使用過程與JdbcCursorItemReader
沒多大差異都是逐條讀取數據然后控制狀態鏈接關閉。只不過他提供了Hibernate所使用的HSQL方案。
@Bean
public ItemReader<WeatherEntity> hibernateCursorItemReader(SessionFactory sessionFactory) {
HibernateCursorItemReader<WeatherEntity> itemReader = new HibernateCursorItemReader<>();
itemReader.setName("hibernateCursorItemReader");
itemReader.setQueryString("from WeatherEntity tmp_test_weather");
itemReader.setSessionFactory(sessionFactory);
return itemReader;
}
或
public ItemReader<WeatherEntity> hibernateCursorItemReader(SessionFactory sessionFactory) {
return new HibernateCursorItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.sessionFactory(sessionFactory)
.queryString("from CustomerCredit")
.build();
}
如果沒有特別的需要,不推薦使用Hibernate。
StoredProcedureItemReader
存儲過程是在同一個數據庫中處理大量數據的常用方法。StoredProcedureItemReader
的執行過程和JdbcCursorItemReader
一致,但是底層邏輯是先執行存儲過程,然后返回存儲過程執行結果游標。不同的數據庫存儲過程游標返回會有一些差異:
- 作為一個
ResultSet
返回。(SQL Server, Sybase, DB2, Derby以及MySQL) - 參數返回一個
ref-cursor
實例。比如Oracle、PostgreSQL數據庫,這類數據庫存儲過程是不會直接return任何內容的,需要從傳參獲取。 - 返回存儲過程調用后的返回值。
針對以上3個類型,配置上有一些差異:
//隨風溜達的向日葵 chkui.com
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_processor_weather");
reader.setRowMapper(new weatherEntityRowMapper());
reader.setRefCursorPosition(1);//第二種類型需要指定ref-cursor的參數位置
reader.setFunction(true);//第三種類型需要明確的告知reader通過返回獲取
return reader;
}
使用存儲過程處理數據的好處是可以實現針對庫內的數據進行合並、分割、排序等處理。如果數據在同一個數據庫,性能也明顯好於通過Java處理。
分頁讀取數據
相對於游標,還有一個辦法是進行分頁查詢。分頁查詢意味着再進行批處理的過程中同一個SQL會多次執行。在聯機型事物系統中分頁查詢常用於列表功能,每一次查詢需要指定開始位置和結束位置。
JdbcPagingItemReader
分頁查詢的默認實現類是JdbcPagingItemReader
,它的核心功能是用分頁器PagingQueryProvider
進行分頁控制。由於不同的數據庫分頁方法差別很大,所以針對不同的數據庫有不同的實現類。框架提供了SqlPagingQueryProviderFactoryBean
用於檢查當前數據庫並自動注入對應的PagingQueryProvider
。
JdbcPagingItemReader
會從數據庫中一次性讀取一整頁的數據,但是調用Reader
的時候還是會一行一行的返回數據。框架會自行根據運行情況確定什么時候需要執行下一個分頁的查詢。
分頁讀取數據執行源碼
- Gitee:https://gitee.com/chkui-com/spring-batch-sample
- Github:https://github.com/chkui/spring-batch-sample
執行JdbcPagingItemReader
的代碼在org.chenkui.spring.batch.sample.items.pageReader
。啟動位置是org.chenkui.spring.batch.sample.database.paging.JdbcPagingApplication
:
//隨風溜達的向日葵 chkui.com
public class pageReader {
final private boolean wrapperBuilder = false;
@Bean
//設置 queryProvider
public SqlPagingQueryProviderFactoryBean queryProvider(DataSource dataSource) {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setDataSource(dataSource);
provider.setSelectClause("select id, siteid, month, type, value, ext");
provider.setFromClause("from tmp_test_weather");
provider.setWhereClause("where id>:start");
provider.setSortKey("id");
return provider;
}
@Bean
public ItemReader<WeatherEntity> jdbcPagingItemReader(DataSource dataSource,
PagingQueryProvider queryProvider,
RowMapper<WeatherEntity> rowMapper) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("start", "1");
JdbcPagingItemReader<WeatherEntity> itemReader;
if (wrapperBuilder) {
itemReader = new JdbcPagingItemReaderBuilder<WeatherEntity>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(rowMapper)
.pageSize(1000)
.build();
} else {
itemReader = new JdbcPagingItemReader<>();
itemReader.setName("weatherEntityJdbcPagingItemReader");
itemReader.setDataSource(dataSource);
itemReader.setQueryProvider(queryProvider);
itemReader.setParameterValues(parameterValues);
itemReader.setRowMapper(rowMapper);
itemReader.setPageSize(1000);
}
return itemReader;
}
}
數據寫入
Spring Batch為不同類型的文件的寫入提供了多個實現類,但並沒有為數據庫的寫入提供任何實現類,而是交由開發者自己去實現接口。理由是:
- 數據庫的寫入與文件寫入有巨大的差別。對於一個
Step
而言,在寫入一份文件時需要保持對文件的打開狀態從而能夠高效的向隊尾添加數據。如果每次都重新打開文件,從開始位置移動到隊尾會耗費大量的時間(很多文件流無法在open時就知道長度)。當整個Step
結束時才能關閉文件的打開狀態,框架提供的文件讀寫類都實現了這個控制過程。 - 另外無論使用何種方式將數據寫入文件都是"逐行進行"的(流數據寫入、字符串逐行寫入)。因此當數據寫入與整個
Step
綁定為事物時還需要實現一個控制過程是:在寫入數據的過程中出現異常時要擦除本次事物已經寫入的數據,這樣才能和整個Step
的狀態保持一致。框架中的類同樣實現了這個過程。 - 但是向數據庫寫入數據並不需要類似於文件的尾部寫入控制,因為數據庫的各種鏈接池本身就保證了鏈接->寫入->釋放的高效執行,也不存在向隊尾添加數據的問題。而且幾乎所有的數據庫驅動都提供了事物能力,在任何時候出現異常都會自動回退,不存在擦除數據的問題。
因此,對於數據庫的寫入操作只要按照常規的批量數據寫入的方式即可,開發者使用任何工具都可以完成這個過程。
寫入數據一個簡單的實現
實現數據寫入方法很多,這和常規的聯機事務系統沒任何區別。下面直接用JdbcTemplate
實現了一個簡單的數據庫寫入過程。
執行數據庫寫入的核心代碼在org.chenkui.spring.batch.sample.items.JdbcWriter
。啟動位置是org.chenkui.spring.batch.sample.database.output.JdbcWriterApplication
。
//隨風溜達的向日葵 chkui.com
public class JdbcWriter {
@Bean
public ItemWriter<WeatherEntity> jdbcBatchWriter(JdbcTemplate template) {
return new ItemWriter<WeatherEntity>() {
final private static String INSERt_SQL =
"INSERT INTO tmp_test_weather(siteid, month, type, value, ext) VALUES(?,?,?,?,?)";
@Override
public void write(List<? extends WeatherEntity> items) throws Exception {
List<Object[]> batchArgs = new ArrayList<>();
for (WeatherEntity entity : items) {
Object[] objects = new Object[5];
objects[0] = entity.getSiteId();
objects[1] = entity.getMonth();
objects[2] = entity.getType().name();
objects[3] = entity.getValue();
objects[4] = entity.getExt();
batchArgs.add(objects);
}
template.batchUpdate(INSERt_SQL, batchArgs);
}
};
}
}
組合使用案例
下面是一些組合使用過程,簡單實現了文件到數據庫、數據庫到文件的過程。文件讀寫的過程已經在文件讀寫中介紹過,這里會重復使用之前介紹的文件讀寫的功能。
下面的案例是將data.csv
中的數據寫入到數據庫,然后再將數據寫入到out-data.csv
。案例組合使用已有的item
完成任務:flatFileReader
、jdbcBatchWriter
、jdbcCursorItemReader
、simpleProcessor
、flatFileWriter
。這種Reader
、Processor
、Writer
組合的方式也是完成一個批處理工程的常見開發方式。
案例的運行代碼在org.chenkui.spring.batch.sample.database.complex
包中,使用了2個Step
來完成任務,一個將數據讀取到數據庫,一個將數據進行過濾,然后再寫入到文件:
//隨風溜達的向日葵 chkui.com
public class FileComplexProcessConfig {
@Bean
// 配置Step1
public Step file2DatabaseStep(StepBuilderFactory builder,
@Qualifier("flatFileReader") ItemReader<WeatherEntity> reader,
@Qualifier("jdbcBatchWriter") ItemWriter<WeatherEntity> writer) {
return builder.get("file2DatabaseStep") // 創建
.<WeatherEntity, WeatherEntity>chunk(50) // 分片
.reader(reader) // 讀取
.writer(writer) // 寫入
.faultTolerant() // 開啟容錯處理
.skipLimit(20) // 跳過設置
.skip(Exception.class) // 跳過異常
.build();
}
@Bean
// 配置Step2
public Step database2FileStep(StepBuilderFactory builder,
@Qualifier("jdbcCursorItemReader") ItemReader<WeatherEntity> reader,
@Qualifier("simpleProcessor") ItemProcessor<WeatherEntity, MaxTemperatureEntiry> processor,
@Qualifier("flatFileWriter") ItemWriter<MaxTemperatureEntiry> writer) {
return builder.get("database2FileStep") // 創建
.<WeatherEntity, MaxTemperatureEntiry>chunk(50) // 分片
.reader(reader) // 讀取
.processor(processor) //
.writer(writer) // 寫入
.faultTolerant() // 開啟容錯處理
.skipLimit(20) // 跳過設置
.skip(Exception.class) // 跳過異常
.build();
}
@Bean
public Job file2DatabaseJob(@Qualifier("file2DatabaseStep") Step step2Database,
@Qualifier("database2FileStep") Step step2File, JobBuilderFactory builder) {
return builder.get("File2Database").start(step2Database).next(step2File).build();
}
}