環境准備
安裝flask
pip install flask
項目結構如圖
1.新建配置文件conf.py
#!/usr/bin/python
# -*- coding:utf-8 -*-
import pymysql,os
# ============================ Global parameter ==============================
proDir = os.path.split(os.path.realpath(__file__))[0]
print(proDir)
xlsPath = os.path.join(proDir, 'testFile')
#============================ DB Config ==============================
config = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': 'pwd',
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
2.新建目錄testFile,將excel表格放到此目錄下
3.原始數據處理,excel表數據導入mysql庫
新建readexcel.py,讀取excel數據,返回格式為[(1,2,3),(3,4,5)]
import xlrd
from conf import xlsPath
class ExcelUtil():
'''
返回格式為[(1,2,3),(3,4,5)]
'''
def __init__(self, excelPath, sheetIndex=0):
self.data = xlrd.open_workbook(excelPath)
self.table = self.data.sheet_by_index(sheetIndex)
# 獲取第一行作為key值
self.keys = self.table.row_values(0)
# 獲取總行數
self.rowNum = self.table.nrows
# 獲取總列數
self.colNum = self.table.ncols
def dict_data(self):
r = []
j = 1
for i in list(range(self.rowNum-1)):
values = self.table.row_values(j)
r.append(tuple(values))
j += 1
return r
if __name__ == "__main__":
filepath = xlsPath+'/testsalary.xlsx'
sheetIndex = 0
data = ExcelUtil(filepath, sheetIndex).dict_data()
print(data)
4.操作數據庫connDB.py,將excel數據批量入庫
#!/usr/bin/python
# -*- coding:utf-8 -*-
import pymysql
from conf import config
from common.readexcel import ExcelUtil, xlsPath
filepath = xlsPath + '/testsalary.xlsx'
sheetIndex = 0
data = ExcelUtil(filepath, sheetIndex).dict_data()
def conn_db():
conn = pymysql.connect(**config)
conn.autocommit(1)
cursor = conn.cursor()
try:
# 創建數據庫
DB_NAME = 'test'
cursor.execute('DROP DATABASE IF EXISTS %s' % DB_NAME)
cursor.execute('CREATE DATABASE IF NOT EXISTS %s ' % DB_NAME)
conn.select_db(DB_NAME)
# 創建表
TABLE_NAME = 'user'
cursor.execute(
'CREATE TABLE %s(company varchar(30),Account varchar(30) primary key,'
'name varchar(30), Duties varchar(30), Jobwages varchar(30),'
'Rankwages varchar(30),'
'workyears varchar(30),70percent varchar(30),'
'30percent varchar(30),'
'totoal_wages varchar(30),housing_fund varchar(30),'
'Medical_insurance varchar(30),Pension varchar(30),'
'Career_Annuities varchar(30),Taxes varchar(30),'
'total_Deduction varchar(30),Actual_wages varchar(30))' % TABLE_NAME)
# 批量插入紀錄
values = []
for i in data:
values.append(i)
cursor.executemany('INSERT INTO user VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)', values)
# 查詢數據條
cursor.execute('SELECT * FROM %s' % TABLE_NAME)
print('total records:', cursor.rowcount)
result = cursor.fetchall()
return result
except:
import traceback
traceback.print_exc()
# 發生錯誤時會滾
conn.rollback()
finally:
# 關閉游標連接
cursor.close()
# 關閉數據庫連接
conn.close()
if __name__ == "__main__":
print(conn_db())
5.數據准備好,開始寫接口,新建api.py
from flask import Flask, request
import json
import pymysql
from conf import config
app = Flask(__name__)
# 只接受get方法訪問
@app.route("/select/salary/", methods=["GET"])
def check():
# 默認返回內容
return_dict = {'code': '200', 'msg': '處理成功', 'result': False}
# 判斷入參是否為空
if request.args is None:
return_dict['return_code'] = '504'
return_dict['return_info'] = '請求參數為空'
return json.dumps(return_dict, ensure_ascii=False)
# 獲取傳入的參數
get_data = request.args.to_dict()
Account = get_data.get('Account')
# age = get_data.get('age')
# 對參數進行操作
return_dict['result'] = sql_result(Account)
return json.dumps(return_dict, ensure_ascii=False)
# 功能函數
def sql_result(Account):
conn = pymysql.connect(**config)
conn.autocommit(1)
conn.select_db('test')
cursor = conn.cursor()
cursor.execute('SELECT * FROM test.user WHERE Account= %s' % Account)
# print('total records:', cursor.rowcount)
result = cursor.fetchall()
conn.close()
return result[0]
if __name__ == "__main__":
app.run(host='127.0.0.1',port=5000)
6.瀏覽器訪問
http://127.0.0.1:5000/select/salary/?Account=62268200113006149