python作业Select版本FTP(第十周)


SELECT版FTP:

使用SELECT或SELECTORS模块实现并发简单版FTP

允许多用户并发上传下载文件

 

思路解析:

1. 使用IO多路复用的知识使用SELECTORS封装好的SELECTORS模块编写程序

2. 是用IO多路复用的SELECT编写程序

3 .最后编写多并发程序模拟用户并发上传下载文件,在并发的时候为避免重复写,使用random随机生成新文件名

 

程序核心代码

README

作者:yaobin 版本: Selectors Ftp 示例版本 v0.1 开发环境: python3.6 程序介绍 1. 使用SELECT或SELECTORS模块实现并发简单版FTP 2. 允许多用户并发上传下载文件 文件目录结构 ├─bin │ __init__.py │ client.py #客户端主程序
│      server.py  #服务端主程序
│ ├─conf │ setting.py #配置文件__init__.py │ │ ├─core │ │ client_main.py #客户端交互程序
│  │  selectors_client.py   #selectors客户端主程序
│  │  selectors_server.py   #selectors服务端主程序
│  │  select_client.py      #select客户端主程序
│  │  select_server.py      #select服务端主程序
│  │  server_main.py        #server端主程序
│  │  __init__.py │ │ │ └─__pycache__          #pyc文件目录
│          client_main.cpython-36.pyc │ ├─db │ │ __init__.py │ │ │ ├─Client_DownLoad │ ├─Server_DownLoad │ ├─Server_Upload │ └─test │ test.log │ test.py │ __init__.py │ └─logs __init__.py
View Code

conf
setting.py

#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao
'''主配置文件'''
import os import sys import platform BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) if platform == 'Windows':#添加上传下载目录变量
    download_path =  (BASE_DIR+'\\'+"\db"+"\Server_DownLoad") upload_path = (BASE_DIR+'\\'+"\db"+"\\Server_Upload") client_download_path = (BASE_DIR+'\\'+"\db"+"\Client_DownLoad") else: download_path =  (BASE_DIR+'/'+"/db"+"/Server_DownLoad") upload_path = (BASE_DIR+'/'+"/db"+"/Server_Upload") client_download_path = (BASE_DIR + '/' + "/db" + "/Client_DownLoad")
View Code

core
client_main.py

#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao
'''客户端交互程序'''
import os import sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) from core import selectors_client from core import select_client class client_ftp(object): '''client_ftp交互类'''
    def start(self): ''' 启动函数 :return: '''
        print('欢迎进入Select Ftp') msg = ''' 1.selectors模块客户端上传下载测试 2.select客户端上传下载测试 3.exit '''
        while True: print(msg) user_choice = input('请选择操作>>>:') if user_choice == '1': client = selectors_client.selectors_client() client.connect("localhost", 10000) client.start() elif user_choice == '2': client = select_client.select_client() client.connect("localhost", 10000) client.start() elif user_choice == '3' or user_choice == 'q' or user_choice == 'exit': sys.exit('程序退出') else: print('非法操作,请重新输入')
View Code

selectors_client.py

#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao
'''selectclient交互程序'''
import sys import os import time import platform import random import socket import json BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) from conf import setting class select_client(object): """FTP 客户端"""
    def __init__(self): ''' 构造函数 :return: ''' self.client = socket.socket() def start(self): ''' 启动函数 :return: '''
        print('login time : %s' % (time.strftime("%Y-%m-%d %X", time.localtime()))) while True: try: self.sending_msg_list = [] self.sending_msg = input('[root@select_ftp_client]# ') self.sending_msg_list = self.sending_msg.split() self.action = self.sending_msg_list[0] if len(self.sending_msg_list) == 0: continue
                elif len(self.sending_msg_list) == 1: if self.sending_msg_list[0] == "exit": print('logout') break
                    else: print(time.strftime("%Y-%m-%d %X", time.localtime()), '-bash : %s command not found' % self.sending_msg_list[0]) else: try: if platform.system() == 'Windows': self.file_path = self.sending_msg_list[1] self.file_list = self.sending_msg_list[1].strip().split('\\') self.file_name = self.file_list[-1] elif platform.system() == 'Linux': self.file_path = self.sending_msg_list[1] self.file_list = self.sending_msg_list[1].strip().split('/') self.file_name = self.file_list[-1] except IndexError: pass
                    if self.action == "put": self.put(self.action,self.file_name) elif self.action == "get": self.get(self.action,self.file_name) else: print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client:-bash: %s:'
                              % self.sending_msg_list[0], 'command not found') except ConnectionResetError and ConnectionRefusedError and OSError and IndexError as e: print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client: -bash :', e, 'Restart client') def put(self, action,file_name): ''' 客户端上传函数 :param cmd: 上传命令 :return: '''
        if os.path.exists(self.file_path) and os.path.isfile(self.file_path): cmd = self.action + " " + self.file_name self.client.send(cmd.encode()) self.client.recv(1024).decode() trans_size = 0 file_size = os.stat(self.file_path).st_size if file_size == 0 : print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client:-bash: %s:'
                      % self.file_name, 'file not allow null') else: n = 0 with open(self.file_path, 'rb') as f: for line in f: self.client.send(line) trans_size += len(line) else: time.sleep(0.5) print("\n文件上传完成。 文件大小:[%s]字节" %trans_size) self.client.send(b'put done(status:200)') else : print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client:-bash: %s:'
                  % self.file_name, 'file not found') def get(self,action,file_name): ''' 客户端下载函数 :param cmd: 下载命令 :return: ''' cmd = self.action + " " + self.file_name os.chdir(setting.client_download_path)#切换到客户端下载目录
 self.client.send(cmd.encode()) data = self.client.recv(1024) file_msg = json.loads(data.decode()) file_status = file_msg['status'] file_name = file_msg['filename'] if file_status == 550: print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client:-bash: %s:'
                  % self.file_name, 'file not found') elif file_status == 200: receive_size = 0 file_size = file_msg['size'] new = random.randint(1, 100000) n = 0 with open(file_name+ '.'+ (str(new)), 'wb') as file_object: while receive_size < file_size: data = self.client.recv(1024) file_object.write(data) receive_size += len(data) file_object.flush() else: file_object.close() print(time.strftime("%Y-%m-%d %X", time.localtime()), "[+]client: -bash :File get done File size is :", file_size) def connect(self,ip,port): ''' connect ip,port :param ip:IP地址 :param port:端口 :return: ''' self.client.connect((ip, port))
View Code

selectors_server.py

#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao

import os import json import sys import random import time import select import socket import queue BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) from conf import setting class select_ftp(object): """Ftp server"""
    def __init__(self, ip, port): ''' 构造函数 :param ip: 监听IP :param port: 监听端口 :return: ''' self.server = socket.socket() self.host = ip self.port = port self.msg_dic = {} self.inputs = [self.server,] self.outputs = [] self.file_flag = {} self.file_up_flag = {} def start(self): ''' 主启动函数 :return: ''' self.server.bind((self.host,self.port)) self.server.listen(1000) self.server.setblocking(False) while True: readable, writeable, exceptional = select.select(self.inputs, self.outputs, self.inputs)  # 定义检测
            for r in readable: self.readable(r) for w in writeable: self.writeable(w) for e in exceptional: self.clean(e) def readable(self, ser): ''' 处理活动的从客户端传来的数据连接 :param ser: socket server自己 :return: '''
        if ser is self.server: conn, addr = self.server.accept() print(time.strftime("%Y-%m-%d %X", time.localtime()), ': echoing: newlink ',addr) self.inputs.append(conn) self.msg_dic[conn] = queue.Queue() else: try : data = ser.recv(1024) cmd = data.decode() cmd_str = cmd.split()[0] if len(cmd.split()) == 2 and hasattr(self, cmd_str): print(time.strftime("%Y-%m-%d %X", time.localtime()), ': echoing: newlink ', cmd) filename = cmd.split()[1] func = getattr(self, cmd_str) func(ser, filename) else: self.upload(ser, data) except ConnectionResetError as e: print(time.strftime("%Y-%m-%d %X", time.localtime()), ": client lost",ser) self.clean(ser) except UnicodeDecodeError as e : self.upload(ser, data) def writeable(self, conn): ''' 处理活动的传回客户端的数据连接 :param conn: 客户端连接 :return: '''
        try : data_to_client = self.msg_dic[conn].get() conn.send(data_to_client) except Exception as e : print(time.strftime("%Y-%m-%d %X", time.localtime()), ': error client lost') self.clean(conn) del self.file_flag[conn] else: self.outputs.remove(conn) filename = self.file_flag[conn][2] size = self.file_flag[conn][0] trans_size = self.file_flag[conn][1] if trans_size < size : self.load(conn, filename, size) else: del self.file_flag[conn] def clean(self, conn): ''' 连接完成,收尾处理 :param conn: 客户端连接 :return: '''
        if conn in self.outputs: self.outputs.remove(conn) if conn in self.inputs: self.inputs.remove(conn) if conn in self.msg_dic: del self.msg_dic[conn] def put(self, conn, filename): ''' 客户端上传函数 :param conn: :param filename: :return: ''' os.chdir(setting.upload_path) if filename == "done(status:200)": del self.file_up_flag[conn] else : if os.path.isfile(filename): try: new = random.randint(1, 100000) self.rename(filename, (filename + '.' + str(new))) except FileExistsError: os.remove(filename) print(time.strftime("%Y-%m-%d %X", time.localtime()), ': server recv download data') conn.send(b'200') self.file_up_flag[conn] = filename def upload(self, conn, data): ''' 客户端上传,数据接收函数 :param conn: 客户端连接 :param data: 客户端上传数据 :return: ''' os.chdir(setting.upload_path) if conn in self.file_up_flag: filename = self.file_up_flag[conn] with open(filename, 'ab') as file_object: file_object.write(data) def get(self, conn, filename): ''' 客户端下载函数 :param conn: :param filename: :return: ''' os.chdir(setting.download_path) msg_dic = {  # 下载文件信息
            "action" : "get", "filename" : filename, "size" : None, "status" : 550 } if os.path.isfile(filename): size = os.stat(filename).st_size msg_dic['size'] = size msg_dic['status'] = 200 conn.send(json.dumps(msg_dic).encode()) if msg_dic['status'] == 200: self.load(conn, filename, size) def load(self, conn, filename, size): ''' 客户端下载,数据传输函数 :param conn: :param filename: :param size: :return: '''
        if conn in self.file_flag: trans_size = self.file_flag[conn][1] else: trans_size = 0 with open(filename, "rb") as f: f.seek(trans_size) data = f.readline() self.msg_dic[conn].put(data) self.outputs.append(conn) trans_size += len(data) self.file_flag[conn] = [size, trans_size, filename] def rename(self, old_name, new_name): ''' 重命名函数 :param old_name: :param new_name: :return: '''
        if os.path.exists(new_name): os.remove(new_name) os.rename(old_name, new_name)
View Code

select_client.py

#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao

import os import sys import json import time import random import socket import platform BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) from conf import setting class selectors_client(object): """FTP 客户端"""
    def __init__(self): ''' 构造函数 :param :return ''' self.client = socket.socket() def start(self): ''' 启动函数 :param: :return: '''
        print('login time : %s' % (time.strftime("%Y-%m-%d %X", time.localtime()))) while True: try: self.sending_msg_list = [] self.sending_msg = input('[root@select_ftp_client]# ') self.sending_msg_list = self.sending_msg.split() self.action = self.sending_msg_list[0] if len(self.sending_msg_list) == 0: continue
                elif len(self.sending_msg_list) == 1: if self.sending_msg_list[0] == "exit": print('logout') break
                    else: print(time.strftime("%Y-%m-%d %X", time.localtime()), '-bash : %s command not found' % self.sending_msg_list[0]) else: try: if platform.system() == 'Windows': self.file_path = self.sending_msg_list[1] self.file_list = self.sending_msg_list[1].strip().split('\\') self.file_name = self.file_list[-1] elif platform.system() == 'Linux': self.file_path = self.sending_msg_list[1] self.file_list = self.sending_msg_list[1].strip().split('/') self.file_name = self.file_list[-1] except IndexError: pass
                    if self.action == "put": self.put() elif self.action == "get": self.get() else: print(time.strftime("%Y-%m-%d %X", time.localtime()),'[+]client:-bash: %s:'
                              %self.sending_msg_list[0], 'command not found') except ConnectionResetError and ConnectionRefusedError and OSError and IndexError as e: print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client: -bash :', e,'Restart client') selectors_client().start() def put(self): ''' 上传函数 :param:cmd:上传命令 :return: '''
         if os.path.exists(self.file_path) and os.path.isfile(self.file_path): self.file_size = os.path.getsize(self.file_path) data_header = {"client": { "action": "put", "file_name": self.file_name, "size": self.file_size}} self.client.send(json.dumps(data_header).encode()) print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]server: -bash : %s '
                   % self.client.recv(1024).decode()) with open(self.file_path, 'rb') as file_object: for line in file_object: self.client.send(line) file_object.close() print(self.client.recv(1024).decode()) else: print(time.strftime("%Y-%m-%d %X", time.localtime()),'[+]client: -bash :%s : No such file'
                   %self.file_name) def get(self): ''' 下载函数 :param:cmd 下载命令 :return: ''' os.chdir(setting.client_download_path) data_header = {"client": { "action": "get", "file_name": self.file_name, "size": 0}} self.client.send(json.dumps(data_header).encode()) self.data = self.client.recv(1024) if self.data.decode() == '404': print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]server: -bash : %s : No such file' % (self.file_path)) else: print(time.strftime("%Y-%m-%d %X", time.localtime()), "[+]server: -bash : File ready to get File size is :", self.data.decode()) new = random.randint(1, 100000) file_object = open((self.file_name + '.' + (str(new))), 'wb') received_size = 0 file_size = int(self.data.decode()) while received_size < file_size: if file_size - received_size > 1024: size = 1024
                elif file_size < 1024: size = file_size else: size = file_size - received_size recv_data = self.client.recv(size) received_size += len(recv_data) file_object.write(recv_data) else: file_object.flush() file_object.close() time.sleep(0.1) print(time.strftime("%Y-%m-%d %X", time.localtime()), "[+]client: -bash :File get done File size is :", file_size) def connect(self, ip, port): ''' 链接函数 :param ip: :param port: :return: ''' self.client.connect((ip, port))
View Code

select_server.py

#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao

import os import sys import json import selectors import socket import time import errno import random BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) from conf import setting sel = selectors.DefaultSelector() class selectors_ftp(object): '''selectors_ftp服务端'''
    def __init__(self): ''' 构造函数 ''' self.sock = socket.socket() def upload(self,conn,mask): ''' 服务器upload函数 :param conn: :param mask :return: ''' os.chdir(setting.upload_path) self.conn.send(b'Server receive upload %s request'%self.file_name.encode()) new = random.randint(1, 100000) #并发测试使用random生成新文件名
        file_object = open((self.file_name+'.'+(str(new))), 'wb') received_size = 0 while received_size < self.file_size: try: if self.file_size - received_size > 1024: size = 1024
                elif self.file_size < 1024: size = self.file_size else: size = self.file_size - received_size recv_data = conn.recv(size) received_size += len(recv_data) file_object.write(recv_data) except BlockingIOError as e: if e.errno != errno.EAGAIN: raise
            else: time.sleep(0.00001) # #print(received_size, file_size)
        else: file_object.close() def download(self,conn,mask): ''' 服务器下载函数 :param conn: :param mask: :return: '''
        while True: os.chdir(setting.download_path) if os.path.isfile(self.file_name) and os.path.exists(self.file_name): try: file_size = os.path.getsize(self.file_name) self.conn.send(str(file_size).encode()) client_file_size = 0 with open(self.file_name, "rb") as file_obj: for line in file_obj: client_file_size += len(line)  # 记录已经传送的文件大小
 self.conn.sendall(line) file_obj.close() if client_file_size >= int(file_size):  # 文件传送完毕
                        break
                except BlockingIOError as e: if e.errno != errno.EAGAIN:  # errno.EAGAIN 缓冲区满 等待下
                        raise
                else: time.sleep(0.00001)  # 等待0.1s进行下一次读取
            else: conn.send(b'404') break

    def accept(self,sock,mask): ''' 服务器监听函数 :param sock: :param mask: :return: ''' self.conn, self.addr = sock.accept() print(time.strftime("%Y-%m-%d %X", time.localtime()), ': accepted',self.conn,'from', self.addr, mask) self.conn.setblocking(False) sel.register(self.conn, selectors.EVENT_READ, self.read) def read(self,conn,mask): ''' 服务器读取命令信息函数 :param conn: :param mask: :return: ''' self.data = conn.recv(1024) if self.data: self.data_receive = json.loads(self.data.decode()) self.action = self.data_receive['client']['action'] self.file_name = self.data_receive['client']['file_name'] self.file_size = self.data_receive['client']['size'] print(time.strftime("%Y-%m-%d %X", time.localtime()), ': echoing', repr(self.data), 'to', self.conn, mask) if self.action == 'put': self.upload(self.conn, mask) conn.send(b'[+]server: -bash : Server receive upload %s done ' % self.file_name.encode()) print(time.strftime("%Y-%m-%d %X", time.localtime()), ': client :', self.addr, ': upload %s done' % self.file_name) elif self.action == 'get': self.download(self.conn, mask) print(time.strftime("%Y-%m-%d %X", time.localtime()), ': client :', self.addr, ': download %s done' % self.file_name) else: print(time.strftime("%Y-%m-%d %X", time.localtime()), ': closing:', self.conn, mask) sel.unregister(conn) conn.close() def register(self,sock): ''' 注册函数 :return: ''' sel.register(self.sock, selectors.EVENT_READ, self.accept) while True: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj,mask) def start(self,ip,port): ''' 启动函数 :return: ''' self.sock.bind((ip,port)) self.sock.listen(500) self.sock.setblocking(False) self.register(self.sock)
View Code

server_main.py

#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao
'''server端交互程序'''
import os import sys import time BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) from core import selectors_server from core import select_server class server_ftp(object): '''ftp_server交互程序'''
    def start(self): ''' 启动函数 :return: '''
        print('欢迎进入Select Ftp') msg = ''' 1.selectors 服务端 2.select 客户端 3.exit 退 出 '''
        while True: print(msg) user_choice = input('请选择操作>>>:') if user_choice == '1': server = selectors_server.selectors_ftp() print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]selectors server ftp already work ') server.start("localhost", 10000) elif user_choice == '2': server = select_server.select_ftp("localhost",10000) print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]select server ftp already work ') server.start() elif user_choice == '3' or user_choice== 'q'or user_choice == 'exit': sys.exit('程序退出') else: print('非法的操作,请重新输入')
View Code

 

程序测试样图

Windows 有没有类似ulimit的文件不太清楚默认是有限制链接的,Linux是可以修改的

Windows : Win10

500链接测试命令返回效果

Linux  (VMware) Centos6.7

ulimit文件

1W链接测试命令返回效果

5W链接测试命令返回效果


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM