安裝三方包
pip install pyftpdlib
1、搭建FTP
搭建簡單的FTP服務器
# 在本地需要創建FTP的目錄執行命令 python -m pyftpdlib

i 指定IP地址(默認為本機的IP地址)
p 指定端口(默認為2121)
w 寫權限(默認為只讀)
d 指定目錄 (默認為當前目錄)
u 指定用戶名登錄
P 設置登錄密碼
運行結果
局域網搭建FTP環境
# -*- coding:utf-8 -*- # pyftpdlib 搭建ftp服務端 from pyftpdlib.authorizers import DummyAuthorizer from pyftpdlib.handlers import FTPHandler from pyftpdlib.servers import FTPServer class ftp_tools: port = 2121 user = "admin" pwd = "12345" # 創建ftp服務端 def creat_ftp_server(self): # 實例化DummyAuthorizer來創建ftp用戶 authorizer = DummyAuthorizer() # 參數:用戶名,密碼,目錄,權限[見拓展部分服務權限] authorizer.add_user(self.user, self.pwd, r'D:\FTP', perm='elradfmwMT') handler = FTPHandler handler.authorizer = authorizer # 參數:IP,端口,handler server = FTPServer(('0.0.0.0', self.port), handler) # 設置為0.0.0.0為本機的IP地址 server.serve_forever() if __name__ == "__main__": ftp_tools().creat_ftp_server()
運行結果:
2、連接FTP
連接FTP
# 連接ftp def conn_ftp(self): ftp = FTP() # 連接ftp ftp.connect(host=self.host, port=self.port, timeout=10) # 登錄ftp ftp.login(user=self.user, passwd=self.pwd) # 關閉ftp ftp.close()
3、上傳和下載
從本地上傳文件到FTP
def uploadfile(ftppath, localpath): ftp = FTP() # 連接ftp ftp.connect(host=self.host, port=self.port, timeout=10) # 登錄ftp ftp.login(user=self.user, passwd=self.pwd) # 設置的緩沖區大小 bufsize = 1024 fp = open(localpath, 'rb') # 二進制存儲器 ftp.storbinary('STOR ' + ftppath, fp, bufsize) # 參數為0,關閉調試模式 ftp.set_debuglevel(0) # 關閉文件並退出ftp fp.close() ftp.quit()
從FTP下載文件到本地
def downloadfile(ftppath, localpath): ftp = FTP() # 連接ftp ftp.connect(host=self.host, port=self.port, timeout=10) # 登錄ftp ftp.login(user=self.user, passwd=self.pwd) # 設置的緩沖區大小 bufsize = 1024 fp = open(localpath, 'wb') # 二進制文件 ftp.retrbinary('RETR ' + ftppath, fp.write, bufsize) # 參數為0,關閉調試模式 ftp.set_debuglevel(0) # 關閉文件並退出ftp fp.close()
ftp.quit()
封裝

# coding: utf-8 from ftplib import FTP from loguru import logger as logs class FTP_tools: """FTP 操作工具""" def __init__(self, pathType, section='ftp'): """ 構造函數 :param pathType:路徑類型 :param section:配置文件section名 """ self.section = section self.host = config.get(self.section, "ftp_host") self.port = config.get(self.section, "ftp_port") self.user = config.get(self.section, "ftp_user") self.password = config.get(self.section, "ftp_password") self.pathType = pathType # 根據功能判斷服務器及本地文件路徑 if self.pathType == "path1": self.server_path = config.get(self.section, "ftp_cloudpath_path1") self.local_path = config.get(self.section, "ftp_localpath_path1") elif self.pathType == "path2": self.server_path = config.get(self.section, "ftp_cloudpath_path2") self.local_path = config.get(self.section, "ftp_localpath_path2") else: logs.error("pathType傳值錯誤,暫且僅支持image、loanfile文件的上傳下載") sys.exit() logs.info("======== FTP連接開始 ========") logs.debug("FTP連接服務器地址 - {}:{}".format(self.host, self.port)) logs.debug("FTP連接服務器用戶 - {}/{}".format(self.user, self.password)) # 實例化 self.ftp = FTP() def __del__(self): """析構函數 對象資源被釋放時觸發""" # 關閉並退出ftp self.ftp.quit() logs.info("======= FTP 操作結束,連接關閉 =======") def ftp_upload_file(self, file, timeout: int = 10, bufsize: int = 1024): """ FTP上傳文件,注意:不支持文件夾 :param file:需要上傳的文件名,如:test.txt :param timeout: 超時時間(默認),必須是int類型 :param bufsize: 緩沖區大小 :return: """ serveFilePath = os.path.join(self.server_path, file) localFilePath = os.path.join(self.local_path, file) logs.debug("FTP服務器路徑:{}".format(serveFilePath)) logs.debug("FTP本地路徑:{}".format(localFilePath)) try: # 連接ftp self.ftp.connect(host=self.host, port=eval(self.port), timeout=timeout) # 登錄ftp self.ftp.login(user=self.user, passwd=self.password) with open(localFilePath, 'wb') as fp: # fp = open(localFilePath, 'rb') # 二進制存儲器 self.ftp.storbinary('STOR ' + serveFilePath, fp, bufsize) # 參數為0,關閉調試模式 self.ftp.set_debuglevel(0) except Exception as e: logs.error("FTP上傳操作異常,異常原因:{}".format(e)) # finally: # # 關閉文件 # fp.close() def ftp_download_file(self, file, timeout: int = 10, bufsize: int = 1024): """ 下載文件,注意:不支持文件夾 :param file:需要下載的文件名,如:test.txt :param timeout: 超時時間(默認),必須是int類型 :param bufsize: 設置的緩沖區大小 :return: """ serverFilePath = os.path.join(self.server_path, file) localFilePath = os.path.join(self.local_path, "download/"+file) logs.debug("FTP服務器路徑:{}".format(serverFilePath)) logs.debug("FTP本地路徑:{}".format(localFilePath)) # 判斷下載路徑是否存在 # logs.debug(os.path.dirname(localFilePath)) if not os.path.exists(os.path.dirname(localFilePath)): os.makedirs(os.path.dirname(localFilePath)) try: # 連接ftp self.ftp.connect(host=self.host, port=eval(self.port), timeout=timeout) # 登錄ftp self.ftp.login(user=self.user, passwd=self.password) with open(localFilePath, 'wb') as fp: # fp = open(localFilePath, 'wb') # 二進制文件 self.ftp.retrbinary('RETR ' + serverFilePath, fp.write, bufsize) # 參數為0,關閉調試模式 self.ftp.set_debuglevel(0) except Exception as e: logs.error("FTP下載操作異常,異常原因:{}".format(e)) # finally: # # 關閉文件 # fp.close() if __name__ == '__main__': """run""" file = "test01.txt" FTP_tools("path1").ftp_download_file(file) # FTP_tools("path1").ftp_download_file(file)
拓展
perm 權限參數解釋
讀取權限: "e" =更改目錄(CWD,CDUP命令) "l" =列表文件(LIST,NLST,STAT,MLSD,MLST,SIZE命令) "r" =從服務器檢索文件(RETR命令) 寫入權限: "a" =將數據追加到現有文件(APPE命令) "d" =刪除文件或目錄(DELE,RMD命令) "f" =重命名文件或目錄(RNFR,RNTO命令) "m" =創建目錄(MKD命令) "w" =將文件存儲到服務器(STOR,STOU命令) "M" =更改文件模式/權限(SITE CHMOD命令) "T" =更改文件修改時間(SITE MFMT命令)
ftp常用函數釋義
from ftplib import FTP #加載ftp模塊
ftp=FTP() #實例化對象 ftp.set_debuglevel(2) #打開調試級別2,顯示詳細信息 ftp.connect("IP","port") #連接ftp的ip和端口 ftp.login("user","password") #連接ftp的用戶名和密碼 print(ftp.getwelcome()) #打印出歡迎信息 ftp.cmd("xxx/xxx") #進入遠程目錄 bufsize=1024 #設置的緩沖區大小 filename="xxxx.txt" #需要下載的文件 file_handle=open(filename,"wb").write #以寫模式在本地打開文件 ftp.set_debuglevel(0) #關閉調試模式 ftp.quit() #退出ftp (發送命令並關閉連接,出現錯誤會拋異常) ftp.close() #關閉ftp (單方面強制關閉ftp)
ftp.cwd(pathname) #設置FTP當前操作的路徑 ftp.dir() #顯示目錄下所有目錄信息 ftp.nlst() #獲取目錄下的文件 ftp.mkd(pathname) #新建遠程目錄 ftp.pwd() #返回當前所在位置 ftp.rmd(dirname) #刪除遠程目錄 ftp.delete(filename) #刪除遠程文件 ftp.rename(fromname, toname) #將fromname修改名稱為toname。 ftp.storbinaly("STOR filename.txt",file_handel,bufsize) #上傳目標文件 ftp.retrbinary("RETR filename.txt",file_handel,bufsize) #下載FTP文件