python3 FTP文件的上傳下載


心得:

  1.要實現多用戶同時登陸,需要使用多進程,不能使用多線程,因為多線程數據是共享的,在用戶切換目錄的時候,會出現問題,而多進程由於數據是隔離的,就沒有這個問題。

  2.切換目錄和刪除文件的時候,需要檢查要切換或刪除的目錄是否超出了用戶的家目錄。需要用到

    user_full_home = os.path.abspath(os.path.join(MyServer.base_dir, user_home)) file_abs_path = os.path.abspath(os.path.join(MyServer.base_dir, now_path, filename)) if os.path.exists(filename) and user_full_home in file_abs_path:
      pass

  3.以下代碼是有服務器壓力的問題的,進程啟動的越多,對服務器的壓力會越大,所以最好的辦法是啟用多線程,思路:客戶端雖然都有自己的家目錄,但是服務端可以通過路徑拼接的方式,而不是真正的os.chdir到具體的目錄里,可解決此問題。

FTP/Readme.md

### 新增功能
    - pwd 查看當前路徑
    - ls 查看當前目錄下的文件和文件夾
    - rm file 刪除文件
    - mkdir folder 新建文件夾
    - touch filename 新建文件
    - cd folder 進入指定目錄
    - cd .. 進入上一目錄
    - quit 退出
    

### 執行順序
    - 先執行FTP_Server/ftp_server.py
    - 再執行FTP_Server/ftp_client.py

### 測試賬號
    - 用戶名 lily  密碼 123456
    - 注冊或登錄后,可執行上傳文件和下載命令
        - put a.mp4  
            - 上傳a.mp4
            - a.mp4提前放到FTP_Client目錄下
        - put 1.jpg
            - 上傳1.jpg
            - 1.jpg提前放到FTP_Client目錄下
        - get a.mp4
            - 下載a.mp4
            - a.mp4會下載到FTP_Client/download目錄下
        - get 1.jpg
            - 下載1.jpg
            - 1.jpg會下載到FTP_Client/download目錄下


### 流程
- 注冊流程
![](reg.png)

- 登錄流程
![](login.png)

###FTP項目開發
- 功能: 
    - 多用戶同時登陸
    - 上傳/下載文件
    - 傳輸過程中現實進度條
    - 不同用戶使用自己的目錄
    - 每個用戶擁有1G的磁盤空間
    - 充分使用面向對象知識


- 思路
    - 先完成客戶端用戶注冊功能
        - 注冊時檢查用戶名s是否存在請求
        ```python
        {'action': '_user_isexists', 'username': 'aaa', 'result': 0}
        ```
        - 注冊時檢查用戶名是否存在響應(不存在)
        ```python
        {'action': '_user_isexists', 'username': 'aaa', 'result': 0}
        ```
        - 注冊時檢查用戶名是否存在響應(存在)
        ```python
        {'action': '_user_isexists', 'username': 'aaa', 'result': 1}
        ```
        - 注冊提交數據
        ```python
        {'action': '_register', 'result': 0, 'data': {'username': 'aaa', 'password': '123456', 'quotaSize': '50M', 'userHome': 'home/aaa'}}
        ```
        - 注冊成功返回響應
        ```python
        {'action': '_register', 'result': 1, 'data': {'username': 'aaa', 'quotaSize': '50M', 'userHome': 'home/aaa'}}
        ```
    - 再完成用戶登錄功能
        - 請求
        ```python
        {'action': '_login', 'username': 'lily', 'password': 'aaa', 'result': 0}
        ```
        - 響應
        ```python
        {'action': '_login', 'username': 'lily', 'password': '123456', 'result': 1, 'userHome': 'home/lily', 'quota_size': '50M'}
        ```
    - 用戶注冊和登錄時有自己的家目錄和磁盤空間大小
    - 注冊時,用戶名 密碼 默認磁盤空間50M 注冊或登錄成功后默認進入家目錄
    - 用戶信息保存在FTP_Server/db/user.json
        - 格式
        ```python
        {"username": "lily", "password": "1822219a532beacb9615e36dcaba3a6c", "quotaSize": "50M", "userHome": "home/lily"}
        ```
    - 實現客戶端上傳和下載文件的功能
        - 注冊或登錄成功后,自動進入用戶的家目錄
        - 上傳文件
            - put 文件名
                - 客戶端判斷文件是否存在
                - 客戶端獲取文件名和文件大小
                - 服務端判斷用戶磁盤是否夠用
                ```python
                {'action': '_file_upload', 'file_name': 'dydt.png', 'file_size': 39542, 'quota_size': '50M', 'user_home': 'home/lily'}
                ```
                - 一切檢查就緒,可以上傳文件
                ```python
                {"result": 1}  # 上傳完成
                ```
            - 將FTP_Client下的文件上傳到用戶的家目錄
        - 下載文件
            - get 文件名
                - 客戶端發送要下載的文件消息
                - 服務端收到消息,拿到文件名,判斷文件是否存在,存在則下載,不存在返回
            - 將用戶家目錄里的文件下載到FTP_Client/download/目錄下
    - 上傳文件的時候,需要判斷磁盤空間是否滿了
    - 上傳和下載文件的時候,有進度條

 

FTP/reg.png

 

FTP/login.png

 

FTP/FTP_Server/ftp_server.py

import sys
import os
import struct
import json
import hashlib


class MyServer():
    base_dir = os.path.dirname(os.path.abspath(__file__))
    sys.path.insert(0, base_dir)
    db_file = os.path.join(base_dir, "db/user.json")

    def __init__(self, conn):
        self.request = conn

    def talk(self):
        while 1:
            dic = self.recv_msg()
            print("start", dic)
            if dic.get('action', "not_find").upper() == "_QUIT":
                self.request.close()
                break
            elif hasattr(self, dic.get('action', "not_find")):
                fn = getattr(self, dic.get('action', "not_find"))
                if callable(fn):
                    fn(dic)
                else:
                    print(fn)
            else:
                self.printC("方法沒找到!", "red")

    def _register(self, dic):
        """
        用戶注冊
        :param dic:
        :return:
        """
        user_dic = dic.get("data")
        user_home = user_dic.get("userHome")
        if not os.path.exists(user_home):
            os.makedirs(user_home)
        password = user_dic.get("password")
        user_dic["password"] = MyServer.get_md5(password)
        user_dic_json = json.dumps(user_dic)
        with open(MyServer.db_file, "a", encoding="utf-8") as f:
            f.write(f"{user_dic_json}\n")
        dic["result"] = 1
        os.chdir(user_home)
        del dic["data"]["password"]
        print(dic)
        self.send_msg(dic)

    def _login(self, dic):
        username = dic.get("username")
        password = dic.get("password")
        password = MyServer.get_md5(password)
        with open(MyServer.db_file, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                user_dic = json.loads(line)
                if user_dic.get("username") == username and user_dic.get("password") == password:
                    dic["result"] = 1
                    dic["userHome"] = user_dic.get("userHome")
                    dic["quota_size"] = user_dic.get("quotaSize")
                    print("now_path:", os.getcwd())
                    os.chdir(dic["userHome"])
        print(dic)
        del dic["password"]
        self.send_msg(dic)

    def _user_isexists(self, dic):
        username = dic.get("username")
        with open(MyServer.db_file, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                user_dic = json.loads(line)
                if user_dic.get("username") == username:
                    dic["result"] = 1
        print(dic)
        self.send_msg(dic)

    def send_msg(self, dic):
        """
        發送信息
        :param dic:
        :return:
        """
        dic_json = json.dumps(dic)
        dic_json_bs = dic_json.encode("utf-8")
        self.request.send(struct.pack("i", len(dic_json_bs)))
        self.request.send(dic_json_bs)

    def recv_msg(self):
        """
        接收消息
        :return:
        """
        recv_msg_head = self.request.recv(4)
        recv_msg_len = struct.unpack("i", recv_msg_head)[0]
        print(recv_msg_len)
        recv_msg_content = self.request.recv(recv_msg_len).decode("utf-8")
        return json.loads(recv_msg_content)

    def _file_upload(self, dic):
        """
        文件上傳
        :param dic:
        :return:
        """
        file_name = dic.get("file_name")
        file_size = dic.get("file_size")
        if dic.get("quota_size").endswith("M"):
            quota_size = int(dic.get("quota_size")[:-1]) * 1024 * 1024
        else:
            quota_size = int(dic.get("quota_size")) * 1024 * 1024
        used_quota_size = MyServer.getPathSize("./")
        if quota_size > used_quota_size + file_size:
            self.send_msg({"space": 1})
            f = open(file_name, "wb")
            while file_size > 0:
                recv_msg = self.request.recv(1024)
                f.write(recv_msg)
                file_size -= len(recv_msg)
            else:
                self.send_msg({"result": 1})  # 上傳完成
        else:
            self.send_msg({"space": 0})  # 空間不足

    def _file_download(self, dic):
        file_name = dic.get("filename")
        if os.path.exists(file_name) and os.path.isfile(file_name):
            file_size = os.path.getsize(file_name)
            self.send_msg({"exists": 1, "file_size": file_size})
            with open(file_name, "rb") as f:
                for line in f:
                    self.request.send(line)
            print("文件下載完成!")
        else:
            self.send_msg({"exists": 0})

    def _get_pwd(self, dic):
        now_path = os.getcwd()
        dic["now_path"] = now_path.replace(MyServer.base_dir, "")
        self.send_msg(dic)

    def _get_list(self, dic):
        dic["file_list"] = os.listdir()
        self.send_msg(dic)

    def _remove_file(self, dic):
        filename = dic.get("filename")
        user_home = dic.get("user_home")
        now_path = dic.get("now_path")
        user_full_home = os.path.abspath(os.path.join(MyServer.base_dir, user_home))
        file_abs_path = os.path.abspath(os.path.join(MyServer.base_dir, now_path, filename))
        print("user_full_home", user_full_home)
        print("file_abs_path", file_abs_path)
        if os.path.exists(filename) and user_full_home in file_abs_path:
            if os.path.isfile(filename):
                os.remove(filename)
                dic["result"] = 1
            else:
                if MyServer.isempty(filename) == 0:
                    os.removedirs(filename)
                    dic["result"] = 1
                else:
                    dic["result"] = "notempty"
        else:
            dic["result"] = "notexists"
        self.send_msg(dic)

    def _make_dir(self, dic):
        folder = dic.get("folder")
        if os.path.exists(folder):
            dic["result"] = "isexists"
        else:
            os.mkdir(folder)
            dic["result"] = 1
        self.send_msg(dic)

    def _touch_file(self, dic):
        filename = dic.get("filename")
        if os.path.exists(filename):
            dic["result"] = "isexists"
        else:
            open(filename, "w", encoding="utf-8").close()
            dic["result"] = 1
        self.send_msg(dic)

    def _change_path(self, dic):
        file_path = dic.get("file_path")
        user_home = dic.get("user_home")
        now_path = dic.get("now_path")
        user_full_home = os.path.abspath(os.path.join(MyServer.base_dir, user_home))
        new_abs_path = os.path.abspath(os.path.join(MyServer.base_dir, now_path, file_path))
        if os.path.exists(file_path):
            if file_path.startswith(".."):
                if user_full_home in new_abs_path:
                    os.chdir(file_path)
                    dic["result"] = 1
                else:
                    dic["result"] = "path_notexists"
            elif file_path == "/":
                os.chdir(os.path.join(MyServer.base_dir, user_home))
                dic["result"] = 1
            else:
                os.chdir(file_path)
                dic["result"] = 1
            now_path = os.getcwd()
            dic["now_path"] = now_path.replace(MyServer.base_dir, "")
        else:
            dic["result"] = "path_notexists"
        print(dic)
        self.send_msg(dic)

    @staticmethod
    def getPathSize(path):
        """
        統計目錄的總大小
        """
        total_size = 0
        gen = os.walk(path)  # 拿到一個生成器
        for root, dirs, files in gen:
            for d in dirs:
                dir_path = os.path.join(root, d)
                total_size += os.path.getsize(dir_path)
                print(dir_path)
            for f in files:
                file_path = os.path.join(root, f)
                total_size += os.path.getsize(file_path)
                print(file_path)
        return total_size

    @staticmethod
    def isempty(path):
        """
        判斷目錄是否為空
        """
        total = 0
        gen = os.walk(path)  # 拿到一個生成器
        for root, dirs, files in gen:
            for d in dirs:
                total += 1
            for f in files:
                total += 1
        return total

    @staticmethod
    def get_md5(str1):
        md5_obj = hashlib.md5("我是鹽,你想咋滴".encode("utf-8"))
        md5_obj.update(str1.encode("utf-8"))
        return md5_obj.hexdigest()

    def printC(self, str1, color="black"):
        '''添加顏色輸出
        color: red:紅色 gre:綠色 yel:黃色
        '''
        col_type = 30
        if color == "red":
            col_type = 31
        elif color == "gre":
            col_type = 32
        elif color == "yel":
            col_type = 33
        print("\033[0;%sm%s\033[0m" % (col_type, str1))


if __name__ == '__main__':
    import socket
    from multiprocessing import Process

    '''一個進程里開多線程實現socket通訊'''
    server = socket.socket()
    ip_port = ("127.0.0.1", 8889)
    server.bind(ip_port)
    server.listen()
    while 1:  # 循環連接
        conn, addr = server.accept()
        my_server = MyServer(conn)
        t = Process(target=my_server.talk)
        t.start()
    server.close()

 

FTP/FTP_Client.py

import os

class Client:
    def __init__(self):
        self.sk = None
        self.username = None
        self.user_home = None
        self.quota_size = None
        self.now_path = None

    def run(self):
        """
        啟動入口
        :return:
        """
        dic = {"1": "login", "2": "register"}
        while 1:
            for k, v in dic.items():
                self.printC("%-3s%-10s" % (k, v), "yel")
            inp = input("請選擇操作序號:").strip()
            if hasattr(self, dic.get(inp, "not_find")):
                fn = getattr(self, dic.get(inp, "not_find"))
                if callable(fn):
                    self.connect_socket()
                    fn()
                    break
            else:
                self.printC("輸入錯誤,請重新輸入!", "red")

    def connect_socket(self):
        """
        創建socket連接
        :return:
        """
        self.sk = socket.socket()
        self.sk.connect(("127.0.0.1", 8889))

    def send_msg(self, dic):
        """
        發送信息
        :param dic:
        :return:
        """
        dic_json = json.dumps(dic)
        dic_json_bs = dic_json.encode("utf-8")
        self.sk.send(struct.pack("i", len(dic_json_bs)))
        self.sk.send(dic_json_bs)

    def recv_msg(self):
        """
        接收消息
        :return:
        """
        recv_msg_head = self.sk.recv(4)
        recv_msg_len = struct.unpack("i", recv_msg_head)[0]
        recv_msg_content = self.sk.recv(recv_msg_len).decode("utf-8")
        return json.loads(recv_msg_content)

    def login(self):
        """
        用戶登錄
        :return: 登錄成功進入welcome方法
        """
        for i in range(2, -1, -1):
            self.printC("用戶登錄".center(20, "*"), "yel")
            username = input("Username: ").strip()
            password = input("Password: ").strip()
            user_dic = {"action": "_login", "username": username, "password": password, "result": 0}
            self.send_msg(user_dic)
            recv_dic = self.recv_msg()
            if recv_dic.get("result"):
                self.printC(f"恭喜用戶{username}登錄成功!", "gre")
                self.username = username
                self.user_home = recv_dic.get("userHome")
                self.quota_size = recv_dic.get("quota_size")
                self.welcome()
                break
            else:
                self.printC(f"用戶名或密碼錯誤,你還有{i}次機會!", "red")

    def register(self):
        """
        用戶注冊
        :return: 注冊成功進入welcome方法
        """
        while 1:
            self.printC("新用戶注冊".center(20, "*"), "yel")
            username = input("Username: ").strip()
            if not username:
                self.printC("用戶名不能為空!", "red")
                continue
            if self._user_isexists(username):  # 判斷用戶是否已存在
                self.printC(f"用戶{username}已存在!", "red")
                continue
            password = input("Password: ").strip()
            if not password:
                self.printC("密碼不能為空!", "red")
                continue
            quota_size = input("Quota size: (Default 50M)").strip()
            if not quota_size:
                quota_size = "50M"
            user_home = f"home/{username}"
            user_dic = {"action": "_register", "result": 0,
                        "data": {"username": username, "password": password, "quotaSize": quota_size,
                                 "userHome": user_home}}
            self.send_msg(user_dic)
            res_dic = self.recv_msg()
            if res_dic.get("result"):
                self.printC(f"用戶{username}注冊成功!", "gre")
                self.username = username
                self.user_home = user_home
                self.quota_size = quota_size
                self.welcome()
                break
            else:
                self.printC(f"用戶{username}注冊失敗!", "red")

    def _user_isexists(self, username):
        send_dic = {"action": "_user_isexists", "username": username, "result": 0}
        self.send_msg(send_dic)
        return self.recv_msg().get("result")

    def welcome(self):
        self.printC("歡迎來到FTP系統".center(20, "*"), "yel")
        while 1:
            if self.now_path is None:
                self.now_path = self.user_home
            inp = input(f"{self.now_path} #  ").strip()
            if inp.startswith("put "):  # 上傳文件
                self.file_upload(inp)
            elif inp.startswith("get "):  # 下載文件
                self.file_download(inp)
            elif inp == "pwd":  # 查看當前目錄
                self.get_pwd()
            elif inp == "ls":  # 查看當前目錄下文件和文件夾
                self.get_list()
            elif inp.startswith("rm "):  # 刪除文件
                self.remove_file(inp)
            elif inp.startswith("mkdir "):  # 新建文件夾
                self.make_dir(inp)
            elif inp.startswith("touch "): # 新建文件
                self.touch_file(inp)
            elif inp.startswith("cd "):  # 進入指定的目錄
                self.change_path(inp)
            elif inp.upper() == "QUIT":  # 退出
                self.quit()
            else:
                self.printC("無效的命令!", "red")

    def file_upload(self, command):
        """
        上傳文件
        :param command:
        :return:
        """
        filename = command[4:].strip()
        if os.path.exists(filename) and os.path.isfile(filename):  # 判斷文件是否存在
            file_size = os.path.getsize(filename)  # 文件大小
            if int(self.quota_size[:-1]) * 1024 * 1024 > file_size:
                send_dic = {"action": "_file_upload", "file_name": filename, "file_size": file_size,
                            "quota_size": self.quota_size, "user_home": self.user_home}
                self.send_msg(send_dic)
                space_dic = self.recv_msg()
                if space_dic.get("space") == 0:
                    self.printC("上傳的文件太大了,空間不足!", "red")
                    return
                now_size = 0
                with open(filename, "rb") as f:
                    for line in f:
                        self.sk.send(line)
                        now_size += len(line)
                        self.process_bar(file_size, now_size)  # 進度條
                recv_dic = self.recv_msg()
                if recv_dic.get("result"):
                    self.printC(f"文件{filename}上傳完成!", "gre")
            else:
                self.printC(f"文件{filename}太大,你的空間不足!", "red")
        else:
            self.printC(f"文件{filename}不存在", "red")

    def file_download(self, command):
        print(command)
        filename = command[4:].strip()
        send_dic = {"action": "_file_download", "filename": filename, "result": 0}
        self.send_msg(send_dic)
        recv_dic = self.recv_msg()
        if recv_dic.get("exists"):
            file_size = recv_dic.get("file_size")
            f = open(f"download/{filename}", "wb")
            now_size = 0
            while file_size > now_size:
                content = self.sk.recv(1024)
                f.write(content)
                now_size += len(content)
                self.process_bar(file_size, now_size)
            else:
                self.printC(f"文件{filename}下載完成!", "gre")
        else:
            self.printC(f"文件{filename}不存在!", "red")

    def process_bar(self, total, now):
        """
        進度條
        :param total: 總大小
        :param now: 當前大小
        :return:
        """
        process = int(now * 100 / total)
        sys.stdout.write(f"\r[{process}%] " + process * ">")
        sys.stdout.flush()

    def get_pwd(self):
        dic = {"action": "_get_pwd"}
        self.send_msg(dic)
        recv_dic = self.recv_msg()
        now_path = recv_dic.get("now_path")
        self.printC(now_path, "gre")

    def get_list(self):
        dic = {"action": "_get_list"}
        self.send_msg(dic)
        recv_dic = self.recv_msg()
        for file in recv_dic["file_list"]:
            self.printC(file, "gre")

    def remove_file(self, command):
        filename = command[3:].strip()
        dic = {"action": "_remove_file", "filename": filename, "user_home": self.user_home, "now_path": self.now_path}
        self.send_msg(dic)
        recv_dic = self.recv_msg()
        if recv_dic.get("result") == 1:
            self.printC(f"文件{filename}刪除成功", "gre")
        elif recv_dic.get("result") == "notempty":
            self.printC(f"刪除失敗,文件{filename}不是空目錄", "red")
        else:
            self.printC(f"文件{filename}不存在", "red")

    def make_dir(self, command):
        folder = command[5:].strip()
        dic = {"action": "_make_dir", "folder": folder}
        self.send_msg(dic)
        recv_dic = self.recv_msg()
        if recv_dic.get("result") == 1:
            self.printC(f"文件夾{folder}創建成功!", "gre")
        else :
            self.printC(f"創建失敗,文件夾{folder}已存在!", "red")

    def touch_file(self, command):
        filename = command[5:].strip()
        dic = {"action": "_touch_file", "filename": filename}
        self.send_msg(dic)
        recv_dic = self.recv_msg()
        if recv_dic.get("result") == "isexists":
            self.printC(f"新建失敗,文件{filename}已存在!", "red")
        else:
            self.printC(f"文件{filename}創建成功!", "gre")

    def change_path(self, command):
        path = command[3:].strip()
        dic = {"action": "_change_path", "file_path": path, "user_home": self.user_home, "now_path": self.now_path}
        self.send_msg(dic)
        recv_dic = self.recv_msg()
        if recv_dic.get("result") == 1:
            self.now_path = recv_dic.get("now_path")
            self.now_path = self.now_path.replace(os.sep, "/").lstrip("/")
        else:
            self.printC("目錄不存在", "red")

    def quit(self):
        dic = {"action": "_quit"}
        self.send_msg(dic)
        self.sk.close()
        exit(0)

    def printC(self, str1, color="black"):
        '''添加顏色輸出
        color: red:紅色 gre:綠色 yel:黃色
        '''
        col_type = 30
        if color == "red":
            col_type = 31
        elif color == "gre":
            col_type = 32
        elif color == "yel":
            col_type = 33
        print("\033[0;%sm%s\033[0m" % (col_type, str1))


if __name__ == '__main__':
    import socket
    import json
    import struct
    import sys

    Client().run()

 

FTP/FTP_Server/db/user.json

{"username": "lily", "password": "1822219a532beacb9615e36dcaba3a6c", "quotaSize": "50M", "userHome": "home/lily"}
{"username": "lucy", "password": "1822219a532beacb9615e36dcaba3a6c", "quotaSize": "100M", "userHome": "home/lucy"}
{"username": "user1", "password": "1822219a532beacb9615e36dcaba3a6c", "quotaSize": "50M", "userHome": "home/user1"}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM