SpringBatch其它文章直通車:
Spring Batch(2)——Job配置與運行
Spring Batch(3)——Step控制
Spring Batch(4)——Item概念及使用代碼
Spring Batch(5)——文件讀寫
Spring Batch(6)——數據庫批數據讀寫
Spring Batch (7) ——異常處理及容錯機制
Spring Batch(8) -- Listeners
Spring Batch : 在不同steps間傳遞數據
批處理的核心場景
- 從某個位置讀取大量的記錄,位置可以是數據庫、文件或者外部推送隊列(MQ)。
- 根據業務需要實時處理讀取的數據。
- 將處理后的數據寫入某個位置,可以第一條一樣,可是數據庫、文件或者推送到隊列。
Spring Batch能解決的批處理場景
Spring Batch為批處理提供了一個輕量化的解決方案,它根據批處理的需要迭代處理各種記錄,提供事物功能。但是Spring Batch僅僅適用於"脫機"場景,在處理的過程中不能和外部進行任何交互,也不允許有任何輸入。
Spring Batch的目標
- 開發人員僅關注業務邏輯,底層框架的交互交由Spring Batch去處理。
- 能夠清晰分離業務與框架,框架已經限定了批處理的業務切入點,業務開發只需關注這些切入點(Read、Process、Write)。
- 提供開箱即用的通用接口。
- 快速輕松的融入Spring 框架,基於Spring Framework能夠快速擴展各種功能。
- 所有現有核心服務都應易於更換或擴展,而不會對基礎架構層產生任何影響。
Spring Batch結構
如上圖,通常情況下一個獨立的JVM程序就是僅僅用於處理批處理,而不要和其他功能重疊。 在最后一層基礎設置(Infrastructure)部分主要分為3個部分。JobLauncher
、Job
以及Step
。每一個Step
又細分為ItemReader
、ItemProcessor
、ItemWirte
。使用Spring Batch主要就是知道每一個基礎設置負責的內容,然后在對應的設施中實現對應的業務。
Spring Batch 批處理原則與建議
當我們構建一個批處理的過程時,必須注意以下原則:
- 通常情況下,批處理的過程對系統和架構的設計要夠要求比較高,因此盡可能的使用通用架構來處理批量數據處理,降低問題發生的可能性。Spring Batch是一個是一個輕量級的框架,適用於處理一些靈活並沒有到海量的數據。
- 批處理應該盡可能的簡單,盡量避免在單個批處理中去執行過於復雜的任務。我們可以將任務分成多個批處理或者多個步驟去實現。
- 保證數據處理和物理數據緊密相連。籠統的說就是我們在處理數據的過程中有很多步驟,在某些步驟執行完時應該就寫入數據,而不是等所有都處理完。
- 盡可能減少系統資源的使用、尤其是耗費大量資源的IO以及跨服務器引用,盡量分配好數據處理的批量。
- 定期分析系統的IO使用情況、SQL語句的執行情況等,盡可能的減少不必要的IO操作。優化的原則有:
- 盡量在一次事物中對同一數據進行讀取或寫緩存。
- 一次事物中,盡可能在開始就讀取所有需要使用的數據。
- 優化索引,觀察SQL的執行情況,盡量使用主鍵索引,盡量避免全表掃描或過多的索引掃描。
- SQL中的where盡可能通過主鍵查詢。
- 不要在批處理中對相同的數據執行2次相同的操作。
- 對於批處理程序而言應該在批處理啟動之前就分配足夠的內存,以免處理的過程中去重新申請新的內存頁。
- 對數據的完整性應該從最差的角度來考慮,每一步的處理都應該建立完備的數據校驗。
- 對於數據的總量我們應該有一個和數據記錄在數據結構的某個字段 上。
- 所有的批處理系統都需要進行壓力測試。
- 如果整個批處理的過程是基於文件系統,在處理的過程中請切記完成文件的備份以及文件內容的校驗。
批處理的通用策略
和軟件開發的設計模式一樣,批處理也有各種各樣的現成模式可供參考。當一個開發(設計)人員開始執行批處理任務時,應該將業務邏輯拆分為一下的步驟或者板塊分批執行:
- 數據轉換:某個(某些)批處理的外部數據可能來自不同的外部系統或者外部提供者,這些數據的結構千差萬別。在統一進行批量數據處理之前需要對這些數據進行轉換,合並為一個統一的結構。因此在數據開始真正的執行業務處理之前,可以先搭建批處理任務將這些數據統一轉換。
- 數據校驗:批處理是對大量數據進行處理,並且數據的來源千差萬別,所以批處理的輸入數據需要對數據的完整性性進行校驗(比如校驗字段數據是否缺失)。另外批處理輸出的數據也需要進行合適的校驗(例如處理了100條數據,校驗100條數據是否校驗成功)
- 提取數據:批處理的工作是逐條從數據庫或目標文件讀取記錄(records),提取時可以通過一些規則從數據源中進行數據篩選。
- 數據實時更新處理:根據業務要求,對實時數據進行處理。某些時候一行數據記錄的處理需要綁定在一個事物之下。
- 輸出記錄到標准的文檔格式:數據處理完成之后需要根據格式寫入到對應的外部數據系統中。
以上五個步驟是一個標准的數據批處理過程,Spring batch框架為業務實現提供了以上幾個功能入口。
數據額外處理
某些情況需要實現對數據進行額外處理,在進入批處理之前通過其他方式將數據進行處理。主要內容有:
- 排序:由於批處理是以獨立的行數據(record)進行處理的,在處理的時候並不知道記錄前后關系。因此如果需要對整體數據進行排序,最好事先使用其他方式完成。
- 分割:數據拆分也建議使用獨立的任務來完成。理由類似排序,因為批處理的過程都是以行記錄為基本處理單位的,無法再對分割之后的數據進行擴展處理。
- 合並:理由如上。
常規數據源
批處理的數據源通常包括:
- 數據庫驅動鏈接(鏈接到數據庫)對數據進行逐條提取。
- 文件驅動鏈接,對文件數據進行提取
- 消息驅動鏈接,從MQ、kafka等消息系統提取數據。
典型的處理過程
- 在業務停止的窗口期進行批數據處理,例如銀行對賬、清結算都是在12點日切到黎明之間。簡稱為離線處理。
- 在線或並發批處理,但是需要對實際業務或用戶的響應進行考量。
- 並行處理多種不同的批處理作業。
- 分區處理:將相同的數據分為不同的區塊,然后按照相同的步驟分為許多獨立的批處理任務對不同的區塊進行處理。
- 以上處理過程進行組合。
在執行2,3點批處理時需要注意事物隔離等級。
Spring Batch批處理的核心概念
下圖是批處理的核心流程圖。
(圖片來源於網絡)
Spring Batch同樣按照批處理的標准實現了各個層級的組件。並且在框架級別保證數據的完整性和事物性。
如圖所示,在一個標准的批處理任務中組要涵蓋的核心概念有JobLauncher
、Job
、Step
,一個Job
可以涵蓋多個Step
,一個Job
對應一個啟動的JobLauncher
。一個Step
中分為ItemReader
、ItemProcessor
、ItemWriter
,根據字面意思它們分別對應數據提取、數據處理和數據寫入。此外JobLauncher
、Job
、Step
也稱之為批處理的元數據(Metadata),它們會被存儲到JobRepository
中。
Job
簡單的說Job
是封裝一個批處理過程的實體,與其他的Spring項目類似,Job
可以通過XML或Java類配置,稱職為”Job Configuration“.如下圖Job
是單個批處理的最頂層。
為了便於理解,可以建立的理解為Job
就是每一步(Step
)實例的容器。他結合了多個Step
,為它們提供統一的服務同時也為Step
提供個性化的服務,比如步驟重啟。通常情況下Job的配置包含以下內容
:
- Job的名稱
- 定義和排序
Step
執行實例。 - 標記每個
Step
是否可以重啟。
Spring Batch為Job接口提供了默認的實現——SimpleJob
類,在類中實現了一些標准的批處理方法。下面的代碼展示了如可申明一個Job
。
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob") //get中命名了Job的名稱
.start(playerLoad()) //playerLoad、gameLoad、playerSummarization都是Step
.next(gameLoad())
.next(playerSummarization())
.end()
.build();
}
JobInstance
JobInstance
是指批處理作業運行的實例。例如一個批處理必須在每天執行一次,系統在2019年5月1日執行了一次我們稱之為2019-05-01的實例,類似的還會有2019-05-02、2019-05-03實例。在特定的運行實踐中,一個Job
只有一個JobInstance
以及對應的JobParameters
,但是可以有多個JobExecution
。(JobParameters
、JobExecution
見后文)。同一個JobInstance
具有相同的上下文(ExecutionContext
內容見后文)。
JobParameters
前面討論了JobInstance
與Job
的區別,但是具體的區別內容都是通過JobParameters
體現的。一個JobParameters
對象中包含了一系列Job運行相關的參數,這些參數可以用於參考或者用於實際的業務使用。對應的關系如下圖:
當我們執行2個不同的JobInstance
時JobParameters
中的屬性都會有差異。可以簡單的認為一個JobInstance
的標識就是Job
+JobParameters
。
JobExecution
JobExecution
可以理解為單次運行Job
的容器。一次JobInstance
執行的結果可能是成功、也可能是失敗。但是對於Spring Batch框架而言,只有返回運行成功才會視為完成一次批處理。例如2019-05-01執行了一次JobInstance
,但是執行的過程失敗,因此第二次還會有一個“相同的”的JobInstance
被執行。
Job
可以定義批處理如何執行,JobInstance
純粹的就是一個處理對象,把所有的內容、對象組織在一起,主要是為了當面臨問題時定義正確的重啟參數。而JobExecution
是運行時的“容器”,記錄動態運行時的各種屬性和上線文,主要有一下內容:
屬性 | 說明 |
---|---|
status | 狀態類名為BatchStatus,它指示了執行的狀態。在執行的過程中狀態為BatchStatus#STARTED,失敗:BatchStatus#FAILED,完成:BatchStatus#COMPLETED |
startTime | java.util.Date對象,標記批處理任務啟動的系統時間,批處理任務未啟動數據為空 |
endTime | java.util.Date對象,結束時間無論是否成功都包含該數據,如未處理完為空 |
exitStatus | ExitStatus類,記錄運行結果。 |
createTime | java.util.Date,JobExecution的創建時間,某些使用execution已經創建但是並未開始運行。 |
lastUpdate | java.util.Date,最后一次更新時間 |
executionContext | 批處理任務執行的所有用戶數據 |
failureExceptions | 記錄在執行Job時的異常,對於排查問題非常有用 |
對應的每次執行的結果會在元數據庫中體現為:
BATCH_JOB_INSTANCE:
JOB_INST_ID | JOB_NAME |
---|---|
1 | EndOfDayJob |
BATCH_JOB_EXECUTION_PARAMS:
JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | DATE_VAL | IDENTIFYING |
---|---|---|---|---|
1 | DATE | schedule.Date | 2019-01-01 | TRUE |
BATCH_JOB_EXECUTION:
JOB_EXEC_ID | JOB_INST_ID | START_TIME | END_TIME | STATUS |
---|---|---|---|---|
1 | 1 | 2019-01-01 21:00 | 2017-01-01 21:30 | FAILED |
當某個Job
批處理任務失敗之后會在對應的數據庫表中路對應的狀態。假設1月1號執行的任務失敗,技術團隊花費了大量的時間解決這個問題到了第二天21才繼續執行這個任務。
BATCH_JOB_INSTANCE:
JOB_INST_ID | JOB_NAME |
---|---|
1 | EndOfDayJob |
2 | EndOfDayJob |
BATCH_JOB_EXECUTION_PARAMS:
JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | DATE_VAL | IDENTIFYING |
---|---|---|---|---|
1 | DATE | schedule.Date | 2019-01-01 | TRUE |
2 | DATE | schedule.Date | 2019-01-01 | TRUE |
3 | DATE | schedule.Date | 2019-01-02 | TRUE |
BATCH_JOB_EXECUTION:
JOB_EXEC_ID | JOB_INST_ID | START_TIME | END_TIME | STATUS |
---|---|---|---|---|
1 | 1 | 2019-01-01 21:00 | 2017-01-01 21:30 | FAILED |
2 | 1 | 2019-01-02 21:00 | 2017-01-02 21:30 | COMPLETED |
3 | 2 | 2019-01-02 21:31 | 2017-01-02 22:29 | COMPLETED |
從數據上看好似JobInstance
是一個接一個順序執行的,但是對於Spring Batch並沒有進行任何控制。不同的JobInstance
很有可能是同時在運行(相同的JobInstance
同時運行會拋出JobExecutionAlreadyRunningException
異常)。
Step
Step
是批處理重復運行的最小單元,它按照順序定義了一次執行的必要過程。因此每個Job
可以視作由一個或多個多個Step
組成。一個Step
包含了所有所有進行批處理的必要信息,這些信息的內容是由開發人員決定的並沒有統一的標准。一個Step
可以很簡單,也可以很復雜。他可以是復雜業務的組合,也有可能僅僅用於遷移數據。與JobExecution
的概念類似,Step
也有特定的StepExecution
,關系結構如下:
StepExecution
StepExecution
表示單次執行Step的容器,每次Step
執行時都會有一個新的StepExecution
被創建。與JobExecution
不同的是,當某個Step
執行失敗后並不會再次嘗試重新執行該Step
。StepExecution
包含以下屬性:
屬性 | 說明 |
---|---|
status | 狀態類名為BatchStatus,它指示了執行的狀態。在執行的過程中狀態為BatchStatus#STARTED,失敗:BatchStatus#FAILED,完成:BatchStatus#COMPLETED |
startTime | java.util.Date對象,標記StepExecution啟動的系統時間,未啟動數據為空 |
endTime | java.util.Date對象,結束時間,無論是否成功都包含該數據,如未處理完為空 |
exitStatus | ExitStatus類,記錄運行結果。 |
createTime | java.util.Date,JobExecution的創建時間,某些使用execution已經創建但是並未開始運行。 |
lastUpdate | java.util.Date,最后一次更新時間 |
executionContext | 批處理任務執行的所有用戶數據 |
readCount | 成功讀取數據的次數 |
wirteCount | 成功寫入數據的次數 |
commitCount | 成功提交數據的次數 |
rollbackCount | 回歸數據的次數,有業務代碼觸發 |
readSkipCount | 當讀數據發生錯誤時跳過處理的次數 |
processSkipCount | 當處理過程發生錯誤,跳過處理的次數 |
filterCount | 被過濾規則攔截未處理的次數 |
writeSkipCount | 寫數據失敗,跳過處理的次數 |
ExecutionContext
前文已經多次提到ExecutionContext
。可以簡單的認為ExecutionContext
提供了一個Key/Value機制,在StepExecution
和JobExecution
對象的任何位置都可以獲取到ExecutionContext
中的任何數據。最有價值的作用是記錄數據的執行位置,以便發生重啟時候從對應的位置繼續執行:
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition())
比如在任務中有一個名為“loadData”的Step
,他的作用是從文件中讀取數據寫入到數據庫,當第一次執行失敗后,數據庫中有如下數據:
BATCH_JOB_INSTANCE:
JOB_INST_ID | JOB_NAME |
---|---|
1 | EndOfDayJob |
BATCH_JOB_EXECUTION_PARAMS:
JOB_INST_ID | TYPE_CD | KEY_NAME | DATE_VAL |
---|---|---|---|
1 | DATE | schedule.Date | 2019-01-01 |
BATCH_JOB_EXECUTION:
JOB_EXEC_ID | JOB_INST_ID | START_TIME | END_TIME | STATUS |
---|---|---|---|---|
1 | 1 | 2017-01-01 21:00 | 2017-01-01 21:30 | FAILED |
BATCH_STEP_EXECUTION:
STEP_EXEC_ID | JOB_EXEC_ID | STEP_NAME | START_TIME | END_TIME | STATUS |
---|---|---|---|---|---|
1 | 1 | loadData | 2017-01-01 21:00 | 2017-01-01 21:30 | FAILED |
BATCH_STEP_EXECUTION_CONTEXT:
STEP_EXEC_ID | SHORT_CONTEXT |
---|---|
1 | {piece.count=40321} |
在上面的例子中,Step
運行30分鍾處理了40321個“pieces”,我們姑且認為“pieces”表示行間的行數(實際就是每個Step完成循環處理的個數)。這個值會在每個commit
之前被更新記錄在ExecutionContext
中(更新需要用到StepListener
后文會詳細說明)。當我們再次重啟這個Job
時並記錄在BATCH_STEP_EXECUTION_CONTEXT中的數據會加載到ExecutionContext
中,這樣當我們繼續執行批處理任務時可以從上一次中斷的位置繼續處理。例如下面的代碼在ItemReader
中檢查上次執行的結果,並從中斷的位置繼續執行:
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);
long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
LineReader reader = getReader();
Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
ExecutionContext
是根據JobInstance
進行管理的,因此只要是相同的實例都會具備相同的ExecutionContext(無論是否停止)。此外通過以下方法都可以獲得一個ExecutionContext
:
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
但是這2個ExecutionContext
並不相同,前者是在一個Step
中每次Commit
數據之間共享,后者是在Step
與Step
之間共享。
JobRepository
JobRepository
是所有前面介紹的對象實例的持久化機制。他為JobLauncher
、Job
、Step
的實現提供了CRUD操作。當一個Job
第一次被啟動時,一個JobExecution
會從數據源中獲取到,同時在執行的過程中StepExecution
、JobExecution
的實現都會記錄到數據源中。擋在程序啟動時使用@EnableBatchProcessing
注解,JobRepository
會進行自動化配置。
JobLauncher
JobLauncher
為Job
的啟動運行提供了一個邊界的入口,在啟動Job
的同時還可以定制JobParameters
:
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
事務模型描述
1、step之間事務獨立
2、step划分成多個chunk執行,chunk事務彼此獨立,互不影響;chunk開始開啟一個事務,正常結束提交。chunk表示給定數量的item的操作集合,主要屬性commit-interval,表示數量達到多少條提交一次。
圖-job總體事務
圖-step內部事務
3、chunk定義:默認設置commitInterval=N,即讀取N條數據為一個chunk(采用默認SimpleCompletionPolicy),或者reader里面所讀取的item==null,或者滿足自定義完成策略