用多線程處理FTP上傳


在進入正文前,先給大家分享一款比較好用的服務器連接工具:
IIS7服務器管理工具是一款windows全系下用於連接並操控基於windows和linux系統的VPS、VNC、FTP等遠程服務器、雲服務器的管理工具。
界面簡單明了,操作易上手,功能強大,支持批量導入服務器,並批量打開,多窗口化管理,除此之外,加載本地硬盤、硬盤映射、加載服務器的聲音,遠程聲卡讀取等功能也一應俱全,完全實現了各類場景使用,對於FTP連接界面,其中FTP文件的定時上傳,定時下載(也可以說定時上傳下載、定時備份)功能,對於經常使用FTP的小伙伴來說,也是非常適用的。
工具支持自動更新,壓縮包只有7.62M,方便簡潔,一步到位。
下載地址:http://fwqglgj.iis7.net/?tscc
簡單的使用步驟可以看下面的截圖,做了詳細標注:

 

#########################################################################################################################

正文

在開發中遇到總站發送命令請求分站將某資源通過FTP上傳過來,也就是總站提取分站的資源問題。並且總站實時可以獲取已經提取了文件的大小的比例。

  思路:1.首先分站要將文件大小告知總站

               2.總站收到文件大小后,根據指定路徑去判斷指定路徑文件夾(分站的文件存儲的位置)下的文件大小,然后和當前文件大小/總大小,就獲取了已經上傳的所占比。

  總站發送請求就不再贅述了,直接說分站接收到請求后的處理,為了防止惡意請求,我們這對每次請求都加了“鹽”,那么分站接收時需要判斷是否帶“鹽”了,如果沒有,則不予處理,返回錯誤。

  分站接收請求contorller

    @RequestMapping("/fileSize.htm")
    @ResponseBody
    public String getFileSize(HttpServletRequest request,HttpServletResponse response,String json) {
        log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "獲取資源文件大小(字節)------start");
        /*獲取參數--start*/
        String checkCode = "see-P2C";//校驗碼
        JSONObject jsonObject = new JSONObject(json);
        String tranno =jsonObject.getString("tranno"); //交易流水號
        String isCheckcode = jsonObject.getString("checkCode"); //校驗碼
        checkCode = tranno.substring(tranno.length()-6,tranno.length())+checkCode;
        String mD5CheckCode = MD5Util.encrypt(checkCode);   
        Long ids =jsonObject.getLong("sid"); //資源ID
        Map<String, Object>map = new HashMap<String, Object>();
        map.put("ids", ids);
        map.put("tranno", tranno);
        map.put("isCheckcode", isCheckcode);
        map.put("mD5CheckCode", mD5CheckCode);
        log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "獲取資源文件大小(字節)------end");
        return this.transRecordService.fileSizeHandle(map);
    }

  TransRecordServiceImpl.java中我們處理這些請求,首先是總站第一遍請求時,分站會判斷有沒有這個文件,如果沒有返回錯誤,如果有資源分站會告訴總站我的文件有多大、我會把文件放到你的FTP的哪個文件夾下(路徑)。

    /**
     * @desc 獲取文件總大小
     * @author zp
     */
    @Override
    public String fileSizeHandle(Map<String, Object> map) {
        // TODO Auto-generated method stub
        log.info(this.getClass().getSimpleName() + "." + Thread.currentThread().getStackTrace()[1].getMethodName() + "(獲取視頻總大小)----start");
        String tranno = map.get("tranno").toString();
        Long ids = (Long)map.get("ids");
        String mD5CheckCode = map.get("mD5CheckCode").toString();
        String isCheckcode = map.get("isCheckcode").toString();
        /*交易記錄表插入信息--start*/
        TranCodeModel tranCodeModel = new TranCodeModel();
        tranCodeModel.setTrandesc("總站提取分站視頻資源");
        tranCodeModel.setUrl("parentMsg/fileSizeHandle.htm");
        tranCodeModel.setProjectphase("2");
        tranCodeModel.setRspcode("00");
        tranCodeModel.setRspinfo("交易成功");
        tranCodeModel.setTranconent("提取資源(resourceid="+ids+")");
        tranCodeModel.setTranno(tranno);
        tranCodeModel.setCreatetime(DateUtil.getCurrentDateString("YYYY-MM-dd HH:mm:ss"));
        /*交易記錄表插入信息--end*/ 
        if(!mD5CheckCode.equals(isCheckcode)){ // 身份驗證失敗
            tranCodeModel.setRspcode("01");
            tranCodeModel.setRspinfo("身份驗證失敗");
            this.insertdata(tranCodeModel);
            return "{\"rspcode\":\"01\",\"rspinfo\":\"identifyError\"}";
        }
        if(this.vertifyTranNo(tranno)>0){ // 流水號重復
            tranCodeModel.setRspcode("01");
            tranCodeModel.setRspinfo("交易流水號重復");
            this.insertdata(tranCodeModel);
            return "{\"rspcode\":\"01\",\"rspinfo\":\"trannoRepeat\"}";
        }
        Map<String, Object> resourceMap = this.resourceService.findById(ids);
        if (Validator.isNull(resourceMap)||resourceMap==null) { // 資源被刪除
            tranCodeModel.setRspcode("01");
            tranCodeModel.setRspinfo("資源被刪除");
            this.insertdata(tranCodeModel);
            return "{\"rspcode\":\"01\",\"rspinfo\":\"delresource\"}";
        }
        if ("-1".equals(resourceMap.get("isextract").toString())) { // 提取異常
            tranCodeModel.setRspcode("01");
            tranCodeModel.setRspinfo("提取異常");
            this.insertdata(tranCodeModel);
            return "{\"rspcode\":\"01\",\"rspinfo\":\"errvideo\"}";
        }
        List<ResourceFileModel>rFiles = this.rFileService.findGroupByResourceId(ids);
        if (rFiles.size()==0) { // 視頻資源不存在
            tranCodeModel.setRspcode("01");
            tranCodeModel.setRspinfo("nofiles");
            this.insertdata(tranCodeModel);
            return "{\"rspcode\":\"01\",\"rspinfo\":\"novideo\"}";
        }
        Long filesize = (long)0;
        for (ResourceFileModel resourceFileModel : rFiles) {
            filesize += Long.parseLong(resourceFileModel.getSize());
        }
        String uuid = UUID.randomUUID().toString().replace("-", "");
        log.info(this.getClass().getSimpleName() + "." + Thread.currentThread().getStackTrace()[1].getMethodName() + "(獲取視頻總大小)----end"); 
        return "{\"rspcode\":\"00\",\"rspinfo\":\"success\",\"filesize\":\""+filesize+"\",\"uuid\":\""+uuid+"\"}"; 
    }

  總站接收到分站第一次成功返回信息之后,然后通知分站,你可以開始上傳了,總站也會將文件總大小進行保存,然后開始根據分站提供的文件路徑進行實時查詢文件大小,然后進行比對。

  分站接收到總站回信后,開始上傳文件(總站的回信也是加鹽的哦)。

  分站接收總站的第二次請求contorller

/**
     * @desc 提取資源
     * @author zp
     * @throws IOException 
     * @date 2018-3-16
     */
    @RequestMapping("/pickup.htm")
    @ResponseBody
    public String pickup(HttpServletRequest request,HttpServletResponse response,String json) throws IOException{
        log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "提取資源------start"); 
        
        /*獲取參數--start*/
        String checkCode = "see-P2C";//校驗碼
        JSONObject jsonObject = new JSONObject(json);
        String tranno =jsonObject.getString("tranno"); //交易流水號
        String isCheckcode = jsonObject.getString("checkCode"); //校驗碼
        checkCode = tranno.substring(tranno.length()-6,tranno.length())+checkCode;
        String mD5CheckCode = MD5Util.encrypt(checkCode);   
        Long ids =jsonObject.getLong("sid"); //資源ID
        String ftpuser = jsonObject.getString("ftpuser");// ftp帳號
        String ftppwd = jsonObject.getString("ftppwd");// ftp密碼
        String ftpport = jsonObject.getString("ftpport");// ftp端口
        String ftpIP = jsonObject.getString("ftpIP");// ftpIP
        String uuid = jsonObject.getString("uuid");// uuid
        Map<String, Object>map = new HashMap<String, Object>();
        map.put("ids", ids);
        map.put("tranno", tranno);
        map.put("isCheckcode", isCheckcode);
        map.put("mD5CheckCode", mD5CheckCode);
        map.put("ftpuser", ftpuser);
        map.put("ftppwd", ftppwd);
        map.put("ftpIP", ftpIP);
        map.put("ftpport", ftpport);
        map.put("uuid", uuid);
        return this.transRecordService.pickHandle(map);
    }

  TransRecordServiceImpl.java中,總站會提供自己的FTP的相關信息。然后開始上傳。里邊也會處理鹽是否正確,是否有該文件等操作。 

/**
     * @desc 處理總站提取請求
     * @author zp
     * @throws IOException 
     * @date 2018-3-19
     */
    @Override
    public String pickHandle(Map<String, Object>map) throws IOException {
        String tranno = map.get("tranno").toString();
        Long ids = (Long)map.get("ids");
        String mD5CheckCode = map.get("mD5CheckCode").toString();
        String isCheckcode = map.get("isCheckcode").toString();
        String ftpuser = map.get("ftpuser").toString();
        String ftppwd = map.get("ftppwd").toString();
        String ftpIP = map.get("ftpIP").toString();
        String ftpport = map.get("ftpport").toString();
        /*交易記錄表插入信息--start*/
        TranCodeModel tranCodeModel = new TranCodeModel();
        tranCodeModel.setTrandesc("總站提取分站視頻資源");
        tranCodeModel.setUrl("parentMsg/pickup.htm");
        tranCodeModel.setProjectphase("2");
        tranCodeModel.setRspcode("00");
        tranCodeModel.setRspinfo("交易成功");
        tranCodeModel.setTranconent("提取資源(resourceid="+ids+")");
        tranCodeModel.setTranno(tranno);
        tranCodeModel.setCreatetime(DateUtil.getCurrentDateString("YYYY-MM-dd HH:mm:ss"));
        /*交易記錄表插入信息--end*/ 
        if(!mD5CheckCode.equals(isCheckcode)){ // 身份驗證失敗
            tranCodeModel.setRspcode("01");
            tranCodeModel.setRspinfo("身份驗證失敗");
            this.insertdata(tranCodeModel);
            return "{\"rspcode\":\"01\",\"rspinfo\":\"identifyError\"}";
        }
        if(this.vertifyTranNo(tranno)>0){ // 流水號重復
            tranCodeModel.setRspcode("01");
            tranCodeModel.setRspinfo("交易流水號重復");
            this.insertdata(tranCodeModel);
            return "{\"rspcode\":\"01\",\"rspinfo\":\"trannoRepeat\"}";
        }
        // 開始上傳資源
        // 1.判斷該任務是否錄制完成
        Map<String, Object>  reMap = this.resourceService.findById(ids);
        if (Validator.isNull(reMap)) { // 不存在
            tranCodeModel.setRspcode("01");
            tranCodeModel.setRspinfo("資源已被刪除");
            this.insertdata(tranCodeModel);
            return "{\"rspcode\":\"01\",\"rspinfo\":\"delResouces\"}";
        }
        if (!"3".equals(reMap.get("isextract").toString())&&!"2".equals(reMap.get("isextract").toString())) { // 資源未被提取或已被刪除
            tranCodeModel.setRspcode("01");
            tranCodeModel.setRspinfo("資源還未被提取或已被刪除");
            this.insertdata(tranCodeModel);
            return "{\"rspcode\":\"01\",\"rspinfo\":\"nopick\"}";
        }
        // 2.通過資源ID查找所有的資源信息 
        List<ResourceFileModel>rFiles = this.rFileService.findGroupByResourceId(ids); 
        if (rFiles.size()==0) { // 視頻資源不存在
            tranCodeModel.setRspcode("01");
            tranCodeModel.setRspinfo("nofiles");
            this.insertdata(tranCodeModel);
            return "{\"rspcode\":\"01\",\"rspinfo\":\"novideo\"}";
        }
        // 3.開始上傳
        String filePath = "";
        String arr = "";
        String picktemp = PropertyUtil.getProperty("picktemp"); // 臨時路徑
        String uuid = map.get("uuid").toString();
        picktemp = picktemp+uuid+"\\";
        picktemp = picktemp.replaceAll("\\\\", "/");
        for (ResourceFileModel resourceFileModel : rFiles) {
            filePath = resourceFileModel.getPath();
            filePath = filePath.replaceAll("\\\\", "/"); 
            String path = filePath.replaceAll("\\\\", "/"); 
            String temp = filePath.substring(0,filePath.lastIndexOf("resources/")+10);
            String temp2 =  filePath.substring(filePath.lastIndexOf("resources/")+10,filePath.length());
            temp2 = temp2.substring(0,temp2.indexOf("/")+1);
            filePath = temp + temp2;   
            path = path .substring(filePath.length(),path.length()); 
            path = picktemp + path;
            path = path.replace("/picktemp/", "/resources/");
            resourceFileModel.setPath(path); 
            String url = path.substring(path.indexOf("ResourcesManager/"),path.length()); 
            resourceFileModel.setUrl(url); 
            arr += JSONObject.fromObject(resourceFileModel)+",";
        }
        if (arr.length()>0) {
            arr = arr.substring(0, arr.length()-1);
        }
        // 將文件拷貝到指定的臨時目錄 
        CopyDirectory copyfile = new CopyDirectory();
        copyfile.copyDirectiory(filePath, picktemp); // filePath 源文件  picktemp 目標地址 
        log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "提取資源------end");
        try {
            String[] ip  = ftpIP.split(":");
            ftpIP = ip[0];
            this.starFtp(picktemp, ftpIP, ftpuser, ftppwd, ftpport); 
            return "{\"rspcode\":\"00\",\"rspinfo\":\"success\",\"resource\":["+arr+"]}";  
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
            return "{\"rspcode\":\"01\",\"rspinfo\":\"ftperror\"}"; 
        } 
    } 
     
    /**
     * @desc 開啟線程進行FTP上傳
     * @author zp
     * @date 2018-3-23
     */
    public void starFtp(String picktemp, String ftpIP,String ftpuser,String ftppwd,String ftpport) { 
        log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "准備開啟FTP上傳線程------start");
        //創建一個可重用固定線程數的線程池 
        ExecutorService pool = Executors.newFixedThreadPool(5); 
        //創建實現了Runnable接口對象,Thread對象當然也實現了Runnable接口
        CreateTreadPool t1 = new CreateTreadPool(picktemp, ftpIP, ftpuser, ftppwd, ftpport);
        //將線程放入池中進行執行
        pool.execute(t1); 
        System.out.println(Thread.currentThread().getName()+"進入線程"); 
        //關閉線程池
        pool.shutdown();  
        log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "准備開啟FTP上傳線程------end");
    } 
    

  開啟線程對FTP上傳進行操作,因為考慮到本次操作總站可能不關心上傳過程,所以成功與否就不實時通知總站了~~~~~

public class CreateTreadPool implements Runnable {
    private static Logger log = Logger.getLogger(CreateTreadPool.class);
    // public static ReentrantLock lock=new ReentrantLock();
    //定義信號量,只能5個線程同時訪問  
    final Semaphore semaphore = new Semaphore(5); 
    public static int c=0;
    public String picktemp; //路徑
    public String ftpIP; // ftpip
    public String ftpuser ; // ftp帳號
    public String ftppwd ; // ftp密碼
    public String ftpport; // ftp端口
    // 通過構造方法 獲取FTP相關信息。
    public CreateTreadPool(String picktemp, String ftpIP,String ftpuser,String ftppwd,String ftpport){
        this.picktemp = picktemp;
        this.ftpIP = ftpIP;
        this.ftpuser = ftpuser;
        this.ftppwd = ftppwd;
        this.ftpport = ftpport;
    }
    @Override
    public void run() {
        // TODO Auto-generated method stub
        System.out.println(Thread.currentThread().getName()+"---------上傳准備中---------"); 
        try {
            //獲取許可  
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName()+"獲得鎖");
            System.out.println(Thread.currentThread().getName()+"====>"+c); 
            boolean isSuccess = this.FTPUpload(picktemp, ftpIP, ftpuser, ftppwd, ftpport);
            if (isSuccess) {
                System.out.println(Thread.currentThread().getName()+"---------上傳成功---------");  
            }else {
                System.out.println(Thread.currentThread().getName()+"---------上傳失敗---------"); 
            } 
            c++;
        } catch (Exception e) { 
            System.out.println(Thread.currentThread().getName()+"---------上傳失敗---------");
            e.printStackTrace();
        }finally{
             System.out.println(Thread.currentThread().getName()+"------------釋放鎖");
             semaphore.release();
        }
    }
    /**
     * @desc FTP上傳
     * @author zp
     * @date 2018-3-23
     */
    public boolean FTPUpload(String filePath, String serverIp,String ftpuser,String ftppwd,String ftpport) {
        log.info(this.getClass().getSimpleName() + "." + Thread.currentThread().getStackTrace()[1].getMethodName() + "(給總站FTP上傳視頻)----start");
        FtpUtil ftpTool = new FtpUtil(ftpuser, ftppwd, serverIp, ftpport); // 創建鏈接
        boolean linkStatus = ftpTool.connectServer(ftpuser, ftppwd, serverIp, ftpport);
        File file = new File(filePath); // 實例化
        UploadListener listener = new UploadListener(ftpTool.client); // 創建監聽
        ftpTool.ftpUploadFolder(file, listener); // 上傳
        ftpTool.disconnect(); // 關閉
        RemoveFilesUtil removeFilesUtil = new RemoveFilesUtil();
        removeFilesUtil.DeleteFolder(filePath); // 移除本地臨時上傳的文件
        log.info(this.getClass().getSimpleName() + "." + Thread.currentThread().getStackTrace()[1].getMethodName() + "(給總站FTP上傳視頻)----end");
        return linkStatus;
    } 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM