使用datacompy比较两个列表


使用datacompy比较两个列表

需求: 判断DB的数据与EXCEL的数据是否完全一致

该需求用到的知识有点多

  • pandas读取SQL
  • pandas读取EXCEL
  • datacompy比较列表
  • pandas写EXCEL

开发前准备

pip install pymysql
pip install pandas
pip install sqlalchemy
pip install datacompy
pip install openpyxl

开发代码

  • 连接DB并获取数据
    def do_db(self):
        engine = create_engine("mysql+pymysql://root:" + self.passwd + "@" + self.host + ":" + self.port + "/" + self.db)
        sql = "select lot_no as '批号' from ZM_TBL_DOMESTIC_GINNED_COTTON where DELETE_FLAG = '0'"

        self.df1 = pd.read_sql_query(sql, engine)
        print(self.df1)
  • 读取EXCEL
    def do_excel(self):
        self.df2 = pd.read_excel(self.file_name, usecols=[0], sheet_name="Sheet1", keep_default_na=False, converters={'批号': str})
        self.df2 = self.df2.drop_duplicates() 
        print(self.df2)

PS:这里需要注意的是,使用datacompy比较的两个列表中不能又重复的数据,所以要使用self.df2.drop_duplicates()去重

  • 比较列表,并将差异存入EXCEL
    def dict_compare(self):
        self.do_db()
        self.do_excel()

        compare = datacompy.Compare(self.df1, self.df2, join_columns=['批号'])

        # print(compare.matches())  # 最后判断是否相等,返回 bool
        # print(compare.report())  # 打印报告详情,返回 string
        # print(compare.report(sample_count=5000))  # 打印报告详情,返回 string

        df1_unq_rows = compare.df1_unq_rows
        df2_unq_rows = compare.df2_unq_rows

        writer = pd.ExcelWriter(self.file_name, engine='openpyxl')
        writer.book = load_workbook(self.file_name)
        df1_unq_rows.to_excel(writer, sheet_name='EXCEL缺少的数据')
        df2_unq_rows.to_excel(writer, sheet_name="DB缺少的数据")
        writer.save()
        writer.close()

查看datacompy文档

完整代码

#!/usr/bin/python3
# -*- encoding: utf-8 -*-
'''
@File        :检查.py
@Time        :2020/10/26 10:39:06
@Author      :He
@Software    :vsCode
'''


import pymysql
import time
import datetime
import uuid
import os
from sqlalchemy import create_engine
import pandas as pd
import datacompy
from openpyxl import load_workbook


class mysql_class:
    def __init__(self):

        self.host = 'IP'
        self.port = '端口'
        self.passwd = '密码'
        self.user = 'root'
        self.db = ''
        self.file_name = 'EXCEL.xlsx'

    def do_db(self):
        engine = create_engine("mysql+pymysql://root:" + self.passwd + "@" + self.host + ":" + self.port + "/" + self.db)
        sql = "select lot_no as '批号' from ZM_TBL_DOMESTIC_GINNED_COTTON where DELETE_FLAG = '0'"

        self.df1 = pd.read_sql_query(sql, engine)
        print(self.df1)

    def do_excel(self):
        self.df2 = pd.read_excel(self.file_name, usecols=[0], sheet_name="Sheet1", keep_default_na=False, converters={'批号': str})
        self.df2 = self.df2.drop_duplicates()
        print(self.df2)

    def getCurrentTime(self):
        return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

    def dict_compare(self):
        self.do_db()
        self.do_excel()

        compare = datacompy.Compare(self.df1, self.df2, join_columns=['批号'])

        df1_unq_rows = compare.df1_unq_rows
        df2_unq_rows = compare.df2_unq_rows

        writer = pd.ExcelWriter(self.file_name, engine='openpyxl')
        writer.book = load_workbook(self.file_name)
        df1_unq_rows.to_excel(writer, sheet_name='EXCEL缺少的数据')
        df2_unq_rows.to_excel(writer, sheet_name="DB缺少的数据")
        writer.save()
        writer.close()


if __name__ == "__main__":
    os.chdir(os.path.abspath(os.path.dirname(__file__)))

    starttime = datetime.datetime.now()
    print(starttime)

    mysql_class = mysql_class()
    mysql_class.dict_compare()

    endtime = datetime.datetime.now()
    print(endtime)

    print('\n数据处理成功!所用时间为:' + str((endtime - starttime).seconds))



免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM