Java異步調用實現並發上傳下載SMB共享文件


Java異步調用實現並發上傳下載SMB共享文件

選擇異步

通常情況下,上傳(下載)多個SMB共享文件這類任務之間不存在依賴關系,可以考慮通過異步調用的方式來實現上傳(下載)的並發執行,來充分利用系統資源以提高計算機的處理能力。

來看一下以下載為例該程序最后的運行日志:

其中最直接的體現,同一組大概3個G的文件,異步執行下載SMB共享文件比非異步所用時間更少,提高了下載效率。下面展示完整的程序代碼。

所需依賴

<!-- 建議使用SMB的1.2.6版本比較穩定 -->
<dependency> 
  <groupId>jcifs</groupId>  
  <artifactId>jcifs</artifactId>  
  <version>1.2.6</version> 
</dependency>
<!-- 日志打印需要引入slf4j日志的依賴 -->
<dependency> 
  <groupId>org.slf4j</groupId>  
  <artifactId>slf4j-api</artifactId>  
  <version>1.7.25</version> 
</dependency>

服務類

共享文件操作類:

  • 使用@Async注解標注uploadSmbFileAsync方法和downloadSmbFileAsync方法為需要異步執行的方法。
  • 建議將jcifs.smb.client.dfs.disabled屬性設置為true,默認值為false,在非域環境中,此屬性可能很重要,其中基於域的DFS引用通常在JCIFS首次嘗試解析路徑時運行,這將導致超時,從而導致長啟動延遲。
package com.example.smb.service;
/**
 * @author: 博客「成猿手冊」
 * @description: 對共享文件的操作
 * @date: 2020/3/24
 */
@Component
public class SmbFileOperator {
    private final Logger logger = LoggerFactory.getLogger(SmbFileOperator.class);

    public SmbFileOperator() {
        System.setProperty("jcifs.smb.client.dfs.disabled", "true");
    }

    @Async
    public Future<String> uploadSmbFileAsync(File file, SmbFile smbFile) throws IOException {
        this.uploadSmbFile(file, smbFile);
        return new AsyncResult<>(smbFile.getPath());
    }

    @Async
    public Future<String> downloadSmbFileAsync(File file, SmbFile smbFile) throws IOException {
        this.downloadSmbFile(file, smbFile);
        return new AsyncResult<>(smbFile.getPath());
    }

    public void uploadSmbFile(File file, SmbFile smbfile) throws IOException {
        logger.info("SMB文件上傳開始:{} --> {};size:{}",
                file.getPath(), smbfile.getPath(), file.length());
        try {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            OutputStream out = new BufferedOutputStream(new SmbFileOutputStream(smbfile));
            FileCopyUtils.copy(in, out);
            logger.info("SMB文件上傳成功:{} --> {}", file.getPath(), smbfile.getPath());
        } catch (IOException e) {
            logger.error("SMB文件上傳失敗:{} --> {}", file.getPath(), smbfile.getPath(), e);
            throw e;
        }
    }

    public void downloadSmbFile(File file, SmbFile smbfile) throws IOException {
        logger.info("SMB文件下載開始:{} --> {};size:{}",
                file.getPath(), smbfile.getPath(), file.length());
        try {
            InputStream in = new BufferedInputStream(new SmbFileInputStream(smbfile));
            OutputStream out = new BufferedOutputStream(new FileOutputStream(file));
            FileCopyUtils.copy(in, out);
            logger.info("SMB文件下載成功:{} --> {}", smbfile.getPath(),file.getPath());
        } catch (IOException e) {
            logger.error("SMB文件下載失敗:{} --> {}", smbfile.getPath(),file.getPath(),e);
            throw e;
        }
    }
}

定義異步操作接口:

  • timeout:為等待的最長時間數;timeUnit 超時參數的時間單位(秒,分,小時等)。
package com.example.smb.service;
/**
 * @author: 博客「成猿手冊」
 * @description: 異步操作接口
 * @date: 2020/3/26
 */
public interface SmbExecuteAsync {
    void downloadSmb(SmbInfo smbFileInfo, long timeout, TimeUnit timeUnit) throws BaseException,IOException;

    void uploadSmb(SmbInfo smbFileInfo, long timeout, TimeUnit timeUnit) throws BaseException,IOException;
}

實現異步操作接口:

  • 使用ThreadLocal規避多線程訪問出現線程不安全的方法:每個線程通過ThreadLocalset()get方法確保對變量smbFileTaskThreadLocal進行訪問的時候訪問的都是該線程自己的變量。
  • generateDownloadSmbFileTask通過遞歸方式獲取文件夾下需要下載的文件完整路徑(上傳類似)。
package com.example.smb.service.impl;
/**
 * @author: 博客「成猿手冊」
 * @description: 異步操作接口實現
 * @date: 2020/3/26
 */
@Service("SmbExecuteAsync")
public class SmbExecuteAsyncImpl implements SmbExecuteAsync {
    @Autowired
    private SmbInstance smbInstance;

    @Autowired
    private SmbFileOperator smbFileOperator;

    private final Logger logger = LoggerFactory.getLogger(SmbExecuteAsyncImpl.class);
    private final ThreadLocal<List<SmbFileTask>> smbFileTaskThreadLocal = new ThreadLocal<>();

    @Override
    public void downloadSmb(SmbInfo smbInfo, long timeout, TimeUnit timeUnit) throws BaseException, IOException {
        File file = new File(smbInfo.getLocalDirPath());
        if (!file.exists()) {
            throw new BaseException("本地文件不存在:" + smbInfo.getLocalDirPath());
        }
        SmbFile smbFile = smbInstance.getSmbFileInstance(
                smbInfo.getIp(), smbInfo.getUserName(), smbInfo.getPassWord(), smbInfo.getSmbFilePath());
        if (!smbFile.exists()) {
            throw new BaseException("SMB共享文件不存在:" + smbFile.getPath());
        }
        smbFileTaskThreadLocal.set(new ArrayList<>());
        List<Future<String>> futures = new ArrayList<>();
        try {
            generateDownloadSmbFileTask(smbFile, smbInfo, smbInfo.getLocalDirPath(), 1);
            List<SmbFileTask> tasks = smbFileTaskThreadLocal.get();
            Date begindate = new Date();
            for (SmbFileTask smbFileTask : tasks) {
                //將線程儲存類ThreadLocal內的多個任務全都通過異步下載方法執行
                Future<String> future = smbFileOperator.downloadSmbFileAsync(smbFileTask.getFile(), smbFileTask.getSmbFile());
                futures.add(future);
            }
            for (Future<String> future : futures) {
                future.get(timeout, timeUnit);
            }
            Date enddate = new Date();
            logger.info("下載任務總耗時間" + (enddate.getTime() - begindate.getTime()) / 1000 + "秒");
        } catch (InterruptedException | ExecutionException e) {
            logger.error("SMB共享文件下載存在異常:{}", e);
            cancleFuture(futures);
            throw new BaseException("SMB共享文件下載存在異常,強制中斷:", e);
        } catch (TimeoutException e) {
            logger.error("SMB共享文件下載任務超時:{}", e);
            cancleFuture(futures);
            throw new BaseException("SMB共享文件下載存在異常,強制中斷:", e);
        } finally {
            smbFileTaskThreadLocal.remove();
        }
    }

    @Override
    public void uploadSmb(SmbInfo smbInfo, long timeout, TimeUnit timeUnit) throws BaseException, IOException{
        File file = new File(smbInfo.getLocalDirPath());
        if (!file.exists()) {
            throw new BaseException("本地文件不存在:" + smbInfo.getLocalDirPath());
        }
        SmbFile smbFile = smbInstance.getSmbFileInstance(
                smbInfo.getIp(), smbInfo.getUserName(), smbInfo.getPassWord(), smbInfo.getSmbFilePath());
        if (!smbFile.exists()) {
            smbFile.mkdir();
        }
        smbFileTaskThreadLocal.set(new ArrayList<>());
        List<Future<String>> futures = new ArrayList<>();
        try {
            NtlmPasswordAuthentication auth = new NtlmPasswordAuthentication(smbInfo.getIp(),smbInfo.getUserName(),smbInfo.getPassWord());
            generateUploadSmbFileTask(file, smbInfo, smbFile.getURL().toString() + file.getName(),auth);
            List<SmbFileTask> tasks = smbFileTaskThreadLocal.get();
            Date begindate = new Date();
            for (SmbFileTask smbFileTask : tasks) {
                //將線程儲存類ThreadLocal內的多個任務全都通過異步下載方法執行
                Future<String> future = smbFileOperator.uploadSmbFileAsync(smbFileTask.getFile(), smbFileTask.getSmbFile());
                futures.add(future);
            }
            for (Future<String> future : futures) {
                future.get(timeout, timeUnit);
            }
            Date enddate = new Date();
            logger.info("上傳任務總耗時間" + (enddate.getTime() - begindate.getTime()) / 1000 + "秒");
        } catch (InterruptedException | ExecutionException e) {
            logger.error("SMB共享文件上傳存在異常:{}", e);
            cancleFuture(futures);
            throw new BaseException("SMB共享文件上傳存在異常,強制中斷:", e);
        } catch (TimeoutException e) {
            logger.error("SMB共享文件上傳任務超時:{}", e);
            cancleFuture(futures);
            throw new BaseException("SMB共享文件上傳存在異常,強制中斷:", e);
        } finally {
            smbFileTaskThreadLocal.remove();
        }
    }

    private void generateUploadSmbFileTask(File file, SmbInfo smbInfo, String smbFatherDir, NtlmPasswordAuthentication auth)throws BaseException, IOException{
        if (file.isFile()) {
            SmbFile smbFile = new SmbFile(smbFatherDir,auth);
            smbFileTaskThreadLocal.get().add(new SmbFileTask(file, smbFile));
        } else if (file.isDirectory()) {
            SmbFile smbFile = new SmbFile(smbFatherDir,auth);
            if (!smbFile.exists() || !smbFile.isDirectory()) {
                smbFile.mkdir();
            }
            File[] files = file.listFiles();
            if (files != null && files.length > 0) {
                for (File f : files) {
                    //遞歸調用本方法 以獲取完整任務
                    generateUploadSmbFileTask(f, smbInfo,smbFatherDir+"/"+f.getName(),auth);
                }
            }
         }
    }

    private void generateDownloadSmbFileTask(SmbFile smbFile, SmbInfo smbInfo, String fatherDir, int depath) throws IOException,BaseException{
        if (depath > 10) {
            logger.warn("共享目錄遍歷已超過10層目錄" + fatherDir);
        }
        if (smbFile.isFile()) {
            //如果是文件則新建任務並添加至線程ThreadLocal中
            String localFilePath = String.format("%s/%s", fatherDir, smbFile.getName());
            File localFile = new File(localFilePath);
            smbFileTaskThreadLocal.get().add(new SmbFileTask(localFile, smbFile));
        } else if (smbFile.isDirectory()) {
            //如果是文件夾,則在本地新建
            String localDirPath = fatherDir + "/" + smbFile.getName();
            logger.info("SMB共享文件目錄開始下載:{} --> {}", smbFile.getPath(), localDirPath);
            boolean result = new File(localDirPath).mkdir();
            logger.info("文件夾創建成功:{},創建標識為:{}", localDirPath, result);
            //獲取該目錄下所有文件和目錄的絕對路徑
            SmbFile[] smbFiles = smbFile.listFiles();
            if (smbFiles != null && smbFiles.length > 0) {
                for (SmbFile s : smbFiles) {
                    //遞歸調用本方法 以獲取完整任務
                    generateDownloadSmbFileTask(s, smbInfo, localDirPath, depath + 1);
                }
            }
        }
    }

    private void cancleFuture(List<Future<String>> futures) {
        for (Future<String> future : futures) {
            boolean result = future.cancel(true);
            logger.info("SMB共享文件操作取消,取消結果為{}", result);
        }
    }
}

實體類

共享文件信息類:

package com.example.smb.entity;
import org.springframework.stereotype.Component;
/**
 * @author: 博客「成猿手冊」
 * @description: 記錄共享文件相關信息
 * @date: 2020/3/22
 */
@Component
public class SmbInfo {

    private String ip;
    private String userName;
    private String passWord;
    private String smbFilePath;
    private String localDirPath;

    public SmbInfo() {
    }

    public SmbInfo(String ip, String userName, String passWord, String smbFilePath, String localDirPath) {
        this.ip = ip;
        this.userName = userName;
        this.passWord = passWord;
        this.smbFilePath = smbFilePath;
        this.localDirPath = localDirPath;
    }
    
    //todo:此處省略get和set方法
}

SMB文件操作任務類:

package com.example.smb.entity;
/**
 * @author: 博客「成猿手冊」
 * @description: Smb任務類
 * @date: 2020/3/26
 */
@Component
public class SmbFileTask {
    private  File file;
    private  SmbFile smbFile;

    public SmbFileTask() {
    }

    public SmbFileTask(File file, SmbFile smbFile) throws BaseException{
        this.file = file;
        this.smbFile = smbFile;
        try{
            if (this.smbFile.isDirectory()){
                throw new BaseException("SmbFileTask不能為目錄類型"+smbFile.getPath());
            }
        } catch (SmbException | BaseException e) {
            throw new BaseException("SmbFileTask不能為目錄類型"+smbFile.getPath(),e);
        }
    }
	//todo:此處省略get和set方法
}

構建SmbFile文件實例:

package com.example.smb.entity;
/**
 * @author: 博客「成猿手冊」
 * @description: 構建SmbFile文件實例
 * @date: 2020/3/26
 */
@Component
public class SmbInstance {
    public SmbFile getSmbFileInstance(String ip, String username, String password, String smbfilepath) throws MalformedURLException {
        NtlmPasswordAuthentication authentication = null;
        //兩種訪問方式:有賬號密碼,無賬號密碼;
        if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
            authentication =
                    new NtlmPasswordAuthentication(ip, username, password);
        }
        String smburl = String.format("smb://%s/%s", ip, smbfilepath);
        return new SmbFile(smburl, authentication);
    }
}

調用方法

自己測試用的文件的目錄結構大概如圖所示:

這里采用了swagger調用controller接口的方式進行了相關測試,代碼示例如下:(也可以直接用單元測試)

package com.example.smb.controller;
/**
 * @author: 博客「成猿手冊」
 * @description: 控制層接口
 * @date: 2020/4/8
 */
@RestController
@RequestMapping("/smbExecute")
@Api(tags = {"SMB操作相關"},
        produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public class SmbController {
    @Autowired
    private SmbExecuteAsync smbExecute;

    @PostMapping(value = "/smbDownload")
    @ApiOperation(value = "SMB下載測試")
    public void smbDownload() throws IOException, BaseException {
        SmbInfo smbInfo =
                new SmbInfo("192.168.1.106", "username",
                        "password", "Test/成猿手冊的smb目標文件夾/", "D:\\LocalTest\\");
        smbExecute.downloadSmb(smbInfo, 1, TimeUnit.HOURS);
    }

    @PostMapping(value = "/smbUpload")
    @ApiOperation(value = "SMB上傳測試")
    public void uploadSmbTest() throws IOException, BaseException {
        SmbInfo smbInfo =
                new SmbInfo("192.168.1.106", "username",
                        "password", "Test/", "D:\\LocalTest\\成猿手冊的smb目標文件夾\\");
        smbExecute.uploadSmb(smbInfo, 1, TimeUnit.HOURS);
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM