在 Spring Batch 中進行數據及參數傳遞的方法。
1 引言
本文是 Spring Batch 系列文章的第9篇,有興趣的可見文章:
- 數據批處理神器-Spring Batch(1)簡介及使用場景
- 快速了解組件-spring batch(2)之helloworld
- 快速使用組件-spring batch(3)讀文件數據到數據庫
- 決戰數據庫-spring batch(4)數據庫到數據庫
- 便捷的數據讀寫-spring batch(5)結合beetlSql進行數據讀寫
- 增量同步-spring batch(6)動態參數綁定與增量同步
- 調度與監控-spring batch(7)結合xxl-job進行批處理
- mongo同步-spring batch(8)的mongo讀寫組件使用
前面文章以實例的方式對 Spring Batch 進行批處理進行詳細說明,相信大家對字符串、文件,關系型數據庫及 NoSQL 數據庫的讀取,處理,寫入流程已比較熟悉。有小伙伴就問,針對這個任務流程,期間有多個步驟,從任務( Job )啟動,到作業步( Step )的執行,其中又包含讀組件、處理組件、寫組件,那么,針對這個流程,若中間需要傳遞自定義的數據,該如何處理?本文將對 Spring Batch 進行數據傳遞的方法進行描述,依然會使用代碼實例的方式進行講解。包括以下幾個內容:
- 基於 Mybatis-plus 集成多數據源的數據庫訪問
- 使用 ExecutionContext 共享數據
- StepScope 動態綁定參數傳遞
2 開發環境
- JDK環境: jdk1.8
- Spring Boot: 2.1.4.RELEASE
- Spring Batch:4.1.2.RELEASE
- 開發IDE: IDEA
- 構建工具Maven: 3.3.9
- 日志組件logback:1.2.3
- lombok:1.18.6
- MySQL: 5.6.26
- Mybatis-plus: 3.4.0
本示例源碼已放至github:https://github.com/mianshenglee/spring-batch-example/tree/master/spring-batch-param
,請結合示例代碼進行閱讀。
3 基於 Mybatis-plus 集成多數據源的數據庫訪問
本示例還是使用原來示例功能,從源數據庫讀取用戶數據,處理數據,然后寫入到目標數據庫。其中會在任務啟動時傳遞參數,並在作業步中傳遞參數。之前已經介紹過如何使用 beetlsql 進行多數據源配置([便捷的數據讀寫-spring batch(5)結合beetlSql進行數據讀寫][5]),實現數據批處理。還有很多朋友使用 Mybatis 或 Mybatis-plus 進行數據庫讀寫,因此,有必要提一下 Spring Batch 如何結合 Mybatis 或 Mybatis-plus 配置多數據源操作。本示例以 Mybatis-plus 為例。
示例工程中的sql
目錄有相應的數據庫腳本,其中源數據庫mytest.sql
腳本創建一個test_user
表,並有相應的測試數據。目標數據庫 my_test1.sql
與 mytest.sql
表結構一致,spring-batch-mysql.sql
是 Spring Batch 本身提供的數據庫腳本。
3.1 pom 文件中引入 Mybatis-plus
<!--mybatis-plus-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.0</version>
</dependency>
3.2 配置及使用多數據源
本示例會涉及三個數據庫,分別是 Spring Batch 本身數據庫,需要批處理的源數據庫,批處理的目標數據庫。因此需要處理多個數據庫,利用多套源策略,可以很簡單就完成多套數據源的處理。簡單來說主要分為以下幾個步驟:
- 配置多數據源連接信息
- 根據不同數據源划分
mapper
包,entity
包,mapper.xml
文件包 - 根據不同數據源配置獨立的
SqlSessionFactory
- 根據不同的應用場景,使用不同的
mapper
關於多數據源多套源策略的詳細配置過程,可以參考我的另一篇文章《搞定SpringBoot多數據源(1):多套源策略》
4 ExecutionContext 傳遞參數
關於 Spring Batch 的讀數據( ItemReader )、處理數據( ItemProcessor )、寫數據( ItemWriter )的配置流程,可以參考前面系列文章,本文不再詳細描述。我們需要記住的是,當一個作業( Job )啟動,Spring Batch 是通過作業名稱( Job name)及 作業參數( JobParameters )作為唯一標識來區分不同的作業。一個 Job 下可以有多個作業步( Step ),每個 Step 中就是有具體的操作邏輯(讀、處理、寫)。在 Job 和 Step 下的各個操作步驟間,如何傳遞,,這里就需要理解 ExecutionContext 的概念。
4.1 ExecutionContext 概念
在 Job 的運行及 Step 的運行過程中,Spring Batch 提供 ExecutionContext 進行運行數據持久化,利用它,可以根據業務進行數據共享,如用來重啟的靜態數據與狀態數據。如下圖:
Execution Context 本質上來講就是一個 Map<String,Object>
,它是Spring Batch 框架提供的持久化與控制的 key/value 對,可以讓開發者在 Step 運行或Job 運行過程中保存需要進行持久化的狀態,它可以。分為兩類,一類是Job 運行的上下文(對應數據表:BATCH_JOB_EXECUTION_CONTEXT),另一類是Step Execution的上下文(對應數據表BATCH_STEP_EXECUTION_CONTEXT)。兩類上下文關系:一個 Job 運行對應一個 Job Execution 的上下文(如上圖中藍色部分的 ExecutionContext ),每個 Step 運行對應一個 Step Execution 上下文(如上圖中粉色部分的 ExecutionContext );同一個 Job 中的 Step Execution 共用 Job Execution 的上下文。也就是說,它們的作用范圍有區別。因此,如果同一個 Job 的不同 Step 間需要共享數據時,可以通過 Job Execution 的上下文共享數據。根據 ExecutionContext 的共享數據特性,則可以實現在不同步驟間傳遞數據。
4.2 ExecutionContext 傳遞數據
一個 Job 啟動后,會生成一個 JobExecution ,用於存放和記錄 Job 運行的信息,同樣,在 Step 啟動后,也會有對應的 StepExecution 。如前面所說,在 JobExecution 和 StepExecution 中都會有一個 ExecutionContext ,用於存儲上下文。因此,數據傳遞的思路就是確定數據使用范圍,然后通過 ExecutionContext 傳入數據,然后就可以在對應的范圍內共享數據。如當前示例,需要 Job 范圍內共享數據,在讀組件( ItemReader )和寫組件( ItemWriter )中傳遞讀與寫數據的數量( size ),在 Job 結束時,輸出讀及寫的數據量。實際上 Spring Batch 會自動計算讀寫數量,本示例僅為了顯示數據共享功能。
那么,如何獲取對應的 Execution ?,Spring Batch 提供了 JobExecutionListener 和 StepExecutionListener 監聽器接口,通過實現監聽器接口,分別可以在開啟作業前( beforeJob )和 完成作業后( afterJob )afterJob ),開啟作業步前( beforeStep)及 完成作業步后( afterStep )獲取對應的 Execution ,然后進行操作。
4.2.1 實現監聽器接口
在自定義的 UserItemReader 和 UserItemWriter 中,實現 StepExecutionListener 接口,其中使用 StepExecution 作為成員,從 beforeStep 中獲取。如下:
public class UserItemWriter implements ItemWriter<TargetUser>, StepExecutionListener {
private StepExecution stepExecution;
//...略
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
讀組件( UserItemReader )也使用同樣的方式。而在作業結束后,獲取參數,則可以繼承 JobExecutionListenerSupport ,實現自己感興趣的方法,也從參數中獲取 JobExecution,然后獲取參數進行處理。
public class ParamJobEndListener extends JobExecutionListenerSupport {
@Override
public void afterJob(JobExecution jobExecution) {}
}
4.2.2 設置用於傳遞的數據
由於我們需要在 Job 范圍內傳遞參數,獲取到 StepExecution 后,可以獲得相應的 JobExecution ,進而獲取 Job 對應的 executionContext,這樣,就可以在 Job 范圍內共享參數數據了。如下是在讀組件中進行配置
ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext();
executionContext.put(SyncConstants.PASS_PARAM_READ_NUM, items.size());
同樣在寫組件中,獲取到 ExecutionContext 后,可以對參數進行處理。本示例中,是通過對 ItemReader 傳遞的處理數目參數進行累加處理,得到結果。
@Override
public void write(List<? extends TargetUser> items) {
ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext();
Object currentWriteNum = executionContext.get(SyncConstants.PASS_PARAM_WRITE_NUM);
if (Objects.nonNull(currentWriteNum)) {
log.info("currentWriteNum:{}", currentWriteNum);
executionContext.put(SyncConstants.PASS_PARAM_WRITE_NUM, items.size()+(Integer)currentWriteNum);
} else {
executionContext.put(SyncConstants.PASS_PARAM_WRITE_NUM, items.size());
}
最后在作業結束后,在實現 JobExecutionListenerSupport 的接口中,afterJob 函數中,對參數進行輸出。
public class ParamJobEndListener extends JobExecutionListenerSupport {
@Override
public void afterJob(JobExecution jobExecution) {
ExecutionContext executionContext = jobExecution.getExecutionContext();
Integer writeNum = (Integer)executionContext.get(SyncConstants.PASS_PARAM_WRITE_NUM);
log.info(LogConstants.LOG_TAG + "writeNum:{}",writeNum);
}
}
5 StepScope 動態綁定參數傳遞
5.1 StepScope及后期綁定
前面說到在 Job 及 Step 范圍內,使用 ExecutionContext 進行數據共享,但,如果需要在 Job 啟動前設置參數,並且每次啟動輸入的參數是動態變化的(比如增量同步時,日期是基於上一次同步的時間或者ID),也就是說,每次運行,需要根據參數新建一個操作步驟(如 ItemReader、ItemWriter等),我們知道,由於在 Spring IOC 中加載的Bean,默認都是單例模式的,因此,需要每次運行新建,運行完銷毀,新建是在運行時進行的。這就需要用到StepScope 及后期綁定技術。
在之前的示例中,已出現過 StepScope,它的作用是提供了操作步驟的作用范圍,某個 Spring Bean 使用注解StepScope,則表示此 Bean 在作業步( Step )開始的時候初始化,在 Step 結束的時候銷毀,也就是說 Bean的作用范圍是在 Step 這個生命周期中。而 Spring Batch 通過屬性后期綁定技術,在運行期獲取屬性值,並使用 SPEL 的表達式進行屬性綁定。而在 StepScope 中,Spring Batch 框架提供 JobParameters,JobExecutionContext,StepExecutionContext,當然也可以使用 Spring 容器中的 Bean ,如 JobExecution ,StepExecution。
5.2 作業參數傳遞及動態獲取 StepExecution
一個 Job 是由 Job name 及 JobParameters 作為唯一標識的,也就是說只有 job name 和 JobParameters 不一致時,Spring Batch 才會啟動一個新的 Job,一致的話就當作是同一個 Job ,若 此 Job 未執行過,則執行;若已執行過且是 FAILED 狀態,則嘗試重新運行此 Job ,若已執行過且是 COMPLETED 狀態,則會報錯。
本示例中,Job 啟動時輸入時間參數,在 ItemReader 中使用 StepScope 注解,然后把時間參數綁定到 ItemReader 中,同時綁定 StepExecution ,以便於在 ItemReader 對時間參數及 StepExecution 進行操作。
5.2.1 設置時間參數
在使用 JobLauncher 啟動 Job 時,是需要輸入 jobParameters 作為參數的。因此可以創建此對象,並設置參數。
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time",timMillis)
.toJobParameters();
5.2.2 動態綁定參數
在配置 Step 時,需要創建ItemReader 的 Bean,為了使用動態參數,在 ItemReader 中設置 Map 存放參數,並設置 StepExecution 為成員,以便於后面使用 ExecutionContext。
public class UserItemReader implements ItemReader<User> {
protected Map<String, Object> params;
private StepExecution stepExecution;
public void setStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
使用 StepScope 進行配置:
@Bean
@StepScope
public ItemReader paramItemReader(@Value("#{stepExecution}") StepExecution stepExecution,
@Value("#{jobParameters['time']}") Long timeParam) {
UserItemReader userItemReader = new UserItemReader();
//設置參數
Map<String, Object> params = CollUtil.newHashMap();
Date datetime = new Date(timeParam);
params.put(SyncConstants.PASS_PARAM_DATETIME, datetime);
userItemReader.setParams(params);
userItemReader.setStepExecution(stepExecution);
return userItemReader;
}
注意:此時 ItemReader 不可再使用實現 StepExecutionListener 的方式來對 stepExecution 賦值,由於 ItemReader 是動態綁定的,StepExecutionListener 將不再起作用,因此需要在后期綁定中來綁定 stepExecution Bean 的方式來賦值。
5.2.3 設置及傳遞參數
ItemReader 獲取到 StepExecution 后即可獲取 ExecutionContext,然后可以像前面說的使用 ExecutionContext 方式進行數據傳遞。如下:
ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext();
//readNum參數
executionContext.put(SyncConstants.PASS_PARAM_READ_NUM, items.size());
//datetime參數
executionContext.put(SyncConstants.PASS_PARAM_DATETIME,params.get(SyncConstants.PASS_PARAM_DATETIME));
6.總結
在 Job 和 Step 不同的數據范圍中,可使用 ExecutionContext 共享數據。本文以傳遞處理數量為例,使用 Mybatis-plus,基於 ExecutionContext ,結合 StepScope及后期綁定技術,實現在 Job 啟動傳入參數,然后在 ItemReader、ItemProcessor、ItemWriter 及 Job 完成后的數據共享及傳遞。如果你在使用 Spring Batch 過程中需要進行數據共享與傳遞,請試試這種方式吧。
往期文章
- 還在手工生成數據庫文檔?3個步驟自動完成了解一下
- Python 處理 Excel 文件
- MinIO 的分布式部署
- 利用MinIO輕松搭建靜態資源服務
- 搞定SpringBoot多數據源(3):參數化變更源
- 搞定SpringBoot多數據源(2):動態數據源
- 搞定SpringBoot多數據源(1):多套源策略
- java開發必學知識:動態代理
- 2019 讀過的好書推薦
如果文章內容對你有幫助,歡迎轉發分享~
我的公眾號(搜索Mason技術記錄
),獲取更多技術記錄: