【轉】用python比對數據庫表數據的腳本


最近在做一個數據庫異構復制的項目,客戶表示需要一個數據比對的工具,我就自己寫了一個異構數據庫的比對python腳本.這個比對腳本只能比對數量,不能比對具體的記錄.使用的sql語句也是最基礎的select count(*) 這種,沒有開並發所以對大表可能比對時間稍長.

基本原理是將需要比對的數據寫到一張表里,先讀取那個表里的數據,取出需要比對的表.然后創建多進程,同時在原端和目標端count.然后將count的結果寫到一個excel文件中.

其中最關鍵的就是那張表.只要將那張表里的數據搞對了,基本就不會有什么問題.

目前支持的數據庫有oracle,mysql,postgresql,sqlserver.程序分為三個部分

1.數據庫配置文件

首先需要在python代碼的相同目錄下寫一個名為check.ini的配置文件.下面一個配置文件例子:

 

[DATA]
#配置原端數據庫,下面的ORACLE需要與后面的項匹配
source=ORACLE
#配置目標端數據庫,下面的POSTGRESQL需要與后面的項匹配
target=POSTGRESQL
#配置比對表的數據庫,需要與下面的配置項匹配
check_node=ORACLE
#配置比對表數據庫的用戶,如果是oracle是用戶,如果是mysql,pg,mssql則是數據庫名
check_owner=suq
#配置比對表的表名,區分大小寫
check_table=check_table

#配置mysql的連接串.注意MYSQL必須大寫而且必須是以MYSQL開頭,例如想比對多個mysql可以寫MYSQL1,MYSQL2等
#下面的幾個配置同樣需要以相應例子開頭,因為程序就是以項的開頭來確認是哪種數據庫的
[MYSQL]
db_host=192.168.56.25
db_port=3306
db_user=root
db_pwd=root
db_dbname=major


[ORACLE]
db_host=192.168.56.30
db_port=1521
db_user=dsg
db_pwd=dsg
db_sid=bre1



[POSTGRESQL]
db_host=192.168.56.50
db_port=5432
db_user=postgres
db_pwd=postgres
db_dbname=msgdb



[MSSQL]
db_host=192.168.56.101
db_port=1433
db_user=sa
db_pwd=sa
db_dbname=master

 

2.創建一個比對表.

例如我上面的例子放在suq用戶下的check_table中

具體的表結構如下:

[html]  view plain  copy
 
  1. SQL> desc check_table  
  2.  Name                      Null?    Type  
  3.  ----------------------------------------- -------- ----------------------------  
  4.  SOWNER                         VARCHAR2(30)  
  5.  SNAME                          VARCHAR2(30)  
  6.  TOWNER                         VARCHAR2(30)  
  7.  TNAME                          VARCHAR2(30)  

分別表示原端的用戶名,表名,目標端用戶名表名,如果不是用戶的那么就是數據庫名.

 

看一下表內我的測試數據:

 

[html]  view plain  copy
 
  1. SQL> select * from check_table;  
  2.   
  3. SOWNER                 SNAME                  TOWNER                 TNAME  
  4. ------------------------------ ------------------------------ ------------------------------ ------------------------------  
  5. suq                "t1"               suq                t1  
  6. suq                "t2"               suq                t2  
  7. suq                "t3"               suq                t3  
  8. suq                "t4"               suq                t4  

這里的數據要特別注意,必須寫對否則可能運行會報錯.需要注意的一般原因是不同的數據庫對大小寫敏感不同.因此建議在寫好這些數據后,手動到數據庫查一下,例如

 

select count(*) from suq."t1"

看這樣的sql對不對.

3.就是主程序

需要注意的是我連接各種數據庫分別使用的如下python模塊,寫excel使用XlsxWriter模塊:

 

[html]  view plain  copy
 
  1. C:\Users\think>pip list  
  2. cx-Oracle (5.2.1)  
  3. MySQL-python (1.2.4)  
  4. psycopg2 (2.6.2)  
  5. pymssql (2.1.3)  
  6. XlsxWriter (0.8.5)  

下面是具體的python代碼:

 

#coding:utf-8
import cx_Oracle as ora
import MySQLdb as my
import psycopg2 as post
import pymssql as ms
import ConfigParser as conf
import multiprocessing as  mul
import xlsxwriter 
import time



def connect(cfg,db):
    if db[0:5] == 'MYSQL':
        db_host=cfg.get(db,'db_host')
        db_port=cfg.get(db,'db_port')
        db_user=cfg.get(db,'db_user')
        db_pwd=cfg.get(db,'db_pwd')
        db_dbname=cfg.get(db,'db_dbname')
        conn = my.connect(host=db_host,port=int(db_port),user=db_user,passwd=db_pwd,db=db_dbname)
        return conn
    elif db[0:6] == 'ORACLE':
        db_host=cfg.get(db,'db_host')
        db_port=cfg.get(db,'db_port')
        db_user=cfg.get(db,'db_user')
        db_pwd=cfg.get(db,'db_pwd')
        db_sid=cfg.get(db,'db_sid')
        conn = ora.connect(db_user,db_pwd,db_host+':'+db_port+'/'+db_sid)
        return conn
    elif db[0:10] == 'POSTGRESQL':
        db_host=cfg.get(db,'db_host')
        db_port=cfg.get(db,'db_port')
        db_user=cfg.get(db,'db_user')
        db_pwd=cfg.get(db,'db_pwd')
        db_dbname=cfg.get(db,'db_dbname')
        conn = post.connect(host=db_host,port=db_port,user=db_user,password=db_pwd,database=db_dbname)
        return conn
    elif db[0:5] == 'MSSQL':
        db_host=cfg.get(db,'db_host')
        db_port=cfg.get(db,'db_port')
        db_user=cfg.get(db,'db_user')
        db_pwd=cfg.get(db,'db_pwd')
        db_dbname=cfg.get(db,'db_dbname')
        conn = ms.connect(host=db_host,port=db_port,user=db_user,password=db_pwd,database=db_dbname)
        return conn
        


def check(cfg,db,check_owner,check_table):
    conn=connect(cfg,db)
    cursor=conn.cursor()
    sql='select * from '+check_owner+'.'+check_table
    cursor.execute(sql)
    table_list=[]
    alldata=cursor.fetchall()
    for i in alldata:
        table_list.append(i)
    #print table_list
    return table_list



def getcount(cfg,db,sql,q):
    conn = connect(cfg,db)
    cursor=conn.cursor()
    try:
        cursor.execute(sql)
        countval = cursor.fetchall()[0][0]
        q.put(countval)
    except Exception,e:
        countval="Error : "+str(e)
        q.put(countval)


def isdigit(num):  
    try:  
        int(num)  
        return True  
    except:  
        return False  



def comp(cfg,source,target,tablelist):
    ###excel start
    xlsxname='check_'+str(time.strftime("%Y%m%d%H%M", time.localtime()))+'.xlsx'
    workbook=xlsxwriter.Workbook(xlsxname)
    top=workbook.add_format({'border':6,'align':'center','bg_color':'cccccc','font_size':13,'bold':True})
    format_data_normal=workbook.add_format({'align':'center','font_size':13})
    format_data_warn=workbook.add_format({'align':'center','font_size':13,'bg_color':'ff0000'})
    format_data_err=workbook.add_format({'align':'center','font_size':13,'bg_color':'ffff00'})
    worksheet = workbook.add_worksheet('sheet1')
    worksheet.set_column('A:A',12)
    worksheet.set_column('B:B',40)
    worksheet.set_column('C:C',12)
    worksheet.set_column('D:D',12)
    worksheet.set_column('E:E',40)
    worksheet.set_column('F:F',12)
    worksheet.set_column('G:G',12)
    title=[u'源端用戶',u'源端表名',u'源端數據量',u'目標端用戶',u'目標端表名',u'目標端數據量',u'差異條數']
    worksheet.write_row('A1',title,top)
    ###excel stop
    length=len(tablelist)
    for i in range(length):
        check_result=[]
        sowner=tablelist[i][0]
        sname=tablelist[i][1]
        towner=tablelist[i][2]
        tname=tablelist[i][3]
        sql_s='select count(*) from '+sowner+'.'+sname
        sql_t='select count(*) from '+towner+'.'+tname
        #sql_t='select count(*) from '+towner+'.'+'\"'+tname+'\"'
        q1=mul.Queue()
        q2=mul.Queue()
        p1=mul.Process(target = getcount,args = (cfg,source,sql_s,q1))
        p2=mul.Process(target = getcount,args = (cfg,target,sql_t,q2))
        p1.start()
        p2.start()
        count_s=q1.get()
        count_t=q2.get()
        p1.join
        p2.join
        check_result.append(sowner)
        check_result.append(sname)
        check_result.append(count_s)
        check_result.append(towner)
        check_result.append(tname)
        check_result.append(count_t)
        print '%s %s %s %s %s %s'  %(sowner,sname,count_s,towner,tname,count_t)
        #print check_result
        if isdigit(count_s) and isdigit(count_t):
            check_result.append(count_s-count_t)
            if count_s == count_t:
                worksheet.write_row('A'+str(2+i),check_result,format_data_normal)
            else:
                worksheet.write_row('A'+str(2+i),check_result,format_data_warn)
        else:
            check_result.append("Error")
            worksheet.write_row('A'+str(2+i),check_result,format_data_err)
    workbook.close()




    

if __name__ == "__main__":
    print "AT time {0}".format(time.ctime())
    print "Begin compare ..."
    cfg=conf.ConfigParser()
    cfg.read('check.ini')
    source=cfg.get('DATA','source')
    target=cfg.get('DATA','target')
    check_node=cfg.get('DATA','check_node')
    check_owner=cfg.get('DATA','check_owner')
    check_table=cfg.get('DATA','check_table')

    tablelist=check(cfg,check_node,check_owner,check_table)
    comp(cfg,source,target,tablelist)
    print "AT time {0}".format(time.ctime())
    print "compare complete!"
    raw_input("Press <ENTER>")

 

 

 

執行這段代碼后就會讀取check.ini文件,獲取需要比對的原端和目標端數據庫的信息,以及比對表的信息,首先將比對的表獲取寫到一個數組中.然后使用for循環對表進行count,再寫到excel中.excel名為check_XXXX.xlsx.xxx為時間.如果在執行sql的時候報錯,那么excel中以黃色標出,如果比對原端和目標端數據不一致以紅色標出.

下面是我比對oracle和pg中的一個結果:



--------------

 1 # -*- coding:utf-8 -*-
 2 import os
 3 
 4 yesterdaynamelist=[]
 5 todaynamelist=[]
 6 differentnamelist=[]
 7 areceivername=[]
 8 test=[]
 9 #讀取 昨天生成的namelist 文件 並生成todaynamelist
10 namelist = open('D:\\python\\Project\\AtuoEmail\\Date\\riqi.txt','r')
11 linea = namelist.readlines()
12 # lineb = namelist.readline()
13 # print (namelist)
14 # print (linea)
15 # print (lineb)
16 # for i in linea:
17 #     print (i)
18 #
19 for i in linea:
20     line=i.split()
21     # print(line)
22     yesterdaynamelist.extend(line)
23     # print(yestdaynamelist)
24 
25 
26 # 將todaynamelist 列表輸出成單列的文本。
27 yesterdaytxt = open("D:\\python\\Project\\AtuoEmail\\Date\\yesterdaytxt.txt","w",encoding="utf-8")  #w參數 創建+復寫
28 yesterdaytxt.close()
29 for i in yesterdaynamelist:
30      # print (i)
31      # print(type(i))
32      yesterdaytxt = open("D:\\python\\Project\\AtuoEmail\\Date\\yesterdaytxt.txt","a",encoding="utf-8")
33      yesterdaytxt.write(i)
34      yesterdaytxt.write("\n")
35 yesterdaytxt.close()
36 # todaytxt.write(todaynamelist)
37 # todaytxt.close()  #關閉文件
38 
39 # Yesterdaytxt
40 # temp
41 
42 # print (line)
43 # for i in line:
44 #     print i.strip().split()[0]
45 #     print i.strip().split()[1]
46 #     print i.strip().split()[2]
47 
48 
49 #調用 cmd生成當天最新的域控用戶名單。
50 os.system('D:\\python\\Project\\AtuoEmail\\TodayADUser.bat')
51 
52 namelist = open('D:\\python\\Project\\AtuoEmail\\Date\\riqi.txt','r')
53 linea = namelist.readlines()
54 todaynamelist=[]
55 for i in linea:
56     line=i.split()
57     # print(line)
58     todaynamelist.extend(line)
59     # print(todaynamelist)
60 
61 # print(todaynamelist)
62 # print(yesterdaynamelist)
63 # print(list(set(todaynamelist).difference(set(yesterdaynamelist))))
64 
65 #生成差異名單並導出文件
66 differentnamelist=list(set(todaynamelist).difference(set(yesterdaynamelist))) #“t”“y”對比,輸出“T”中新增的元素
67 # print(differentnamelist)
68 for i in differentnamelist:
69 #      print (i)
70 #      print(type(i))
71       differentnamelist = open("D:\\python\\Project\\AtuoEmail\\Date\\differentnamelist.txt","a",encoding="utf-8")
72       differentnamelist.write(i)
73       differentnamelist.write("@dafy.com,")
74 differentnamelist.close()
View Code
# -*- coding:utf-8 -*-
import os

yesterdaynamelist=[]
todaynamelist=[]
differentnamelist=[]
areceivername=[]
test=[]
#讀取 昨天生成的namelist 文件 並生成todaynamelist
namelist = open('D:\\python\\Project\\AtuoEmail\\Date\\riqi.txt','r')
linea = namelist.readlines()
# lineb = namelist.readline()
# print (namelist)
# print (linea)
# print (lineb)
# for i in linea:
#     print (i)
#
for i in linea:
    line=i.split()
    # print(line)
    yesterdaynamelist.extend(line)
    # print(yestdaynamelist)


# 將todaynamelist 列表輸出成單列的文本。
yesterdaytxt = open("D:\\python\\Project\\AtuoEmail\\Date\\yesterdaytxt.txt","w",encoding="utf-8")  #w參數 創建+復寫
yesterdaytxt.close()
for i in yesterdaynamelist:
     # print (i)
     # print(type(i))
     yesterdaytxt = open("D:\\python\\Project\\AtuoEmail\\Date\\yesterdaytxt.txt","a",encoding="utf-8")
     yesterdaytxt.write(i)
     yesterdaytxt.write("\n")
yesterdaytxt.close()
# todaytxt.write(todaynamelist)
# todaytxt.close()  #關閉文件

# Yesterdaytxt
# temp

# print (line)
# for i in line:
#     print i.strip().split()[0]
#     print i.strip().split()[1]
#     print i.strip().split()[2]


#調用 cmd生成當天最新的域控用戶名單。
os.system('D:\\python\\Project\\AtuoEmail\\TodayADUser.bat')

namelist = open('D:\\python\\Project\\AtuoEmail\\Date\\riqi.txt','r')
linea = namelist.readlines()
todaynamelist=[]
for i in linea:
    line=i.split()
    # print(line)
    todaynamelist.extend(line)
    # print(todaynamelist)

# print(todaynamelist)
# print(yesterdaynamelist)
# print(list(set(todaynamelist).difference(set(yesterdaynamelist))))

#生成差異名單並導出文件
differentnamelist=list(set(todaynamelist).difference(set(yesterdaynamelist))) #“t”“y”對比,輸出“T”中新增的元素
# print(differentnamelist)
for i in differentnamelist:
#      print (i)
#      print(type(i))
      differentnamelist = open("D:\\python\\Project\\AtuoEmail\\Date\\differentnamelist.txt","a",encoding="utf-8")
      differentnamelist.write(i)
      differentnamelist.write("@dafy.com,")
differentnamelist.close()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM