最近在做一個數據庫異構復制的項目,客戶表示需要一個數據比對的工具,我就自己寫了一個異構數據庫的比對python腳本.這個比對腳本只能比對數量,不能比對具體的記錄.使用的sql語句也是最基礎的select count(*) 這種,沒有開並發所以對大表可能比對時間稍長.
基本原理是將需要比對的數據寫到一張表里,先讀取那個表里的數據,取出需要比對的表.然后創建多進程,同時在原端和目標端count.然后將count的結果寫到一個excel文件中.
其中最關鍵的就是那張表.只要將那張表里的數據搞對了,基本就不會有什么問題.
目前支持的數據庫有oracle,mysql,postgresql,sqlserver.程序分為三個部分
1.數據庫配置文件
首先需要在python代碼的相同目錄下寫一個名為check.ini的配置文件.下面一個配置文件例子:
[DATA] #配置原端數據庫,下面的ORACLE需要與后面的項匹配 source=ORACLE #配置目標端數據庫,下面的POSTGRESQL需要與后面的項匹配 target=POSTGRESQL #配置比對表的數據庫,需要與下面的配置項匹配 check_node=ORACLE #配置比對表數據庫的用戶,如果是oracle是用戶,如果是mysql,pg,mssql則是數據庫名 check_owner=suq #配置比對表的表名,區分大小寫 check_table=check_table #配置mysql的連接串.注意MYSQL必須大寫而且必須是以MYSQL開頭,例如想比對多個mysql可以寫MYSQL1,MYSQL2等 #下面的幾個配置同樣需要以相應例子開頭,因為程序就是以項的開頭來確認是哪種數據庫的 [MYSQL] db_host=192.168.56.25 db_port=3306 db_user=root db_pwd=root db_dbname=major [ORACLE] db_host=192.168.56.30 db_port=1521 db_user=dsg db_pwd=dsg db_sid=bre1 [POSTGRESQL] db_host=192.168.56.50 db_port=5432 db_user=postgres db_pwd=postgres db_dbname=msgdb [MSSQL] db_host=192.168.56.101 db_port=1433 db_user=sa db_pwd=sa db_dbname=master
2.創建一個比對表.
例如我上面的例子放在suq用戶下的check_table中
具體的表結構如下:
- SQL> desc check_table
- Name Null? Type
- ----------------------------------------- -------- ----------------------------
- SOWNER VARCHAR2(30)
- SNAME VARCHAR2(30)
- TOWNER VARCHAR2(30)
- TNAME VARCHAR2(30)
分別表示原端的用戶名,表名,目標端用戶名表名,如果不是用戶的那么就是數據庫名.
看一下表內我的測試數據:
- SQL> select * from check_table;
- SOWNER SNAME TOWNER TNAME
- ------------------------------ ------------------------------ ------------------------------ ------------------------------
- suq "t1" suq t1
- suq "t2" suq t2
- suq "t3" suq t3
- suq "t4" suq t4
這里的數據要特別注意,必須寫對否則可能運行會報錯.需要注意的一般原因是不同的數據庫對大小寫敏感不同.因此建議在寫好這些數據后,手動到數據庫查一下,例如
select count(*) from suq."t1"
看這樣的sql對不對.
3.就是主程序
需要注意的是我連接各種數據庫分別使用的如下python模塊,寫excel使用XlsxWriter模塊:
- C:\Users\think>pip list
- cx-Oracle (5.2.1)
- MySQL-python (1.2.4)
- psycopg2 (2.6.2)
- pymssql (2.1.3)
- XlsxWriter (0.8.5)
下面是具體的python代碼:
#coding:utf-8 import cx_Oracle as ora import MySQLdb as my import psycopg2 as post import pymssql as ms import ConfigParser as conf import multiprocessing as mul import xlsxwriter import time def connect(cfg,db): if db[0:5] == 'MYSQL': db_host=cfg.get(db,'db_host') db_port=cfg.get(db,'db_port') db_user=cfg.get(db,'db_user') db_pwd=cfg.get(db,'db_pwd') db_dbname=cfg.get(db,'db_dbname') conn = my.connect(host=db_host,port=int(db_port),user=db_user,passwd=db_pwd,db=db_dbname) return conn elif db[0:6] == 'ORACLE': db_host=cfg.get(db,'db_host') db_port=cfg.get(db,'db_port') db_user=cfg.get(db,'db_user') db_pwd=cfg.get(db,'db_pwd') db_sid=cfg.get(db,'db_sid') conn = ora.connect(db_user,db_pwd,db_host+':'+db_port+'/'+db_sid) return conn elif db[0:10] == 'POSTGRESQL': db_host=cfg.get(db,'db_host') db_port=cfg.get(db,'db_port') db_user=cfg.get(db,'db_user') db_pwd=cfg.get(db,'db_pwd') db_dbname=cfg.get(db,'db_dbname') conn = post.connect(host=db_host,port=db_port,user=db_user,password=db_pwd,database=db_dbname) return conn elif db[0:5] == 'MSSQL': db_host=cfg.get(db,'db_host') db_port=cfg.get(db,'db_port') db_user=cfg.get(db,'db_user') db_pwd=cfg.get(db,'db_pwd') db_dbname=cfg.get(db,'db_dbname') conn = ms.connect(host=db_host,port=db_port,user=db_user,password=db_pwd,database=db_dbname) return conn def check(cfg,db,check_owner,check_table): conn=connect(cfg,db) cursor=conn.cursor() sql='select * from '+check_owner+'.'+check_table cursor.execute(sql) table_list=[] alldata=cursor.fetchall() for i in alldata: table_list.append(i) #print table_list return table_list def getcount(cfg,db,sql,q): conn = connect(cfg,db) cursor=conn.cursor() try: cursor.execute(sql) countval = cursor.fetchall()[0][0] q.put(countval) except Exception,e: countval="Error : "+str(e) q.put(countval) def isdigit(num): try: int(num) return True except: return False def comp(cfg,source,target,tablelist): ###excel start xlsxname='check_'+str(time.strftime("%Y%m%d%H%M", time.localtime()))+'.xlsx' workbook=xlsxwriter.Workbook(xlsxname) top=workbook.add_format({'border':6,'align':'center','bg_color':'cccccc','font_size':13,'bold':True}) format_data_normal=workbook.add_format({'align':'center','font_size':13}) format_data_warn=workbook.add_format({'align':'center','font_size':13,'bg_color':'ff0000'}) format_data_err=workbook.add_format({'align':'center','font_size':13,'bg_color':'ffff00'}) worksheet = workbook.add_worksheet('sheet1') worksheet.set_column('A:A',12) worksheet.set_column('B:B',40) worksheet.set_column('C:C',12) worksheet.set_column('D:D',12) worksheet.set_column('E:E',40) worksheet.set_column('F:F',12) worksheet.set_column('G:G',12) title=[u'源端用戶',u'源端表名',u'源端數據量',u'目標端用戶',u'目標端表名',u'目標端數據量',u'差異條數'] worksheet.write_row('A1',title,top) ###excel stop length=len(tablelist) for i in range(length): check_result=[] sowner=tablelist[i][0] sname=tablelist[i][1] towner=tablelist[i][2] tname=tablelist[i][3] sql_s='select count(*) from '+sowner+'.'+sname sql_t='select count(*) from '+towner+'.'+tname #sql_t='select count(*) from '+towner+'.'+'\"'+tname+'\"' q1=mul.Queue() q2=mul.Queue() p1=mul.Process(target = getcount,args = (cfg,source,sql_s,q1)) p2=mul.Process(target = getcount,args = (cfg,target,sql_t,q2)) p1.start() p2.start() count_s=q1.get() count_t=q2.get() p1.join p2.join check_result.append(sowner) check_result.append(sname) check_result.append(count_s) check_result.append(towner) check_result.append(tname) check_result.append(count_t) print '%s %s %s %s %s %s' %(sowner,sname,count_s,towner,tname,count_t) #print check_result if isdigit(count_s) and isdigit(count_t): check_result.append(count_s-count_t) if count_s == count_t: worksheet.write_row('A'+str(2+i),check_result,format_data_normal) else: worksheet.write_row('A'+str(2+i),check_result,format_data_warn) else: check_result.append("Error") worksheet.write_row('A'+str(2+i),check_result,format_data_err) workbook.close() if __name__ == "__main__": print "AT time {0}".format(time.ctime()) print "Begin compare ..." cfg=conf.ConfigParser() cfg.read('check.ini') source=cfg.get('DATA','source') target=cfg.get('DATA','target') check_node=cfg.get('DATA','check_node') check_owner=cfg.get('DATA','check_owner') check_table=cfg.get('DATA','check_table') tablelist=check(cfg,check_node,check_owner,check_table) comp(cfg,source,target,tablelist) print "AT time {0}".format(time.ctime()) print "compare complete!" raw_input("Press <ENTER>")
執行這段代碼后就會讀取check.ini文件,獲取需要比對的原端和目標端數據庫的信息,以及比對表的信息,首先將比對的表獲取寫到一個數組中.然后使用for循環對表進行count,再寫到excel中.excel名為check_XXXX.xlsx.xxx為時間.如果在執行sql的時候報錯,那么excel中以黃色標出,如果比對原端和目標端數據不一致以紅色標出.
下面是我比對oracle和pg中的一個結果:
--------------

1 # -*- coding:utf-8 -*- 2 import os 3 4 yesterdaynamelist=[] 5 todaynamelist=[] 6 differentnamelist=[] 7 areceivername=[] 8 test=[] 9 #讀取 昨天生成的namelist 文件 並生成todaynamelist 10 namelist = open('D:\\python\\Project\\AtuoEmail\\Date\\riqi.txt','r') 11 linea = namelist.readlines() 12 # lineb = namelist.readline() 13 # print (namelist) 14 # print (linea) 15 # print (lineb) 16 # for i in linea: 17 # print (i) 18 # 19 for i in linea: 20 line=i.split() 21 # print(line) 22 yesterdaynamelist.extend(line) 23 # print(yestdaynamelist) 24 25 26 # 將todaynamelist 列表輸出成單列的文本。 27 yesterdaytxt = open("D:\\python\\Project\\AtuoEmail\\Date\\yesterdaytxt.txt","w",encoding="utf-8") #w參數 創建+復寫 28 yesterdaytxt.close() 29 for i in yesterdaynamelist: 30 # print (i) 31 # print(type(i)) 32 yesterdaytxt = open("D:\\python\\Project\\AtuoEmail\\Date\\yesterdaytxt.txt","a",encoding="utf-8") 33 yesterdaytxt.write(i) 34 yesterdaytxt.write("\n") 35 yesterdaytxt.close() 36 # todaytxt.write(todaynamelist) 37 # todaytxt.close() #關閉文件 38 39 # Yesterdaytxt 40 # temp 41 42 # print (line) 43 # for i in line: 44 # print i.strip().split()[0] 45 # print i.strip().split()[1] 46 # print i.strip().split()[2] 47 48 49 #調用 cmd生成當天最新的域控用戶名單。 50 os.system('D:\\python\\Project\\AtuoEmail\\TodayADUser.bat') 51 52 namelist = open('D:\\python\\Project\\AtuoEmail\\Date\\riqi.txt','r') 53 linea = namelist.readlines() 54 todaynamelist=[] 55 for i in linea: 56 line=i.split() 57 # print(line) 58 todaynamelist.extend(line) 59 # print(todaynamelist) 60 61 # print(todaynamelist) 62 # print(yesterdaynamelist) 63 # print(list(set(todaynamelist).difference(set(yesterdaynamelist)))) 64 65 #生成差異名單並導出文件 66 differentnamelist=list(set(todaynamelist).difference(set(yesterdaynamelist))) #“t”“y”對比,輸出“T”中新增的元素 67 # print(differentnamelist) 68 for i in differentnamelist: 69 # print (i) 70 # print(type(i)) 71 differentnamelist = open("D:\\python\\Project\\AtuoEmail\\Date\\differentnamelist.txt","a",encoding="utf-8") 72 differentnamelist.write(i) 73 differentnamelist.write("@dafy.com,") 74 differentnamelist.close()
# -*- coding:utf-8 -*- import os yesterdaynamelist=[] todaynamelist=[] differentnamelist=[] areceivername=[] test=[] #讀取 昨天生成的namelist 文件 並生成todaynamelist namelist = open('D:\\python\\Project\\AtuoEmail\\Date\\riqi.txt','r') linea = namelist.readlines() # lineb = namelist.readline() # print (namelist) # print (linea) # print (lineb) # for i in linea: # print (i) # for i in linea: line=i.split() # print(line) yesterdaynamelist.extend(line) # print(yestdaynamelist) # 將todaynamelist 列表輸出成單列的文本。 yesterdaytxt = open("D:\\python\\Project\\AtuoEmail\\Date\\yesterdaytxt.txt","w",encoding="utf-8") #w參數 創建+復寫 yesterdaytxt.close() for i in yesterdaynamelist: # print (i) # print(type(i)) yesterdaytxt = open("D:\\python\\Project\\AtuoEmail\\Date\\yesterdaytxt.txt","a",encoding="utf-8") yesterdaytxt.write(i) yesterdaytxt.write("\n") yesterdaytxt.close() # todaytxt.write(todaynamelist) # todaytxt.close() #關閉文件 # Yesterdaytxt # temp # print (line) # for i in line: # print i.strip().split()[0] # print i.strip().split()[1] # print i.strip().split()[2] #調用 cmd生成當天最新的域控用戶名單。 os.system('D:\\python\\Project\\AtuoEmail\\TodayADUser.bat') namelist = open('D:\\python\\Project\\AtuoEmail\\Date\\riqi.txt','r') linea = namelist.readlines() todaynamelist=[] for i in linea: line=i.split() # print(line) todaynamelist.extend(line) # print(todaynamelist) # print(todaynamelist) # print(yesterdaynamelist) # print(list(set(todaynamelist).difference(set(yesterdaynamelist)))) #生成差異名單並導出文件 differentnamelist=list(set(todaynamelist).difference(set(yesterdaynamelist))) #“t”“y”對比,輸出“T”中新增的元素 # print(differentnamelist) for i in differentnamelist: # print (i) # print(type(i)) differentnamelist = open("D:\\python\\Project\\AtuoEmail\\Date\\differentnamelist.txt","a",encoding="utf-8") differentnamelist.write(i) differentnamelist.write("@dafy.com,") differentnamelist.close()