flask上傳文件及上傳zip文件實例
from flask import Flask, render_template, redirect, session, request from flask_session import Session import redis import pymysql import os from werkzeug.utils import secure_filename import functools import shutil import datetime import time base_dir = os.path.abspath(os.path.dirname(__file__)) # 獲取當前項目路徑 print(base_dir) ALLOWED_EXTENSIONS = set(['zip'])#允許文件上傳的格式 app = Flask(__name__) # 設置上傳文件的路徑 app.config['UPLOAD_FOLDER'] = base_dir def allowed_file(filename): # 判斷上傳文件的格式 return '.' in filename and \ filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS # session 存儲到redis中 app.config['SESSION_TYPE'] = 'redis' # 建立redis連接 app.config['SESSION_REDIS'] = redis.Redis(host='127.0.0.1', port=6379) # session存儲到redis注冊到flak中 Session(app) # 建立pymysql連接 db = pymysql.connect(host="localhost", user="root", db="day118", port=3306) # 使用cursor()方法創建一個游標對象 cursor = db.cursor(cursor=pymysql.cursors.DictCursor) # 登錄裝器 def auth(func): @functools.wraps(func) def inner(*args, **kwargs): if not session['user']: return redirect('login') ret = func(*args, **kwargs) return ret return inner @app.route('/login', methods=['POST', 'GET']) def login(): if request.method == 'GET': return render_template('login.html') user = request.form.get('user') pwd = request.form.get('pwd') sql = "select * from user where user='" + user + "'and pwd='" + pwd + "'" cursor.execute(sql) # 使用fetchall()獲取全部數據 data = cursor.fetchall() print(data) if data: session['user'] = user return redirect('index') return render_template('login.html', error='用戶名或密碼錯誤') @app.route('/index') def index(): cursor.execute("SELECT * FROM user") user=session.get('user') # 使用fetchall()獲取全部數據 data = cursor.fetchall() return render_template('index.html', data=data,user=user) @app.route('/detail/<nid>') def detail(nid): cursor.execute( "select detail.id,detail.user_id ,user.name,detail.data,detail.lens from detail inner join user on detail.user_id = user.id WHERE detail.user_id='" + nid + "'") # 使用fetchall()獲取全部數據 data = cursor.fetchall() print(data) if data: return render_template('detail.html', data=data) return redirect('index') @app.route('/upload',methods=['POST','GET']) @auth def upload(): if request.method=='POST': file = request.files.get('files') print(file.filename) print(file.filename.rsplit('.')[-1]) if file.filename.rsplit('.')[-1] not in ['zip',]: path=os.path.join(base_dir,file.filename) with open(path,'w')as f: for line in file: f.write(line.decode('utf-8')) with open(path,'rb')as f: count=0 while True: line = f.readline() if line: count+=1 else: break print(count) sql='select id from user where user.user="'+session.get('user')+'"' cursor.execute(sql) data = cursor.fetchall() print(data[0]) ctime=datetime.datetime.now() cursor.execute("INSERT into detail(user_id,lens) VALUES('"+str(data[0]['id'])+"','"+str(count)+"')") db.commit() # 處理壓縮文件 if file and allowed_file(file.filename): filename = secure_filename(file.filename) file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) # 壓縮文件保存在項目路徑下 local_dir = os.path.join(base_dir, '11') # 新創建一個路徑,用來放壓縮后的文件 hh = os.path.join(base_dir, filename) # 這個是找到壓縮文件路徑-------C:/Code/haha.zip print(hh) print(local_dir) shutil.unpack_archive(filename=hh, extract_dir=local_dir)# 把文件保存在剛剛設定好的路徑下 os.remove(hh) # 最后把壓縮文件刪除 filename = filename.split('.')[0] print(filename) # 此處為驗證信息 host_path = os.path.join(local_dir, filename+'.py') # host.txt的路徑 print(host_path) with open(host_path, 'r',encoding='utf-8') as f: # 把host文件打開 key, values = [i.replace('\n', '').split(',') for i in f.readlines()] # 列表推倒式,生成一個由鍵組成的列表,一個由值組成的列表 hostvalue = dict(zip(key, values)) # 把兩個列表組成字典 print(hostvalue) ip = hostvalue['host_os_ip'] # 開始讀取里面的信息 systemname = hostvalue['host_database_bussines'] databasename = hostvalue['host_database_instance'] uploadtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print(ip, systemname, databasename, uploadtime) return render_template('upload.html') if __name__ == '__main__': app.run()
第二版
from flask import Blueprint, render_template, Flask, request, redirect,session import os import uuid from ..utils import helper ind = Blueprint('ind', __name__) @ind.before_request def process_request(): if not session.get("user_info"): return redirect("/login") return None @ind.route('/home') def home(): return render_template('home.html') @ind.route('/user_list') def user_list(): # import pymysql # conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8') # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # cursor.execute("SELECT id,user,nickname FROM userinfo") # data_list = cursor.fetchall() # cursor.close() # conn.close() data_list = helper.fetch_all("SELECT id,user,name FROM user",[]) return render_template('user_list.html',data_list=data_list) @ind.route('/detail/<int:nid>') def detail(nid): # import pymysql # conn = Config.POOL.connection() # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # cursor.execute("SELECT id,line,ctime FROM record where user_id=%s",(nid,)) # record_list = cursor.fetchall() # cursor.close() # conn.close() record_list = helper.fetch_all("SELECT id,lens,data FROM detail where user_id=%s",(nid,)) return render_template('detail.html',record_list=record_list) @ind.route('/upload',methods=['GET','POST']) def upload(): if request.method == "GET": return render_template('upload.html') from werkzeug.datastructures import FileStorage file_obj = request.files.get('code') # 1. 檢查上傳文件后綴名 name_ext = file_obj.filename.rsplit('.',maxsplit=1) if len(name_ext) != 2: return "請上傳zip壓縮文件" if name_ext[1] != 'zip': return "請上傳zip壓縮文件" """ # 2. 接收用戶上傳文件,並寫入到服務器本地. file_path = os.path.join("files",file_obj.filename) # 從file_obj.stream中讀取內容,寫入到文件 file_obj.save(file_path) # 3. 解壓zip文件 import shutil # 通過open打開壓縮文件,讀取內容再進行解壓。 shutil._unpack_zipfile(file_path,'xsadfasdfasdf') """ # 2+3, 接收用戶上傳文件,並解壓到指定目錄 import shutil target_path = os.path.join('files',str(uuid.uuid4())) shutil._unpack_zipfile(file_obj.stream,target_path) # 4. 遍歷某目錄下的所有文件 # for item in os.listdir(target_path): # print(item) total_num = 0 for base_path,folder_list,file_list in os.walk(target_path): for file_name in file_list: file_path = os.path.join(base_path,file_name) file_ext = file_path.rsplit('.',maxsplit=1) if len(file_ext) != 2: continue if file_ext[1] != 'py': continue file_num = 0 with open(file_path,'rb') as f: for line in f: line = line.strip() if not line: continue if line.startswith(b'#'): continue file_num += 1 total_num += file_num # 獲取當前時間 import datetime ctime = datetime.date.today() print(total_num,ctime,session['user_info']['id']) # import pymysql # conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8') # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # cursor.execute("select id from record where ctime=%s and user_id=%s",(ctime,session['user_info']['id'])) # data = cursor.fetchone() # cursor.close() # conn.close() data = helper.fetch_one("select id from record where ctime=%s and user_id=%s",(ctime,session['user_info']['id'])) if data: return "今天已經上傳" # import pymysql # conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8') # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # cursor.execute("insert into record(line,ctime,user_id)values(%s,%s,%s)",(total_num,ctime,session['user_info']['id'])) # conn.commit() # cursor.close() # conn.close() helper.insert("insert into record(line,ctime,user_id)values(%s,%s,%s)",(total_num,ctime,session['user_info']['id'])) return "上傳成功"
數據庫連接池使用
from DBUtils.PooledDB import PooledDB, SharedDBConnection import pymysql class Config(object): SALT = b"gadsg" SECRET_KEY = 'asdf123sdfsdfsdf' MAX_CONTENT_LENGTH = 1024 * 1024 * 7 POOL = PooledDB( creator=pymysql, # 使用鏈接數據庫的模塊 maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數 mincached=2, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建 maxcached=5, # 鏈接池中最多閑置的鏈接,0和None不限制 maxshared=3, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。 blocking=True, # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯 maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='', database='day118', charset='utf8' )
連接連接池
import pymysql from settings import Config def connect(): conn = Config.POOL.connection() cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) return conn,cursor def connect_close(conn,cursor): cursor.close() conn.close() def fetch_all(sql,args): conn,cursor = connect() cursor.execute(sql, args) record_list = cursor.fetchall() connect_close(conn,cursor) return record_list def fetch_one(sql, args): conn, cursor = connect() cursor.execute(sql, args) result = cursor.fetchone() connect_close(conn, cursor) return result def insert(sql, args): conn, cursor = connect() row = cursor.execute(sql, args) conn.commit() connect_close(conn, cursor) return row
