python作業簡單FTP(第七周)


作業需求:

1. 用戶登陸

2. 上傳/下載文件

3. 不同用戶家目錄不同

4. 查看當前目錄下文件

5. 充分使用面向對象知識

 

思路分析:

1.用戶登陸保存文件對比用戶名密碼。

2.上傳用json序列化文件名,文件路徑,文件大小傳給服務器端,根據得到的字段內容操作上傳動作。

3.下載代碼和上傳基本可以互換,因為文件名都一樣所以傳一個文件大小即可。

4.查看當前目錄下文件,調用cd命令,既然能分解get 和put動作就可以看cd動作。

5.添加了LINUX和Windows不同系統的命令,路徑,判斷,命令有cd,ls,mkdir,等。

6.添加了少部分的異常處理。

7.添加了多線程的SocketServer可以實現多人對話。

8.未完成日志,md5校驗的功能。

 

README:

作者:yaobin 版本:簡單Ftp示例版本 v0.1 開發環境: python3.6 程序介紹: 1. 用戶登陸 2. 上傳/下載文件 3. 不同用戶家目錄不同 4. 查看當前目錄下文件 5. 充分使用面向對象知識 使用說明: 1.可以在Linux和Windows都可以運行 2.Linux調用了cd,mkdir,ls,rm,命令 3.Windows調用了cd,md,dir,del,命令 On Linux,Windows Client: Python3 ./Ftp_Client.py put 上傳 get 下載 mkdir 創建目錄 ls 查看文件信息 rm 刪除文件 drm 刪除目錄 Server:Python3 ./Ftp_Server.py 文件目錄結構: ├─簡單Ftp #程序執行目錄
│ README │ __init__.py │ ├─bin │ Ftp_Client.py #客戶端程序
│      Ftp_Server.py    #服務器端程序__init__.py │ ├─conf │ │ setting.py #配置文件
│  │  __init__.py │ │ │ └─__pycache__ │ setting.cpython-36.pyc │ __init__.cpython-36.pyc │ ├─core │ │ auth.py #用戶驗證邏輯交互
│  │  commands.py       #命令邏輯交互
│  │  ftp_client.py     #sock_客戶端邏輯交互
│  │  ftp_server.py     #sock_服務端邏輯交互
│  │  logger.py         #日志邏輯交互---未完成
│  │  main.py           #客戶端程序
│  │  __init__.py │ │ │ └─__pycache__ │ auth.cpython-36.pyc │ commands.cpython-36.pyc │ ftp_client.cpython-36.pyc │ ftp_server.cpython-36.pyc │ main.cpython-36.pyc │ __init__.cpython-36.pyc │ ├─db │ │ __init__.py │ │ │ ├─colin #用戶目錄
│ │ │ colin.bak │ │ │ colin.dat #用戶賬號密碼文件
│ │ │ colin.dir │ │ │ __init__.py │ │ │ │ │ └─colin #用戶ftp家目錄
│  │      │  __init__.py │ │ │ │ │ └─aaa │ ├─pub #ftp程序模擬pub目錄
│ │ FTP作業.7z │ │ socket通信client.py │ │ __init__.py │ │ 選課系統.png │ │ │ └─user_path #用戶路徑文件,判斷用戶進入系統后在哪里處理
│ path │ ├─logs #日志未完成
│ access.log │ transmission.log │ __init__.py │ ├─src │ │ auth_class.py #用戶驗證類
│  │  linux_cmd_class.py    #linux命令類
│  │  server_class.py       #server_socket類
│  │  windows_cmd_class.py  #server命令類
│  │  __init__.py │ │ │ └─__pycache__ │ auth_class.cpython-36.pyc │ linux_cmd_class.cpython-36.pyc │ server_class.cpython-36.pyc │ windows_cmd_class.cpython-36.pyc │ __init__.cpython-36.pyc │ └─test #測試
 args_test.py __init__.py
View Code

FTP程序代碼:

bin/Ftp_Client.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao
import os,sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) #添加環境變量
from core.main import Admin if __name__ == '__main__': Admin.run('')
View Code
/bin/Ftp_Server.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao
import os,sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) #添加環境變量
from core import ftp_server if __name__ == '__main__': ftp_server.socketserver.ThreadingTCPServer(('127.0.0.1', 62000), ftp_server.Ftp_server).serve_forever()
View Code
conf/setting.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao
import logging import os,sys,platform,json BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) #添加環境變量
os_res = platform.system() if  os_res == 'Windows': data_path = os.path.join(BASE_DIR + '\db') ftp_path = os.path.join(BASE_DIR + '\db\pub') path_file = os.path.join(BASE_DIR + '\db\\user_path\path') else: data_path = os.path.join(BASE_DIR + '/db') ftp_path = os.path.join(BASE_DIR + '\db\pub') path_file = os.path.join(BASE_DIR + '/db/user_path/path') with open(path_file, 'r', encoding='utf-8')as f: file = f.readlines() file_object=file[0]
View Code
core/auth.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao
import os,sys,shelve BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) #添加環境變量
from conf import setting from src.auth_class import Auth from core.commands import Commands class Auth_ftp(object): def __init__(self,username,user_passwd): self.user_data = {} self.username = username   #用戶名
        self.username_passwd = user_passwd  #密碼
        os_res = setting.platform.system() if os_res == 'Windows':    #用戶密碼文件
            self.passwd_data_path = os.path.join(setting.data_path + '\\'+ username +  '\\'+ username + '.' + 'dat') #Windows用戶密碼文件路徑
            self.passwd_data = os.path.join(setting.data_path + '\\' + username+ '\\' + username)  # Windows用戶密碼文件
            self.file_object = os.path.join(setting.data_path + '\\' + self.username) else: self.passwd_data_path = \ os.path.join(setting.data_path + '/' + username  + '/' + username + '.' + 'dat')  # Linux用戶密碼文件路徑
            self.passwd_data = \ os.path.join(setting.data_path + '/' + username  + '/' + username)  # Linux用戶密碼文件
            self.file_object = os.path.join(setting.data_path + '/' + username) user_obj = (self.username,self.username_passwd,self.passwd_data) # '''用戶名作為key,把用戶名,密碼,目錄,寫到字典中方便調用'''
        self.user_data[self.username] = user_obj '''驗證用戶是否存在'''
    def auth_user_passwd(self): '''判斷文件路徑是否存在然后返回用戶是否存在''' os_res = os.path.exists(self.passwd_data_path) if os_res !=False: user_file = shelve.open(self.passwd_data) if self.user_data[self.username][0] in user_file\ and  user_file[self.username][1] == self.username_passwd: print("Welcome,%s,您的身份驗證成功"%self.username) user_file.close() else: return False else: return True def add_user_passwd(self): res = os.path.exists(self.file_object) if res != True: Commands(self.file_object).mkdir() #用戶賬號密碼文件
            Commands(self.passwd_data).mkdir()  #用戶上傳下載目錄
            user_file = shelve.open(self.passwd_data) user_file.update(self.user_data) print("用戶創建成功") else: print("賬號信息出現問題,請聯系管理員....")
View Code
core/commands.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao
import os,sys,platform BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) #添加環境變量
from conf import setting from src.linux_cmd_class import L_commands from src.windows_cmd_class import W_commands class Commands(object): def __init__(self,file_object): self.file_object=file_object def cd1(self): if setting.os_res == 'Windows': res = W_commands(self.file_object).cd1() return res elif setting.os_res == 'Linux': res = L_commands(self.file_object).cd1() return res else: print('不支持此操作系統') '''進入目錄'''
    def cd(self): if setting.os_res =='Windows': res= W_commands(self.file_object).cd() return res elif setting.os_res == 'Linux': res=L_commands(self.file_object).cd() return res else: print('不支持此操作系統') '''創建目錄'''
    def mkdir(self): if setting.os_res =='Windows': res= W_commands(self.file_object).mkdir() return res elif setting.os_res == 'Linux': res=L_commands(self.file_object).mkdir() return res else: print('不支持此操作系統') '''刪除文件'''
    def rm(self): if setting.os_res == 'Windows': res=W_commands(self.file_object).rm() return res elif setting.os_res == 'Linux': res=L_commands(self.file_object).rm() return res else: print('不支持此操作系統') '''刪除目錄'''
    def drm(self): if setting.os_res == 'Windows': res=W_commands(self.file_object).drm() return res elif setting.os_res == 'Linux': res=L_commands(self.file_object).drm() return res else: print('不支持此操作系統')
View Code
core/ftp_client.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao
import os, sys import platform, socket import time,hashlib,json BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) # 添加環境變量
from conf import setting from core.commands import Commands from conf import setting # from src.server_class import Server_class
'''定義client連接的協議和方式后面需要補充'''
'''創建連接輸入IP地址和密碼'''


class Ftp_client(object): def link(self): try: os.chdir(setting.file_object) #直接進入FTP指定的用戶目錄
        self.sending_msg_list = [] self.ip_addr = '127.0.0.1' self.ip_port = 62000 self.client = socket.socket() self.client.connect((self.ip_addr, self.ip_port)) while True: self.sending_msg = None self.data = self.client.recv(1024) print("[+]Server>>>recv: %s" % self.data.decode()) self.menu() sending_msg = input('請輸入命令>>>:') self.sending_msg_list = sending_msg.split() if len(sending_msg)  == 0: data_header = { 'test1': { 'action': '', 'file_name': '', 'size': 0 } } self.client.send(json.dumps(data_header).encode()) elif len(sending_msg) >= 2 : if setting.os_res == 'Windows': try : new_path = self.sending_msg_list[1].encode('utf-8') self.res_new = self.sending_msg_list[1].strip().split('\\')  # 截取獲得文件名
                        self.file_name1 = self.res_new[-1] except IndexError: pass
                elif setting.os_res == 'Linux': try: self.res_new = self.sending_msg_list[1].strip().split('/') self.file_name1 = self.res_new[-1] except IndexError: pass
                if self.sending_msg_list[0] == "put": try: self.put(self.sending_msg_list[1]) except IndexError: self.client.send('put'.encode()) if self.sending_msg_list[0] == "get": try: self.get(self.file_name1) except IndexError and ValueError: self.client.send('get'.encode()) elif self.sending_msg_list[0] == "exit": break
                else:#cd ls rm drm mkdir 命令等
                    try: data_header = { 'test1': { 'action': self.sending_msg_list[0], 'file_name': self.file_name1, 'size': 0 } } self.client.send(json.dumps(data_header).encode()) except AttributeError: data_header = { 'test1': { 'action': self.sending_msg_list[0], 'file_name': '', 'size': 0 } } self.client.send(json.dumps(data_header).encode()) except ConnectionResetError and ConnectionAbortedError: print("[+]Server is Down ....Try to Reconnect......") self.link() def get(self,file_name): data_header = { 'test1': { 'action': 'get', 'file_name': file_name, 'size': 0 } } #這里沒做同名文件判斷,下一次補充
        self.client.send(json.dumps(data_header).encode())  #發送get請求
        print(os.getcwd()) self.data = self.client.recv(1024)     #拿到size
        self.client.send(b'come on') file_size = int(self.data.decode()) def file_tr(): file_object = open(file_name, 'wb') received_size = 0 while received_size < file_size: recv_data = self.client.recv(1024) file_object.write(recv_data) received_size += len(recv_data)  # 規定多少但不一定收到那么多
                print(file_size, received_size) else: print('[+]Client:File Recv Successful') file_object.close() if os.path.exists(file_name) == False:  # 判斷本地目錄文件是否存在
 file_tr() else: print('文件已經存在將要覆蓋') file_tr() def put(self,file_name): if os.path.exists(file_name)== True: #判斷文件路徑# 是否存不存在
            if os.path.isfile(file_name): file_obj = open(file_name, "rb") data_header = { 'test1': { 'action': 'put', 'file_name': self.file_name1, 'size': os.path.getsize(self.sending_msg_list[1].encode()) } } self.client.send(json.dumps(data_header).encode()) for line in file_obj: self.client.send(line) file_obj.close() print("[+]-----send file down-----") else: print('[+]file is no valid.') self.client.send('cmd'.encode()) else: print('[+] File Not Found') data_header = { 'test1': { 'action': 'aaaa', 'file_name': '', 'size': 0 } } self.client.send(json.dumps(data_header).encode()) def menu(self): menu = ''' ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; 幫 助 請注意windows和linux的路徑稍有不同 查看文件 ls(file) eg: ls /tmp/Python3.6.3/README 進入目錄 cd eg: cd /tmp/python 創建目錄 mkdir(dir) eg: mkdir /tmp/python 刪除文件 rm eg: rm /tmp/python/README 刪除目錄 drm eg: drm /tmp/python 上 傳 put eg: put /tmp/python/README 下 載 get eg: get /tmp/python/README 登 出 exit 默認上傳文件夾 用戶目錄下用用戶名的命名的文件夾 默認下載文件夾 db/pub ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; '''
        print(menu)
View Code
core/ftp_server.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
import os,sys,time
import subprocess,subprocess,json
import socketserver,socket,traceback

BASE_DIR  = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)

from core.main import Admin
from core.auth import Auth_ftp
from core.commands import Commands
from conf import setting
from src.server_class  import Server_class


class Ftp_server(socketserver.BaseRequestHandler):
    #都懵比了很多代碼重復了、等着重新回爐下~~~
    def parsecmd(self,data):

        data = json.loads(data.decode())
        file_action = data['test1']['action']
        file_path = data['test1']['file_name']
        file_size = int(data['test1']['size'])
        file_obj = setting.file_object
        print('from ip : %s information : %s' % (self.client_address[0], self.data.decode()))

        try:
            if file_action == 'put':
                os.chdir(file_obj)  # 默認用用戶的目錄作為上傳目錄
                def file_tr():
                    self.request.send(b'recv data')
                    file_object = open(file_path, 'wb')
                    received_size = 0
                    while received_size < file_size:
                        recv_data = self.request.recv(1024)
                        file_object.write(recv_data)
                        received_size += len(recv_data)  # 規定多少但不一定收到那么多
                        print(file_size, received_size)
                    else:
                        print('[+]File Recv Successful')
                        file_object.close()

                if os.path.exists(file_path) == False:  # 判斷文件是否存在
                    file_tr()
                else:
                    self.request.send(b'[+] this file[%s] will cover ....' % file_path.encode())
                    file_tr()

            elif file_action == 'get':
                os.chdir(setting.ftp_path)  # get對象工作的目錄pub
                # # get文件首先需要其他目錄,或者更改名字 ,然后進行getget的地址應該是自己家目錄的地址
                # # 下載文件時候客戶端在用戶目錄下面去下載
                #判斷size不同os的路徑不同
                if setting.os_res == 'Windows':
                    file_size = os.path.getsize(setting.ftp_path+'\\'+file_path)
                if setting.os_res == 'Linux':
                    file_size = os.path.getsize(setting.ftp_path + '/' + file_path)
                try:
                    if os.path.exists(file_path) == True:  # 判斷文件路徑# 是否存在
                        if os.path.isfile(file_path):
                            file_obj = open(file_path, "rb")
                            #直接copy 客戶端put的代碼就可以,這里self.conn.send 改為request.send
                            self.request.send(str(file_size).encode())
                            self.request.recv(1024)
                            for line in file_obj:
                                self.request.send(line)
                            file_obj.close()
                            self.request.send(b"-b:bash:[+]Server[%s]---send file down-----"%file_path.encode())
                        else:
                            print('[+]file is no valid.')
                            self.request.send(b"-b:bash:[+]Server[%s]---file is no valid.-----" % file_path.encode())
                            #這里不寫上下倆條的話客戶端就會卡住、處於一直等待中
                    else:
                        self.request.send(b"-b:bash:[+]Server[%s]---file is no valid.-----" % file_path.encode())
                 # except json.decoder.JSONDecodeError:
                 #     self.request.send(b'-b:bash:[+]what are you doing???')
                except:
                    pass

            elif  file_action== 'cd':
                if os.path.exists(file_path) == True:
                    res = Commands(file_path).cd()
                    self.request.send(b'-bash: [%s] [%s]: ' % (file_action.encode(), file_path.encode()))
                else:
                    self.request.send(b'-bash:Directory Exitis')

            elif file_action == 'mkdir':
                    os.chdir(file_obj)
                    # 文件夾mkdir命令處理,如果是windows默認應該是gbk生成的bytes類型'''
                    if os.path.exists(file_path) == True:# 進入要傳遞目錄
                        self.request.send(b'-bash: directory exitis ')
                    else:
                        res = Commands(file_path).mkdir()
                        self.request.send(b'-bash: [%s] [%s]:' % (file_action.encode(), file_path.encode()))

            elif file_action == 'ls':  #測試利用json返回命令結果、反正挺好玩的!!!!用subporcess 用的挺high的
                res = os.listdir()
                res = json.dumps(res)
                self.request.send(res.encode())
                # '''文件刪除命令處理, 如果是windows默認應該是gbk生成的bytes類型'''

            elif file_action == 'rm':
                os.chdir(file_obj)
                if os.path.isfile(file_path) == True:
                    res = Commands(file_path).rm()
                    self.request.send(b'-bash: [%s] [%s]:' % (file_action.encode(), file_path.encode()))
                else:
                    self.request.send(b'-bash: [%s]: Not file  ' % file_path.encode())
                    # 目錄刪除命令處理, 如果是windows默認應該是gbk生成的bytes類型'''

            elif file_action == 'drm':
                os.chdir(file_obj)  # 進入要傳遞目錄
                if os.path.isdir(file_path) == True:  # 判斷是否是目錄
                    Commands(file_path).drm()
                    self.request.send(b'-bash: %s: Delete OK '%file_path.encode())
                else:
                    self.request.send(b'-bash: [%s]: No such File or Directory ' %file_path.encode())
            else:
                self.request.send(b'-bash:Command or File Not Found ')

        except Exception and UnicodeDecodeError:
            traceback.print_exc()


    def handle(self):
        print("[+] Server is running on port:62000", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
        while True:
            try:  # 調整一下socket.socket()的位置每次重新連接都生成新的socket實例,避免因為意外而導致socket斷開連接
                print("[+] Connect success -> %s at ", self.client_address, time.strftime("%Y%m%d %H:%M:%S", time.localtime()))
                self.request.send(b'\033[34;1mWelcome,-bash version 0.0.1-release \033[0m ')
                while True:
                    self.data = self.request.recv(1024)
                    data = self.data
                    self.parsecmd(data)
                    if not self.data:
                        print("[+]Error: Client is lost")
                        break
            except socket.error:
                print("[+]Error get connect error")
                break
            continue

#socketserver.ThreadingTCPServer(('127.0.0.1', 62000), Ftp_server).serve_forever()
View Code
core/main.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao
import os,sys,json BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) #添加環境變量
from conf import setting from core.auth import Auth_ftp from core.commands import Commands from core.ftp_client import Ftp_client class Admin(object): def run(self): exit_flag = False print('歡迎來到簡單FTP程序,登陸程序后server會在本地自動啟動') menu = u''' \033[32;1m 1.登陸 2.注冊 3.退出\033[0m '''
        while not exit_flag: print(menu) user_option = input('請輸入您的操作,輸入q退出>>>:').strip() '''登陸'''
            if user_option == '1': user_name = input('請輸入用戶名>>>:').strip() user_passwd = input('請輸入您的密碼>>>:').strip() file_object = (Auth_ftp(user_name, user_passwd).passwd_data)  #傳入路徑變量
                res = Auth_ftp(user_name,user_passwd).auth_user_passwd() if res == None: with open(setting.path_file, 'w',encoding='utf-8') as f: f.write(file_object);f.close() Ftp_client().link() elif res == False: print('%s用戶密碼不正確' % user_name) else: print('請先注冊') elif user_option == '2': user_name = input('請輸入用戶名>>>:').strip() user_passwd = input('請輸入您的密碼>>>:').strip() user_res = Auth_ftp(user_name, user_passwd).auth_user_passwd() if user_res == None: print("%s用戶不需要注冊"%user_name) #不需要注冊但是很是應該能進入ftp進行操作
                    file_object = (Auth_ftp(user_name, user_passwd).passwd_data) with open(setting.path_file, 'w',encoding='utf-8') as f: f.write(file_object);f.close() Ftp_client().link() elif user_res == False: print("%已注冊用戶,密碼不正確" % user_name) elif user_res == True: Auth_ftp(user_name, user_passwd).add_user_passwd() #創建用戶名密碼文件等
                    file_object = (Auth_ftp(user_name, user_passwd).passwd_data) with open(setting.path_file, 'w',encoding='utf-8') as f: f.write(file_object);f.close() Ftp_client().link() else: sys.exit('異常退出') elif user_option == 'q' or user_option == '3': sys.exit() else: print('輸入的選項不正確,請重新輸入') #Admin().run()
View Code
src/auth_class.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao
class Auth(object): '''用戶驗證類類為用戶、密碼家目錄'''
    def __init__(self,name,passwd): self.name = name self.passwd = passwd
View Code
src/linux_cmd.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
import os,sys,subprocess
class L_commands(object):
    def __init__(self,file_object):
        self.cwd = os.getcwd()  # 獲取當前目錄
        self.file_object = [file_object] #列表文件 #取0得到文件名/或目錄路徑
        self.directory_object=[file_object] #目錄文件
        self.file = self.file_object[0]
        self.directory = self.file_object[0]

    def cd1(self):
        os.chdir(self.directory_object[0])

    '''進入目錄'''
    def cd(self):
        res = subprocess.Popen(['cd %s' % self.directory], shell=True, stdout=subprocess.PIPE,
                               stderr=subprocess.PIPE)
        self.msg = res.stdout.read()
        if len(self.msg) == 0:
            self.msg = res.stderr.read()
        return self.msg

    '''創建目錄'''
    def mkdir(self):
        res = subprocess.call(['mkdir -p %s' % self.directory], shell=True, stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE)
        self.msg = res.stdout.read()
        if len(self.msg) == 0:
            self.msg = res.stderr.read()
        return self.msg


    '''刪除文件'''
    def rm(self):
        res = subprocess.call(['rm -f %s' % self.directory], shell=True, stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE)
        self.msg = res.stdout.read()
        if len(self.msg) == 0:
            self.msg = res.stderr.read()
        return self.msg

    '''刪除目錄'''
    def drm(self):
        res = subprocess.call(['rm -rf %s' % self.directory], shell=True, stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE)
        self.msg = res.stdout.read()
        if len(self.msg) == 0:
            self.msg = res.stderr.read()
        return self.msg
View Code
src/windows_cmd.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
import os,sys
import os,sys,subprocess
class W_commands(object):
    def __init__(self,file_object):
        self.cwd = os.getcwd()  # 獲取當前目錄
        self.file_object = [file_object] #列表文件 #取0得到文件名/或目錄路徑
        self.directory_object=[file_object] #目錄文件
        self.file = self.file_object[0]
        self.directory = self.file_object[0]

    def cd1(self):
        os.chdir(self.directory_object[0])

    '''進入目錄'''
    def cd(self):
        res= subprocess.Popen(['cd,%s' % self.directory], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        self.msg = res.stdout.read()
        if len(self.msg) == 0:
            self.msg = res.stderr.read()
        return self.msg

    '''創建目錄'''
    def mkdir(self):
        res = subprocess.Popen(['md,%s' % self.directory], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        self.msg = res.stdout.read()
        if len(self.msg) == 0:
            self.msg = res.stderr.read()
        return self.msg

    '''刪除文件'''
    def rm(self):
        res = subprocess.Popen(['del,%s' % self.directory], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        self.msg = res.stdout.read()
        if len(self.msg) == 0:
            self.msg = res.stderr.read()
        return self.msg

    '''刪除目錄'''
    def drm(self):
        res = subprocess.Popen(['rd,%s' % self.directory], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        self.msg = res.stdout.read()
        if len(self.msg) == 0:
            self.msg = res.stderr.read()
        return self.msg
View Code
src/server_class.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao
'''ftpserver類'''
import os,socket class Server_class(object): def __init__(self,ip_addr,port): self.ip_addr = ip_addr self.port = port self.server = socket.socket()
View Code

 

程序測試樣圖:

用戶驗證

系統命令

 

上傳和下載

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM