#文件接收
1、存在形式:web服務,可以跨平台部署
2、自定義配置
a、可設置文件保存路徑
b、可設置單個文件大小
c、可設置單次請求的文件總大小
#pom配置
<!-- 添加依賴 --> <dependencies> <!--部署成war包時開啟↓↓↓↓--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-jasper</artifactId> <scope>provided</scope> </dependency> <!--部署成war包時開啟↑↑↑↑--> </dependencies>
#配置文件
server.port=8000 server.servlet.context-path=/nb-file-server spring.servlet.multipart.max-request-size=1024MB spring.servlet.multipart.max-file-size=1024MB file.upload.path=C:\\test\\tmp3
#控制層
@RestController @RequestMapping("/file") public class FileController { @Value("${file.upload.path}") private String path; @PostMapping(value = "/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) public boolean upload(@RequestPart("file") MultipartFile file) { if (file == null) { return false; } try { File parent = new File(path); if (!parent.exists()) { boolean mkdirs = parent.mkdirs(); if (!mkdirs) { return false; } } // 保存文件 file.transferTo(new File(parent.getAbsolutePath() + File.separator + file.getOriginalFilename())); return true; } catch (Exception e) { e.printStackTrace(); } return false; } }
#啟動類
@SpringBootApplication(exclude = DruidDataSourceAutoConfigure.class) public class FileServerApplication extends SpringBootServletInitializer { public static void main(String[] args) { SpringApplication.run(FileServerApplication.class, args); } @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) { return builder.sources(FileServerApplication.class); } }
#文件上傳
1、存在形式:web服務,可以跨平台部署
2、文件監控:使用apache下commons-io.jar包,繼承FileAlterationListenerAdaptor類定義一個監聽器,創建FileAlterationObserver觀察者,將監聽器注入到觀察者,當文件發生變化,觀察者會調用監聽器的方法。
FileAlterationListenerAdaptor和FileAlterationObserver都屬於org.apache.commons.io.monitor 包下的類,這個包的作用是監控指定目錄下的文件狀態,它使用觀察者設計模式設計這些類的關系。
a、可設置監聽路徑
b、可設置監聽間隔
c、可設置監聽指定格式,支持開啟和關閉配置,如果開啟了指定格式,最終文件只上傳指定格式的文件
3、掃描功能:為了解決歷史文件問題
a、可以設置掃描頻率,即每隔多久掃一次,只適合移動文件模式
b、支持開啟和關閉配置
c、掃描到的文件采用並行流(利用CPU的多核)的方式,將任務交給線程池去執行
4、文件上傳:
a、可以設置上傳時間段
b、可以設置只上傳修改日期(包含)之后的文件
c、上傳模式[0-復制文件 1-移動文件]
d、采用feign+okhttp完成,穩定高效,通過連接池來減小響應延遲,還有透明的GZIP壓縮,請求緩存等優勢。
e、使用線程池,異步多線程去執行,提高執行效率
#pom配置
<!-- 添加依賴 --> <dependencies> <!--部署成war包時開啟↓↓↓↓--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-jasper</artifactId> <scope>provided</scope> </dependency> <!--部署成war包時開啟↑↑↑↑--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <!-- feign底層采用的http請求方式 不加則默認使用JDK的HttpURLConnection --> <dependency> <groupId>io.github.openfeign</groupId> <artifactId>feign-okhttp</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> </dependency> </dependencies>
#配置文件
server.port=8001
feign.okhttp.enabled=true
logging.level.com.nubomed.apiservice.client=debug
#文件上傳目標服務器地址
endpoint.file.server=http://192.168.1.220:8000/nb-file-server
#監聽路徑
monitor.dir=C:\\test\\tmp
#監聽間隔,單位毫秒
monitor.interval=5000
#是否監聽指定文件格式
monitor.file.suffix.enable=false
#監聽文件格式
monitor.file.suffix=.txt
#文件修改日期,只上傳修改日期(包含)之后的文件
file.upload.date.after=2020-06-05
#上傳文件時間段
file.upload.time.slot=00:00-23:00
#0-復制文件 1-移動文件
file.operate.mode=1
#是否開啟主動掃描功能,[0-復制文件]模式下,不會執行文件上傳操作
scanner.enable=true
#掃描頻率,多少分鍾執行一次
scanner.rate=1
#配置類-線程池
@Configuration @EnableAsync public class ExecutorConfig { @Bean public Executor asyncServiceExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心線程數 executor.setCorePoolSize(5); //配置最大線程數 executor.setMaxPoolSize(10); //配置隊列大小 executor.setQueueCapacity(1000); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("nb-file-client-async-"); // rejection-policy:當pool已經達到max size的時候,如何處理新任務 // CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執行初始化 executor.initialize(); return executor; } }
#配置類-feign
@Configuration public class FeignConfig { @Bean Logger.Level feignLoggerLevel() { //記錄請求和響應的標頭,正文和元數據 return Logger.Level.FULL; } }
#文件監聽器
public class FileListener extends FileAlterationListenerAdaptor { private ListenerService listenerService; public FileListener(ListenerService listenerService) { this.listenerService = listenerService; } @Override public void onFileCreate(File file) { listenerService.handleFileCreate(file); } @Override public void onFileChange(File file) { listenerService.handleFileChange(file); } @Override public void onFileDelete(File file) { } @Override public void onDirectoryCreate(File directory) { } @Override public void onDirectoryChange(File directory) { } @Override public void onDirectoryDelete(File directory) { } @Override public void onStart(FileAlterationObserver observer) { } @Override public void onStop(FileAlterationObserver observer) { } }
#文件監聽工廠類
@Component public class FileListenerFactory { @Value("${monitor.dir}") private String monitorDir; @Value("${monitor.interval}") private long interval; @Value("${monitor.file.suffix.enable}") private boolean enable; @Value("${monitor.file.suffix}") private String suffix; @Autowired private ListenerService listenerService; public FileAlterationMonitor getMonitor() { FileAlterationObserver observer = null; // 創建過濾器 if (enable) { IOFileFilter directories = FileFilterUtils.and(FileFilterUtils.directoryFileFilter(), HiddenFileFilter.VISIBLE); IOFileFilter files = FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter(suffix)); IOFileFilter filter = FileFilterUtils.or(directories, files); //裝配過濾器 observer = new FileAlterationObserver(new File(monitorDir), filter); } else { observer = new FileAlterationObserver(new File(monitorDir)); } // 向監聽者添加監聽器,並注入業務服務 observer.addListener(new FileListener(listenerService)); // 返回監聽者 return new FileAlterationMonitor(interval, observer); } }
#文件監聽服務接口
public interface ListenerService { /** * 監聽文件創建 * * @param file 創建的文件 */ void handleFileCreate(File file); /** * 監聽文件修改 * * @param file 修改的文件 */ void handleFileChange(File file); /** * 執行文件掃描 */ void handleScanner(); }
#文件監聽服務接口實現
@Service @Slf4j public class ListenerServiceImpl implements ListenerService { @Resource private AsyncService asyncService; @Value("${file.operate.mode}") private Integer mode; @Value("${file.upload.time.slot}") private String timeSlot; @Value("${monitor.dir}") private String dir; @Value("${monitor.file.suffix.enable}") private boolean enableSuffix; @Value("${monitor.file.suffix}") private String suffix; @Override public void handleFileCreate(File file) { log.info("發現新的文件[{}]...", file.getName()); if (isHandleTimeSlot()) { //asyncService.handleFileUpload(file); } } @Override public void handleFileChange(File file) { log.info("發現文件[{}]修改了...", file.getName()); if (isHandleTimeSlot()) { asyncService.handleFileUpload(file); } } @Override public void handleScanner() { if (mode == 0) { log.info("[復制文件模式]不執行掃描操作!"); return; } log.info("開始掃描目錄[{}]文件...", dir); File file = new File(dir); File[] files = file.listFiles(); if (files == null || files.length == 0) { log.info("目錄[{}]下沒有發現文件!", dir); return; } log.info("已掃描到[{}]個文件", files.length); if (enableSuffix) { log.info("已開啟掃描[{}]格式文件", suffix); List<File> fileList = Arrays.stream(files) .filter(file1 -> file1.getName().contains(suffix)).collect(Collectors.toList()); log.info("[{}]格式文件有[{}]個", suffix, fileList.size()); fileList.parallelStream().forEach(file12 -> asyncService.handleFileUpload(file12)); return; } Arrays.stream(files).parallel().forEach(file13 -> asyncService.handleFileUpload(file13)); } private LocalDateTime parseStringToDateTime(String time) { String[] split = time.split(":"); return LocalDateTime.of(LocalDate.now(), LocalTime.of(Integer.valueOf(split[0]), Integer.valueOf(split[1]), 0)); } private boolean isHandleTimeSlot() { String[] split = timeSlot.split("-"); LocalDateTime startTime = parseStringToDateTime(split[0]); LocalDateTime endTime = parseStringToDateTime(split[1]); LocalDateTime now = LocalDateTime.now(); if (now.isBefore(startTime) || now.isAfter(endTime)) { log.info("文件上傳的時間段為[{}]", timeSlot); return false; } return true; } }
#文件上傳服務接口
public interface AsyncService { /** * 執行異步任務-上傳文件 * * @param file 目標文件 */ void handleFileUpload(File file); }
#文件上傳服務實現
@Service @Slf4j public class AsyncServiceImpl implements AsyncService { private static final Map<String, Boolean> PROCESS_MAP = new ConcurrentHashMap<>(); @Resource private FileUploadClient fileUploadClient; @Value("${file.operate.mode}") private Integer mode; @Override @Async("asyncServiceExecutor") public void handleFileUpload(File file) { log.info("當前線程[{}]", Thread.currentThread().getName()); if (PROCESS_MAP.get(file.getName()) != null && PROCESS_MAP.get(file.getName())) { log.info("文件[{}]正在被處理中...", file.getName()); return; } PROCESS_MAP.put(file.getName(), true); if (!file.exists()) { log.info("文件[{}]已被處理!", file.getName()); return; } long start = System.currentTimeMillis(); log.info("文件[{}]正在上傳...", file.getName()); boolean result = false; try { MultipartFile multipartFile = new MockMultipartFile("file", file.getName(), MediaType.MULTIPART_FORM_DATA_VALUE, new FileInputStream(file)); result = fileUploadClient.upload(multipartFile); if (result) { //移動文件 if (mode == 1) { log.info("開始刪除文件[{}]...", file.getName()); boolean delete = file.delete(); if (delete) { log.info("文件[{}]刪除成功!", file.getName()); } else { log.error("文件[{}]刪除失敗!", file.getName()); } } } } catch (Exception e) { e.printStackTrace(); } long end = System.currentTimeMillis(); long cost = end - start; if (result) { log.info("文件[{}]上傳成功!耗時[{}]ms", file.getName(), cost); } else { log.error("文件[{}]上傳失敗!耗時[{}]ms", file.getName(), cost); } PROCESS_MAP.remove(file.getName()); } }
#啟動類
@SpringBootApplication(exclude = DruidDataSourceAutoConfigure.class) @EnableFeignClients @EnableScheduling @Slf4j public class FileClientApplication extends SpringBootServletInitializer implements CommandLineRunner { @Autowired private FileListenerFactory fileListenerFactory; @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; @Autowired private ListenerService listenerService; @Value("${scanner.enable}") private boolean enable; @Value("${scanner.rate}") private String rate; public static void main(String[] args) { SpringApplication.run(FileClientApplication.class, args); } @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) { return builder.sources(FileClientApplication.class); } @Override public void run(String... args) throws Exception { try { // 創建監聽者 FileAlterationMonitor fileAlterationMonitor = fileListenerFactory.getMonitor(); fileAlterationMonitor.start(); if (enable) { log.info("啟動主動掃描功能,掃描頻率[{}]分鍾執行一次", rate); String cron = String.format("0/59 0/%s * * * ?", rate); threadPoolTaskScheduler.schedule(() -> listenerService.handleScanner(), new CronTrigger(cron)); } else { log.info("未啟動主動掃描功能,原因[scanner.enable={}]", enable); } } catch (Exception e) { e.printStackTrace(); } } }