Springboot整合SpringBatch完成基本案例--從數據庫讀取數據並寫入文件


本案例旨在讓新手從0開始完成一個批量任務的開發

 

 

第一步:建表,入參

CREATE TABLE `music_info` (

  `id` int(10) NOT NULL AUTO_INCREMENT COMMENT '主鍵id',

  `singer_name` varchar(100) NOT NULL COMMENT '歌手名',

  `music_size` varchar(100) NOT NULL COMMENT '歌曲大小',

  `music_name` varchar(100) NOT NULL COMMENT '歌曲名',

  PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8;

 

insert  into `music_info`(`id`,`singer_name`,`music_size`,`music_name`) values (1,'小三','3.2M','起風了'),(2,'劉德華','3.0M','忘情水'),(3,'豬點點','5.0M','會寫程序的小豬');

 

第二步:搭建SpringBoot項目,使用IDEA或者Spring官網均可

 

第三步:導入依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>2.1.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    <exclusions>
        <exclusion>
            <groupId>org.junit.vintage</groupId>
            <artifactId>junit-vintage-engine</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>

第四步:啟動類上加注解@EnableBatchProcessing

@SpringBootApplication
@EnableBatchProcessing
public class SpringbatchApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringbatchApplication.class, args);
    }
}

第五步:添加配置文件內容

#開發配置
#數據庫連接參數的配置
spring.datasource.driverClassName = com.mysql.jdbc.Driver
spring.datasource.url = jdbc:mysql://localhost:3306/test?serverTimezone=UTC
spring.datasource.username = root
spring.datasource.password = 123456
#項目啟動時的建表sql腳本,該腳本由Spring Batch提供
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
#在項目啟動時進行執行建表sql
#
是否生成執行狀態記錄的表結構
spring.batch.initialize-schema=always
#禁止Spring Batch自動執行,既需要用戶觸發才能執行
spring.batch.job.enabled=true

第六步:創建實體類

public class Music {
    // 主鍵id
   
private Integer id;

    // 歌手名
   
private String singer_name;

    // 歌曲大小
   
private String music_size;

    // 歌曲名
   
private String music_name;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getSinger_name() {
        return singer_name;
    }

    public void setSinger_name(String singer_name) {
        this.singer_name = singer_name;
    }

    public String getMusic_size() {
        return music_size;
    }

    public void setMusic_size(String music_size) {
        this.music_size = music_size;
    }

    public String getMusic_name() {
        return music_name;
    }

    public void setMusic_name(String music_name) {
        this.music_name = music_name;
    }

    @Override
    public String toString() {
        return "Music{" +
                "id=" + id +
                ", singer_name='" + singer_name + '\'' +
                ", music_size='" + music_size + '\'' +
                ", music_name='" + music_name + '\'' +
                '}';
    }
}

第七步:創建數據持久層代碼

@Mapper
public interface MusicDao {
    //通過id查詢數據庫記錄
   
@Select("select id , singer_name , music_size , music_name from music_info where id = #{id};")
    public List<Music> queryInfoById(Map<String , Integer> map);
}

第八步:編寫批量代碼

@Configuration
public class BatchJobDemo {
    @Autowired
    JobBuilderFactory jobBuilderFactory;
    @Autowired
    StepBuilderFactory stepBuilderFactory;
    @Autowired
    SqlSessionFactory sqlSessionFactory;

    private static final String JOB = "job";

    private static final String STEP = "step";

    //配置一個Job
   
@Bean(name = JOB)
    Job job() {
        return jobBuilderFactory.get(JOB)
                .start(step())
                .build();
    }


    //配置一個Step
   
@Bean(name = STEP)
    Step step() {
        return stepBuilderFactory.get(STEP)
                .<Music, Music>chunk(2)
                .reader(itemReader())
                .writer(itemWriter())
                .build();
    }


    //配置itemReader
   
@Bean("itemReader")
    @StepScope
    MyBatisCursorItemReader<Music> itemReader() {
        System.out.println("開始查詢數據庫");
        MyBatisCursorItemReader<Music> reader = new MyBatisCursorItemReader<>();
        Map<String , Object> map = new HashMap<>();
        map.put("id" , 2);
        reader.setQueryId("com.example.springbatch.dao.MusicDao.queryInfoById");
        reader.setSqlSessionFactory(sqlSessionFactory);
        reader.setParameterValues(map);
        return reader;
    }

    //配置itemWriter
   
@Bean("itemWriter")
    @StepScope
    FlatFileItemWriter<Music> itemWriter() {
        System.out.println("開始寫入文件中");
        FlatFileItemWriter<Music> writer = new FlatFileItemWriter<>();
        writer.setResource(new FileSystemResource("F:\\music.txt"));//系統目錄
       
//Music對象轉換成字符串,並輸出到文件
       
writer.setLineAggregator(new LineAggregator<Music>() {
            @Override
            public String aggregate(Music music) {
                ObjectMapper mapper = new ObjectMapper();
                String str = null;
                try {
                    str = mapper.writeValueAsString(music);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
                return str;
            }
        });
        return writer;
    }
}

第八步:在磁盤根目錄創建文件music.txt

第九步:啟動服務

第十步:文件中的結果

{"id":2,"singer_name":"劉德華","music_size":"3.0M","music_name":"忘情水"}

 

 

技術總結

1、 什么是SpringBatch?

是Spring提供的一個數據處理框架,是一個輕量級,全面的批處理框架,旨在開發對企業系統日常運營至關重要的強大批處理應用程序

2、 批處理應用程序大致流程

從數據庫、文件、隊列中讀取大量記錄

以某種方式處理數據

以修改之后的形式寫回數據

3、 SpringBatch總體架構

在SpringBatch中一個Job可以定義很多的步驟Step,在每一個Step中可以定義其專屬的ItemReader用於讀取數據,ItemProcesseor用於處理數據,ItemWriter用於寫入數據,而每一個定義的Job則都在JobRepository中,我們可以通過JobLauncher來啟動某個Job

4、 什么是Job?

Job是一個封裝整個批處理過程的概念,在SpringBatch體系中是一個最頂層的抽象概念,體現在代碼中則是一個最上層的接口

5、 什么是JobLauncher?

該接口主要用於啟動指定了JobParameters的Job,JobParameters和Job一起才能組成一次Job的執行

6、 chunk處理流程

由於一次batch任務可能會有很多數據讀寫操作,一條一條的處理並向數據庫提交的話效率不會很高,因此SpringBatch提出了chunk的概念,設定一個chunk size,SpraingBatch將會一條條處理數據,但是不會提交到數據庫,只有當處理的數據達到了chunk size設定值,才會一起commit

 

例如:在一個Step中,chunk size設置為10,當ItemReader讀的數據達到10的時候,這一批次就一起傳到ItemWriter,同時Transaction被提交

7、 skip策略和失敗處理

skipLimit():該方法的作用是我們可以設定一個我們允許的Step跳過異常的數量,加入設定為10,那么整個Step運行時,只要出現異常數量不超過10,整個Step就不會失敗

skip():用來指定跳過的異常,因為有些異常的出現, 我們可以忽略

noSkip():指定某些異常出現時,無需跳過,一旦出現,計數器就會累加一次,直到達到上限

8、JobLauncher何時使用?

 

 

 

問題:

1、 如果一個類實現了Job接口,會怎樣,實際開發中沒有出現實現Job接口的情況?

2、 什么是JobInstance、什么是JobParameters、什么是JobExecution?Springboot整合SpringBatch完成基本案例--從數據庫讀取數據並寫入文件


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM