使用 Spring Batch 構建企業級批處理應用


https://www.ibm.com/developerworks/cn/java/j-lo-springbatch1/index.html

https://www.ibm.com/developerworks/cn/java/j-lo-springbatch2/

引言

總述

本系列文章旨在通過示例搭建以及特性介紹,詳細講述如何利用 Spring Batch 開發企業批處理應用。本系列文章共分為三部分,第一部分初步介紹了批處理以及 Spring Batch 的相關概念,同時搭建了一個簡單的基於 Spring Batch 的批處理應用。第二部分介紹了 Step Flow 以及並發支持。第三部分則主要介紹了 Spring Batch 對任務監控的支持。下面讓我們進入第一部分內容。

什么是批處理

在現代企業應用當中,面對復雜的業務以及海量的數據,除了通過龐雜的人機交互界面進行各種處理外,還有一類工作,不需要人工干預,只需要定期讀入大批量數據,然后完成相應業務處理並進行歸檔。這類工作即為“批處理”。

從上面的描述可以看出,批處理應用有如下幾個特點:

  • 數據量大,少則百萬,多則上億的數量級。
  • 不需要人工干預,由系統根據配置自動完成。
  • 與時間相關,如每天執行一次或每月執行一次。

同時,批處理應用又明顯分為三個環節:

  • 讀數據,數據可能來自文件、數據庫或消息隊列等
  • 數據處理,如電信支撐系統的計費處理
  • 寫數據,將輸出結果寫入文件、數據庫或消息隊列等

因此,從系統架構上,應重點考慮批處理應用的事務粒度、日志監控、執行、資源管理(尤其存在並發的情況下)。從系統設計上,應重點考慮數據讀寫與業務處理的解耦,提高復用性以及可測試性。

什么是 Spring Batch

Spring Batch 作為 Spring 的子項目,是一款基於 Spring 的企業批處理框架。通過它可以構建出健壯的企業批處理應用。Spring Batch 不僅提供了統一的讀寫接口、豐富的任務處理方式、靈活的事務管理及並發處理,同時還支持日志、監控、任務重啟與跳過等特性,大大簡化了批處理應用開發,將開發人員從復雜的任務配置管理過程中解放出來,使他們可以更多地去關注核心的業務處理過程。

另外我們還需要知道,Spring Batch 是一款批處理應用框架,不是調度框架。它只關注批處理任務相關的問題,如事務、並發、監控、執行等,並不提供相應的調度功能。因此,如果我們希望批處理任務定期執行,可結合 Quartz 等成熟的調度框架實現。

下面將通過一個示例詳細介紹如何使用 Spring Batch 搭建批處理應用。這個示例比較簡單,對系統中所有用戶發送一封繳費提醒通知。此處,我們簡單地將繳費提醒輸出到控制台。當然,隨着介紹的深入,我將逐漸豐富該功能,使其最終完整展示 Spring Batch 的各種特性。

環境搭建

首先,從 Spring 官方網站下載 Spring Batch 發布包(見 參考資源)。本文基於 Spring Batch 2.1.6(當前最新版本為 2.1.8)以及 Spring 2.5.6 版本構建。我們可以看到 Spring Batch 共包含 spring-batch-core 和 spring-batch-infrastructure 兩個包。spring-batch-core 主要包含批處理領域相關類,而 spring-batch-infrastructure 提供了一個基礎訪問處理框架。

接下來,讓我們新建一個 Eclipse 工程,並將 Spring Batch 以及 Spring 相關包添加到依賴環境,如 圖 1 所示

圖 1. 依賴環境

圖 1. 依賴環境

環境搭建完成后,讓我們看一下如何一步步構建一個批處理應用。

構建應用

如“引言”中所述 Spring Batch 按照關注點的不同,將整個批處理過程分為三部分:讀、處理、寫,從而將批處理應用進行合理解耦。同時,Spring Batch 還針對讀、寫操作提供了多種實現,如消息、文件、數據庫。對於數據庫,還提供了 Hibernate、iBatis、JPA 等常見 ORM 框架的讀、寫接口支持。

對象定義

首先我們需要編寫用戶以及消息類,比較簡單,如清單 1 和 清單 2 所示:

清單 1. User 類
1
2
3
4
5
6
7
8
9
10
package org.springframework.batch.sample;
 
public class User {
     private String name;
     private Integer age;
     public String getName() {return name;}
     public void setName(String name) {this.name = name;}
     public Integer getAge() {return age;}
     public void setAge(Integer age) {this.age = age;}
}
清單 2. Message 類
1
2
3
4
5
6
7
package org.springframework.batch.sample;
 
public class Message {
     private String content;
     public String getContent() {return content;}
     public void setContent(String content) {this.content = content;}
}

讀寫及處理接口

首先,所有 Spring Batch 的讀操作均需要實現 ItemReader 接口,而且 Spring Batch 為我們提供了多種默認實現,尤其是基於 ORM 框架的讀接口,同時支持基於游標和分頁兩類操作。因此,多數情況下我們並不需要手動編寫 ItemReader 類,而是直接使用相應實現類即可。

在該示例中,我們使用 org.springframework.batch.item.file.FlatFileItemReader 類從文件中進行信息讀入,用戶信息格式定義如 清單 3 所示。

清單 3. 用戶信息
1
2
3
4
5
6
7
8
9
10
User1,20
User2,21
User3,22
User4,23
User5,24
User6,25
User7,26
User8,27
User9,28
User10,29

該類封裝了文件讀操作,僅僅需要我們手動設置 LineMapper 與訪問文件路徑即可。Spring Batch 通過 LineMapper 可以將文件中的一行映射為一個對象。我們不難發現,Spring Batch 將文件操作封裝為類似 Spring JDBC 風格的接口,這也與 Spring 一貫倡導的接口統一是一致的。此處我們使用 org.springframework.batch.item.file.mapping.DefaultLineMapper 進行行映射。讀操作的配置信息如 清單 4 所示:

清單 4. message_job.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
< beans:bean id = "messageReader"
       class = "org.springframework.batch.item.file.FlatFileItemReader" >
     < beans:property name = "lineMapper" ref = "lineMapper" >
     </ beans:property >
     < beans:property name = "resource"
     value = "classpath:/users.txt" ></ beans:property >
</ beans:bean >
< beans:bean id = "lineMapper"
     class = "org.springframework.batch.item.file.mapping.DefaultLineMapper" >
     < beans:property name = "lineTokenizer" >
         < beans:bean
class = "org.springframework.batch.item.file.transform.DelimitedLineTokenizer" >
         </ beans:bean >
     </ beans:property >
     < beans:property name = "fieldSetMapper" >
         < beans:bean class = "org.springframework.batch.sample.UserMapper" >
         </ beans:bean >
     </ beans:property >
</ beans:bean >

從清單我們可以知道,DefaultLineMapper 需要設置 lineTokenizer 和 fieldSetMapper 兩個屬性,首先通過 lineTokenizer 完成文件行拆分,並封裝為一個屬性結果集,因為我們使用“,”分隔用戶屬性,所以需要將 lineTokenizer 設置為 DelimitedLineTokenizer。最后通過 fieldSetMapper 完成將結果集封裝為一個 POJO 對象。具體實現如 清單 5 所示:

清單 5. UserMapper 類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package org.springframework.batch.sample;
 
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
 
public class UserMapper implements FieldSetMapper< User > {
     public User mapFieldSet(FieldSet fs) throws BindException {
         User u = new User();
         u.setName(fs.readString(0));
         u.setAge(fs.readInt(1));
         return u;
     }
}

該接口的實現方式與 Spring JDBC 的 RowMapper 極其相似。

接下來,再讓我們看一下如何實現寫操作。Spring Batch 所有寫操作均需要實現 ItemWriter 接口。該接口只有一個方法 void write(List<? extends T> items),參數是輸出結果的列表。之所以如此定義,是為了便於我們進行批量操作,以提高性能。每次傳入的列表由事務提交粒度確定,也就是說 Spring Batch 每次將提交的結果集傳入寫操作接口。因為我們要做的僅僅是將繳費通知輸出到控制台,所以,寫操作實現如 清單 6 所示:

清單 6. MessagesItemWriter 類
1
2
3
4
5
6
7
8
9
10
11
12
13
package org.springframework.batch.sample;
 
import java.util.List;
import org.springframework.batch.item.ItemWriter;
 
public class MessagesItemWriter implements ItemWriter< Message >{
     public void write(List<? extends Message> messages) throws Exception {
         System.out.println("write results");
         for (Message m : messages) {
             System.out.println(m.getContent());
         }
     }
}

同 ItemReader 一樣,Spring Batch 也為我們提供了多樣的寫操作支持,具體可閱讀 Spring Batch 參考手冊,此處不再贅述。

最后,再看一下如何實現業務處理。Spring Batch 提供了 ItemProcessor 接口用於完成相應業務處理。在本示例中,即為根據用戶信息生成一條繳費通知信息,如 清單 7 所示:

清單 7. MessagesItemProcessor 類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package org.springframework.batch.sample;
 
import org.springframework.batch.item.ItemProcessor;
 
public class MessagesItemProcessor implements ItemProcessor< User , Message> {
 
     public Message process(User user) throws Exception {
         Message m = new Message();
         m.setContent("Hello " + user.getName()
                 + ",please pay promptly at the end of this month.");
         return m;
     }
 
}

任務定義

通過上面一節,我們已經完成了批處理任務的讀數據、處理過程、寫數據三個過程。那么,我們如何將這三部分結合在一起完成批處理任務呢?

Spring Batch 將批處理任務稱為一個 Job,同時,Job 下分為多個 Step。Step 是一個獨立的、順序的處理步驟,包含該步驟批處理中需要的所有信息。多個批處理 Step 按照一定的流程組成一個 Job。通過這樣的設計方式,我們可以靈活配置 Job 的處理過程。

接下來,讓我們看一下如何配置繳費通知的 Job,如 清單 8 所示:

清單 8. message_job.xml
1
2
3
4
5
6
7
8
9
10
< job id = "messageJob" >
   < step id = "messageStep" >
     < tasklet >
        < chunk reader = "messageReader" processor = "messageProcessor"
            writer = "messageWriter" commit-interval = "5"
            chunk-completion-policy = "" >
         </ chunk >
     </ tasklet >
    </ step >
</ job >

如上,我們定義了一個名為“messageJob”的 Job,該 Job 僅包含一個 Step。在配置 Step 的過程中,我們不僅要指定讀數據、處理、寫數據相關的 bean,還要指定 commit-interval 和 chunk-completion-policy 屬性。前者指定了該 Step 中事務提交的粒度,取值為 5 即表明每當處理完畢讀入的 5 條數據時,提交一次事務。后者指定了 Step 的完成策略,即當什么情況發生時表明該 Step 已經完成,可以轉入后續處理。由於沒有明確指定相應的類,Spring Batch 使用默認策略,即當讀入數據為空時認為 Step 結束。

最后,我們還需要配置一個 JobRepository 並為其指定一個事務管理器,該類用於對 Job 進行管理,如 清單 9 所示:

清單 9. message_job.xml
1
2
3
4
5
6
7
< beans:bean id = "jobRepository"
class = "org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" >
         < beans:property name = "transactionManager" ref = "transactionManager" />
</ beans:bean >
 
< beans:bean id = "transactionManager"
class = "org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

因為我們整個示例不需要數據庫操作,所以選擇了使用 MapJobRepositoryFactoryBean 和 ResourcelessTransactionManager。

所有配置完成以后,進入最后一步——任務執行。

任務執行

那么如何運行一個 Job 呢? Spring Batch 提供了 JobLauncher 接口用於運行 Job,並提供了一個默認實現 SimpleJobLauncher。先讓我們看一下具體執行代碼,如 清單 10 所示:

清單 10. Main 類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Main {
     public static void main(String[] args) {
         ClassPathXmlApplicationContext c =
                  new ClassPathXmlApplicationContext("message_job.xml");
         SimpleJobLauncher launcher = new SimpleJobLauncher();
         launcher.setJobRepository((JobRepository) c.getBean("jobRepository"));
         launcher.setTaskExecutor(new SimpleAsyncTaskExecutor());
         try {
              launcher.run((Job) c.getBean("messageJob"), new JobParameters());
         } catch (Exception e) {
         e.printStackTrace();
         }
     }
}

首先,我們需要為 JobLauncher 指定一個 JobRepository,該類負責創建一個 JobExecution 對象來執行 Job,此處直接從上下文獲取即可。其次,需要指定一個任務執行器,我們使用 Spring Batch 提供的 SimpleAsyncTaskExecutor。最后,通過 run 方法來執行指定的 Job,該方法包含兩個參數,需要執行的 Job 以及執行參數。您可以通過運行示例工程查看運行結果。由於 MessageItemWriter 在每次輸出結果前,先打印了一行提示,因此您可以明顯看出輸出分 2 組進行打印,即事務被提交了 2 次(因為我們設置的事務粒度為 5。)。

從業務功能上考慮,同一任務應該盡量避免重復執行(即相同條件下的任務只能成功運行一次),試想如果本示例中發送繳費通知過多只能導致用戶不滿,那么電信計費批處理任務重復執行則將導致重復計費,從而使用戶遭受損失。幸運的是,Spring Batch 已經為我們考慮好了這些。

對於 Spring Batch 來說,JobParameters 相同的任務只能成功運行一次。您如果在示例 Main 類中連續運行同一 Job,將會得到如下異常(見 清單 11 ):

清單 11. 異常信息
1
2
3
org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException:
A job instance already exists and is complete for parameters={}. 
If you want to run this job again, change the parameters.

因此,如果我們希望該任務是周期執行的(如每月執行一次),那么必須保證周期內參數是唯一。假如該客戶要求我們每月為用戶發送一次繳費通知。我們的任務執行可以如 清單 12 所示:

清單 12. Main 類
1
2
3
4
5
Map< String ,JobParameter> parameters = new HashMap< String ,JobParameter>();
parameters.put(RUN_MONTH_KEY,new JobParameter("2011-10"));
launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters));
parameters.put(RUN_MONTH_KEY,new JobParameter("2011-11"));
launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters));

在示例中,我將執行月份作為 Job 的參數傳入,分別執行了 10、11 月兩個月的任務。

任務重試

既然相同參數的任務只能成功執行一次,那么,如果任務失敗該如何處理?此時,需要考慮的是,既然任務步驟有事務提交粒度,那么可能任務已經提交了部分處理結果,這部分不應該被重復處理。也就是說,此時應該有重試操作。

在 Spring Batch 中,通過配置可以實現步驟 Step 的重試,如 清單 13 所示:

清單 13. message_job.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
< job id = "messageJob" restartable = "true" >
     < step id = "messageStep" >
         < tasklet >
             < chunk reader = "messageReader" processor = "messageProcessor"
                                                    writer = "messageWriter"
                 commit-interval = "5" chunk-completion-policy = "" retry-limit = "2" >
                 < retryable-exception-classes >
                     < include class = "java.lang.RuntimeException" />
                 </ retryable-exception-classes >
             </ chunk >
         </ tasklet >
     </ step >
</ job >

我們可以看到,主要分兩部分:首先,需要設置重試次數,其次是當執行過程中捕獲到哪些異常時需要重試。如果在執行過程中捕獲到重試異常列表中的異常信息,則進行重試操作。如果重試操作達到最大次數仍提示異常,則認為任務執行失敗。對於異常信息的配置,除了通過 include 配置包含列表外,也可以通過 exclude 配置排除列表。

由於通過配置進行的 Step 重試是自動的,因此較難控制(多用於網絡訪問異常等不需要人工干預的情況)。可以考慮一下本示例,如果有一個用戶的信息有問題,名字為空,不能發送繳費通知,步驟重試便不合適了,此時我們可以對 Job 進行重試操作。

Spring Batch 允許重復執行未成功的 Job,而每次執行即為一次重試操作。示例代碼如 清單 14 所示:

清單 14. Main 類
1
2
3
4
5
Map< String ,JobParameter> parameters = new HashMap< String ,JobParameter>();
parameters.put(RUN_MONTH_KEY,new JobParameter("2011-10"));
launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters));
Thread.sleep(10000);
launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters));

您可以通過如下步驟查看運行結果:首先,將 users.txt 文件中的第 7 行(之所以指定該行,便於驗證事務提交以及重復執行的起始位置)的用戶名修改為空。其次,運行示例。最后,在程序出現異常提示時,更新第 7 行的用戶名(為了便於演示,程序在兩次任務執行過程中等待 10 秒鍾)。

您可以在控制台中很明顯的看到,任務先打印了 5 條記錄(第一次事務提交),然后出現異常信息,待我們將錯誤更正后,又打印了 5 條記錄,任務最終成功完成。

從輸出結果,我們可以知道 Spring Batch 是從出錯的事務邊界內第一條記錄重復執行的,這樣便確保了數據完整性,而且所有這一切對於用戶均是透明的。

那么 Spring Batch 是如何做到這一步的呢?這與 Spring Batch 的運行時管理是分不開的。

運行時管理

Spring Batch 提供了如 表 1 所示的類用於記錄每個 Job 的運行信息:

表 1. 運行時類信息

Spring Batch 通過 JobRepository 接口維護所有 Job 的運行信息,此外 JobLauncher 的 run 方法也返回一個 JobExecution 對象,通過該對象可以方便的獲得 Job 其他的運行信息,代碼如 清單 15 所示:

清單 15. Main 類
1
2
3
4
5
6
7
Map< String ,JobParameter> parameters = new HashMap< String ,JobParameter>();
parameters.put(RUN_MONTH_KEY,new JobParameter("2011-10"));
JobExecution je =
        launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters));
System.out.println(je);
System.out.println(je.getJobInstance());
System.out.println(je.getStepExecutions());

輸出信息如 清單 16 所示:

清單 16. 輸出結果
1
2
3
4
5
6
7
8
9
10
11
JobExecution: id=0, version=2, startTime=Tue Nov 15 21:00:09 CST 2011,
endTime=Tue Nov 15 21:00:09 CST 2011, lastUpdated=Tue Nov 15 21:00:09 CST 2011,
status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=,
job=[JobInstance: id=0, version=0, JobParameters=[{run.month=2011-10}], Job=[messageJob]]
 
  JobInstance: id=0, version=0, JobParameters=[{run.month=2011-10}], Job=[messageJob]
 
  [StepExecution: id=1, version=5, name=messageStep, status=COMPLETED,
  exitStatus=COMPLETED, readCount=10, filterCount=0, writeCount=10 readSkipCount=0,
   writeSkipCount=0, processSkipCount=0, commitCount=3 , rollbackCount=0,
   exitDescription=]

從日志您可以發現事務一共提交了 3 次,這與前面的說明是不一致的。之所以會如此是因為當事務提交粒度恰好可以被記錄數整除時,事務會有一次空提交。

關於 Spring Batch 運行時信息管理,將在講解 Job 監控時再詳細介紹,此處不再贅述,你也可以查看 Spring Batch 參考資料了解相關信息。

總結

本文通過一個簡單示例演示了如何構建 Spring Batch 應用,同時介紹了 Spring Batch 的相關核心概念。希望您通過本文可以掌握 Spring Batch 的基本功能。在接下來的文章中,我將繼續介紹 Spring Batch 的兩個重要特性:Job 流和並發。

 

下載資源

 

相關主題

  • 本系列 第 2 部分:主要介紹了 Spring Batch 的 Step Flow 以及並發處理兩項重要特性。
  • Spring Batch 主頁,可以初步了解 Spring Batch 的基本架構。
  • Spring Batch 發布包,您可以在這里找到各個版本的 Spring Batch 發布包。
  • Spring Batch 入門,教你如何入門。
  • Spring Batch 參考手冊,詳細了解 Spring Batch 框架。
  • Spring 參考手冊,Spring Framework 知識學習。
  • Spring Richclient 中的安全認證管理”(developerWorks,2011 年 7 月):作為企業級開發框架,Spring Richclient 為我們提供了完善的安全認證管理功能,使我們能夠方便構建安全的企業級應用。本文將詳細介紹 Spring Richclient 中安全認證管理的實現方式以及使用方法。
  • Struts2、Spring、Hibernate 高效開發的最佳實踐”(developerWorks,2011 年 8 月):Struts2、Spring、Hibernate(SSH)是最常用的 Java EE Web 組件層的開發技術搭配,網絡中和許多 IT 技術書籍中都有它們的開發教程,但是通常的教程都會讓很多程序員陷入痛苦的配置與修改配置的過程。本文利用 SSH 中的技術特性,利用 Java 反射技術,按照規約優於配置的原理,基於 SSH 設定編寫了一個通用開發框架,這使得開發者可以專注於業務邏輯的開發。
  • 如何將基於 Struts、Spring 和 Hibernate 的應用從 Tomcat 遷移到 WebSphere Application Server”(developerWorks,2011 年 11 月):本文向讀者介紹基於 Eclipse 開發的 Struts、Spring 和 Hibernate 開源應用和開發環境的特點,並通過實例介紹從 Tomcat 遷移到 WebSphere 所遇到的問題及其解決方案。
  • 基於 Spring 和 iBATIS 的動態可更新多數據源持久層”(developerWorks,2012 年 2 月):開發擁有多重數據源的項目時,經常希望能夠通過用戶界面來動態配置數據源。本文針對這一問題提出了創新的解決方案,通過使用 Spring+iBATIS 的組合,來實現可動態更新的多重數據源的持久層,從而可以通過用戶界面自主地管理所需的數據源。
  • developerWorks Java 技術專區:這里有數百篇關於 Java 編程各個方面的文章。

前言

在本系列文章的第 1 部分,我們搭建了一個用戶繳費通知的批處理任務。盡管這個簡單的應用展現了 Spring Batch 的基本功能,但是它與真實的應用相去甚遠。在實際應用中,我們的 Job 可能必須要包含多個 Step,為了提高性能,我們可能需要考慮 Job 的並發問題。Spring Batch 在這些方面又提供了哪些好的特性呢?讓我們繼續。

Step Flow

通過前文我們已經知道,Step 是一個獨立的、順序的處理步驟,包含完整的輸入、處理以及輸出。但是在企業應用中,我們面對的更多情況是多個步驟按照一定的順序進行處理。因此如何維護步驟之間的執行順序是我們需要考慮的。Spring Batch 提供了 Step Flow 來解決這個問題。

示例改進

讓我們回到用戶繳費通知的 Job。客戶提出了進一步的需求:計費、扣費、繳費通知要確保順序執行。首先,為每個用戶生成賬單,然后從用戶余額上進行扣除,對於余額不足的用戶,發送繳費通知。下面看一下如何使用 Step Flow 實現該需求。

在講解 Step Flow 之前,我們先對第 1 部分的示例進行改進,將其由文件操作遷移到數據庫上,這樣便於我們后續的講解。數據庫初始化腳本見 init_db_mysql.sql(位於示例代碼包 batch_sample 根目錄下),具體配置如清單 1 所示:

清單 1. billing_job.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
< beans:bean id = "jobRepository"
    class = "org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" >
    < beans:property name = "dataSource" ref = "dataSource" />
    < beans:property name = "transactionManager" ref = "transactionManager" />
</ beans:bean >
< beans:bean id = "userDbReader"
     class = "org.springframework.batch.item.database.JdbcPagingItemReader" >
     < beans:property name = "dataSource" ref = "dataSource" />
     < beans:property name = "rowMapper" ref = "userDbMapper" />
     < beans:property name = "queryProvider" ref = "userQueryProvider" />
</ beans:bean >
< beans:bean id = "userDbMapper"
     class = "org.springframework.batch.sample.UserDbMapper" />
< beans:bean id = "userQueryProvider"
     class = "org.springframework.batch.item.database.support.MySqlPagingQueryProvider" >
     < beans:property name = "selectClause" value = "u.id,u.name,u.age,u.balance" />
     < beans:property name = "fromClause" value = "users u" />
     < beans:property name = "sortKey" value = "u.id" />
</ beans:bean >
< beans:bean id = "messageDbWriter"
     class = "org.springframework.batch.item.database.JdbcBatchItemWriter" >
     < beans:property name = "dataSource" ref = "dataSource" />
     < beans:property name = "sql"
     value = "insert into messages(id,user_id,content) values(:id,:user.id,:content)" />
     < beans:property name = "itemSqlParameterSourceProvider"
         ref = "itemSqlParameterSourceProvider" />
</ beans:bean >
< beans:bean id = "itemSqlParameterSourceProvider"
class = "org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"
  />

我們分別使用 Spring Batch 提供的 JdbcPagingItemReader 和 JdbcBatchItemWriter 進行讀寫。同時,我將 jobRepository 修改為 JobRepositoryFactoryBean,因此,運行示例前,您還需要執行 Spring Batch 提供的 schema-mysql.sql(位於 core 包 org\springframework\batch\core 目錄下)。相關內容將在第 3 部分詳細講解,此處不再贅述。

第一個流程

在配置 Step 時,我們可以指定其 next 屬性,該屬性指向另一個 Step。通過配置 Step 的 next 屬性,我們便可以輕易實現上述流程。具體如清單 2 所示

清單 2. billing_job.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
< job id = "billingJob" restartable = "true" >
     < step id = "billingStep" next = "payStep" >
         < tasklet >
             < chunk reader = "userDbReader" processor = "billingProcessor"
              writer = "billDbWriter" commit-interval = "5" chunk-completion-policy = "" >
             </ chunk >
         </ tasklet >
     </ step >
     < step id = "payStep" next = "messageStep" >
         < tasklet >
           < chunk reader = "billDbReader" processor = "payProcessor" writer = "payDbWriter"
             commit-interval = "5" chunk-completion-policy = ""  skip-limit = "100" >
< skippable-exception-classes >
       < include class = "org.springframework.batch.sample.MoneyNotEnoughException" />
                 </ skippable-exception-classes >
             </ chunk >
         </ tasklet >
     </ step >
     < step id = "messageStep" >
         < tasklet >
             < chunk reader = "billArrearsDbReader" processor = "messageProcessor"
                     writer = "messageDbWriter" commit-interval = "5"
                     chunk-completion-policy = "" >
             </ chunk >
         </ tasklet >
     </ step >
</ job >

我們將 billStep 的 next 設置為 payStep,將 payStep 的 next 設置為 messageStep,同時分別指定了讀、處理、寫接口。Spring Batch 在運行 billingJob 時,首先執行 billingStep,查找用戶信息生成賬單費用,然后執行 payStep,查找賬單信息生成扣費記錄,如果用戶余額不足則跳過。最后,查找欠費賬單,生成繳費通知。只有當上一步執行成功后,才會執行下一步。

billStep 和 payStep 的 ItemProcessor 實現分別如清單 3 和清單 4 所示:

清單 3.BillingItemProcessor 類
1
2
3
4
5
6
7
8
9
10
11
12
13
public class BillingItemProcessor implements ItemProcessor#< User , Bill> {
 
     public Bill process(User item) throws Exception {
         Bill b = new Bill();
         b.setUser(item);
         b.setFees(70.00);
         b.setPaidFees(0.0);
         b.setUnpaidFees(70.00);
         b.setPayStatus(0);/*unpaid*/
         return b;
     }
 
}
清單 4.PaymentItemProcessor 類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class PaymentItemProcessor implements ItemProcessor< Bill , PayRecord> {
  public PayRecord process(Bill item) throws Exception {
         if (item.getUser().getBalance() <= 0) {
             return null;
         }
         if (item.getUser().getBalance() >= item.getUnpaidFees()) {
             // create payrecord
             PayRecord pr = new PayRecord();
             pr.setBill(item);
             pr.setPaidFees(item.getUnpaidFees());
             // update balance
             item.getUser().setBalance(item.getUser().getBalance() -
                  item.getUnpaidFees());
             // update bill
             item.setPaidFees(item.getUnpaidFees());
             item.setUnpaidFees(0.0);
             item.setPayStatus(1);/* paid */
             return pr;
         } else {
             throw new MoneyNotEnoughException();
         }
     }
}

在清單 3 中,我們為每個用戶生成一條 70 元的賬單,已繳費用為 0,未繳費用為 70。在清單 4 中,將賬單金額從用戶余額中扣除,並更新賬單已繳和未繳費用,如果余額不足,提示異常(通過清單 2 可知,我們對於此類異常進行了跳過處理)。

此外,我們現在的繳費通知需要基於欠費賬單生成,因此,我們需要新提供一個繳費通知的 ItemProcessor,具體如清單 5 所示:

清單 5.ArrearsMessagesItemProcessor 類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ArrearsMessagesItemProcessor implements
         ItemProcessor< Bill , Message> {
 
     public Message process(Bill item) throws Exception {
         if (item.getPayStatus() == 0) {/*unpaid*/
             Message m = new Message();
             m.setUser(item.getUser());
             m.setContent("Hello " + item.getUser().getName()
                     + ",please pay promptly at end of this month.");
             return m;
         }
         return null;
     }
 
}

每個 Step 的讀寫接口可參照 billing_job.xml,均使用 Spring Batch 提供的實現類,此處不再贅述(此處需要特別注意 payDbWriter,由於扣費時,我們需要同時生成扣費記錄,並更新用戶和賬單,因此我們使用了 CompositeItemWriter)。至此,我們已經完成了第一步,實現了基本的多步驟順序處理,您可以運行 Main2,並通過數據庫查看運行結果(bills、payrecords、messages)。

條件流程和流程決策

通過上面的 Step Flow,我們已經滿足了客戶的初步需求,但是客戶又提出進一步要求:能否當所有用戶費用均足夠的情況下,不再執行繳費通知處理。因為查詢一遍欠費賬單在一定程度上還是降低了處理性能。Spring Batch 提供了條件流程和流程決策來支持類似應用場景。

首先,讓我們看一下如何使用條件流程來實現該需求。Step 通過在 next 元素上設置 on 屬性來支持條件流程,on 屬性取值為 Step 的結束狀態,如 COMPLETED、FAILED 等,同時還支持 * 以及 ? 通配符,具體可閱讀 參考手冊

由於我們希望當存在余額不足的情況時,也就是 payStep 的跳過條數大於 0 時,再執行繳費通知 Step,因此,我們需要特殊指定一種結束狀態。此處,我們可以為 Step 添加一個監聽器,以返回指定的結束狀態。

修改后的 payStep 如清單 6 所示,監聽器實現如清單 7 所示:

清單 6. billing_job.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
< step id = "payStep" >
     < tasklet >
         < chunk reader = "billDbReader" processor = "payProcessor" writer = "payDbWriter"
             commit-interval = "5" chunk-completion-policy = "" skip-limit = "100" >
             < skippable-exception-classes >
                 < include
         class = "org.springframework.batch.sample.MoneyNotEnoughException" />
             </ skippable-exception-classes >
         </ chunk >
     </ tasklet >
< next on = "COMPLETED WITH SKIPS" to = "messageStep" />
     < listeners >
         < listener ref = "payStepCheckingListener" ></ listener >
     </ listeners >
</ step >
清單 7. PayStepCheckingListener 類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class PayStepCheckingListener extends StepExecutionListenerSupport {
 
     @Override
     public ExitStatus afterStep(StepExecution stepExecution) {
         String exitCode = stepExecution.getExitStatus().getExitCode();
         if (!exitCode.equals(ExitStatus.FAILED.getExitCode())
                 && stepExecution.getSkipCount() > 0) {
             return new ExitStatus("COMPLETED WITH SKIPS");
         } else {
             return null;
         }
     }
 
}

接下來,再讓我們看一下如何使用流程決策來實現該功能。多數情況下,Step 的結束狀態並不能夠滿足較為復雜的條件流程,此時便用到了流程決策器。通過它,我們可以根據 Job 和 Step 的各種執行情況返回相應的執行狀態來控制流程。

首先,我們需要定義一個流程決策器,代碼如清單 8 所示:

清單 8. MessagesDecider 類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MessagesDecider implements JobExecutionDecider {
 
     public FlowExecutionStatus decide(JobExecution jobExecution,
             StepExecution stepExecution) {
         String exitCode = stepExecution.getExitStatus().getExitCode();
         if (!exitCode.equals(ExitStatus.FAILED.getExitCode())
                 && stepExecution.getSkipCount() > 0) {
             return new FlowExecutionStatus("COMPLETED WITH SKIPS");
         } else {
             return FlowExecutionStatus.COMPLETED;
         }
     }
 
}

與 StepExecutionListener 不同,該類的 decide 方法返回一個 FlowExecutionStatus 對象。與之對應,Job 配置修改為如清單 9 所示:

清單 9. billing_job2.xml
1
2
3
4
5
6
7
8
9
10
11
12
< job id = "billingJob" restartable = "true" >
     < step id = "billingStep" next = "payStep" >
     </ step >
     < step id = "payStep" next = "decider" >
     </ step >
     < decision id = "decider" decider = "messagesDecider" >
         < next on = "COMPLETED WITH SKIPS" to = "messageStep" />
         < end on = "COMPLETED" />
     </ decision >
     < step id = "messageStep" >
     </ step >
</ job >

可以看到 payStep 的 next 變成了 decider,在 decider 中根據返回結果確定執行路徑:如果存在跳過的情況,執行 messageStep,否則直接結束 Job(注意:此處我們用到了 end 元素)。

通過上面的講述,我們大體了解了 Spring Batch 對於條件流程的支持(此外,我們可以通過設置 Step 的 next 屬性為先前執行的 Step,從而實現支持循環的 Job,但是筆者並不認為這是實現循環任務的一個好方案,故在此處不做詳細講解),接下來再讓我們看一下批處理中另一項重要特征——並發。

並發處理

如果我們的批處理任務足夠簡單,硬件配置及網絡環境也足夠好,那么我們完全可以將批處理任務設計為單線程,但現實是企業應用對於硬件的要求要比硬件自身發展快的多,更何況還有那么多的企業要在較差的硬件環境中運行自己的企業應用並希望擁有一個可以接受的性能。因此在企業應用中,尤其是涉及到大批量數據處理,並發是不可避免的。那么,Spring Batch 在並發方面又提供了哪些功能支持呢?

首先,Spring Batch 提供了 Step 內的並發,這也是最簡單的一種並發處理支持。通過為 Step 設置 task-executor 屬性,我們便可以使當前 Step 以並發方式執行。同時,還可以通過 throttle-limit 設置並發線程數(默認為 4)。也就是說您不必修改任何業務處理邏輯,僅僅通過修改配置即可以實現同步到異步的切換。

如果我們希望示例中的 billingStep 以並發方式執行,且並發任務數為 5,那么只需要做如下配置即可,見清單 10:

清單 10. billing_job3.xml
1
2
3
4
5
6
7
8
9
10
< step id = "billingStep" next = "payStep" >
     < tasklet task-executor = "taskExecutor" throttle-limit = "5" >
         < chunk reader = "userDbReader" processor = "billingProcessor"
         writer = "billDbWriter" commit-interval = "5" chunk-completion-policy = "" >
         </ chunk >
     </ tasklet >
</ step >
< beans:bean id = "taskExecutor"
     class = "org.springframework.core.task.SimpleAsyncTaskExecutor" >
</ beans:bean >

從清單可以看出,我們為 billingStep 指定了一個異步任務執行器 SimpleAsyncTaskExecutor,該執行器將會按照配置創建指定數目的線程來進行數據處理。通過這種方式,避免了我們手動創建並管理線程的工作,使我們只需要關注業務處理本身。

需要補充說明的是,Spring Core 為我們提供了多種執行器實現(包括多種異步執行器),我們可以根據實際情況靈活選擇使用。當然,像我們此處需要並發處理時,必須使用異步執行器。幾種主要實現如表 1 所示:

表 1. 任務執行器列表

其次,Spring Batch 還支持 Step 間的並發,這是通過 Split Flow 實現的。讓我們看看 Split Flow 是如何使用的。在此之前,讓我們設想一下,假如客戶基於上面的示例提出進一步需求:每月為用戶生成扣費通知,並生成賬單、扣費、繳費通知(對於費用不足的情況)。

當然,要實現上述需求有很多種方式,比如,按照生成賬單、扣費通知、扣費、繳費通知的順序串行執行,然而,此種處理方式勢必會降低性能,即使我們可以使用 Step 多線程處理來提高性能,可仍不是最優方式。那么我們該如何改進呢?顯然,我們可以將生成扣費通知和扣費並行執行,因為這兩步是完全獨立的。修改后的 billing_job 如清單 11 所示:

清單 11. billing_job3.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
< job id = "billingJob" restartable = "true" >
     < step id = "billingStep" next = "splitStep" >
         < tasklet task-executor = "taskExecutor" throttle-limit = "5" >
             < chunk reader = "userDbReader" processor = "billingProcessor"
         writer = "billDbWriter" commit-interval = "5" chunk-completion-policy = "" >
             </ chunk >
         </ tasklet >
     </ step >
     < split id = "splitStep" task-executor = "taskExecutor"  next = "decider" >
         < flow >
             < step id = "billingMessageStep" >
                 < tasklet >
                   < chunk reader = "billDbReader" processor = "billMessageItemProcessor"
                     writer = "messageDbWriter" commit-interval = "5"
                             chunk-completion-policy = "" >
                     </ chunk >
                 </ tasklet >
             </ step >
         </ flow >
         < flow >
             < step id = "payStep" >
                 < tasklet >
                    < chunk reader = "billDbReader" processor = "payProcessor"
                    writer = "payDbWriter" commit-interval = "5" chunk-completion-policy = ""
                    skip-limit = "100" >
                     < skippable-exception-classes >
                       < include
                   class = "org.springframework.batch.sample.MoneyNotEnoughException" />
                       </ skippable-exception-classes >
                     </ chunk >
                 </ tasklet >
             </ step >
         </ flow >
     </ split >
     < decision id = "decider" decider = "messagesDecider" >
         < next on = "COMPLETED WITH SKIPS" to = "paymentMessageStep" />
         < end on = "COMPLETED" />
     </ decision >
     < step id = "paymentMessageStep" >
         < tasklet >
             < chunk reader = "billArrearsDbReader" processor = "messageProcessor"
                     writer = "messageDbWriter" commit-interval = "5"
                     chunk-completion-policy = "" >
             </ chunk >
         </ tasklet >
     </ step >
</ job >

從清單 10 可以看出,billingStep 的下一步變成了一個 split 元素,該元素下包含兩個 flow。“flow”顧名思義包含一系列可執行的 step,示例中兩個 flow 分別包含 billingMessageStep(生成扣費通知)和 payStep 兩個 step。Spring Batch 執行 split 時,將會並行執行其下所有 flow,而且只有當所有 step 均執行完畢之后,才會執行 split 元素的下一步,當然,前提是您為 split 元素指定的"task-executor"為 SimpleAsyncTaskExecutor,該屬性默認為 SyncTaskExecutor,即串行執行。

通過上述兩種方式,我們可以實現 Job 的並發處理,但顯然該方式有其局限性,即僅限於單機。

讓我們設想一下如下場景:在上述繳費任務中,用戶生成賬單非常慢(也許是因為業務處理過於復雜,也許因為生成賬單的過程中同時處理了好多關聯信息)。這種場景我們該怎么優化呢?顯然,即便我們將該步驟配置為並行,那么它的優化空間也是有限的,因為線程並發到一定數量之后必定受限於系統硬件配置。這個時候,我們自然會想到修改部署方式,將耗時操作分配到多個機器上並行執行。那么基於 Spring Batch 我們該如何實現呢?此處便用到了 PartitionStep。讓我們看一下它是如何執行的,其時序圖如圖 1 所示:

圖 1. PartitionStep 序列圖
圖 1. PartitionStep 序列圖

從圖中我們可以看到,PartitionStep 並不負責讀、寫數據,它只是根據配置的策略(PartitionHandler)將 StepExecution 進行分解,並委派到指定的 Step 上並行執行(該 Step 可能是本地,也可能是遠程),執行完畢后,將所有執行結果進行合並(由 StepExecutionAggregator 完成)作為自身的執行結果。利用 PartitionStep,在委派 Step 為遠程調用的情況下,我們可以很容易通過增加從機數目的方式來提高任務運行效率,大大提高了系統的可伸縮性。而且此種方式並不會影響 PartitionStep 所在 Job 的執行順序,因為 PartitionStep 只有當所有委派 Step 完成之后,才會繼續往下執行。

不過使用 PartitionStep 需要注意以下幾點:

  • 由於數據的讀寫以及處理均在從機上進行,因此需要確保並發的從機之間不會重復讀取數據(當然,這個問題是所有批處理應用采用主從和集群架構時所必須考慮的問題,而並非只有 Spring Batch 才會有)。
  • 確保分解到各個從機上的 StepExecution 是不同的。在 StepExecutionSplitter 的默認實現 SimpleStepExecutionSplitter 中,首先通過一個 Partitioner 得到分解后的 ExecutionContext,然后針對每個 ExecutionContext,創建 StepExecution(當然,如果 Step 為重復執行,那么將會得到上次運行的 ExecutionContext 和 StepExecution,而非重新創建)。

從第二點可以看出,通過在 ExecutionContext 設置唯一的信息,我們便可以保證每個從機讀取的數據是不同的。

主從方式的具體配置如清單 12 所示:

清單 12. partition.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
< beans:bean name = "step"
     class = "org.springframework.batch.core.partition.support.PartitionStep" >
     < beans:property name = "partitionHandler" >
         < beans:bean
class = "org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler" >
             < beans:property name = "step" ref = "remoteStep" />
             < beans:property name = "gridSize" value = "10" />
             < beans:property name = "taskExecutor" ref = "taskExecutor" />
         </ beans:bean >
     </ beans:property >
     < beans:property name = "stepExecutionSplitter" >
       < beans:bean
class = "org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter" >
             < beans:constructor-arg ref = "jobRepository" />
             < beans:constructor-arg ref = "messageStep" />
             < beans:constructor-arg ref = "simplePartitioner" />
         </ beans:bean >
     </ beans:property >
     < beans:property name = "jobRepository" ref = "jobRepository" />
</ beans:bean >
 
< step id = "messageStep" >
     < tasklet task-executor = "taskExecutor" >
         < chunk reader = "messageReader" processor = "messageProcessor"
         writer = "messageWriter" commit-interval = "5" chunk-completion-policy = ""
                 retry-limit = "2" >
             < retryable-exception-classes >
                 < include class = "java.lang.RuntimeException" />
             </ retryable-exception-classes >
         </ chunk >
     </ tasklet >
</ step >
< beans:bean id = "remoteStep"
     class = "org.springframework.remoting.httpinvoker.HttpInvokerProxyFactoryBean" >
     < beans:property name = "serviceInterface"
         value = "org.springframework.batch.core.Step" />
     < beans:property name = "serviceUrl"
          value = "${batch.remote.base.url}/steps/messageStep" />
</ beans:bean >

此處只采用 Spring Batch 的默認實現,將 Step 發送到一台從機上執行,當然,您完全可以基於 Spring Batch 當前接口,輕易擴展出分發到 N 台從機上執行的實現。

此外,在耗時的 Step 比較獨立的情況下(如發送扣費通知的 Step,后續 Step 不會依賴扣費通知 Step 的任何輸出結果),我們還可以采用另一種主從架構。在主機上配置一個標准的 Step,其 ItemWriter 負責將讀取的記錄以 Message 的形式發送給消息中間件(當然,該方案並未充分利用 Spring Batch 的特性,而是由消息中間件完成並發處理)。

總結

通過本文的講解,您已經基本了解了 Spring Batch 中對流程、條件以及並發的支持。利用 Spring Batch 提供的這些特性,我們完全可以構建出高性能、高可擴展性和可維護性的批處理應用。在本系列文章的最后一部分,我將繼續給您介紹 Spring Batch 關於批處理監控方面的內容。

 

相關主題

  • 本系列 第 1 部分:一步步了解如何開發基於 Spring Batch 的批處理程序和相關核心概念。
  • Spring Batch 主頁,可以初步了解 Spring Batch 的基本架構。
  • Spring Batch 發布包,您可以在這里找到各個版本的 Spring Batch 發布包。
  • Spring Batch 入門:教你如何入門。
  • Spring Batch 參考手冊,詳細了解 Spring Batch 框架。
  • Spring 參考手冊,Spring Framework 知識學習。
  • Spring Richclient 中的安全認證管理”(developerWorks,2011 年 7 月):作為企業級開發框架,Spring Richclient 為我們提供了完善的安全認證管理功能,使我們能夠方便構建安全的企業級應用。本文將詳細介紹 Spring Richclient 中安全認證管理的實現方式以及使用方法。
  • Struts2、Spring、Hibernate 高效開發的最佳實踐”(developerWorks,2011 年 8 月):Struts2、Spring、Hibernate(SSH)是最常用的 Java EE Web 組件層的開發技術搭配,網絡中和許多 IT 技術書籍中都有它們的開發教程,但是通常的教程都會讓很多程序員陷入痛苦的配置與修改配置的過程。本文利用 SSH 中的技術特性,利用 Java 反射技術,按照規約優於配置的原理,基於 SSH 設定編寫了一個通用開發框架,這使得開發者可以專注於業務邏輯的開發。
  • 如何將基於 Struts、Spring 和 Hibernate 的應用從 Tomcat 遷移到 WebSphere Application Server”(developerWorks,2011 年 11 月):本文向讀者介紹基於 Eclipse 開發的 Struts、Spring 和 Hibernate 開源應用和開發環境的特點,並通過實例介紹從 Tomcat 遷移到 WebSphere 所遇到的問題及其解決方案。
  • 基於 Spring 和 iBATIS 的動態可更新多數據源持久層”(developerWorks,2012 年 2 月):開發擁有多重數據源的項目時,經常希望能夠通過用戶界面來動態配置數據源。本文針對這一問題提出了創新的解決方案,通過使用 Spring+iBATIS 的組合,來實現可動態更新的多重數據源的持久層,從而可以通過用戶界面自主地管理所需的數據源。
  • developerWorks Java 技術專區:這里有數百篇關於 Java 編程各個方面的文章。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM