在Spring batch由上至下的結構中Job、Step都是屬於框架級別的的功能,大部分時候都是提供一些配置選項給開發人員使用,而Item中的Reader
、Processor
和Writer
是屬於業務級別的,它開放了一些業務切入的接口。 但是文件的讀寫過程中有很多通用一致的功能Spring Batch為這些相同的功能提供了一致性實現類。
扁平結構文件
扁平結構文件(也稱為矩陣結構文件,后文簡稱為文件)是最常見的一種文件類型。他通常以一行表示一條記錄,字段數據之間用某種方式分割。與標准的格式數據(xml、json等)主要差別在於他沒有結構性描述方案(SXD、JSON-SCHEME),進而沒有結構性分割規范。因此在讀寫此類文件之前需要先設定好字段的分割方法。
文件的字段數據分割方式通常有兩種:使用分隔符或固定字段長度。前者通常使用逗號(,
)之類的符號對字段數據進行划分,后者的每一列字段數據長度是固定的。 框架為文件的讀取提供了FieldSet
用於將文件結構中的信息映射到一個對象。FieldSet
的作用是將文件的數據與類的field
進行綁定(field是Java中常見的概念,不清楚的可以了解Java反射)。
數據讀取
Spring Batch為文件讀取提供了FlatFileItemReader
類,它為文件中的數據的讀取和轉換提供了基本功能。在FlatFileItemReader
中有2個主要的功能接口,一是Resource
、二是LineMapper
。 Resource
用於外部文件獲取,詳情請查看Spring核心——資源管理部分的內容,下面是一個例子:
Resource resource = new FileSystemResource("resources/trades.csv");
在復雜的生產環境中,文件通常由中心化、或者流程式的基礎框架來管理(比如EAI)。因此文件往往需要使用FTP等方式從其他位置獲取。如何遷移文件已經超出了Spring Batch框架的范圍,在Spring的體系中可以參考Spring Integration
項目。
下面是FlatFileItemReader
的屬性,每一個屬性都提供了Setter方法。
屬性名 | 參數類型 | 說明 |
---|---|---|
comments | String[] | 指定文件中的注釋前綴,用於過濾注釋內容行 |
encoding | String | 指定文件的編碼方式,默認為Charset.defaultCharset() |
lineMapper | LineMapper | 利用LineMapper接口將一行字符串轉換為對象 |
linesToSkip | int | 跳過文件開始位置的行數,用於跳過一些字段的描述行 |
recordSeparatorPolicy | RecordSeparatorPolicy | 用於判斷數據是否結束 |
resource | Resource | 指定外部資源文件位置 |
skippedLinesCallback | LineCallbackHandler | 當配置linesToSkip,每執行一次跳過都會被回調一次,會傳入跳過的行數據內容 |
每個屬性都為文件的解析提供了某方面的功能,下面是結構的說明。
LineMapper
這個接口的作用是將字符串轉換為對象:
public interface LineMapper { T mapLine(String line, int lineNumber) throws Exception; }
接口的基本處理邏輯是聚合類(FlatFileItemReader
)傳遞一行字符串以及行號給LineMapper::mapLine
,方法處理后返回一個映射的對象。
LineTokenizer
這個接口的作用是將一行數據轉換為一個FieldSet
結構。對於Spring Batch而言,扁平結構文件的到Java實體的映射都通過FieldSet
來控制,因此讀寫文件的過程需要完成字符串到FieldSet
的轉換:
public interface LineTokenizer { FieldSet tokenize(String line); }
這個接口的含義是:傳遞一行字符串數據,然后獲取一個FieldSet
。
框架為LineTokenizer
提供三個實現類:
DelimitedLineTokenizer
:利用分隔符將數據轉換為FieldSet
。最常見的分隔符是逗號,
,類提供了分隔符的配置和解析方法。FixedLengthTokenizer
:根據字段的長度來解析出FieldSet
結構。必須為記錄定義字段寬度。PatternMatchingCompositeLineTokenizer
:使用一個匹配機制來動態決定使用哪個LineTokenizer
。
FieldSetMapper
該接口是將FieldSet
轉換為對象:
public interface FieldSetMapper { T mapFieldSet(FieldSet fieldSet) throws BindException; }
FieldSetMapper
通常和LineTokenizer
聯合在一起使用:String->FieldSet->Object。
DefaultLineMapper
DefaultLineMapper
是LineMapper
的實現,他實現了從文件到Java實體的映射:
public class DefaultLineMapper implements LineMapper<>, InitializingBean {
private LineTokenizer tokenizer;
private FieldSetMapper fieldSetMapper;
public T mapLine(String line, int lineNumber) throws Exception {
return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
}
public void setLineTokenizer(LineTokenizer tokenizer) {
this.tokenizer = tokenizer;
}
public void setFieldSetMapper(FieldSetMapper fieldSetMapper) {
this.fieldSetMapper = fieldSetMapper;
}
}
在解析文件時數據是按行解析的:
- 傳入一行字符串。
LineTokenizer
將字符串解析為FieldSet
結構。FieldSetMapper
繼續解析為一個Java實體對象返回給調用者。
DefaultLineMapper
是框架提供的默認實現類,看似非常簡單,但是利用組合模式可以擴展出很多功能。
數據自動映射
在轉換過程中如果將FieldSet
的names
屬性與目標類的field
綁定在一起,那么可以直接使用反射實現數據轉換,為此框架提供了BeanWrapperFieldSetMapper
來實現。
DefaultLineMapper<WeatherEntity> lineMapper = new DefaultLineMapper<>(); //創建LineMapper
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); //創建LineTokenizer
tokenizer.setNames(new String[] { "siteId", "month", "type", "value", "ext" }); //設置Field名稱
BeanWrapperFieldSetMapper<WeatherEntity> wrapperMapper
= new BeanWrapperFieldSetMapper<>(); //創建FieldSetMapper
wrapperMapper.setTargetType(WeatherEntity.class); //設置實體,實體的field名稱必須和tokenizer.names一致。
// 組合lineMapper
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(wrapperMapper);
文件讀取總結
上面提到了各種接口和實現,實際上都是圍繞着FlatFileItemReader
的屬性在介紹,雖然內容很多但是實際上就以下幾點:
- 首先要定位文件,Spring Batch提供了
Resource
相關的定位方法。 - 其次是將文件中的行字符串數據轉換為對象,
LineMapper
的功能就是完成這個功能。 - 框架為
LineMapper
提供了DefaultLineMapper
作為默認實現方法,在DefaultLineMapper
中需要組合使用LineTokenizer
和FieldSetMapper
。前者將字符串轉為為一個Field
,后者將Field
轉換為目標對象。 LineTokenizer
有3個實現類可供使用、FieldSetMapper
有一個默認實現類BeanWrapperFieldSetMapper
。
文件讀取可執行源碼
可執行的源碼在下列地址的items子工程中:
- Gitee:https://gitee.com/chkui-com/spring-batch-sample
- Github:https://github.com/chkui/spring-batch-sample
運行之前需要配置數據庫鏈接,參看源碼庫中的README.md。
文件讀取的主要邏輯在org.chenkui.spring.batch.sample.items.FlatFileReader
類:
public class FlatFileReader {
// FeildSet的字段名,設置字段名之后可以直接使用名字作為索引獲取數據。也可以使用索引位置來獲取數據
public final static String[] Tokenizer = new String[] { "siteId", "month", "type", "value", "ext" };
private boolean userWrapper = false;
@Bean
//定義FieldSetMapper用於FieldSet->WeatherEntity
public FieldSetMapper<WeatherEntity> fieldSetMapper() {
return new FieldSetMapper<WeatherEntity>() {
@Override
public WeatherEntity mapFieldSet(FieldSet fieldSet) throws BindException {
if (null == fieldSet) {
return null; // fieldSet不存在則跳過該行處理
} else {
WeatherEntity observe = new WeatherEntity();
observe.setSiteId(fieldSet.readRawString("siteId"));
//Setter
return observe;
}
}
};
}
@Bean
// 配置 Reader
public ItemReader<WeatherEntity> flatFileReader(
@Qualifier("fieldSetMapper") FieldSetMapper<WeatherEntity> fieldSetMapper) {
FlatFileItemReader<WeatherEntity> reader = new FlatFileItemReader<>();
reader.setResource(new FileSystemResource("src/main/resources/data.csv")); // 讀取資源文件
DefaultLineMapper<WeatherEntity> lineMapper = new DefaultLineMapper<>(); // 初始化 LineMapper實現類
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); // 創建LineTokenizer接口實現
tokenizer.setNames(Tokenizer); // 設定每個字段的名稱,如果不設置需要使用索引獲取值
lineMapper.setLineTokenizer(tokenizer); // 設置tokenizer工具
if (userWrapper) { //使用 BeanWrapperFieldSetMapper 使用反射直接轉換
BeanWrapperFieldSetMapper<WeatherEntity> wrapperMapper = new BeanWrapperFieldSetMapper<>();
wrapperMapper.setTargetType(WeatherEntity.class);
fieldSetMapper = wrapperMapper;
}
lineMapper.setFieldSetMapper(fieldSetMapper);
reader.setLineMapper(lineMapper);
reader.setLinesToSkip(1); // 跳過的初始行,用於過濾字段行
reader.open(new ExecutionContext());
return reader;
}
}
按字段長度格讀取文件
除了按照分隔符,有些文件可以字段數據的占位長度來提取數據。按照前面介紹的過程,實際上只要修改LineTokenizer接口即可,框架提供了FixedLengthTokenizer
類:
@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
//Range用於設定數據的長度。
tokenizer.setColumns(new Range(1-12),
new Range(13-15),
new Range(16-20),
new Range(21-29));
return tokenizer;
}
寫入扁平結構文件
將數據寫入到文件與讀取的過程正好相反:將對象轉換為字符串。
LineAggregator
與LineMapper
相對應的是LineAggregator
,他的功能是將實體轉換為字符串:
public interface LineAggregator<T> {
public String aggregate(T item);
}
PassThroughLineAggregator
框架為LineAggregator
接口提供了一個非常簡單的實現類——PassThroughLineAggregator
,其唯一實現就是使用對象的toString
方法:
public class PassThroughLineAggregator<T> implements LineAggregator<T> {
public String aggregate(T item) {
return item.toString();
}
}
DelimitedLineAggregator
LineAggregator
的另外一個實現類是DelimitedLineAggregator
。與PassThroughLineAggregator
簡單直接使用toString
方法不同的是,DelimitedLineAggregator
需要一個轉換接口FieldExtractor
:
DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);
FieldExtractor
FieldExtractor
用於實體類到collection
結構的轉換。它可以和LineTokenizer
進行類比,前者是將實體類轉換為扁平結構的數據,后者是將String
轉換為一個FieldSet
結構。
public interface FieldExtractor<T> {
Object[] extract(T item);
}
框架為FieldExtractor
接口提供了一個基於反射的實現類BeanWrapperFieldExtractor
,其過程就是將實體對象轉換為列表:
BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"field1", "field2"});
setName
方法用於指定要轉換的field
列表。
輸出文件處理
文件讀取的邏輯非常簡單:文件存在打開文件並寫入數據,當文件不存在拋出異常。但是寫入文件明顯不能這么簡單粗暴。新建一個JobInstance
時最直觀的操作是:存在同名文件就拋出異常,不存在則創建文件並寫入數據。但是這樣做顯然有很大的問題,當批處理過程中出現問題需要restart
,此時並不會從頭開始處理所有的數據,而是要求文件存在並接着繼續寫入。為了確保這個過程FlatFileItemWriter
默認會在新JobInstance
運行時刪除已有文件,而運行重啟時繼續在文件末尾寫入。FlatFileItemWriter
可以使用shouldDeleteIfExists
、appendAllowed
、shouldDeleteIfEmpty
來有針對性的控制文件。
文件寫入可執源碼
文件寫入主要代碼在org.chenkui.spring.batch.sample.items.FlatFileWriter
:
public class FlatFileWriter {
private boolean useBuilder = true;
@Bean
public ItemWriter<MaxTemperatureEntiry> flatFileWriter() {
BeanWrapperFieldExtractor<MaxTemperatureEntiry> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] { "siteId", "date", "temperature" }); //設置映射field
fieldExtractor.afterPropertiesSet(); //參數檢查
DelimitedLineAggregator<MaxTemperatureEntiry> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(","); //設置輸出分隔符
lineAggregator.setFieldExtractor(fieldExtractor); //設置FieldExtractor處理器
FlatFileItemWriter<MaxTemperatureEntiry> fileWriter = new FlatFileItemWriter<>();
fileWriter.setLineAggregator(lineAggregator);
fileWriter.setResource(new FileSystemResource("src/main/resources/out-data.csv")); //設置輸出文件位置
fileWriter.setName("outpufData");
if (useBuilder) {//使用builder方式創建
fileWriter = new FlatFileItemWriterBuilder<MaxTemperatureEntiry>().name("outpufData")
.resource(new FileSystemResource("src/main/resources/out-data.csv")).lineAggregator(lineAggregator)
.build();
}
return fileWriter;
}
}
文件的寫入過程與讀取過程完全對稱相反:先用FieldExtractor
將對象轉換為一個collection
結構(列表),然后用lineAggregator
將collection
轉化為帶分隔符的字符串。
代碼說明
- 代碼中的測試數據來自數據分析交流項目bi-process-example,是NOAA的2015年全球天氣監控數據。為了便於源碼存儲進行了大量的刪減,原始數據有百萬條,如有需要使用下列方式下載: curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/2015.csv.gz #數據文件 curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt # 文件結構及類型說明
- 代碼實現了讀取文件、處理文件、寫入文件的整個過程。處理文件的過程是只獲取監控的最高溫度信息(
Type=TMAX
),其他都過濾。 - 本案例的代碼使用
org.chenkui.spring.batch.sample.flatfile.FlatFileItemApplication::main
方法運行,使用的是Command Runner的方式執行(運行方式的說明見Item概念及使用代碼的命令行方式運行、Java內嵌運行)。