寫這個肯定是工作需要了,不啰嗦,直接說事
我現在有兩台主機,一台是公司主機,一台是客戶主機,要求把公司主機上的三個表同步到客戶主機上的數據庫
注意是同步,首先就得考慮用linux定時任務或者主從復制,主從復制因為我沒有權限在主機上設置,所以只能選擇通過腳本,做定時任務
涉及的三個表創建語句
# 創建表`schedule_building`
create_sql_schedule_building = """
DROP table IF EXISTS schedule_building ;
CREATE TABLE `schedule_building` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`uuid` varchar(32) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`proj_id` int(11) DEFAULT NULL,
`team_id` int(11) DEFAULT NULL,
`Building` varchar(32) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`status` tinyint(4) DEFAULT '1',
`cid` int(11) DEFAULT NULL,
`cusrname` varchar(50) CHARACTER SET utf8 DEFAULT NULL COMMENT '建立人',
`ctime` datetime DEFAULT NULL,
`uid` int(11) DEFAULT NULL,
`uusrname` varchar(50) CHARACTER SET utf8 DEFAULT NULL COMMENT '修改人',
`utime` datetime DEFAULT NULL,
`random_no` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=91 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
"""
# 創建表`schedule_floor`
create_sql_schedule_floor = """
DROP table IF EXISTS schedule_floor ;
CREATE TABLE `schedule_floor` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`m_id` int(11) DEFAULT NULL,
`sort` int(11) DEFAULT NULL,
`cname` varchar(100) CHARACTER SET utf8 DEFAULT NULL,
`ctime` datetime DEFAULT NULL,
`cid` int(11) DEFAULT NULL,
`utime` datetime DEFAULT NULL,
`uid` int(11) DEFAULT NULL,
`status` tinyint(4) DEFAULT '1',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3249 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ;
"""
# 創建表`schedule_room`
create_sql_schedule_room = """
DROP table IF EXISTS schedule_room ;
CREATE TABLE `schedule_room` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`m_id` int(11) DEFAULT NULL,
`ilevel` int(11) DEFAULT NULL,
`parent_id` int(11) DEFAULT NULL,
`cname` varchar(50) DEFAULT NULL,
`mark` varchar(50) DEFAULT NULL,
`status` tinyint(4) DEFAULT '1',
`sort` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=926 DEFAULT CHARSET=utf8;
"""
核心代碼
from mysql_base import DataBaseParent_local, DataBaseParent_remote, DataBaseParent_test
import MySQLdb
db1 = DataBaseParent_local()
db2 = DataBaseParent_remote()
db3 = DataBaseParent_test()
def read(tb_name):
sql = "SELECT * FROM {0};".format(tb_name)
rows, length = db1.select(sql)
data = []
for row in rows:
data.append(row)
return data
def write_building():
schedule_building = read("schedule_building")
sql_schedule_building_2 = "delete from schedule_building ;"
sql_schedule_building_3 = "insert into schedule_building values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);"
db3.insert_many(sql_schedule_building_3, sql_schedule_building_2, schedule_building)
def write_floor():
schedule_floor = read("schedule_floor")
sql_schedule_floor_2 = "delete from schedule_floor ;"
sql_schedule_floor_1 = "insert into schedule_floor values(%s,%s,%s,%s,%s,%s,%s,%s,%s);"
db3.insert_many(sql_schedule_floor_1, sql_schedule_floor_2, schedule_floor)
def write_room():
schedule_room = read("schedule_room")
sql_schedule_room_2 = "delete from schedule_room ;"
sql_schedule_room_1 = "insert into schedule_room values(%s,%s,%s,%s,%s,%s,%s,%s);"
db3.insert_many(sql_schedule_room_1, sql_schedule_room_2, schedule_room)
if __name__ == '__main__':
write_floor()
write_building()
write_room()
數據庫共享基類
#!/usr/bin/python
# -*- coding: UTF-8 -*-
"""DB共享類庫"""
# 使用此類,先實例化一個DataBaseParent_local對象,然后對象調用相應方法
# from django.db import connection
import MySQLdb
db1 = MySQLdb.connect("www.shdfshajd.cn", "db_user", "qazeDC!@12", "xcx", charset='utf8')
db3 = MySQLdb.connect("localhost", "root", "root", "apollo", charset='utf8')
class DataBaseParent_local:
def __init__(self):
self.cursor = "Initial Status"
self.cursor = db1.cursor()
if self.cursor == "Initial Status":
raise Exception("Can't connect to Database server!")
# 返回元組套元組數據
def select(self, sqlstr):
# result = (('apollo', 'male', '164.jpeg'), ('apollo', 'male', ''))
cur = db1.cursor()
cur.execute(sqlstr)
List = cur.fetchall()
iTotal_length = len(List)
self.description = cur.description
cur.close()
return List, iTotal_length
# 返回列表套字典數據
def select_include_name(self, sqlstr):
# result = [{'name':'apollo','age':28},{'name':'jack','age':27}]
cur = db1.cursor()
cur.execute(sqlstr)
index = cur.description
List = cur.fetchall()
iTotal_length = len(List)
result = []
for res in List:
row = {}
for i in range(len(index) - 1):
row[index[i][0]] = res[i]
result.append(row)
cur.close()
return result, iTotal_length
# 返回指定頁碼數據(元組套元組)
def select_for_grid(self, sqlstr, pageNo=1, select_size=5):
# List: (('apollo','male','28'),('jack','male','27'))
# iTotal_length: 查詢結果元組的長度
# select_size:分頁每頁顯示
# pageNo:頁碼
List, iTotal_length = self.select(sqlstr)
# 確定頁碼
if iTotal_length % select_size == 0:
iTotal_Page = iTotal_length / select_size
else:
iTotal_Page = iTotal_length / select_size + 1
start, end = (pageNo - 1) * select_size, pageNo * select_size
if end >= iTotal_length: end = iTotal_length
if iTotal_length == 0 or start > iTotal_length or start < 0:
return [], iTotal_length, iTotal_Page, pageNo, select_size
# 假設有10條數據,select_size=5,對應結果如下:
# List[start:end]:(('apollo','male','28'),('jack','male','27')) 10,2,
# iTotal_length:10
# iTotal_Page:2
# pageNo:1
# select_size:5
return List[start:end], iTotal_length, iTotal_Page, pageNo, select_size
# 執行sql語句
def executesql(self, sqlstr):
cur = db1.cursor()
r = cur.execute(sqlstr)
db1.commit()
cur.close()
return r
# 插入數據
def insert(self, sql, param):
cur = self.cursor
n = cur.execute(sql, param)
db1.commit()
cur.close()
return n
def release(self):
return 0
class DataBaseParent_test:
def __init__(self):
self.cursor = "Initial Status"
self.cursor = db3.cursor()
if self.cursor == "Initial Status":
raise Exception("Can't connect to Database server!")
# 返回元組套元組數據
def select(self, sqlstr):
# result = (('apollo', 'male', '164.jpeg'), ('apollo', 'male', ''))
cur = db3.cursor()
cur.execute(sqlstr)
List = cur.fetchall()
iTotal_length = len(List)
self.description = cur.description
cur.close()
return List, iTotal_length
# 返回列表套字典數據
def select_include_name(self, sqlstr):
# result = [{'name':'apollo','age':28},{'name':'jack','age':27}]
cur = db3.cursor()
cur.execute(sqlstr)
index = cur.description
List = cur.fetchall()
iTotal_length = len(List)
result = []
for res in List:
row = {}
for i in range(len(index) - 1):
row[index[i][0]] = res[i]
result.append(row)
cur.close()
return result, iTotal_length
# 返回指定頁碼數據(元組套元組)
def select_for_grid(self, sqlstr, pageNo=1, select_size=5):
# List: (('apollo','male','28'),('jack','male','27'))
# iTotal_length: 查詢結果元組的長度
# select_size:分頁每頁顯示
# pageNo:頁碼
List, iTotal_length = self.select(sqlstr)
# 確定頁碼
if iTotal_length % select_size == 0:
iTotal_Page = iTotal_length / select_size
else:
iTotal_Page = iTotal_length / select_size + 1
start, end = (pageNo - 1) * select_size, pageNo * select_size
if end >= iTotal_length: end = iTotal_length
if iTotal_length == 0 or start > iTotal_length or start < 0:
return [], iTotal_length, iTotal_Page, pageNo, select_size
# 假設有10條數據,select_size=5,對應結果如下:
# List[start:end]:(('apollo','male','28'),('jack','male','27')) 10,2,
# iTotal_length:10
# iTotal_Page:2
# pageNo:1
# select_size:5
return List[start:end], iTotal_length, iTotal_Page, pageNo, select_size
# 執行sql語句
def executesql(self, sqlstr):
cur = db3.cursor()
r = cur.execute(sqlstr)
db1.commit()
cur.close()
return r
# 插入數據
def insert(self, sql, param):
cur = self.cursor
n = cur.execute(sql, param)
db3.commit()
cur.close()
return n
def release(self):
return 0
def insert_many(self, sql, sql1, args):
cur = self.cursor
cur.execute(sql1)
res = cur.executemany(sql, args)
db3.commit()
# cur.close()
return res
