本案例旨在讓新手從0開始完成一個批量任務的開發
第一步:建表,入參
CREATE TABLE `music_info` (
`id` int(10) NOT NULL AUTO_INCREMENT COMMENT '主鍵id',
`singer_name` varchar(100) NOT NULL COMMENT '歌手名',
`music_size` varchar(100) NOT NULL COMMENT '歌曲大小',
`music_name` varchar(100) NOT NULL COMMENT '歌曲名',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8;
insert into `music_info`(`id`,`singer_name`,`music_size`,`music_name`) values (1,'小三','3.2M','起風了'),(2,'劉德華','3.0M','忘情水'),(3,'豬點點','5.0M','會寫程序的小豬');
第二步:搭建SpringBoot項目,使用IDEA或者Spring官網均可
第三步:導入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
第四步:啟動類上加注解@EnableBatchProcessing
@SpringBootApplication
@EnableBatchProcessing
public class SpringbatchApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbatchApplication.class, args);
}
}
第五步:添加配置文件內容
#開發配置
#數據庫連接參數的配置
spring.datasource.driverClassName = com.mysql.jdbc.Driver
spring.datasource.url = jdbc:mysql://localhost:3306/test?serverTimezone=UTC
spring.datasource.username = root
spring.datasource.password = 123456
#項目啟動時的建表sql腳本,該腳本由Spring Batch提供
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
#在項目啟動時進行執行建表sql
#是否生成執行狀態記錄的表結構
spring.batch.initialize-schema=always
#禁止Spring Batch自動執行,既需要用戶觸發才能執行
spring.batch.job.enabled=true
第六步:創建實體類
public class Music {
// 主鍵id
private Integer id;
// 歌手名
private String singer_name;
// 歌曲大小
private String music_size;
// 歌曲名
private String music_name;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getSinger_name() {
return singer_name;
}
public void setSinger_name(String singer_name) {
this.singer_name = singer_name;
}
public String getMusic_size() {
return music_size;
}
public void setMusic_size(String music_size) {
this.music_size = music_size;
}
public String getMusic_name() {
return music_name;
}
public void setMusic_name(String music_name) {
this.music_name = music_name;
}
@Override
public String toString() {
return "Music{" +
"id=" + id +
", singer_name='" + singer_name + '\'' +
", music_size='" + music_size + '\'' +
", music_name='" + music_name + '\'' +
'}';
}
}
第七步:創建數據持久層代碼
@Mapper
public interface MusicDao {
//通過id查詢數據庫記錄
@Select("select id , singer_name , music_size , music_name from music_info where id = #{id};")
public List<Music> queryInfoById(Map<String , Integer> map);
}
第八步:編寫批量代碼
@Configuration
public class BatchJobDemo {
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Autowired
SqlSessionFactory sqlSessionFactory;
private static final String JOB = "job";
private static final String STEP = "step";
//配置一個Job
@Bean(name = JOB)
Job job() {
return jobBuilderFactory.get(JOB)
.start(step())
.build();
}
//配置一個Step
@Bean(name = STEP)
Step step() {
return stepBuilderFactory.get(STEP)
.<Music, Music>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.build();
}
//配置itemReader
@Bean("itemReader")
@StepScope
MyBatisCursorItemReader<Music> itemReader() {
System.out.println("開始查詢數據庫");
MyBatisCursorItemReader<Music> reader = new MyBatisCursorItemReader<>();
Map<String , Object> map = new HashMap<>();
map.put("id" , 2);
reader.setQueryId("com.example.springbatch.dao.MusicDao.queryInfoById");
reader.setSqlSessionFactory(sqlSessionFactory);
reader.setParameterValues(map);
return reader;
}
//配置itemWriter
@Bean("itemWriter")
@StepScope
FlatFileItemWriter<Music> itemWriter() {
System.out.println("開始寫入文件中");
FlatFileItemWriter<Music> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource("F:\\music.txt"));//系統目錄
//將Music對象轉換成字符串,並輸出到文件
writer.setLineAggregator(new LineAggregator<Music>() {
@Override
public String aggregate(Music music) {
ObjectMapper mapper = new ObjectMapper();
String str = null;
try {
str = mapper.writeValueAsString(music);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return str;
}
});
return writer;
}
}
第八步:在磁盤根目錄創建文件music.txt
第九步:啟動服務
第十步:文件中的結果
{"id":2,"singer_name":"劉德華","music_size":"3.0M","music_name":"忘情水"}
技術總結
1、 什么是SpringBatch?
是Spring提供的一個數據處理框架,是一個輕量級,全面的批處理框架,旨在開發對企業系統日常運營至關重要的強大批處理應用程序
2、 批處理應用程序大致流程
從數據庫、文件、隊列中讀取大量記錄
以某種方式處理數據
以修改之后的形式寫回數據
3、 SpringBatch總體架構
在SpringBatch中一個Job可以定義很多的步驟Step,在每一個Step中可以定義其專屬的ItemReader用於讀取數據,ItemProcesseor用於處理數據,ItemWriter用於寫入數據,而每一個定義的Job則都在JobRepository中,我們可以通過JobLauncher來啟動某個Job
4、 什么是Job?
Job是一個封裝整個批處理過程的概念,在SpringBatch體系中是一個最頂層的抽象概念,體現在代碼中則是一個最上層的接口
5、 什么是JobLauncher?
該接口主要用於啟動指定了JobParameters的Job,JobParameters和Job一起才能組成一次Job的執行
6、 chunk處理流程
由於一次batch任務可能會有很多數據讀寫操作,一條一條的處理並向數據庫提交的話效率不會很高,因此SpringBatch提出了chunk的概念,設定一個chunk size,SpraingBatch將會一條條處理數據,但是不會提交到數據庫,只有當處理的數據達到了chunk size設定值,才會一起commit
例如:在一個Step中,chunk size設置為10,當ItemReader讀的數據達到10的時候,這一批次就一起傳到ItemWriter,同時Transaction被提交
7、 skip策略和失敗處理
skipLimit():該方法的作用是我們可以設定一個我們允許的Step跳過異常的數量,加入設定為10,那么整個Step運行時,只要出現異常數量不超過10,整個Step就不會失敗
skip():用來指定跳過的異常,因為有些異常的出現, 我們可以忽略
noSkip():指定某些異常出現時,無需跳過,一旦出現,計數器就會累加一次,直到達到上限
8、JobLauncher何時使用?
問題:
1、 如果一個類實現了Job接口,會怎樣,實際開發中沒有出現實現Job接口的情況?
2、 什么是JobInstance、什么是JobParameters、什么是JobExecution?Springboot整合SpringBatch完成基本案例--從數據庫讀取數據並寫入文件