使用datacompy比較兩個列表


使用datacompy比較兩個列表

需求: 判斷DB的數據與EXCEL的數據是否完全一致

該需求用到的知識有點多

  • pandas讀取SQL
  • pandas讀取EXCEL
  • datacompy比較列表
  • pandas寫EXCEL

開發前准備

pip install pymysql
pip install pandas
pip install sqlalchemy
pip install datacompy
pip install openpyxl

開發代碼

  • 連接DB並獲取數據
    def do_db(self):
        engine = create_engine("mysql+pymysql://root:" + self.passwd + "@" + self.host + ":" + self.port + "/" + self.db)
        sql = "select lot_no as '批號' from ZM_TBL_DOMESTIC_GINNED_COTTON where DELETE_FLAG = '0'"

        self.df1 = pd.read_sql_query(sql, engine)
        print(self.df1)
  • 讀取EXCEL
    def do_excel(self):
        self.df2 = pd.read_excel(self.file_name, usecols=[0], sheet_name="Sheet1", keep_default_na=False, converters={'批號': str})
        self.df2 = self.df2.drop_duplicates() 
        print(self.df2)

PS:這里需要注意的是,使用datacompy比較的兩個列表中不能又重復的數據,所以要使用self.df2.drop_duplicates()去重

  • 比較列表,並將差異存入EXCEL
    def dict_compare(self):
        self.do_db()
        self.do_excel()

        compare = datacompy.Compare(self.df1, self.df2, join_columns=['批號'])

        # print(compare.matches())  # 最后判斷是否相等,返回 bool
        # print(compare.report())  # 打印報告詳情,返回 string
        # print(compare.report(sample_count=5000))  # 打印報告詳情,返回 string

        df1_unq_rows = compare.df1_unq_rows
        df2_unq_rows = compare.df2_unq_rows

        writer = pd.ExcelWriter(self.file_name, engine='openpyxl')
        writer.book = load_workbook(self.file_name)
        df1_unq_rows.to_excel(writer, sheet_name='EXCEL缺少的數據')
        df2_unq_rows.to_excel(writer, sheet_name="DB缺少的數據")
        writer.save()
        writer.close()

查看datacompy文檔

完整代碼

#!/usr/bin/python3
# -*- encoding: utf-8 -*-
'''
@File        :檢查.py
@Time        :2020/10/26 10:39:06
@Author      :He
@Software    :vsCode
'''


import pymysql
import time
import datetime
import uuid
import os
from sqlalchemy import create_engine
import pandas as pd
import datacompy
from openpyxl import load_workbook


class mysql_class:
    def __init__(self):

        self.host = 'IP'
        self.port = '端口'
        self.passwd = '密碼'
        self.user = 'root'
        self.db = ''
        self.file_name = 'EXCEL.xlsx'

    def do_db(self):
        engine = create_engine("mysql+pymysql://root:" + self.passwd + "@" + self.host + ":" + self.port + "/" + self.db)
        sql = "select lot_no as '批號' from ZM_TBL_DOMESTIC_GINNED_COTTON where DELETE_FLAG = '0'"

        self.df1 = pd.read_sql_query(sql, engine)
        print(self.df1)

    def do_excel(self):
        self.df2 = pd.read_excel(self.file_name, usecols=[0], sheet_name="Sheet1", keep_default_na=False, converters={'批號': str})
        self.df2 = self.df2.drop_duplicates()
        print(self.df2)

    def getCurrentTime(self):
        return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

    def dict_compare(self):
        self.do_db()
        self.do_excel()

        compare = datacompy.Compare(self.df1, self.df2, join_columns=['批號'])

        df1_unq_rows = compare.df1_unq_rows
        df2_unq_rows = compare.df2_unq_rows

        writer = pd.ExcelWriter(self.file_name, engine='openpyxl')
        writer.book = load_workbook(self.file_name)
        df1_unq_rows.to_excel(writer, sheet_name='EXCEL缺少的數據')
        df2_unq_rows.to_excel(writer, sheet_name="DB缺少的數據")
        writer.save()
        writer.close()


if __name__ == "__main__":
    os.chdir(os.path.abspath(os.path.dirname(__file__)))

    starttime = datetime.datetime.now()
    print(starttime)

    mysql_class = mysql_class()
    mysql_class.dict_compare()

    endtime = datetime.datetime.now()
    print(endtime)

    print('\n數據處理成功!所用時間為:' + str((endtime - starttime).seconds))



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM