本節目錄
- 煙草掃碼數據統計
- 獎學金統計
實戰一、煙草掃碼數據統計
1. 需求分析
根據掃碼信息在數據庫文件中匹配相應規格詳細信息,並進行個數統計
條碼庫.xls
掃碼.xlsx
一個條碼對應多個規格名稱.xlsx
2. 代碼實現
# -*- coding: utf-8 -*- """ Datetime: 2020/08/05 Author: ZhangYafei Description: 掃碼數據統計 pip install -i https://pypi.tuna.tsinghua.edu.cn/simple xlrd xlwt pandas python scan_code_stat.py -d 條碼庫文件路徑 -f 掃碼文件路徑 -o 統計結果輸出文件路徑 或者 python scan_code_stat.py 默認為 -d 條碼庫 -f 掃碼 -o 統計結果.xlsx """ from functools import wraps from collections import defaultdict import pandas as pd import os import time from optparse import OptionParser def get_option(): opt_parser = OptionParser() opt_parser.add_option('-d', '--infile1', action='store', type="string", dest='code_db_file', default='條碼庫') opt_parser.add_option('-f', '--infile2', action='store', type="string", dest='code_file', default='掃碼') opt_parser.add_option("-o", "--outfile", action="store", dest="output_file", default='統計結果.xlsx', help='輸出文件路徑') option, args = opt_parser.parse_args() return option.code_db_file, option.code_file, option.output_file def timeit(func): """ 裝飾器: 判斷函數執行時間 :param func: :return: """ @wraps(func) def inner(*args, **kwargs): start = time.time() ret = func(*args, **kwargs) end = time.time() - start if end < 60: print(f'花費時間: {round(end, 2)}秒') else: min, sec = divmod(end, 60) print(f'花費時間 {round(min)}分 {round(sec, 2)}秒') return ret return inner def read_file(file_name: str, converters: dict = None): if file_name.endswith('xls') or file_name.endswith('xlsx'): return pd.read_excel(file_name, converters=converters) if converters else pd.read_excel(file_name) if os.path.exists(f'{file_name}.xls'): return pd.read_excel(f'{file_name}.xls', converters=converters) if converters else pd.read_excel(f'{file_name}.xls') elif os.path.exists(f'{file_name}.xlsx'): return pd.read_excel(f'{file_name}.xlsx', converters=converters) if converters else pd.read_excel(f'{file_name}.xls') @timeit def main(): code_db_file, code_file, output_file = get_option() # 1. 讀取條碼庫並處理為 規格名稱 掃碼 類型 三列 print('正在讀取條碼庫數據---') code_db = read_file(code_db_file, converters={'條碼': str, '盒碼': str}) new_code_db = code_db.dropna(subset=['條碼'])[['規格名稱', '條碼']].copy().rename(columns={'條碼': '掃碼'}) new_code_db['類型'] = '條碼' new_code_db2 = code_db.dropna(subset=['盒碼'])[['規格名稱', '盒碼']].copy().rename(columns={'盒碼': '掃碼'}) new_code_db2['類型'] = '盒碼' new_code_db = new_code_db.append(new_code_db2) new_code_db['掃碼'] = new_code_db['掃碼'].str.strip() new_code_db = new_code_db[(new_code_db['掃碼'] != '(null)') & (new_code_db['掃碼'] != '無')] code_type_duplicated = set(new_code_db.loc[new_code_db.duplicated(subset=['掃碼', '類型']), '掃碼'].to_list()) name_dict = defaultdict(set) code_name_dict = defaultdict(list) code_name_dict2 = {} new_code_db2 = new_code_db.set_index(keys=['掃碼', '規格名稱']) for code, name in new_code_db2.loc[code_type_duplicated, :].index: name_dict[code].add(name) def build_dict(row): if pd.notna(row['規格名稱']): code_name_dict2[row['掃碼']] = row['規格名稱'] print('正在讀取 一個條碼對應多個規格名稱.xlsx ---') total_duplicated_code_df = read_file('一個條碼對應多個規格名稱.xlsx') if len(total_duplicated_code_df[total_duplicated_code_df['規格名稱'].notna()]) > 0: total_duplicated_code_df.apply(build_dict, axis=1) # data = [] for code in name_dict: if len(name_dict[code]) > 1: code_name_dict[code] = list(name_dict[code]) # data.append({'掃碼': code, '規格名稱[多]': ';;'.join(name_dict[code]), '規格名稱': ''}) # total_duplicated_code_df = pd.DataFrame(data=data) # total_duplicated_code_df.to_excel('一個條碼對應多個規格名稱.xlsx', index=False) # 2. 將條碼和盒碼相同的只保留條碼 name_duplicated = new_code_db.loc[(new_code_db['掃碼'].duplicated()) & (~new_code_db.duplicated(subset=['掃碼', '類型'])), '掃碼'] new_code_db = new_code_db[~((new_code_db['掃碼'].isin(name_duplicated)) & (new_code_db['類型'] == '盒碼'))] duplicated_tiao_he_code_list = new_code_db.loc[new_code_db['掃碼'].isin(name_duplicated), '掃碼'] # 3. 讀取掃碼數據並與條碼庫合並(左連接) print('正在讀取掃碼數據---') scan_code = read_file(file_name=code_file, converters={'掃碼': str}) scan_code_count = scan_code['掃碼'].value_counts() scan_code_count = scan_code_count.reset_index().rename(columns={'index': '掃碼', '掃碼': '數量'}) scan_code_match_data = pd.merge(scan_code_count, new_code_db, on='掃碼') scan_code_match_data.drop_duplicates(inplace=True) if len(scan_code_match_data.index) == 0: print('數據匹配結果為空,請重新檢查您的數據') return duplicated_code_list = scan_code_match_data['掃碼'][scan_code_match_data['掃碼'].duplicated()] print(f'{code_file}文件中有 【{len(duplicated_code_list)}】 項掃碼匹配到數據庫中的規格名稱存在重復項 需手動選擇匹配') # 4. 對於重復的掃碼 手動匹配重復的規格名稱 for index, code in enumerate(duplicated_code_list): if code in code_name_dict2: select_name = code_name_dict2[code] else: names = code_name_dict[code] names_str = '' for n, name in enumerate(names): names_str += f'【{n}】 {name}\t' while True: select_num = int(input(f'掃碼{index+1} {code}\t請選擇對應規格名稱的序號\n{names_str}\n請輸入(數字【0-{len(names)-1}】: ')) if select_num >= len(names): print(f'輸入序號超出指定范圍(0-{len(names)-1})\t請重新輸入') else: select_name = names[select_num] break print(f'{code} 將匹配的規格名稱是: {select_name}') scan_code_match_data = scan_code_match_data[~((scan_code_match_data['掃碼'] == code) & (scan_code_match_data['規格名稱'] != select_name))] scan_code_match_data.set_index(keys=['類型', '規格名稱', '掃碼'], inplace=True) code_list = scan_code_match_data.index.get_level_values(2).to_list() code_types = scan_code_match_data.index.get_level_values(0) # 5. 計算結果文件所需格式,並導出 res_df = pd.DataFrame(columns=['規格名稱', '盒包數量', '條包數量'], index=code_list) if '條碼' in code_types: for code_type, name, code in scan_code_match_data.loc[code_types == '條碼', '數量'].index: res_df.loc[code, '規格名稱'] = name res_df.loc[code, '條包數量'] = scan_code_match_data.loc[('條碼', name, code), '數量'] if '盒碼' in code_types: for code_type, name, code in scan_code_match_data.loc[code_types == '盒碼', '數量'].index: res_df.loc[code, '規格名稱'] = name res_df.loc[code, '盒包數量'] = scan_code_match_data.loc[('盒碼', name, code), '數量'] res_df.dropna(subset=['條包數量', '盒包數量'], inplace=True, how='all') res_df.fillna(0, inplace=True) # 6. 導出文件 並給重復的規格名稱添加顏色 duplicated_name_list = res_df.loc[res_df['規格名稱'].duplicated(), '規格名稱'].to_list() writer = pd.ExcelWriter(output_file, engine='xlsxwriter') res_df.to_excel(excel_writer=writer, index_label='掃碼') workbook = writer.book worksheet = writer.sheets['Sheet1'] format1 = workbook.add_format({'bold': True, 'bg_color': '#FFD700', 'font_color': '#DC143C'}) format2 = workbook.add_format({'bold': True, 'bg_color': '#90EE90', 'font_color': '#1E90FF'}) format3 = workbook.add_format({'bold': True, 'bg_color': 'red', 'font_color': 'white'}) format4 = workbook.add_format({'align': 'center'}) worksheet.set_column(0, len(res_df.index), cell_format=format4) for name in duplicated_name_list: worksheet.conditional_format(1, 1, len(res_df.index), 1, {'type': 'text', 'criteria': 'containing', 'value': name, 'format': format1}) for code in duplicated_tiao_he_code_list: worksheet.conditional_format(1, 0, len(res_df.index), 0, {'type': 'text', 'criteria': 'containing', 'value': code, 'format': format2}) for code in duplicated_code_list: worksheet.conditional_format(1, 0, len(res_df.index), 0, {'type': 'text', 'criteria': 'containing', 'value': code, 'format': format3}) writer.save() print('\n################** 開始打印運行日志 **##################') print(f'數據已處理完成! 並保存到文件: {output_file}') print('【注:1.條碼和盒碼相同的標注為藍色背景 2.規格名稱相同,掃碼不同的標注為黃色背景 3.一個掃碼對應多個規格名稱標注為紅色背景】') res_check(code_file, output_file) print('################** 打印運行日志結束 **##################') def res_check(code_file, output_file): print('正在檢測統計結果數據完整性---') scan_code_data = read_file(file_name=code_file, converters={'掃碼': str}) scan_code_data.drop_duplicates(subset=['掃碼'], inplace=True) scan_code_data.dropna(subset=['掃碼'], inplace=True) res_data = read_file(output_file, converters={'掃碼': str}) scan_db = set(scan_code_data['掃碼']) res_db = set(res_data['掃碼']) code_list = scan_db - res_db if code_list: print(f'掃碼文件中共有條碼【{len(scan_db)}】\t結果文件中匹配【{len(res_db)}】\t有【{len(code_list)}】條未匹配') print('未匹配的條碼為:') for index, code in enumerate(code_list): print(f'\t條碼{index+1}\t{code}') else: print(f'掃碼文件中共有條碼【{len(scan_db)}】\t結果文件中匹配【{len(res_db)}】\t所有掃碼已全部匹配') if __name__ == '__main__': main()
- 運行
python scan_code_stat.py -d 條碼庫文件路徑 -f 掃碼文件路徑 -o 統計結果輸出文件路徑 或者 python scan_code_stat.py 默認為 -d 條碼庫 -f 掃碼 -o 統計結果.xlsx
打印輸出
正在讀取條碼庫數據--- 正在讀取 一個條碼對應多個規格名稱.xlsx --- 正在讀取掃碼數據--- 掃碼文件中有 【2】 項掃碼匹配到數據庫中的規格名稱存在重復項 需手動選擇匹配 掃碼1 6901028102940 請選擇對應規格名稱的序號 【0】 貴煙(小國酒香) 【1】 貴煙(國酒香軟黃10mg爆珠) 請輸入(數字【0-1】: 1 6901028102940 將匹配的規格名稱是: 貴煙(國酒香軟黃10mg爆珠) 掃碼2 8801116005581 請選擇對應規格名稱的序號 【0】 ESSE(CHANGE 4mg) 【1】 愛喜(幻變) 請輸入(數字【0-1】: 1 8801116005581 將匹配的規格名稱是: 愛喜(幻變) 數據已處理完成! 並保存到文件: 統計結果.xlsx 【注:1.條碼和盒碼相同的標注為藍色背景 2.規格名稱相同,掃碼不同的標注為黃色背景 3.一個掃碼對應多個規格名稱標注為紅色背景】 花費時間: 11.73秒
3. 效果
- 統計結果.xlsx
實戰二、獎學金統計
-
數據介紹
-
有身份證號.xlsx
-
無身份證號.xlsx
-
-
需求
分別統計有身份證號和無身份證號兩個文件中每個人獲得獎學金的類型和總金額
2. 代碼實現
# -*- coding: utf-8 -*- """ Datetime: 2020/01/15 Author: Zhang Yafei Description: """ import pandas as pd class SchorshipStat(object): def __init__(self, file, idcard_file): self.df = pd.read_excel(file) self.df_idcard = pd.read_excel(idcard_file) @staticmethod def stat_type(row): return '|'.join(row['項目']) @staticmethod def stat_amount_sum(row): return row['金額'].sum() def stat_scholarship_type_total(self, is_save=False, is_print=True): """ 獎學金統計:身份證號 姓名 獎學金類型 獎學金金額 :param is_save: 是否保存到文件 :param is_print: 是否打印 :return: None """ group_df = self.df.groupby(by=['姓名']) group_df_idcard = self.df_idcard.groupby(by=['身份證號碼', '姓名']) df_result = self.grouped_stat(group_df) df_idcard_result = self.grouped_stat(group_df_idcard) if is_print: print(df_result) print(df_idcard_result) if is_save: with pd.ExcelWriter(path='res/獎學金統計.xlsx') as writer: df_result.to_excel(excel_writer=writer, sheet_name='無身份證號') df_idcard_result.to_excel(excel_writer=writer, sheet_name='有身份證號') def grouped_stat(self, group_dataframe): """ 分組統計函數 :param group_dataframe: :return: result_df """ scholor_type = group_dataframe.apply(self.stat_type) scholor_amount_sum = group_dataframe.apply(self.stat_amount_sum) result_df = pd.concat(objs=[scholor_type, scholor_amount_sum], axis=1) result_df.rename(columns={0: '獎學金類型', 1: '獎學金總額'}, inplace=True) return result_df if __name__ == '__main__': schorship = SchorshipStat(file='data/無身份證號.xlsx', idcard_file='data/有身份證號.xlsx') schorship.stat_scholarship_type_total(is_print=True, is_save=True)
3. 運行效果