需求場景:
有一業務數據庫,使用MySQL 5.5版本,每天會寫入大量數據,需要不定期將多表中“指定時期前“的數據進行刪除,在SQL SERVER中很容易實現,寫幾個WHILE循環就搞定,雖然MySQL中也存在類似功能,怎奈自己不精通,於是采用Python來實現
話不多少,上腳本:
# coding: utf-8 import MySQLdb import time import os # delete config # 如果VIEW_OR_RUN = "VIEW",僅生成小批量刪除的腳本但不執行 # 如果VIEW_OR_RUN = "RUN",生成小批量刪除的腳本並直接調用執行 VIEW_OR_RUN = "VIEW" DELETE_DATABASE_NAME = "" DELETE_TABLE_NAME = "" DELETE_TABLE_KEY = "" DELETE_CONDITION = "" DELETE_ROWS_PER_BATCH = 10000 SLEEP_SECOND_PER_BATCH = 0.5 # MySQL Connection Config Default_MySQL_Host = '192.168.166.169' Default_MySQL_Port = 3358 Default_MySQL_User = "mysql_admin" Default_MySQL_Password = 'mysql@Admin@Pwd' Default_MySQL_Charset = "utf8" Default_MySQL_Connect_TimeOut = 120 Default_MySQL_Socket = "/export/data/mysql/tmp/mysql.sock" # Common config: DATETIME_FORMAT = '%Y-%m-%d %X' EXEC_DETAIL_FILE = 'exec_detail.txt' EXEC_SCRIPT_FILE = 'delete_scripts.sql' def highlight(s): return "%s[30;2m%s%s[1m" % (chr(27), s, chr(27)) def print_warning_message(message): """ 以紅色字體顯示消息內容 :param message: 消息內容 :return: 無返回值 """ message = str(message) print(highlight('') + "%s[31;1m%s%s[0m" % (chr(27), message, chr(27))) global EXEC_DETAIL_FILE write_file(EXEC_DETAIL_FILE, message) def print_info_message(message): """ 以綠色字體輸出提醒性的消息 :param message: 消息內容 :return: 無返回值 """ message = str(message) print(highlight('') + "%s[32;2m%s%s[0m" % (chr(27), message, chr(27))) global EXEC_DETAIL_FILE write_file(EXEC_DETAIL_FILE, message) def write_file(file_path, message): """ 將傳入的message追加寫入到file_path指定的文件中 請先創建文件所在的目錄 :param file_path: 要寫入的文件路徑 :param message: 要寫入的信息 :return: """ file_handle = open(file_path, 'a') file_handle.writelines(message) # 追加一個換行以方便瀏覽 file_handle.writelines(chr(13)) file_handle.close() def get_user_choose_option(input_options, input_message): while_flag = True choose_option = None while while_flag: print_info_message(input_message) str_input = raw_input("") for input_option in input_options: if str_input.strip() == input_option: choose_option = input_option while_flag = False return choose_option def get_mysql_connection(): """ 根據默認配置返回數據庫連接 :return: 數據庫連接 """ if Default_MySQL_Host.lower() == 'localhost': conn = MySQLdb.connect( host=Default_MySQL_Host, port=Default_MySQL_Port, user=Default_MySQL_User, passwd=Default_MySQL_Password, connect_timeout=Default_MySQL_Connect_TimeOut, charset=Default_MySQL_Charset, db=DELETE_DATABASE_NAME, unix_socket=Default_MySQL_Socket ) else: conn = MySQLdb.connect( host=Default_MySQL_Host, port=Default_MySQL_Port, user=Default_MySQL_User, passwd=Default_MySQL_Password, connect_timeout=Default_MySQL_Connect_TimeOut, charset=Default_MySQL_Charset, db=DELETE_DATABASE_NAME ) return conn def mysql_exec(sql_script, sql_param=None): """ 執行傳入的腳本,返回影響行數 :param sql_script: :param sql_param: :return: 腳本最后一條語句執行影響行數 """ try: conn = get_mysql_connection() print_info_message("在服務器{0}上執行腳本:{1}".format( conn.get_host_info(), sql_script)) cursor = conn.cursor() if sql_param is not None: cursor.execute(sql_script, sql_param) else: cursor.execute(sql_script) affect_rows = cursor.rowcount conn.commit() cursor.close() conn.close() return affect_rows except Exception as ex: cursor.close() conn.rollback() raise Exception(str(ex)) def mysql_exec_many(sql_script_list): """ 執行傳入的腳本,返回影響行數 :param sql_script_list: 要執行的腳本List,List中每個元素為sql_script, sql_param對 :return: 返回執行每個腳本影響的行數列表 """ try: conn = get_mysql_connection() exec_result_list = [] for sql_script, sql_param in sql_script_list: print_info_message("在服務器{0}上執行腳本:{1}".format( conn.get_host_info(), sql_script)) cursor = conn.cursor() if sql_param is not None: cursor.execute(sql_script, sql_param) else: cursor.execute(sql_script) affect_rows = cursor.rowcount exec_result_list.append("影響行數:{0}".format(affect_rows)) conn.commit() cursor.close() conn.close() return exec_result_list except Exception as ex: cursor.close() conn.rollback() raise Exception(str(ex)) def mysql_query(sql_script, sql_param=None): """ 執行傳入的SQL腳本,並返回查詢結果 :param sql_script: :param sql_param: :return: 返回SQL查詢結果 """ try: conn = get_mysql_connection() print_info_message("在服務器{0}上執行腳本:{1}".format( conn.get_host_info(), sql_script)) cursor = conn.cursor() if sql_param is not None: cursor.execute(sql_script, sql_param) else: cursor.execute(sql_script) exec_result = cursor.fetchall() cursor.close() conn.close() return exec_result except Exception as ex: cursor.close() conn.close() raise Exception(str(ex)) def get_column_info_list(table_name): sql_script = """ DESC {0} """.format(table_name) column_info_list = [] query_result = mysql_query(sql_script=sql_script, sql_param=None) for row in query_result: column_name = row[0] column_type = row[1] column_key = row[3] column_info = column_name, column_key, column_type column_info_list.append(column_info) return column_info_list def get_id_range(): """ 按照傳入的表獲取要刪除數據最大ID、最小ID、刪除總行數 :return: 返回要刪除數據最大ID、最小ID、刪除總行數 """ global DELETE_TABLE_NAME global DELETE_CONDITION sql_script = """ SELECT MAX({2}) AS MAX_ID, MIN({2}) AS MIN_ID, COUNT(1) AS Total_Count FROM {0} WHERE {1}; """.format(DELETE_TABLE_NAME, DELETE_CONDITION, DELETE_TABLE_KEY) query_result = mysql_query(sql_script=sql_script, sql_param=None) max_id, min_id, total_count = query_result[0] # 此處有一坑,可能出現total_count不為0 但是max_id 和min_id 為None的情況 # 因此判斷max_id和min_id 是否為NULL if (max_id is None) or (min_id is None): max_id, min_id, total_count = 0, 0, 0 return max_id, min_id, total_count def delete_data(current_min_id, current_max_id): sql_script = """ DELETE FROM {0} WHERE {4} <= {1} and {4} >= {2} AND {3}; """.format(DELETE_TABLE_NAME, current_max_id, current_min_id, DELETE_CONDITION, DELETE_TABLE_KEY ) global EXEC_SCRIPT_FILE global VIEW_OR_RUN if VIEW_OR_RUN == 'RUN': row_count = mysql_exec(sql_script) print_info_message("影響行數:{0}".format(row_count)) time.sleep(SLEEP_SECOND_PER_BATCH) else: print_info_message("生成刪除腳本(未執行)") print_info_message(sql_script) tmp_script = """ USE {0}; """.format(DELETE_DATABASE_NAME) + sql_script + """ COMMIT; SELECT SLEEP('{0}'); ##=====================================================## """.format(SLEEP_SECOND_PER_BATCH) write_file(file_path=EXEC_SCRIPT_FILE, message=tmp_script) def loop_delete_data(): max_id, min_id, total_count = get_id_range() if min_id == max_id: print_info_message("無數據需要結轉") return current_min_id = min_id global DELETE_ROWS_PER_BATCH while current_min_id <= max_id: print_info_message("*" * 70) current_max_id = current_min_id + DELETE_ROWS_PER_BATCH delete_data(current_min_id, current_max_id) current_percent = (current_max_id - min_id) * 100.0 / (max_id - min_id) left_rows = max_id - current_max_id if left_rows < 0: left_rows = 0 current_percent_str = "%.2f" % current_percent info = "當前進度{0}/{1},剩余{2},進度為{3}%" info = info.format(current_max_id, max_id, left_rows, current_percent_str) print_info_message(info) current_min_id = current_max_id print_info_message("*" * 70) print_info_message("執行完成") def check_config(): try: global DELETE_DATABASE_NAME global DELETE_TABLE_NAME global DELETE_TABLE_KEY global DELETE_CONDITION global VIEW_OR_RUN if str(DELETE_DATABASE_NAME).strip() == "": print_warning_message("數據庫名不能為空") return False if str(DELETE_TABLE_NAME).strip() == "": print_warning_message("表名不能為空") return False if str(DELETE_CONDITION).strip() == "": print_warning_message("刪除條件不能為空") return False source_columns_info_list = get_column_info_list(DELETE_TABLE_NAME) column_count = len(source_columns_info_list) primary_key_count = 0 for column_id in range(column_count): source_column_name, source_column_key, source_column_type = source_columns_info_list[column_id] if source_column_key.lower() == 'pri': primary_key_count += 1 if not ('int' in str(source_column_type).lower()): print_warning_message("主鍵不為int或bigint") return False else: global DELETE_TABLE_KEY DELETE_TABLE_KEY = source_column_name if primary_key_count == 0: print_warning_message("未找到主鍵,不瞞足遷移條件") return False if primary_key_count > 1: print_warning_message("要刪除的表使用復合主鍵,不滿足遷移條件") return False return True except Exception as ex: print_warning_message("執行出現異常,異常為{0}".format(ex.message)) return False def clean_env(): global DELETE_DATABASE_NAME global DELETE_TABLE_NAME global DELETE_TABLE_KEY global DELETE_CONDITION global VIEW_OR_RUN DELETE_DATABASE_NAME = "" DELETE_TABLE_NAME = "" DELETE_TABLE_KEY = "" DELETE_CONDITION = "" VIEW_OR_RUN = "VIEW" if os.path.exists(EXEC_SCRIPT_FILE): os.remove(EXEC_SCRIPT_FILE) if os.path.exists(EXEC_DETAIL_FILE): os.remove(EXEC_DETAIL_FILE) def user_confirm(): if VIEW_OR_RUN == 'RUN': info = """ 您將在服務器{0}上{1}中刪除表{2}中數據 刪除數據條件為: DELETE FROM {3} WHERE {4} """ else: info = """ 將生成在服務器{0}上{1}中刪除表{2}中數據的腳本 刪除數據條件為: DELETE FROM {3} WHERE {4} """ info = info.format(Default_MySQL_Host, DELETE_DATABASE_NAME, DELETE_TABLE_NAME, DELETE_TABLE_NAME, DELETE_CONDITION) if VIEW_OR_RUN == "RUN": print_warning_message(info) else: print_info_message(info) input_options = ['yes', 'no'] input_message = """ 請輸入yes繼續或輸入no退出,yes/no? """ user_option = get_user_choose_option(input_options=input_options, input_message=input_message) if user_option == "no": return False else: return True def delete_table_data(database_name, table_name, delete_condition, is_run, is_need_confirm): global DELETE_DATABASE_NAME global DELETE_TABLE_NAME global DELETE_TABLE_KEY global DELETE_CONDITION global VIEW_OR_RUN DELETE_DATABASE_NAME = database_name DELETE_TABLE_NAME = table_name DELETE_CONDITION = delete_condition DELETE_TABLE_KEY = '' if is_run: VIEW_OR_RUN = "RUN" else: VIEW_OR_RUN = "VIEW" check_result = check_config() if not check_result: return if is_need_confirm: confirm_result = user_confirm() else: confirm_result = True if confirm_result: loop_delete_data() def main(): clean_env() delete_table_data(database_name="db001", table_name="tb2001", delete_condition="dt<'2017-09-01'", is_run=True, is_need_confirm=True) if __name__ == '__main__': main()
執行效果:

實現原理:
由於表存在自增ID,於是給我們增量循環刪除的機會,查找出滿足刪除條件的最大值ID和最小值ID,然后按ID 依次遞增,每次小范圍內(如10000條)進行刪除。
實現優點:
實現“小斧子砍大柴”的效果,事務小,對線上影響較小,打印出當前處理到的“ID”,可以隨時關閉,稍微修改下代碼便可以從該ID開始,方便。
實現不足:
為防止主從延遲太高,采用每次刪除SLEEP1秒的方式,相對比較糙,最好的方式應該是周期掃描這條復制鏈路,根據延遲調整SLEEP的周期,反正都腳本化,再智能化點又何妨!
本文重點依舊是妹子,不能讓諸位看官白跑一趟,是不!!!

