springboot 大文件處理
業務背景
定時任務初始化,調用第三方API 接口獲取數據,第三方接口為模糊查詢,業務會將需要查詢的大量關鍵詞提前,放到TEXT文件中,一行一條數據,項目中是使用定時任務去操作我們的文件,讀取獲取需要關鍵字,調用API,獲得數據,數據加載到本地DB中。
- 業務上傳到文件服務器,固定路徑中
- 觸發定時任務,獲取文件到本地服務,項目讀取文件,加載
- 調用API ,獲得數據,落庫
實際業務實現,出現問題
當需要搜索的關鍵詞比較多,量比較大,這個時候可能由於單線程讀取文件,加載比較慢,無法實現快速處理,落庫
解決方案:
- springboot項目,添加單獨線程池,專門用來處理批量任務,與核心業務線程進行區別,保證互不影響,提高安全性
- 使用多線程讀取本地以及下載好的文件【具體實現下文】文件內容量比較小不建議使用,反而可能造成耗時
項目實踐
1. springboot配置類,支持線程數量可配置:application.properties
# 線程池相關 線程池配置
async.film-job.core-pool-size=20
async.film-job.max-pool-size=100
async.film-job.keep-alive-seconds=10
async.film-job.queue-capacity=200
async.film-job.thread-name-prefix=async-Thread-film-service-
# 讀取文件開啟線程數量
file.thread.num=5
實體類
@Data
@Accessors(chain = true)
public class FileThreadVO<T> {
private InputStream is;
private Integer start_line;
private Integer end_line;
private List<T> result;
}
2. AsyncFilmServiceConfig線程池配置類:
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 線程池配置 @Async("asyncOrderService")
* @EnableAsync開始對異步任務的支持,然后使用@ConfigurationProperties把同類配置信息自動封裝成一個實體類
* @ConfigurationProperties屬性prefix表示application.yml配置文件中配置項的前綴,最后結合Lombok的@Setter保證配置文件的值能夠注入到該類對應的屬性中
*
**/
@Setter
@ConfigurationProperties(prefix = "async.film-job")
@EnableAsync
@Configuration
public class AsyncFilmServiceConfig {
/**
* 核心線程數(默認線程數)
*/
private int corePoolSize;
/**
* 最大線程數
*/
private int maxPoolSize;
/**
* 允許線程空閑時間(單位:默認為秒)
*/
private int keepAliveSeconds;
/**
* 緩沖隊列大小
*/
private int queueCapacity;
/**
* 線程池名前綴
*/
private String threadNamePrefix;
@Bean
public ThreadPoolTaskExecutor asyncFilmService() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds);
threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
threadPoolTaskExecutor.setThreadNamePrefix(threadNamePrefix);
// 線程池對拒絕任務的處理策略
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 完成任務自動關閉 , 默認為false
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
// 核心線程超時退出,默認為false
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
2.2 @Async注解
注意一點:線程池的@Async
- @Async注解,它只有一個String類型的value屬性,用於指定一個 Bean 的 Name,類型是 Executor 或 TaskExecutor,表示使用這個指定的線程池來執行異步任務:例如 @Async("asyncFilmService")
@Async失效:
● 如果使用SpringBoot框架必須在啟動類中增加@EnableAsync注解
● 異步方法不能與被調用的異步方法在同一個類中
● 異步類沒有使用@Component注解(或其他同類注解)導致spring無法掃描到異步類
● 類中需要使用@Autowired或@Resource等注解自動注入,不能自己手動new對象
● 異步方法使用非public或static修飾
2.3 線程池提交執行線程的原理圖
ThreadPoolExecutor
3. 分段讀取文件工具類 ReadFileThread
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
/**
* Description:分段讀取文件
*/
public class ReadFileThread implements Callable<List<String>> {
private static Logger logger = LogManager.getLogger(ReadFileThread.class);
private Integer start_index; //文件開始讀取行數
private Integer end_index; //文件結束讀取行數
private InputStream is; //輸入流
public ReadFileThread(int start_index, int end_index, InputStream is) {
this.start_index = start_index;
this.end_index = end_index;
this.is = is;
}
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
@Override
public List<String> call() throws Exception {
long start = System.currentTimeMillis();
StringBuilder result = new StringBuilder();
List<String> resultList = new ArrayList<>();
BufferedReader reader = new BufferedReader(new InputStreamReader(is, "utf-8"));
int loc = 1;
while (loc < start_index) {
reader.readLine();
loc++;
}
while (loc < end_index) {
// result.append(reader.readLine()).append("\r\n"); // 讀取成string字符串
resultList.add(reader.readLine().trim());
loc++;
}
// result.append(reader.readLine());
resultList.add(reader.readLine().trim());
// String strResult = result.toString();
reader.close();
is.close();
logger.info("線程={} 文件讀取完成 總耗時={}毫秒 讀取數據={}條",
Thread.currentThread().getName(), (System.currentTimeMillis() - start), resultList.size());
return resultList;
}
}
4. FileService 服務實現類
import com.zy.website.code.ApiReturnCode;
import com.zy.website.exception.WebsiteBusinessException;
import com.zy.website.model.vo.FileThreadVO;
import com.zy.website.utils.multi.ReadFileThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
@Service("fileService")
public class FileService{
//日志
private static Logger logger = LogManager.getLogger(FileService.class);
@Value("${file.thread.num}")
private Integer threadNum; //線程數
@Resource(name = "asyncFilmService")
private ThreadPoolTaskExecutor executor; //線程池
/**
* 啟用多個線程分段讀取文件
* PS:若文件行數小於線程數會造成線程浪費
* 適用於讀取一行一行的數據報文
* @return
*/
public List uploadByThread(MultipartFile file) throws Exception {
if (file.isEmpty()) {
return null;
}
InputStream is = file.getInputStream();
List<FileThreadVO> threadVOS = new ArrayList<>(threadNum); //自定義線程實體對象
//為線程分配讀取行數
Integer lines = getLineNum(is); //文件總行數
Integer line; //每個線程分配行數
Integer start_line; //線程讀取文件開始行數
Integer end_line; //線程讀取文件結束行數
//根據文件行數和線程數計算分配的行數,這里有點繁瑣了,待優化
if (lines < threadNum) {
for (int i = 1; i <= lines; i++) {
FileThreadVO fileThreadVO = new FileThreadVO();
start_line = end_line = i;
InputStream stream = file.getInputStream();
ReadFileThread readFileThread = new ReadFileThread(start_line, end_line, stream);
fileThreadVO.setStart_line(start_line);
fileThreadVO.setIs(stream);
fileThreadVO.setEnd_line(end_line);
fileThreadVO.setResult(executor.submit(readFileThread).get());
threadVOS.add(fileThreadVO);
}
} else {
for (int i = 1, tempLine = 0; i <= threadNum; i++, tempLine = ++end_line) {
InputStream stream = file.getInputStream();
FileThreadVO fileThreadVO = new FileThreadVO();
Integer var1 = lines / threadNum;
Integer var2 = lines % threadNum;
line = (i == threadNum) ? (var2 == 0 ? var1 : var1 + var2) : var1;
start_line = (i == 1) ? 1 : tempLine;
end_line = (i == threadNum) ? lines : start_line + line - 1;
ReadFileThread readFileThread = new ReadFileThread(start_line, end_line, stream);
fileThreadVO.setStart_line(start_line);
fileThreadVO.setIs(stream);
fileThreadVO.setEnd_line(end_line);
fileThreadVO.setResult(executor.submit(readFileThread).get());
threadVOS.add(fileThreadVO);
}
}
List resultCompleteList = new ArrayList<>(); //整合多個線程的讀取結果
threadVOS.forEach(record->{
List<String> result = record.getResult();
resultCompleteList.addAll(result);
});
boolean isComplete = false;
if (resultCompleteList != null ) {
//校驗行數 由於本項目使用的是讀取行為一個條件 所以只校驗行數 也可以校驗字節
int i = resultCompleteList.size() - lines;
if (i == 0) {
isComplete = true;
}
}
if (!isComplete) {
logger.error(">>>>>====uploadByThread====>>>>>>文件完整性校驗失敗!");
throw new WebsiteBusinessException("The file is incomplete!", ApiReturnCode.HTTP_ERROR.getCode());//自定義異常以及錯誤碼
} else {
return resultCompleteList;
}
}
/**
* 獲取文件行數
* @param is
* @return
* @throws IOException
*/
public int getLineNum(InputStream is) throws IOException {
int line = 0;
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
while (reader.readLine() != null) {
line++;
}
reader.close();
is.close();
return line;
}
}
5. 方法中具體使用
該方法只有對文件,利用線程池多線程的讀取並未寫入,主要業務暫不需要,后續在更。。。