跑批 - Spring Batch 批處理使用記錄


根據spring官網文檔提供的spring batch的demo進行小的測驗

啟動類與原springboot啟動類無異

package com.example.batchprocessing;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * Although batch processing can be embedded in web apps and WAR files,
 * the simpler approach demonstrated below creates a standalone application.
 * You package everything in a single, executable JAR file,
 * driven by a good old Java main() method.
 *
 * springboot
 */
@SpringBootApplication
public class BatchProcessingApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchProcessingApplication.class, args);
    }

}

 

批量處理的不管是數據,文本,數據庫備份等,需要對應的有實體類進行映射,比如要備份數據庫得有Tables實體類(里面含有表名等一些數據字段)

這里是批量處理一個用戶角色名,使其全部改為大寫(成為一個大寫的人)

package com.example.batchprocessing;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 現在您可以看到數據輸入和輸出的格式,您可以編寫代碼來表示一行數據,
 * 如下面的示例(來自src/main/java/com/ example/batchprocessing/person.java)所示
 * Now that you can see the format of data inputs and outputs,
 * you can write code to represent a row of data,
 * as the following example shows:
 *
 * You can instantiate the Person class either with first and
 * last name through a constructor or by setting the properties.
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Person {

    private String lastName;
    private String firstName;

}

 

哪個去執行這個操作(變為大寫的人),得有專門的處理類

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;

/**
 * PersonItemProcessor implements Spring Batch's ItemProcessor(項目處理器)interface.
 * This makes it easy to wire the code into a batch job that you will define later in this guide.
 * According to the interface, you receive(接收) an incoming Person object,
 * after which you transform it to an upper-cased Person.(然后將其轉換成大寫的人 ^_^)
 */
//人類中間處理機
public class PersonItemProcessor implements ItemProcessor<Person,Person> {

    private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

    @Override
    public Person process(final Person person) throws Exception {
        final String firstName = person.getFirstName().toUpperCase();
        final String lastName = person.getLastName().toUpperCase();

        final Person transformedPerson = new Person(firstName,lastName);

        log.info("Converting("+person+")info("+transformedPerson+")");

        return transformedPerson;
    }
}

這里實現的是spring batch里的ItemProcessor<~,~> 其中的<Person,Person>表示要讀,寫的數據類型

在文檔里說明了讀與寫的類型可以是不同的.這里是相同的Person類型.

其中重寫的process方法,獲取了傳入的Person類,將其firstName,lastName進行大寫更改.生成新的Person,進行數據回傳. return transformedPerson.

 

有了映射數據實體類,有了更改數據的中間處理器,還需要批量處理,需要進行配置

package com.example.batchprocessing;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

import javax.sql.DataSource;

/**
 * For starters(初學者),the @EnableBatchProcessing annotation adds many critical
 * beans that support jobs and save you a lot of leg work.
 * This example uses a memory-based database (provided by @EnableBatchProcessing)
 * meaning that, when it is done, the data is gone.
 * It also autowires a couple factories needed further below.
 * Now add the following beans to your BatchConfiguration class to define a
 * reader, a processor, and a writer.
 */
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    //讀該讀的

    /**
     * reader() creates and ItemReader. It looks for a file called sample-data.csv
     * and parses each line item with enough information to turn it into a Person.
     * @return
     */
    @Bean
    public FlatFileItemReader<Person> reader(){ //單調文件
        return new FlatFileItemReaderBuilder<Person>()
                .name("personItemReader")
                .resource(new ClassPathResource("sample-data.csv"))
                .delimited() //界限
                .names(new String[]{"firstName","lastName"})
                .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>(){{
                    setTargetType(Person.class);
                }})
                .build();
    }

    /**
     * processor() creates an instance of the PersonItemProcessor that you defined earlier,
     * meant to converth the data to upper case.
     * @return
     */
    @Bean
    public PersonItemProcessor processor(){ //人類中間處理器 使之成為大寫的人
        return new PersonItemProcessor();
    }

    /**
     * write(DataSource) creates an ItemWriter.This one is aimed at a JDBC
     * destination and automatically gets a copy of the dataSource created by
     * @EnableBatchProcessing. It includes the SQL statement needed to isnert a
     * single Person,driven by Java bean properties.
     * @param dataSource
     * @return
     */
    //寫該寫的
    @Bean  //JDBC批量項目寫入器
    public JdbcBatchItemWriter<Person> writer(DataSource dataSource){
        return new JdbcBatchItemWriterBuilder<Person>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO people (first_name,last_name) VALUES (:firstName,:lastName)")
                .dataSource(dataSource)
                .build();
    }

    // tag::jobStep[]

    /**
     * The first method defines the job,and the second one defines a single step.
     * Jobs are built from steps,(工作是一個步驟一個步驟建立起來的),where each step can involve
     * a reader, a processor, and a writer.
     * 每個step步驟都涉及到讀取器,處理器,寫入器
     *
     * /

    /**
     * The last chunk shows the actual job configuratoin
     *
     */

    /**
     *
     * The first method defines the job,and the second one defines a single step.
     * Jobs are built from steps, where each step can involve a reader,
     * a processor, and a writer.
     *
     * In this job definition, you need an incrementer, because jobs use a database
     * to maintain execution state, You then list each step,(though this job has only one step)
     * The job ends,and the Java API produces(產生生成) a perfectly configured job.
     *
     * @param listener 監聽器
     * @param step1 步驟1
     * @return
     */
    //設定工作 包含步驟
    @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step1){
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(step1)
                .end()
                .build();
    }

    /**
     * In the step definition, you define how much data to write at a time.
     * In this case,it writes up to ten records at a time, Next,
     * you configure the reader,processor, and writer by using the bits
     * injected earlier.
     * @param writer
     * @return
     */
    //設定步驟
    @Bean
    public Step step1(JdbcBatchItemWriter<Person> writer){
        return stepBuilderFactory.get("step1")
                .<Person,Person> chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer)
                .build();
    }

}

其中配置中包含讀取器,設定了要讀哪些,即讀該讀的.

包含了寫入器,即寫到哪里,按理說這里如果要做持久存儲,需要在這里配置一些PersonService進行數據的存儲的操作.

包含了中間處理器,在上面已經說過了.

包含了Job配置,其中參數為一個工作完成監聽器,一個步驟step1,下面講到step.

每個Job(工作)是需要一項一項的step組成的,例如這次只是更改為大寫的人名,而需要更改為大寫的人之后,還需要將另一個對應着該Person的電話寫在用戶名后面

那么可能需要要加入一個readPhoneNo()方法,從另一個文件中獲取其對應的電話.再需要一個數據庫連接查詢回來的已經更改為大寫的人名,

在中間處理器PersonItemProcessor中加入這些查詢數據庫獲取新大寫人名,並再加入到配置類中一個writePersonAndPhone方法,使用另一種write方式寫到想寫到地方.

 

而這個改為大寫的人,和將電話寫在大寫的人名后面可以分為兩步操作,即兩個step.

但是可以作為一個Job來執行.

 

有了這些當然還要有批處理的事件提醒,是完成了Job還是沒有,完成到哪一步,以及通過這個事件提醒來記錄一些批處理日志.

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

/**
 * What's this Class use to?
 * The last bit of batch configuration is a way to get notified
 * when the job completes.
 *
 * This Class listens for when a job is BatchStatus.COMPLETED
 * and then uses JdbcTemplate to inspect the results.
 */
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

    private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

    private final JdbcTemplate jdbcTemplate;

    @Autowired
    public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate){
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void afterJob(JobExecution jobExecution){
        if(jobExecution.getStatus()==BatchStatus.COMPLETED){
            log.info("!!!Job Finished! Time to verify the results");

            jdbcTemplate.query("SELECT first_name,last_name FROM people",
                    (rs,row)-> new Person(
                            rs.getString(1),
                            rs.getString(2)
                    )).forEach(person -> log.info("Found <"+person+"> in the database."));
        }
    }
}

上面的事件提醒為查看是否完成了批處理,這里通過查詢數據庫,查看批處理更改后的數據.

也可以進行相應的校驗及測試,查看是否批處理無誤.

 

批量處理結果,其只是執行一個springboot的命令操作.

在后續步驟中,可以更改為某個方法實現.將批量處理可視化界面化.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM