Python--增量循環刪除MySQL表數據


需求場景:

有一業務數據庫,使用MySQL 5.5版本,每天會寫入大量數據,需要不定期將多表中“指定時期前“的數據進行刪除,在SQL SERVER中很容易實現,寫幾個WHILE循環就搞定,雖然MySQL中也存在類似功能,怎奈自己不精通,於是采用Python來實現

 

話不多少,上腳本:

# coding: utf-8
import MySQLdb
import time
import os

# delete config
# 如果VIEW_OR_RUN = "VIEW",僅生成小批量刪除的腳本但不執行
# 如果VIEW_OR_RUN = "RUN",生成小批量刪除的腳本並直接調用執行
VIEW_OR_RUN = "VIEW"
DELETE_DATABASE_NAME = ""
DELETE_TABLE_NAME = ""
DELETE_TABLE_KEY = ""
DELETE_CONDITION = ""
DELETE_ROWS_PER_BATCH = 10000
SLEEP_SECOND_PER_BATCH = 0.5

# MySQL Connection Config
Default_MySQL_Host = '192.168.166.169'
Default_MySQL_Port = 3358
Default_MySQL_User = "mysql_admin"
Default_MySQL_Password = 'mysql@Admin@Pwd'

Default_MySQL_Charset = "utf8"
Default_MySQL_Connect_TimeOut = 120
Default_MySQL_Socket = "/export/data/mysql/tmp/mysql.sock"

# Common config:
DATETIME_FORMAT = '%Y-%m-%d %X'
EXEC_DETAIL_FILE = 'exec_detail.txt'
EXEC_SCRIPT_FILE = 'delete_scripts.sql'


def highlight(s):
    return "%s[30;2m%s%s[1m" % (chr(27), s, chr(27))


def print_warning_message(message):
    """
    以紅色字體顯示消息內容
    :param message: 消息內容
    :return: 無返回值
    """
    message = str(message)
    print(highlight('') + "%s[31;1m%s%s[0m" % (chr(27), message, chr(27)))
    global EXEC_DETAIL_FILE
    write_file(EXEC_DETAIL_FILE, message)


def print_info_message(message):
    """
    以綠色字體輸出提醒性的消息
    :param message: 消息內容
    :return: 無返回值
    """
    message = str(message)
    print(highlight('') + "%s[32;2m%s%s[0m" % (chr(27), message, chr(27)))
    global EXEC_DETAIL_FILE
    write_file(EXEC_DETAIL_FILE, message)


def write_file(file_path, message):
    """
    將傳入的message追加寫入到file_path指定的文件中
    請先創建文件所在的目錄
    :param file_path: 要寫入的文件路徑
    :param message: 要寫入的信息
    :return:
    """
    file_handle = open(file_path, 'a')
    file_handle.writelines(message)
    # 追加一個換行以方便瀏覽
    file_handle.writelines(chr(13))
    file_handle.close()


def get_user_choose_option(input_options, input_message):
    while_flag = True
    choose_option = None
    while while_flag:
        print_info_message(input_message)
        str_input = raw_input("")
        for input_option in input_options:
            if str_input.strip() == input_option:
                choose_option = input_option
                while_flag = False
    return choose_option


def get_mysql_connection():
    """
    根據默認配置返回數據庫連接
    :return: 數據庫連接
    """
    if Default_MySQL_Host.lower() == 'localhost':
        conn = MySQLdb.connect(
                host=Default_MySQL_Host,
                port=Default_MySQL_Port,
                user=Default_MySQL_User,
                passwd=Default_MySQL_Password,
                connect_timeout=Default_MySQL_Connect_TimeOut,
                charset=Default_MySQL_Charset,
                db=DELETE_DATABASE_NAME,
                unix_socket=Default_MySQL_Socket
        )
    else:
        conn = MySQLdb.connect(
                host=Default_MySQL_Host,
                port=Default_MySQL_Port,
                user=Default_MySQL_User,
                passwd=Default_MySQL_Password,
                connect_timeout=Default_MySQL_Connect_TimeOut,
                charset=Default_MySQL_Charset,
                db=DELETE_DATABASE_NAME
        )
    return conn


def mysql_exec(sql_script, sql_param=None):
    """
    執行傳入的腳本,返回影響行數
    :param sql_script:
    :param sql_param:
    :return: 腳本最后一條語句執行影響行數
    """
    try:
        conn = get_mysql_connection()
        print_info_message("在服務器{0}上執行腳本:{1}".format(
                conn.get_host_info(), sql_script))
        cursor = conn.cursor()
        if sql_param is not None:
            cursor.execute(sql_script, sql_param)
        else:
            cursor.execute(sql_script)
        affect_rows = cursor.rowcount
        conn.commit()
        cursor.close()
        conn.close()
        return affect_rows
    except Exception as ex:
        cursor.close()
        conn.rollback()
        raise Exception(str(ex))


def mysql_exec_many(sql_script_list):
    """
    執行傳入的腳本,返回影響行數
    :param sql_script_list: 要執行的腳本List,List中每個元素為sql_script, sql_param對
    :return: 返回執行每個腳本影響的行數列表
    """
    try:
        conn = get_mysql_connection()
        exec_result_list = []
        for sql_script, sql_param in sql_script_list:
            print_info_message("在服務器{0}上執行腳本:{1}".format(
                    conn.get_host_info(), sql_script))
            cursor = conn.cursor()
            if sql_param is not None:
                cursor.execute(sql_script, sql_param)
            else:
                cursor.execute(sql_script)
            affect_rows = cursor.rowcount
            exec_result_list.append("影響行數:{0}".format(affect_rows))
        conn.commit()
        cursor.close()
        conn.close()
        return exec_result_list

    except Exception as ex:
        cursor.close()
        conn.rollback()
        raise Exception(str(ex))


def mysql_query(sql_script, sql_param=None):
    """
    執行傳入的SQL腳本,並返回查詢結果
    :param sql_script:
    :param sql_param:
    :return: 返回SQL查詢結果
    """
    try:
        conn = get_mysql_connection()
        print_info_message("在服務器{0}上執行腳本:{1}".format(
                conn.get_host_info(), sql_script))
        cursor = conn.cursor()
        if sql_param is not None:
            cursor.execute(sql_script, sql_param)
        else:
            cursor.execute(sql_script)
        exec_result = cursor.fetchall()
        cursor.close()
        conn.close()
        return exec_result
    except Exception as ex:
        cursor.close()
        conn.close()
        raise Exception(str(ex))


def get_column_info_list(table_name):
    sql_script = """
DESC {0}
""".format(table_name)
    column_info_list = []
    query_result = mysql_query(sql_script=sql_script, sql_param=None)
    for row in query_result:
        column_name = row[0]
        column_type = row[1]
        column_key = row[3]
        column_info = column_name, column_key, column_type
        column_info_list.append(column_info)
    return column_info_list


def get_id_range():
    """
    按照傳入的表獲取要刪除數據最大ID、最小ID、刪除總行數
    :return: 返回要刪除數據最大ID、最小ID、刪除總行數
    """
    global DELETE_TABLE_NAME
    global DELETE_CONDITION
    sql_script = """
SELECT
MAX({2}) AS MAX_ID,
MIN({2}) AS MIN_ID,
COUNT(1) AS Total_Count
FROM {0}
WHERE {1};
""".format(DELETE_TABLE_NAME, DELETE_CONDITION, DELETE_TABLE_KEY)

    query_result = mysql_query(sql_script=sql_script, sql_param=None)
    max_id, min_id, total_count = query_result[0]
    # 此處有一坑,可能出現total_count不為0 但是max_id 和min_id 為None的情況
    # 因此判斷max_id和min_id 是否為NULL
    if (max_id is None) or (min_id is None):
        max_id, min_id, total_count = 0, 0, 0
    return max_id, min_id, total_count


def delete_data(current_min_id, current_max_id):
    sql_script = """
DELETE FROM {0}
WHERE {4} <= {1}
and {4} >= {2}
AND {3};
""".format(DELETE_TABLE_NAME,
           current_max_id,
           current_min_id,
           DELETE_CONDITION,
           DELETE_TABLE_KEY
           )
    global EXEC_SCRIPT_FILE
    global VIEW_OR_RUN
    if VIEW_OR_RUN == 'RUN':
        row_count = mysql_exec(sql_script)
        print_info_message("影響行數:{0}".format(row_count))
        time.sleep(SLEEP_SECOND_PER_BATCH)
    else:
        print_info_message("生成刪除腳本(未執行)")
        print_info_message(sql_script)
    tmp_script = """
USE {0};
""".format(DELETE_DATABASE_NAME) + sql_script + """
COMMIT;
SELECT SLEEP('{0}');
##=====================================================##
""".format(SLEEP_SECOND_PER_BATCH)
    write_file(file_path=EXEC_SCRIPT_FILE, message=tmp_script)


def loop_delete_data():
    max_id, min_id, total_count = get_id_range()
    if min_id == max_id:
        print_info_message("無數據需要結轉")
        return
    current_min_id = min_id
    global DELETE_ROWS_PER_BATCH
    while current_min_id <= max_id:
        print_info_message("*" * 70)
        current_max_id = current_min_id + DELETE_ROWS_PER_BATCH
        delete_data(current_min_id, current_max_id)
        current_percent = (current_max_id - min_id) * 100.0 / (max_id - min_id)
        left_rows = max_id - current_max_id
        if left_rows < 0:
            left_rows = 0
        current_percent_str = "%.2f" % current_percent
        info = "當前進度{0}/{1},剩余{2},進度為{3}%"
        info = info.format(current_max_id,
                           max_id,
                           left_rows,
                           current_percent_str)
        print_info_message(info)
        current_min_id = current_max_id
    print_info_message("*" * 70)
    print_info_message("執行完成")


def check_config():
    try:
        global DELETE_DATABASE_NAME
        global DELETE_TABLE_NAME
        global DELETE_TABLE_KEY
        global DELETE_CONDITION
        global VIEW_OR_RUN

        if str(DELETE_DATABASE_NAME).strip() == "":
            print_warning_message("數據庫名不能為空")
            return False
        if str(DELETE_TABLE_NAME).strip() == "":
            print_warning_message("表名不能為空")
            return False
        if str(DELETE_CONDITION).strip() == "":
            print_warning_message("刪除條件不能為空")
            return False
        source_columns_info_list = get_column_info_list(DELETE_TABLE_NAME)
        column_count = len(source_columns_info_list)
        primary_key_count = 0
        for column_id in range(column_count):
            source_column_name, source_column_key, source_column_type = source_columns_info_list[column_id]
            if source_column_key.lower() == 'pri':
                primary_key_count += 1
                if not ('int' in str(source_column_type).lower()):
                    print_warning_message("主鍵不為int或bigint")
                    return False
                else:
                    global DELETE_TABLE_KEY
                    DELETE_TABLE_KEY = source_column_name

        if primary_key_count == 0:
            print_warning_message("未找到主鍵,不瞞足遷移條件")
            return False

        if primary_key_count > 1:
            print_warning_message("要刪除的表使用復合主鍵,不滿足遷移條件")
            return False

        return True
    except Exception as ex:
        print_warning_message("執行出現異常,異常為{0}".format(ex.message))
        return False


def clean_env():
    global DELETE_DATABASE_NAME
    global DELETE_TABLE_NAME
    global DELETE_TABLE_KEY
    global DELETE_CONDITION
    global VIEW_OR_RUN
    DELETE_DATABASE_NAME = ""
    DELETE_TABLE_NAME = ""
    DELETE_TABLE_KEY = ""
    DELETE_CONDITION = ""
    VIEW_OR_RUN = "VIEW"

    if os.path.exists(EXEC_SCRIPT_FILE):
        os.remove(EXEC_SCRIPT_FILE)
    if os.path.exists(EXEC_DETAIL_FILE):
        os.remove(EXEC_DETAIL_FILE)


def user_confirm():
    if VIEW_OR_RUN == 'RUN':
        info = """
您將在服務器{0}上{1}中刪除表{2}中數據

刪除數據條件為:
DELETE FROM {3}
WHERE {4}
"""
    else:
        info = """
將生成在服務器{0}上{1}中刪除表{2}中數據的腳本

刪除數據條件為:
DELETE FROM {3}
WHERE {4}
"""
    info = info.format(Default_MySQL_Host,
                       DELETE_DATABASE_NAME,
                       DELETE_TABLE_NAME,
                       DELETE_TABLE_NAME,
                       DELETE_CONDITION)

    if VIEW_OR_RUN == "RUN":
        print_warning_message(info)
    else:
        print_info_message(info)
    input_options = ['yes', 'no']
    input_message = """
請輸入yes繼續或輸入no退出,yes/no?
"""
    user_option = get_user_choose_option(input_options=input_options,
                                         input_message=input_message)
    if user_option == "no":
        return False
    else:
        return True


def delete_table_data(database_name, table_name, delete_condition, is_run, is_need_confirm):
    global DELETE_DATABASE_NAME
    global DELETE_TABLE_NAME
    global DELETE_TABLE_KEY
    global DELETE_CONDITION
    global VIEW_OR_RUN
    DELETE_DATABASE_NAME = database_name
    DELETE_TABLE_NAME = table_name
    DELETE_CONDITION = delete_condition
    DELETE_TABLE_KEY = ''
    if is_run:
        VIEW_OR_RUN = "RUN"
    else:
        VIEW_OR_RUN = "VIEW"
    check_result = check_config()
    if not check_result:
        return
    if is_need_confirm:
        confirm_result = user_confirm()
    else:
        confirm_result = True
    if confirm_result:
        loop_delete_data()


def main():
    clean_env()
    delete_table_data(database_name="db001",
                      table_name="tb2001",
                      delete_condition="dt<'2017-09-01'",
                      is_run=True,
                      is_need_confirm=True)


if __name__ == '__main__':
    main()

 

執行效果:

實現原理:

由於表存在自增ID,於是給我們增量循環刪除的機會,查找出滿足刪除條件的最大值ID和最小值ID,然后按ID 依次遞增,每次小范圍內(如10000條)進行刪除。

實現優點:

實現“小斧子砍大柴”的效果,事務小,對線上影響較小,打印出當前處理到的“ID”,可以隨時關閉,稍微修改下代碼便可以從該ID開始,方便。

實現不足:

為防止主從延遲太高,采用每次刪除SLEEP1秒的方式,相對比較糙,最好的方式應該是周期掃描這條復制鏈路,根據延遲調整SLEEP的周期,反正都腳本化,再智能化點又何妨!

 

本文重點依舊是妹子,不能讓諸位看官白跑一趟,是不!!!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM