【Python】pyftpdlib 模塊 _ 對FTP搭建及連接使用


安裝三方包

pip install pyftpdlib

 

1、搭建FTP

 搭建簡單的FTP服務器

# 在本地需要創建FTP的目錄執行命令
python -m pyftpdlib
i 指定IP地址(默認為本機的IP地址)

p 指定端口(默認為2121)

w 寫權限(默認為只讀)

d 指定目錄 (默認為當前目錄)

u 指定用戶名登錄

P 設置登錄密碼
其他參數

 

  運行結果

 

 

局域網搭建FTP環境

# -*- coding:utf-8 -*-
# pyftpdlib 搭建ftp服務端
from pyftpdlib.authorizers import DummyAuthorizer
from pyftpdlib.handlers  import FTPHandler
from pyftpdlib.servers import FTPServer

class ftp_tools:
    port = 2121
    user = "admin"
    pwd = "12345"

    # 創建ftp服務端
    def creat_ftp_server(self):
        # 實例化DummyAuthorizer來創建ftp用戶
        authorizer = DummyAuthorizer()
        # 參數:用戶名,密碼,目錄,權限[見拓展部分服務權限]
        authorizer.add_user(self.user, self.pwd, r'D:\FTP', perm='elradfmwMT')
        handler = FTPHandler
        handler.authorizer = authorizer
        # 參數:IP,端口,handler
        server = FTPServer(('0.0.0.0', self.port), handler)  # 設置為0.0.0.0為本機的IP地址
        server.serve_forever()


if __name__ == "__main__":
    ftp_tools().creat_ftp_server()

  運行結果:

 

2、連接FTP

連接FTP

    # 連接ftp
    def conn_ftp(self):
        ftp = FTP()
        # 連接ftp
        ftp.connect(host=self.host, port=self.port, timeout=10)
        # 登錄ftp
        ftp.login(user=self.user, passwd=self.pwd)
        # 關閉ftp
        ftp.close()

 

 

3、上傳和下載

從本地上傳文件到FTP

    def uploadfile(ftppath, localpath):
        ftp = FTP()
        # 連接ftp
        ftp.connect(host=self.host, port=self.port, timeout=10)
        # 登錄ftp
        ftp.login(user=self.user, passwd=self.pwd)
        # 設置的緩沖區大小
        bufsize = 1024
        fp = open(localpath, 'rb')
        # 二進制存儲器
        ftp.storbinary('STOR ' + ftppath, fp, bufsize)
        # 參數為0,關閉調試模式
        ftp.set_debuglevel(0)
        # 關閉文件並退出ftp
        fp.close()
        ftp.quit()    

 

 

從FTP下載文件到本地

    def downloadfile(ftppath, localpath):
        ftp = FTP()
        # 連接ftp
        ftp.connect(host=self.host, port=self.port, timeout=10)
        # 登錄ftp
        ftp.login(user=self.user, passwd=self.pwd)
        # 設置的緩沖區大小
        bufsize = 1024
        fp = open(localpath, 'wb')
        # 二進制文件
        ftp.retrbinary('RETR ' + ftppath, fp.write, bufsize)
        # 參數為0,關閉調試模式
        ftp.set_debuglevel(0)
      # 關閉文件並退出ftp
        fp.close()
     ftp.quit()

 

 封裝

# coding: utf-8
from ftplib import FTP
from loguru import logger as logs


class FTP_tools:
    """FTP 操作工具"""

    def __init__(self, pathType, section='ftp'):
        """
        構造函數

        :param pathType:路徑類型
        :param section:配置文件section名
        """
        
        self.section = section
        self.host = config.get(self.section, "ftp_host")
        self.port = config.get(self.section, "ftp_port")
        self.user = config.get(self.section, "ftp_user")
        self.password = config.get(self.section, "ftp_password")
        self.pathType = pathType

        # 根據功能判斷服務器及本地文件路徑
        if self.pathType == "path1":
            self.server_path = config.get(self.section, "ftp_cloudpath_path1")
            self.local_path = config.get(self.section, "ftp_localpath_path1")
        elif self.pathType == "path2":
            self.server_path = config.get(self.section, "ftp_cloudpath_path2")
            self.local_path = config.get(self.section, "ftp_localpath_path2")
        else:
            logs.error("pathType傳值錯誤,暫且僅支持image、loanfile文件的上傳下載")
            sys.exit()

        logs.info("======== FTP連接開始 ========")
        logs.debug("FTP連接服務器地址 - {}:{}".format(self.host, self.port))
        logs.debug("FTP連接服務器用戶 - {}/{}".format(self.user, self.password))

        # 實例化
        self.ftp = FTP()

    def __del__(self):
        """析構函數 對象資源被釋放時觸發"""
        
        # 關閉並退出ftp
        self.ftp.quit()
        logs.info("======= FTP 操作結束,連接關閉 =======")

    def ftp_upload_file(self, file, timeout: int = 10, bufsize: int = 1024):
        """
        FTP上傳文件,注意:不支持文件夾

        :param file:需要上傳的文件名,如:test.txt
        :param timeout: 超時時間(默認),必須是int類型
        :param bufsize: 緩沖區大小
        :return: 
        """
        
        serveFilePath = os.path.join(self.server_path, file)
        localFilePath = os.path.join(self.local_path, file)
        logs.debug("FTP服務器路徑:{}".format(serveFilePath))
        logs.debug("FTP本地路徑:{}".format(localFilePath))

        try:
            # 連接ftp
            self.ftp.connect(host=self.host, port=eval(self.port), timeout=timeout)
            # 登錄ftp
            self.ftp.login(user=self.user, passwd=self.password)
            with open(localFilePath, 'wb') as fp:
                # fp = open(localFilePath, 'rb')
                # 二進制存儲器
                self.ftp.storbinary('STOR ' + serveFilePath, fp, bufsize)
                # 參數為0,關閉調試模式
                self.ftp.set_debuglevel(0)
        except Exception as e:
            logs.error("FTP上傳操作異常,異常原因:{}".format(e))
        # finally:
        #     # 關閉文件
        #     fp.close()

    def ftp_download_file(self, file, timeout: int = 10, bufsize: int = 1024):
        """
        下載文件,注意:不支持文件夾

        :param file:需要下載的文件名,如:test.txt
        :param timeout: 超時時間(默認),必須是int類型
        :param bufsize: 設置的緩沖區大小
        :return: 
        """
        
        serverFilePath = os.path.join(self.server_path, file)
        localFilePath = os.path.join(self.local_path, "download/"+file)
        logs.debug("FTP服務器路徑:{}".format(serverFilePath))
        logs.debug("FTP本地路徑:{}".format(localFilePath))

        # 判斷下載路徑是否存在
        # logs.debug(os.path.dirname(localFilePath))
        if not os.path.exists(os.path.dirname(localFilePath)):
            os.makedirs(os.path.dirname(localFilePath))

        try:
            # 連接ftp
            self.ftp.connect(host=self.host, port=eval(self.port), timeout=timeout)
            # 登錄ftp
            self.ftp.login(user=self.user, passwd=self.password)
            with open(localFilePath, 'wb') as fp:
                # fp = open(localFilePath, 'wb')
                # 二進制文件
                self.ftp.retrbinary('RETR ' + serverFilePath, fp.write, bufsize)
                # 參數為0,關閉調試模式
                self.ftp.set_debuglevel(0)
        except Exception as e:
            logs.error("FTP下載操作異常,異常原因:{}".format(e))
        # finally:
        #     # 關閉文件
        #     fp.close()


if __name__ == '__main__':
    """run"""

    file = "test01.txt"
    FTP_tools("path1").ftp_download_file(file)
    # FTP_tools("path1").ftp_download_file(file)
ftp_tools.py  

 

拓展


 

perm 權限參數解釋

讀取權限:   "e" =更改目錄(CWD,CDUP命令)
  
  "l" =列表文件(LIST,NLST,STAT,MLSD,MLST,SIZE命令)

  "r" =從服務器檢索文件(RETR命令)
 寫入權限:   "a" =將數據追加到現有文件(APPE命令)

  "d" =刪除文件或目錄(DELE,RMD命令)

  "f" =重命名文件或目錄(RNFR,RNTO命令)

  "m" =創建目錄(MKD命令)

  "w" =將文件存儲到服務器(STOR,STOU命令)

  "M" =更改文件模式/權限(SITE CHMOD命令)

  "T" =更改文件修改時間(SITE MFMT命令)

 

ftp常用函數釋義

from ftplib import FTP                          #加載ftp模塊
ftp=FTP()               #實例化對象 ftp.set_debuglevel(2)               #打開調試級別2,顯示詳細信息 ftp.connect("IP","port")               #連接ftp的ip和端口 ftp.login("user","password")               #連接ftp的用戶名和密碼 print(ftp.getwelcome())               #打印出歡迎信息 ftp.cmd("xxx/xxx")               #進入遠程目錄 bufsize=1024                #設置的緩沖區大小 filename="xxxx.txt"                  #需要下載的文件 file_handle=open(filename,"wb").write             #以寫模式在本地打開文件 ftp.set_debuglevel(0)               #關閉調試模式 ftp.quit()                #退出ftp (發送命令並關閉連接,出現錯誤會拋異常) ftp.close()                            #關閉ftp (單方面強制關閉ftp)
ftp.cwd(pathname)               #設置FTP當前操作的路徑 ftp.dir()               #顯示目錄下所有目錄信息 ftp.nlst()                #獲取目錄下的文件 ftp.mkd(pathname)                #新建遠程目錄 ftp.pwd()               #返回當前所在位置 ftp.rmd(dirname)                #刪除遠程目錄 ftp.delete(filename)                #刪除遠程文件 ftp.rename(fromname, toname)                  #將fromname修改名稱為toname。 ftp.storbinaly("STOR filename.txt",file_handel,bufsize) #上傳目標文件 ftp.retrbinary("RETR filename.txt",file_handel,bufsize) #下載FTP文件

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM