根據spring官網文檔提供的spring batch的demo進行小的測驗
啟動類與原springboot啟動類無異
package com.example.batchprocessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * Although batch processing can be embedded in web apps and WAR files, * the simpler approach demonstrated below creates a standalone application. * You package everything in a single, executable JAR file, * driven by a good old Java main() method. * * springboot */ @SpringBootApplication public class BatchProcessingApplication { public static void main(String[] args) { SpringApplication.run(BatchProcessingApplication.class, args); } }
批量處理的不管是數據,文本,數據庫備份等,需要對應的有實體類進行映射,比如要備份數據庫得有Tables實體類(里面含有表名等一些數據字段)
這里是批量處理一個用戶角色名,使其全部改為大寫(成為一個大寫的人)
package com.example.batchprocessing; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * 現在您可以看到數據輸入和輸出的格式,您可以編寫代碼來表示一行數據, * 如下面的示例(來自src/main/java/com/ example/batchprocessing/person.java)所示 * Now that you can see the format of data inputs and outputs, * you can write code to represent a row of data, * as the following example shows: * * You can instantiate the Person class either with first and * last name through a constructor or by setting the properties. */ @Data @NoArgsConstructor @AllArgsConstructor public class Person { private String lastName; private String firstName; }
哪個去執行這個操作(變為大寫的人),得有專門的處理類
package com.example.batchprocessing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.ItemProcessor; /** * PersonItemProcessor implements Spring Batch's ItemProcessor(項目處理器)interface. * This makes it easy to wire the code into a batch job that you will define later in this guide. * According to the interface, you receive(接收) an incoming Person object, * after which you transform it to an upper-cased Person.(然后將其轉換成大寫的人 ^_^) */ //人類中間處理機 public class PersonItemProcessor implements ItemProcessor<Person,Person> { private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class); @Override public Person process(final Person person) throws Exception { final String firstName = person.getFirstName().toUpperCase(); final String lastName = person.getLastName().toUpperCase(); final Person transformedPerson = new Person(firstName,lastName); log.info("Converting("+person+")info("+transformedPerson+")"); return transformedPerson; } }
這里實現的是spring batch里的ItemProcessor<~,~> 其中的<Person,Person>表示要讀,寫的數據類型
在文檔里說明了讀與寫的類型可以是不同的.這里是相同的Person類型.
其中重寫的process方法,獲取了傳入的Person類,將其firstName,lastName進行大寫更改.生成新的Person,進行數據回傳. return transformedPerson.
有了映射數據實體類,有了更改數據的中間處理器,還需要批量處理,需要進行配置
package com.example.batchprocessing; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import javax.sql.DataSource; /** * For starters(初學者),the @EnableBatchProcessing annotation adds many critical * beans that support jobs and save you a lot of leg work. * This example uses a memory-based database (provided by @EnableBatchProcessing) * meaning that, when it is done, the data is gone. * It also autowires a couple factories needed further below. * Now add the following beans to your BatchConfiguration class to define a * reader, a processor, and a writer. */ @Configuration @EnableBatchProcessing public class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; //讀該讀的 /** * reader() creates and ItemReader. It looks for a file called sample-data.csv * and parses each line item with enough information to turn it into a Person. * @return */ @Bean public FlatFileItemReader<Person> reader(){ //單調文件 return new FlatFileItemReaderBuilder<Person>() .name("personItemReader") .resource(new ClassPathResource("sample-data.csv")) .delimited() //界限 .names(new String[]{"firstName","lastName"}) .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>(){{ setTargetType(Person.class); }}) .build(); } /** * processor() creates an instance of the PersonItemProcessor that you defined earlier, * meant to converth the data to upper case. * @return */ @Bean public PersonItemProcessor processor(){ //人類中間處理器 使之成為大寫的人 return new PersonItemProcessor(); } /** * write(DataSource) creates an ItemWriter.This one is aimed at a JDBC * destination and automatically gets a copy of the dataSource created by * @EnableBatchProcessing. It includes the SQL statement needed to isnert a * single Person,driven by Java bean properties. * @param dataSource * @return */ //寫該寫的 @Bean //JDBC批量項目寫入器 public JdbcBatchItemWriter<Person> writer(DataSource dataSource){ return new JdbcBatchItemWriterBuilder<Person>() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO people (first_name,last_name) VALUES (:firstName,:lastName)") .dataSource(dataSource) .build(); } // tag::jobStep[] /** * The first method defines the job,and the second one defines a single step. * Jobs are built from steps,(工作是一個步驟一個步驟建立起來的),where each step can involve * a reader, a processor, and a writer. * 每個step步驟都涉及到讀取器,處理器,寫入器 * * / /** * The last chunk shows the actual job configuratoin * */ /** * * The first method defines the job,and the second one defines a single step. * Jobs are built from steps, where each step can involve a reader, * a processor, and a writer. * * In this job definition, you need an incrementer, because jobs use a database * to maintain execution state, You then list each step,(though this job has only one step) * The job ends,and the Java API produces(產生生成) a perfectly configured job. * * @param listener 監聽器 * @param step1 步驟1 * @return */ //設定工作 包含步驟 @Bean public Job importUserJob(JobCompletionNotificationListener listener, Step step1){ return jobBuilderFactory.get("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); } /** * In the step definition, you define how much data to write at a time. * In this case,it writes up to ten records at a time, Next, * you configure the reader,processor, and writer by using the bits * injected earlier. * @param writer * @return */ //設定步驟 @Bean public Step step1(JdbcBatchItemWriter<Person> writer){ return stepBuilderFactory.get("step1") .<Person,Person> chunk(10) .reader(reader()) .processor(processor()) .writer(writer) .build(); } }
其中配置中包含讀取器,設定了要讀哪些,即讀該讀的.
包含了寫入器,即寫到哪里,按理說這里如果要做持久存儲,需要在這里配置一些PersonService進行數據的存儲的操作.
包含了中間處理器,在上面已經說過了.
包含了Job配置,其中參數為一個工作完成監聽器,一個步驟step1,下面講到step.
每個Job(工作)是需要一項一項的step組成的,例如這次只是更改為大寫的人名,而需要更改為大寫的人之后,還需要將另一個對應着該Person的電話寫在用戶名后面
那么可能需要要加入一個readPhoneNo()方法,從另一個文件中獲取其對應的電話.再需要一個數據庫連接查詢回來的已經更改為大寫的人名,
在中間處理器PersonItemProcessor中加入這些查詢數據庫獲取新大寫人名,並再加入到配置類中一個writePersonAndPhone方法,使用另一種write方式寫到想寫到地方.
而這個改為大寫的人,和將電話寫在大寫的人名后面可以分為兩步操作,即兩個step.
但是可以作為一個Job來執行.
有了這些當然還要有批處理的事件提醒,是完成了Job還是沒有,完成到哪一步,以及通過這個事件提醒來記錄一些批處理日志.
package com.example.batchprocessing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.listener.JobExecutionListenerSupport; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; /** * What's this Class use to? * The last bit of batch configuration is a way to get notified * when the job completes. * * This Class listens for when a job is BatchStatus.COMPLETED * and then uses JdbcTemplate to inspect the results. */ @Component public class JobCompletionNotificationListener extends JobExecutionListenerSupport { private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class); private final JdbcTemplate jdbcTemplate; @Autowired public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate){ this.jdbcTemplate = jdbcTemplate; } @Override public void afterJob(JobExecution jobExecution){ if(jobExecution.getStatus()==BatchStatus.COMPLETED){ log.info("!!!Job Finished! Time to verify the results"); jdbcTemplate.query("SELECT first_name,last_name FROM people", (rs,row)-> new Person( rs.getString(1), rs.getString(2) )).forEach(person -> log.info("Found <"+person+"> in the database.")); } } }
上面的事件提醒為查看是否完成了批處理,這里通過查詢數據庫,查看批處理更改后的數據.
也可以進行相應的校驗及測試,查看是否批處理無誤.
批量處理結果,其只是執行一個springboot的命令操作.
在后續步驟中,可以更改為某個方法實現.將批量處理可視化界面化.