# -*- coding: UTF-8 -*-
import ast
import json
import os
import sys
import time
import xlrd
import xlwt
import pandas as pd
from xlutils.copy import copy
from collections import OrderedDict
from aa_demo.base.logger import *
from jsonpath_ng import parse
class xlsx_json_tools:
""" Execl 操作工具 """
def __init__(self, dir_path_name='', read_file_name='', new_file_name='', sheet_name='', datas={}):
"""
Execl 操作工具
:param dir_path_name: 需要保存到的文件夹名称 路径示例:项目名/模块名/data/dir_path_name/xxx.xslx
:param read_file_name: 读取文件名
:param new_file_name: 存储文件名
:param sheet_name:execl sheet页名称, 如果为空 取 read_file_name 文件名
:param datas: 传入空字典{},即datas初始值为{} 作用与方法recur_data()
"""
self.read_file_name = read_file_name
self.datas = datas
# 获取根目录
proj_path = os.path.abspath(os.path.dirname(__file__))[:os.path.abspath(os.path.dirname(__file__)).find('base')]
# xlsx 文件路径
self.xlsx_path = os.path.join(proj_path, 'data\\{}\\{}.xlsx'.format("xlsx" if dir_path_name == '' else dir_path_name, self.read_file_name))
self.json_path = os.path.join(proj_path, 'data\\{}\\{}.json'.format("json" if dir_path_name == '' else dir_path_name, self.read_file_name))
self.new_xlsx_path = os.path.join(proj_path, 'data\\{}\\{}.xlsx'.format("xlsx" if dir_path_name == '' else dir_path_name, self.read_file_name if new_file_name == '' else new_file_name))
self.new_json_path = os.path.join(proj_path, 'data\\{}\\{}.json'.format("json" if dir_path_name == '' else dir_path_name, self.read_file_name if new_file_name == '' else new_file_name))
self.sheet_name = self.read_file_name if sheet_name == "" else sheet_name
# 判断目录是否存在
self.xlsx_path if os.path.exists(os.path.dirname(self.xlsx_path)) else os.makedirs(os.path.dirname(self.xlsx_path))
self.json_path if os.path.exists(os.path.dirname(self.json_path)) else os.makedirs(os.path.dirname(self.json_path))
# logs.debug("xlsx path : {}".format(self.xlsx_path))
# logs.debug("json path : {}".format(self.json_path))
def recur_data(self, data: dict, result="", is_json_layer='on'):
"""
多维/嵌套字典数据无限遍历,获取所有key层和value(key以层级key值拼接)
:param data: 需要遍历获取所有层级key/value值的json串
:param result: 层级名称
:param is_json_layer: 是否逐层获取,on 逐层获取key-value 否则不逐层获取
:return:
"""
if is_json_layer == 'on':
# 使用isinstance检测数据类型:字典类型
if isinstance(data, dict):
for k, v in data.items():
if result == "":
self.recur_data(v, k, is_json_layer)
else:
self.recur_data(v, result+"_{}".format(k), is_json_layer)
# 列表或元组类型
elif isinstance(data, (list, tuple)):
for i in range(len(data)):
# 自我调用实现无限遍历, 并判断是否逐层key拼接命名
self.recur_data(data[i], result, is_json_layer)
else:
# print(result + "=" + str(data))
# self.datas[result] = str(data)
self.datas.setdefault(result, []).append(str(data))
else:
for key, value in data.items():
# self.datas.update({key: [value]})
self.datas.setdefault(key, []).append(str(value))
def read_json_data(self, json_path='', layer_by_layer="on"):
"""
读取json内容
:param json_path: json文件 路径+文件名
:param layer_by_layer: 逐层获取key-value
:return:
"""
if json_path == '':
json_path = self.json_path
# 读取json文件内容
json_con = json.load(open(json_path, "r", encoding="utf-8"), object_pairs_hook=OrderedDict)
# logs.debug("json_content:\n{}".format(json_con))
# 区分是否为execl转换的数据
if "execl_content" in json_con.keys():
self.recur_data(data=json_con.get("execl_content"), is_json_layer=layer_by_layer)
else:
self.recur_data(data=json_con, is_json_layer=layer_by_layer)
json_content = self.datas
logs.debug("获取到的json内容:{}".format(json_content))
return json_content
def write_json_data(self, content):
try:
json.dump(content, open(self.new_json_path, "w+", encoding="utf-8"), ensure_ascii=False, sort_keys=True, indent=4)
logs.info("写入.json文件成功, 写入文件:{}".format(self.new_json_path))
except Exception as e:
logs.error("写入异常原因:{}".format(e))
def read_execl_data(self, read_mode='row', read_num: int = 1) -> list:
"""
读取execl数据
:param read_mode: 读取数据方式: row 按行读取, col 按列读取
:param read_num: 读取数据条数 Tips:当读取条数超过execl条数,则取全部条数
:return: [数据行数, 读取到的数据集合]
"""
try:
# 打开文件
wb = xlrd.open_workbook(self.xlsx_path)
# 根据sheet名称获取sheet页内容
sheet = wb.sheet_by_name(self.sheet_name)
# 获取行数、列数
rowNums = sheet.nrows
colNums = sheet.ncols
# logs.debug("行数:{}, 列数:{}".format(rowNums, colNums))
all_contents = []
if read_mode.lower() == "row":
# 获取第一行内容
# contents = sheet.row_values(0)
# logs.debug("首行内容:{}".format(contents))
# 获取case条数
runNums = read_num+1 if rowNums > read_num+1 > 1 else rowNums
# logs.debug("run = {}".format(runNums-1))
# 获取数据
for i in range(0, runNums):
contents = sheet.row_values(i)
all_contents.append(contents)
# logs.debug("所有行List = {}".format(all_contents))
elif read_mode.lower() == "col":
# 获取第一列内容
# contents = sheet.col_values(0)
# logs.debug("首列内容:{}".format(contents))
# 获取case条数
runNums = read_num+1 if colNums > read_num+1 > 1 else colNums
# logs.debug("run = {}".format(runNums-1))
# 获取数据
for i in range(0, runNums):
contents = sheet.col_values(i)
all_contents.append(contents)
# logs.debug("所有列List = {}".format(all_contents))
else:
sys.exit("仅支持[row, col], 请检查参数:read_mode")
except Exception as e:
runNums = 0
all_contents = None
logs.error("读取数据出现异常, Exception Casue: {}".format(e))
finally:
logs.debug("获取到的execl内容:{}".format(all_contents))
return runNums, all_contents
def copy_execl_data(self, read_mode='row', params_name: list = None, params_values: list = None) -> list:
"""
将数据写入execl Tips:文件不存在则新建,存在则追加(如存在请确保参数名及参数与原数据一一对应,否则会出现覆盖或错位填充)
:param read_mode: 写入方式 [row 按行写入, col 按列写入]
:param params_name: 参数名
:param params_values: 参数值 [[], []]
:return:
"""
sys.exit("Execl 写入文件失败,请检查传入参数是否符合要求") if params_name is None or params_values is None or len(params_name) != len(params_values[0]) else ""
try:
# 打开Execl
wb = xlrd.open_workbook(self.new_xlsx_path)
except FileNotFoundError:
# 新建工作簿
wb = xlwt.Workbook('utf-8')
# 创建工作表
wb.add_sheet(self.sheet_name)
# 保存到新建工作簿
wb.save(self.new_xlsx_path)
# 打开文件
wb = xlrd.open_workbook(self.new_xlsx_path)
except Exception as e:
sys.exit(e)
finally:
sheet_names = wb.sheet_names() # 获取文件 sheet 列表
if self.sheet_name in sheet_names:
sheet = wb.sheet_by_name(self.sheet_name)
nrows, ncols = sheet.nrows, sheet.ncols
else:
nrows, ncols = 0, 0
# logs.debug("行={}, 列={}".format(nrows, ncols))
# 向 execl 写入内容, 如果sheetname不存在,则创建副本
cp_wb = copy(wb)
cp_ws = cp_wb.add_sheet(self.sheet_name) if self.sheet_name not in sheet_names else cp_wb.get_sheet(self.sheet_name)
# 根据写入方式,copy 直接修改单元格内容
if read_mode.lower() == "row":
# 循环写入keys到表格第一行
for i in range(len(params_name)):
cp_ws.write(0, i, params_name[i], self.__title_style())
# 循环写入value 到表格第二行
for x in range(len(params_values)):
values = params_values[x]
for y in range(len(values)):
# values 可能为dict 所以需要转换为str
if nrows == 0:
cp_ws.write(x+1, y, str(values[y]))
else:
cp_ws.write(x+nrows, y, str(values[y]))
elif read_mode.lower() == "col":
# 循环写入keys到表格第一行
for i in range(len(params_name)):
cp_ws.write(i, 0, params_name[i], self.__title_style())
# 循环写入value 到表格第二行
for x in range(len(params_values)):
values = params_values[x]
for y in range(len(values)):
# values 可能为dict 所以需要转换为str
if ncols == 0:
cp_ws.write(y, x+1, str(values[y]))
else:
cp_ws.write(y, x+ncols, str(values[y]))
else:
sys.exit("仅支持[row, col], 请检查参数:read_mode")
try:
# 保存文件
cp_wb.save(self.new_xlsx_path)
logs.info('Execl 文件写入成功, 存储文件名为:{} \n'.format(self.new_xlsx_path))
except FileExistsError:
logs.error('Execl 文件写入败, 请关闭名称为{}文件'.format(self.new_xlsx_path))
except PermissionError:
logs.error('Execl 文件写入失败, 请关闭名称为{}文件'.format(self.new_xlsx_path))
except Exception as e:
logs.error("Execl 文件写入失败,失败原因:{}".format(e))
def write_execl_data(self, params_name: list = None, params_values: list = None) -> list:
"""
将数据写入execl文件中 (文件存在则覆盖原数据, 不存在则新建)
:param params_name:
:param params_values:
:return:
"""
sys.exit("Execl 生成文件失败,请检查传入参数是否符合要求") if params_name is None or params_values is None or len(params_name) != len(params_values[0]) else ""
# list转dataframe,data传入list格式数据,如:[[],[],[]]
# 写入数据
df = pd.DataFrame(params_values, columns=params_name)
# 保存到本地excel
try:
df.to_excel(self.new_xlsx_path, index=False, sheet_name=self.sheet_name)
logs.info('Execl 生成文件成功, 存储文件名为:{} \n'.format(self.new_xlsx_path))
except FileExistsError:
logs.error('Execl 生成文件失败, 请关闭名称为{}文件'.format(self.new_xlsx_path))
except PermissionError:
logs.error('Execl 生成文件失败, 请关闭名称为{}文件'.format(self.new_xlsx_path))
except Exception as e:
logs.error("Execl 生成文件失败,失败原因:{}".format(e))
def read_execl_to_json(self, read_mode='row', read_num=1, is_save_json='off') -> list:
"""
读取execl数据并转换为json
:param read_mode: 读取数据方式: row 按行读取, col 按列读取
:param read_num: 读取数据条数 Tips:当读取条数超过execl条数,则取全部条数
:param is_save_json: 是否保存到json文件;[on 保存,off 不保存]
:return: [{}, {}]
"""
try:
rd = self.read_execl_data(read_mode=read_mode, read_num=read_num)
runNums = rd[0]
all_contents = rd[1]
# 拆分json结构 keys
r = []
all_data = {}
for j in range(1, runNums):
key_name = all_contents[0]
key_value = all_contents[j]
# 将两个list(表头和case行) 拼成dict
s = {}
for x in range(0, len(key_value)):
if key_value[x] != "删除":
s[key_name[x]] = ast.literal_eval(key_value[x]) if str(key_value[x]).startswith("[") or str(key_value[x]).startswith("{") else key_value[x]
# 将数据存储到r列表中
r.extend([json.dumps(s, ensure_ascii=False)])
# 将全部数据存储到all_data中
all_data.setdefault("execl_content", []).append(ast.literal_eval(json.dumps(s, ensure_ascii=False)))
except Exception as e:
r = None
all_data = None
logs.error("读取数据出现异常, Exception Casue: {}".format(e))
return r
finally:
logs.debug(all_data)
logs.debug(type(all_data))
# 判断是否需要存储到文件中
if is_save_json == "on":
# json.dump(all_data, open(self.new_json_path, "w+", encoding="utf-8"), ensure_ascii=False, sort_keys=True, indent=4)
# logs.info("写入.json文件成功, 写入文件:{}".format(self.json_path))
self.write_json_data(all_data)
# logs.debug("写入.json文件内容:{}".format(all_data))
return r
def read_json_to_execl(self, to_execl_mode="on", read_json_layer='on') -> list:
"""
读取json数据保存到execl
:param to_execl_mode: on 无文件则生成,有则覆盖, off 无文件则生成,有则追加
:param read_json_layer: on json逐层获取key-value, off 仅获取一层key-value
:return:
"""
# 读取json数据
json_dic = self.read_json_data(layer_by_layer=read_json_layer)
# logs.debug("获取到的json内容:{}".format(json_dic))
# 获取key-value
keys_list = list(json_dic.keys())
values_list = list(json_dic.values())
data_line = len(values_list[0]) if values_list != [] else None
line_list = []
# logs.debug('所有层级keys:{}'.format(keys_list))
# logs.debug('所有层级values:{}'.format(values_list))
# logs.debug("json共包含{}行数据信息".format(data_line))
if data_line is not None:
for i in range(data_line):
line = []
for j in range(len(keys_list)):
# logs.debug("{}:{}".format(keys_list[j], values_list[j]))
line.append(values_list[j][i] if values_list[j] is not None else "") # 单行
line_list.extend([line]) # 多行
# logs.debug("获取单行信息line={}".format(line))
# logs.debug("获取多行信息line_list={}".format(line_list))
# logs.debug("写入xlsx文件的内容:{}".format([keys_list, line_list]))
if to_execl_mode == "on":
self.write_execl_data(keys_list, line_list)
else:
self.copy_execl_data(read_mode="row", params_name=keys_list, params_values=line_list)
return [keys_list, line_list]
@staticmethod
def __forward(letter, jump):
"""字母表上的循环迭代器"""
if letter.islower():
start_character = ord('a')
else:
start_character = ord('A')
start = ord(letter) - start_character
offset = ((start + jump) % 26) + start_character
result = chr(offset)
return result
@staticmethod
def __title_style():
"""xlsx表格标题样式"""
# 新建一个样式表
style = xlwt.XFStyle()
# 新建一个字体格式对象
font = xlwt.Font()
font.name = '微软雅黑'
font.bold = True
font.height = 180 # 360
# 把字体放入样式表
style.font = font
# 新建一个边框样式
borders = xlwt.Borders()
# 边框都是细线
borders.top = xlwt.Borders.THIN
borders.bottom = xlwt.Borders.THIN
borders.left = xlwt.Borders.THIN
borders.right = xlwt.Borders.THIN
# 把边框给样式表
style.borders = borders
# 新建一个对齐
alingnment = xlwt.Alignment()
# 垂直和水平对齐
alingnment.horz = xlwt.Alignment.HORZ_CENTER
alingnment.vert = xlwt.Alignment.VERT_CENTER
# 把对齐给样式表
style.alignment = alingnment
# 设置背景颜色
pattern = xlwt.Pattern()
# 背景颜色
pattern.pattern = xlwt.Pattern.SOLID_PATTERN
pattern.pattern_fore_colour = 48
# 把背景颜色给样式表
style.pattern = pattern
return style
if __name__ == "__main__":
"""debug"""
params_name = ["name", "age", "addr"]
# params_values = [["zhangsan", 18, "shanghai"], ["lisi", 22, "beijing"], ["wugo", 30, "harbin"]]
# xlsx_json_tools(dir_path_name="demo", read_file_name="demo", sheet_name="demo").read_execl_data(read_mode="row", read_num=0)
# xlsx_json_tools(dir_path_name="demo", read_file_name="demo", sheet_name="demo").read_execl_data(read_mode="col", read_num=0)
# xlsx_json_tools(dir_path_name="demo", read_file_name="demo", sheet_name="demo").write_execl_data(read_mode="row", params_name=params_name, params_values=params_values)
# xlsx_json_tools(dir_path_name="demo", read_file_name="demo", sheet_name="demo").copy_execl_data(read_mode="row", params_name=params_name, params_values=params_values)
# xlsx_json_tools(dir_path_name="demo", read_file_name="need", sheet_name="demo").copy_execl_data(read_mode="col", params_name=params_name, params_values=params_values)
# xlsx_json_tools(dir_path_name="demo", read_file_name="need", sheet_name="demo").write_execl_data(params_name=params_name, params_values=params_values)
# """ json <--> xlsx 转换 """
# xlsx_json_tools(dir_path_name="demo", read_file_name="demo").read_execl_data(read_mode="row", read_num=0)
xlsx_json_tools(dir_path_name="demo", read_file_name="demo", new_file_name='demo1', sheet_name="demo").read_execl_to_json(read_mode="row", read_num=0, is_save_json="on")
# xlsx_json_tools(dir_path_name="demo", read_file_name="demo", new_file_name='demo2', sheet_names="demo").read_execl_to_json(read_mode="col", read_num=0, is_save_json="on")
# xlsx_json_tools(dir_path_name="demo", read_file_name="demo").read_json_data(layer_by_layer="on.")
xlsx_json_tools(dir_path_name="demo", read_file_name="demo", new_file_name='demo2').read_json_to_execl(to_execl_mode='on', read_json_layer='on.')