文件上傳與文件接收服務


#文件接收

1、存在形式:web服務,可以跨平台部署

2、自定義配置

      a、可設置文件保存路徑

      b、可設置單個文件大小

      c、可設置單次請求的文件總大小

 

#pom配置

<!--  添加依賴  -->
<dependencies>
    <!--部署成war包時開啟↓↓↓↓-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-tomcat</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.tomcat.embed</groupId>
        <artifactId>tomcat-embed-jasper</artifactId>
        <scope>provided</scope>
    </dependency>
    <!--部署成war包時開啟↑↑↑↑-->
</dependencies>

 

#配置文件

server.port=8000
server.servlet.context-path=/nb-file-server
 
spring.servlet.multipart.max-request-size=1024MB
spring.servlet.multipart.max-file-size=1024MB
 
file.upload.path=C:\\test\\tmp3

 

#控制層

@RestController
@RequestMapping("/file")
public class FileController {
 
    @Value("${file.upload.path}")
    private String path;
 
    @PostMapping(value = "/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public boolean upload(@RequestPart("file") MultipartFile file) {
        if (file == null) {
            return false;
        }
 
        try {
            File parent = new File(path);
            if (!parent.exists()) {
                boolean mkdirs = parent.mkdirs();
                if (!mkdirs) {
                    return false;
                }
            }
 
            // 保存文件
            file.transferTo(new File(parent.getAbsolutePath() + File.separator + file.getOriginalFilename()));
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
 
        return false;
    }
 
}

 

#啟動類

@SpringBootApplication(exclude = DruidDataSourceAutoConfigure.class)
public class FileServerApplication extends SpringBootServletInitializer {
 
    public static void main(String[] args) {
        SpringApplication.run(FileServerApplication.class, args);
    }
 
    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
        return builder.sources(FileServerApplication.class);
    }
 
}

 

#文件上傳

1、存在形式:web服務,可以跨平台部署

2、文件監控:使用apache下commons-io.jar包,繼承FileAlterationListenerAdaptor類定義一個監聽器,創建FileAlterationObserver觀察者,將監聽器注入到觀察者,當文件發生變化,觀察者會調用監聽器的方法。

FileAlterationListenerAdaptor和FileAlterationObserver都屬於org.apache.commons.io.monitor 包下的類,這個包的作用是監控指定目錄下的文件狀態,它使用觀察者設計模式設計這些類的關系。

     a、可設置監聽路徑

     b、可設置監聽間隔

     c、可設置監聽指定格式,支持開啟和關閉配置,如果開啟了指定格式,最終文件只上傳指定格式的文件

3、掃描功能:為了解決歷史文件問題

     a、可以設置掃描頻率,即每隔多久掃一次,只適合移動文件模式

     b、支持開啟和關閉配置

     c、掃描到的文件采用並行流(利用CPU的多核)的方式,將任務交給線程池去執行

4、文件上傳:

     a、可以設置上傳時間段

     b、可以設置只上傳修改日期(包含)之后的文件

     c、上傳模式[0-復制文件 1-移動文件]

     d、采用feign+okhttp完成,穩定高效,通過連接池來減小響應延遲,還有透明的GZIP壓縮,請求緩存等優勢。

     e、使用線程池,異步多線程去執行,提高執行效率

 

#pom配置

<!--  添加依賴  -->
<dependencies>
    <!--部署成war包時開啟↓↓↓↓-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-tomcat</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.tomcat.embed</groupId>
        <artifactId>tomcat-embed-jasper</artifactId>
        <scope>provided</scope>
    </dependency>
    <!--部署成war包時開啟↑↑↑↑-->
 
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-openfeign</artifactId>
    </dependency>
    <!-- feign底層采用的http請求方式 不加則默認使用JDK的HttpURLConnection -->
    <dependency>
        <groupId>io.github.openfeign</groupId>
        <artifactId>feign-okhttp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
    </dependency>
</dependencies>

 

#配置文件

server.port=8001
feign.okhttp.enabled=true
logging.level.com.nubomed.apiservice.client=debug
#文件上傳目標服務器地址
endpoint.file.server=http://192.168.1.220:8000/nb-file-server

#監聽路徑
monitor.dir=C:\\test\\tmp
#監聽間隔,單位毫秒
monitor.interval=5000
#是否監聽指定文件格式
monitor.file.suffix.enable=false
#監聽文件格式
monitor.file.suffix=.txt

#文件修改日期,只上傳修改日期(包含)之后的文件
file.upload.date.after=2020-06-05
#上傳文件時間段
file.upload.time.slot=00:00-23:00
#0-復制文件 1-移動文件
file.operate.mode=1
#是否開啟主動掃描功能,[0-復制文件]模式下,不會執行文件上傳操作
scanner.enable=true
#掃描頻率,多少分鍾執行一次
scanner.rate=1

 

#配置類-線程池

@Configuration
@EnableAsync
public class ExecutorConfig {
 
    @Bean
    public Executor asyncServiceExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心線程數
        executor.setCorePoolSize(5);
        //配置最大線程數
        executor.setMaxPoolSize(10);
        //配置隊列大小
        executor.setQueueCapacity(1000);
        //配置線程池中的線程的名稱前綴
        executor.setThreadNamePrefix("nb-file-client-async-");
        // rejection-policy:當pool已經達到max size的時候,如何處理新任務
        // CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執行初始化
        executor.initialize();
        return executor;
    }
 
}

 

#配置類-feign

@Configuration
public class FeignConfig {
 
    @Bean
    Logger.Level feignLoggerLevel() {
        //記錄請求和響應的標頭,正文和元數據
        return Logger.Level.FULL;
    }
 
}

 

#文件監聽器

public class FileListener extends FileAlterationListenerAdaptor {
    private ListenerService listenerService;
 
    public FileListener(ListenerService listenerService) {
        this.listenerService = listenerService;
    }
 
    @Override
    public void onFileCreate(File file) {
        listenerService.handleFileCreate(file);
    }
 
    @Override
    public void onFileChange(File file) {
        listenerService.handleFileChange(file);
    }
 
    @Override
    public void onFileDelete(File file) {
    }
 
    @Override
    public void onDirectoryCreate(File directory) {
    }
 
    @Override
    public void onDirectoryChange(File directory) {
    }
 
    @Override
    public void onDirectoryDelete(File directory) {
    }
 
    @Override
    public void onStart(FileAlterationObserver observer) {
    }
 
    @Override
    public void onStop(FileAlterationObserver observer) {
    }
}

 

#文件監聽工廠類

@Component
public class FileListenerFactory {
 
    @Value("${monitor.dir}")
    private String monitorDir;
 
    @Value("${monitor.interval}")
    private long interval;
 
    @Value("${monitor.file.suffix.enable}")
    private boolean enable;
 
    @Value("${monitor.file.suffix}")
    private String suffix;
 
    @Autowired
    private ListenerService listenerService;
 
    public FileAlterationMonitor getMonitor() {
        FileAlterationObserver observer = null;
 
        // 創建過濾器
        if (enable) {
            IOFileFilter directories = FileFilterUtils.and(FileFilterUtils.directoryFileFilter(), HiddenFileFilter.VISIBLE);
            IOFileFilter files = FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter(suffix));
            IOFileFilter filter = FileFilterUtils.or(directories, files);
            //裝配過濾器
            observer = new FileAlterationObserver(new File(monitorDir), filter);
        } else {
            observer = new FileAlterationObserver(new File(monitorDir));
        }
 
        // 向監聽者添加監聽器,並注入業務服務
        observer.addListener(new FileListener(listenerService));
 
        // 返回監聽者
        return new FileAlterationMonitor(interval, observer);
    }
 
}

 

#文件監聽服務接口

public interface ListenerService {
 
    /**
     * 監聽文件創建
     *
     * @param file 創建的文件
     */
    void handleFileCreate(File file);
 
    /**
     * 監聽文件修改
     *
     * @param file 修改的文件
     */
    void handleFileChange(File file);
 
    /**
     * 執行文件掃描
     */
    void handleScanner();
}

 

#文件監聽服務接口實現

@Service
@Slf4j
public class ListenerServiceImpl implements ListenerService {
 
    @Resource
    private AsyncService asyncService;
 
    @Value("${file.operate.mode}")
    private Integer mode;
 
    @Value("${file.upload.time.slot}")
    private String timeSlot;
 
    @Value("${monitor.dir}")
    private String dir;
 
    @Value("${monitor.file.suffix.enable}")
    private boolean enableSuffix;
 
    @Value("${monitor.file.suffix}")
    private String suffix;
 
    @Override
    public void handleFileCreate(File file) {
        log.info("發現新的文件[{}]...", file.getName());
        if (isHandleTimeSlot()) {
            //asyncService.handleFileUpload(file);
        }
    }
 
    @Override
    public void handleFileChange(File file) {
        log.info("發現文件[{}]修改了...", file.getName());
        if (isHandleTimeSlot()) {
            asyncService.handleFileUpload(file);
        }
    }
 
    @Override
    public void handleScanner() {
        if (mode == 0) {
            log.info("[復制文件模式]不執行掃描操作!");
            return;
        }
 
        log.info("開始掃描目錄[{}]文件...", dir);
        File file = new File(dir);
        File[] files = file.listFiles();
        if (files == null || files.length == 0) {
            log.info("目錄[{}]下沒有發現文件!", dir);
            return;
        }
 
        log.info("已掃描到[{}]個文件", files.length);
 
        if (enableSuffix) {
            log.info("已開啟掃描[{}]格式文件", suffix);
            List<File> fileList = Arrays.stream(files)
                    .filter(file1 -> file1.getName().contains(suffix)).collect(Collectors.toList());
            log.info("[{}]格式文件有[{}]個", suffix, fileList.size());
            fileList.parallelStream().forEach(file12 -> asyncService.handleFileUpload(file12));
            return;
        }
 
        Arrays.stream(files).parallel().forEach(file13 -> asyncService.handleFileUpload(file13));
    }
 
    private LocalDateTime parseStringToDateTime(String time) {
        String[] split = time.split(":");
        return LocalDateTime.of(LocalDate.now(), LocalTime.of(Integer.valueOf(split[0]), Integer.valueOf(split[1]), 0));
    }
 
    private boolean isHandleTimeSlot() {
        String[] split = timeSlot.split("-");
        LocalDateTime startTime = parseStringToDateTime(split[0]);
        LocalDateTime endTime = parseStringToDateTime(split[1]);
        LocalDateTime now = LocalDateTime.now();
        if (now.isBefore(startTime) || now.isAfter(endTime)) {
            log.info("文件上傳的時間段為[{}]", timeSlot);
            return false;
        }
 
        return true;
    }
 
}

 

#文件上傳服務接口

public interface AsyncService {
 
    /**
     * 執行異步任務-上傳文件
     *
     * @param file 目標文件
     */
    void handleFileUpload(File file);
}

 

#文件上傳服務實現

@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {
 
    private static final Map<String, Boolean> PROCESS_MAP = new ConcurrentHashMap<>();
 
    @Resource
    private FileUploadClient fileUploadClient;
 
    @Value("${file.operate.mode}")
    private Integer mode;
 
    @Override
    @Async("asyncServiceExecutor")
    public void handleFileUpload(File file) {
        log.info("當前線程[{}]", Thread.currentThread().getName());
 
        if (PROCESS_MAP.get(file.getName()) != null && PROCESS_MAP.get(file.getName())) {
            log.info("文件[{}]正在被處理中...", file.getName());
            return;
        }
        PROCESS_MAP.put(file.getName(), true);
 
        if (!file.exists()) {
            log.info("文件[{}]已被處理!", file.getName());
            return;
        }
 
        long start = System.currentTimeMillis();
        log.info("文件[{}]正在上傳...", file.getName());
        boolean result = false;
 
        try {
            MultipartFile multipartFile = new MockMultipartFile("file", file.getName(),
                    MediaType.MULTIPART_FORM_DATA_VALUE, new FileInputStream(file));
            result = fileUploadClient.upload(multipartFile);
 
            if (result) {
                //移動文件
                if (mode == 1) {
                    log.info("開始刪除文件[{}]...", file.getName());
                    boolean delete = file.delete();
                    if (delete) {
                        log.info("文件[{}]刪除成功!", file.getName());
                    } else {
                        log.error("文件[{}]刪除失敗!", file.getName());
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
 
        long end = System.currentTimeMillis();
        long cost = end - start;
        if (result) {
            log.info("文件[{}]上傳成功!耗時[{}]ms", file.getName(), cost);
        } else {
            log.error("文件[{}]上傳失敗!耗時[{}]ms", file.getName(), cost);
        }
        PROCESS_MAP.remove(file.getName());
    }
 
}

 

#啟動類

@SpringBootApplication(exclude = DruidDataSourceAutoConfigure.class)
@EnableFeignClients
@EnableScheduling
@Slf4j
public class FileClientApplication extends SpringBootServletInitializer implements CommandLineRunner {
 
    @Autowired
    private FileListenerFactory fileListenerFactory;
 
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;
 
    @Autowired
    private ListenerService listenerService;
 
    @Value("${scanner.enable}")
    private boolean enable;
 
    @Value("${scanner.rate}")
    private String rate;
 
    public static void main(String[] args) {
        SpringApplication.run(FileClientApplication.class, args);
    }
 
    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
        return builder.sources(FileClientApplication.class);
    }
 
    @Override
    public void run(String... args) throws Exception {
        try {
            // 創建監聽者
            FileAlterationMonitor fileAlterationMonitor = fileListenerFactory.getMonitor();
            fileAlterationMonitor.start();
 
            if (enable) {
                log.info("啟動主動掃描功能,掃描頻率[{}]分鍾執行一次", rate);
                String cron = String.format("0/59 0/%s * * * ?", rate);
                threadPoolTaskScheduler.schedule(() -> listenerService.handleScanner(), new CronTrigger(cron));
            } else {
                log.info("未啟動主動掃描功能,原因[scanner.enable={}]", enable);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM