springboot 大文件處理優化


springboot 大文件處理

業務背景

定時任務初始化,調用第三方API 接口獲取數據,第三方接口為模糊查詢,業務會將需要查詢的大量關鍵詞提前,放到TEXT文件中,一行一條數據,項目中是使用定時任務去操作我們的文件,讀取獲取需要關鍵字,調用API,獲得數據,數據加載到本地DB中。

  1. 業務上傳到文件服務器,固定路徑中
  2. 觸發定時任務,獲取文件到本地服務,項目讀取文件,加載
  3. 調用API ,獲得數據,落庫

實際業務實現,出現問題

當需要搜索的關鍵詞比較多,量比較大,這個時候可能由於單線程讀取文件,加載比較慢,無法實現快速處理,落庫

解決方案:

  1. springboot項目,添加單獨線程池,專門用來處理批量任務,與核心業務線程進行區別,保證互不影響,提高安全性
  2. 使用多線程讀取本地以及下載好的文件【具體實現下文】文件內容量比較小不建議使用,反而可能造成耗時

項目實踐

1. springboot配置類,支持線程數量可配置:application.properties

# 線程池相關 線程池配置
async.film-job.core-pool-size=20
async.film-job.max-pool-size=100
async.film-job.keep-alive-seconds=10
async.film-job.queue-capacity=200
async.film-job.thread-name-prefix=async-Thread-film-service-

# 讀取文件開啟線程數量
file.thread.num=5

實體類

@Data
@Accessors(chain = true)
public class FileThreadVO<T> {
    private InputStream is;
    private Integer start_line;
    private Integer end_line;
    private List<T> result;
}

2. AsyncFilmServiceConfig線程池配置類:


import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 *  線程池配置        @Async("asyncOrderService")
 * @EnableAsync開始對異步任務的支持,然后使用@ConfigurationProperties把同類配置信息自動封裝成一個實體類
 * @ConfigurationProperties屬性prefix表示application.yml配置文件中配置項的前綴,最后結合Lombok的@Setter保證配置文件的值能夠注入到該類對應的屬性中
 *
 **/

@Setter
@ConfigurationProperties(prefix = "async.film-job")
@EnableAsync
@Configuration
public class AsyncFilmServiceConfig {
    /**
     * 核心線程數(默認線程數)
     */
    private int corePoolSize;
    /**
     * 最大線程數
     */
    private int maxPoolSize;
    /**
     * 允許線程空閑時間(單位:默認為秒)
     */
    private int keepAliveSeconds;
    /**
     * 緩沖隊列大小
     */
    private int queueCapacity;
    /**
     * 線程池名前綴
     */
    private String threadNamePrefix;

    @Bean
    public ThreadPoolTaskExecutor asyncFilmService() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
        threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
        threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds);
        threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
        threadPoolTaskExecutor.setThreadNamePrefix(threadNamePrefix);
        // 線程池對拒絕任務的處理策略
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 完成任務自動關閉 , 默認為false
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        // 核心線程超時退出,默認為false
        threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
}

2.2 @Async注解

注意一點:線程池的@Async

  1. @Async注解,它只有一個String類型的value屬性,用於指定一個 Bean 的 Name,類型是 Executor 或 TaskExecutor,表示使用這個指定的線程池來執行異步任務:例如 @Async("asyncFilmService")

@Async失效:

​ ● 如果使用SpringBoot框架必須在啟動類中增加@EnableAsync注解
● 異步方法不能與被調用的異步方法在同一個類中
​ ● 異步類沒有使用@Component注解(或其他同類注解)導致spring無法掃描到異步類
​ ● 類中需要使用@Autowired或@Resource等注解自動注入,不能自己手動new對象
​ ● 異步方法使用非public或static修飾

2.3 線程池提交執行線程的原理圖

ThreadPoolExecutor

image-20220307145733279

image-20220307145628995

3. 分段讀取文件工具類 ReadFileThread


import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

/**
 * Description:分段讀取文件
 */
public class ReadFileThread implements Callable<List<String>> {

    private static Logger logger = LogManager.getLogger(ReadFileThread.class);

    private Integer start_index;    //文件開始讀取行數
    private Integer end_index;      //文件結束讀取行數
    private InputStream is;         //輸入流

    public ReadFileThread(int start_index, int end_index, InputStream is) {
        this.start_index = start_index;
        this.end_index = end_index;
        this.is = is;
    }

    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    @Override
    public List<String> call() throws Exception {
        long start = System.currentTimeMillis();
        StringBuilder result = new StringBuilder();
        List<String> resultList = new ArrayList<>();
        BufferedReader reader = new BufferedReader(new InputStreamReader(is, "utf-8"));
        int loc = 1;
        while (loc < start_index) {
            reader.readLine();
            loc++;
        }
        while (loc < end_index) {
//            result.append(reader.readLine()).append("\r\n"); // 讀取成string字符串
            resultList.add(reader.readLine().trim());
            loc++;
        }
//        result.append(reader.readLine());
        resultList.add(reader.readLine().trim());
//        String strResult = result.toString();
        reader.close();
        is.close();
        logger.info("線程={} 文件讀取完成 總耗時={}毫秒  讀取數據={}條",
                Thread.currentThread().getName(), (System.currentTimeMillis() - start), resultList.size());
        return resultList;
    }
}

4. FileService 服務實現類

import com.zy.website.code.ApiReturnCode;
import com.zy.website.exception.WebsiteBusinessException;
import com.zy.website.model.vo.FileThreadVO;
import com.zy.website.utils.multi.ReadFileThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;

import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

@Service("fileService")
public class FileService{
    //日志
    private static Logger logger = LogManager.getLogger(FileService.class);

    @Value("${file.thread.num}")
    private Integer threadNum; //線程數

    @Resource(name = "asyncFilmService")
    private ThreadPoolTaskExecutor executor;  //線程池

    /**
     * 啟用多個線程分段讀取文件
     * PS:若文件行數小於線程數會造成線程浪費
     * 適用於讀取一行一行的數據報文
     * @return
     */
    public List uploadByThread(MultipartFile file) throws Exception {
        if (file.isEmpty()) {
            return null;
        }
        InputStream is = file.getInputStream();
        List<FileThreadVO> threadVOS = new ArrayList<>(threadNum); //自定義線程實體對象
        //為線程分配讀取行數
        Integer lines = getLineNum(is);     //文件總行數
        Integer line;                       //每個線程分配行數
        Integer start_line;                 //線程讀取文件開始行數
        Integer end_line;                   //線程讀取文件結束行數

        //根據文件行數和線程數計算分配的行數,這里有點繁瑣了,待優化
        if (lines < threadNum) {
            for (int i = 1; i <= lines; i++) {
                FileThreadVO fileThreadVO = new FileThreadVO();
                start_line = end_line = i;
                InputStream stream = file.getInputStream();

                ReadFileThread readFileThread = new ReadFileThread(start_line, end_line, stream);
                fileThreadVO.setStart_line(start_line);
                fileThreadVO.setIs(stream);
                fileThreadVO.setEnd_line(end_line);
                fileThreadVO.setResult(executor.submit(readFileThread).get());
                threadVOS.add(fileThreadVO);
            }
        } else {
            for (int i = 1, tempLine = 0; i <= threadNum; i++, tempLine = ++end_line) {
                InputStream stream = file.getInputStream();
                FileThreadVO fileThreadVO = new FileThreadVO();
                Integer var1 = lines / threadNum;
                Integer var2 = lines % threadNum;
                line = (i == threadNum) ? (var2 == 0 ? var1 : var1 + var2) : var1;
                start_line = (i == 1) ? 1 : tempLine;
                end_line = (i == threadNum) ? lines : start_line + line - 1;

                ReadFileThread readFileThread = new ReadFileThread(start_line, end_line, stream);
                fileThreadVO.setStart_line(start_line);
                fileThreadVO.setIs(stream);
                fileThreadVO.setEnd_line(end_line);
                fileThreadVO.setResult(executor.submit(readFileThread).get());
                threadVOS.add(fileThreadVO);
            }
        }
        List resultCompleteList = new ArrayList<>(); //整合多個線程的讀取結果
        threadVOS.forEach(record->{
            List<String> result = record.getResult();
            resultCompleteList.addAll(result);
        });

        boolean isComplete = false;
        if (resultCompleteList != null ) {
            //校驗行數 由於本項目使用的是讀取行為一個條件 所以只校驗行數 也可以校驗字節
            int i = resultCompleteList.size() - lines;
            if (i == 0) {
                isComplete = true;
            }
        }
        if (!isComplete) {
            logger.error(">>>>>====uploadByThread====>>>>>>文件完整性校驗失敗!");
            throw new WebsiteBusinessException("The file is incomplete!", ApiReturnCode.HTTP_ERROR.getCode());//自定義異常以及錯誤碼
        } else {
            return resultCompleteList;
        }
    }

    /**
     * 獲取文件行數
     * @param is
     * @return
     * @throws IOException
     */
    public int getLineNum(InputStream is) throws IOException {
        int line = 0;
        BufferedReader reader = new BufferedReader(new InputStreamReader(is));
        while (reader.readLine() != null) {
            line++;
        }
        reader.close();
        is.close();
        return line;
    }
}

5. 方法中具體使用

image-20220307150329646


該方法只有對文件,利用線程池多線程的讀取並未寫入,主要業務暫不需要,后續在更。。。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM