Mysql指定部分數據同步


一、需求背景
朋友的公司需要每天定時從源端定時同步一部分數據到目標端,庫中存在company_id列的表,只將指定的company_id列導入到目標端數據庫;存在company_id列的表,將表中所有的數據導入到目標端。
 
二、實現思路
1 遠端與目標端的ip地址、賬號、密碼、端口號等信息保存在配置文件中,由我朋友自己填寫;
2 通過information_schema.`COLUMNS`獲取需要導出的表數據;
3 通過mysqldump導出表
4 通過通過mysql -e 的方式導入數據
5 預先在目標端建立相對應的schema
 
三、代碼實現
3.1 配置文件db.conf
[db]
source_host=源端IP
source_port=源端端口
source_user=源端用戶名
source_passwd=源端密碼
dest_host=目標端用戶名
dest_port=目標端端口
dest_user=目標端用戶名
dest_passwd=目標端密碼

 

3.2 實現代碼
# -*- coding: utf-8 -*-
# @Time    : 2019-09-11 10:25
# @Author  : Huangwenjun
# @Email   : 350920551@qq.com
# @File    : rsyncdata.py
# @Software: PyCharm
# @exe_location:
import pymysql
import configparser
import os
class RsyncData():
    def __init__(self):
        self.get_item()
        self.mysql_host = self.item['source_host']
        self.mysql_port = int(self.item['source_port'])
        self.mysql_user = self.item['source_user']
        self.mysql_pwd = self.item['source_passwd']
        self.dest_host=self.item['dest_host']
        self.dest_port=int(self.item['dest_port'])
        self.dest_user = self.item['dest_user']
        self.dest_pwd = self.item['dest_passwd']
        self.conn_source()

    def get_item(self):
        """
        讀取配置文件信息
        """
        cf = configparser.ConfigParser()
        cf.read('db.conf', encoding='utf8')
        config = {}
        for db in cf.sections():
            items = {}
            for item in cf.items(db):
                items[item[0]] = item[1]
            config[db] = items
        self.item = config['db']

    def conn_source(self):
        """
        連接源端數據庫
        """
        self.source_conn = pymysql.connect(host=self.dest_host, port=self.dest_port, user=self.dest_user,
                                                 passwd=self.dest_pwd, charset='utf8mb4')

    def execute_sql(self, conn, sql):
        """
        執行sql
        """
        curs = conn.cursor()
        curs.execute(sql)
        if sql.startswith('select'):
            rows = curs.fetchall()
            curs.close()
            return rows
        else:
            curs.execute('commit')
            curs.close()

    def close(self):
        """
        關閉數據庫連接
        """
        self.source_conn.close()

    def backup_table(self,table_schema,table_name,is_com):
        """
        備份表
        :param table_schema: 數據庫schema
        :param table_name: 表名稱
        :param is_com: 是否包含company_id
        """
        if is_com>=1:
            backup_cmd="mysqldump -u%s -p%s -h %s -P %s --single-transaction %s %s --where='company_id=1'>%s.sql"%(self.mysql_user,self.mysql_pwd,self.mysql_host,self.mysql_port,table_schema,table_name,table_name)
        else:
            backup_cmd = "mysqldump -u%s -p%s -h %s -P %s --single-transaction %s %s>%s.sql" % (
            self.mysql_user, self.mysql_pwd, self.mysql_host,self.mysql_port, table_schema, table_name, table_name)
        os.system(backup_cmd)

    def mysql_import(self,table_schema,table_name):
        """
        數據導入
        :param table_schema: 數據庫schema
        :param table_name: 表名稱
        """
        import_cmd='mysql -u%s -p%s -h %s -P %s -D %s -e"source %s.sql"'%(self.dest_user,self.dest_pwd,self.dest_host,self.dest_port,table_schema,table_name)
        print(import_cmd)
        os.system(import_cmd)


    def run(self):
        """
        執行入口
        :return:
        """
        #從源端獲取庫 表名稱
        sql="select table_schema,table_name,sum(case when column_name='company_id' then 1 else 0 end) company_id_count from information_schema.`COLUMNS` where TABLE_SCHEMA not in ('mysql','information_schema','performance_schema','sys') group by table_schema,table_name"
        rows=self.execute_sql(self.source_conn,sql)
        for row in rows:
            table_schema=row[0]
            table_name=row[1]
            is_comid=row[2]
            #數據備份
            self.backup_table(table_schema,table_name,is_comid)
            #數據導入
            self.mysql_import(table_schema,table_name)

if __name__ == '__main__':
    rsync_data=RsyncData()
    rsync_data.run()
    rsync_data.close()
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM