數據治理需要面臨和解決的最重要的問題是,企業這么多的數據庫,每個數據庫這么多的表,每個表這么多的字段,如何進行信息資產的分類分級。
通過以下方法,可以自動的取企業所有數據庫、所有表、所有字段,根據字段的值,利用正則表達式等方式判斷此字段是否屬於用戶敏感信息,如姓名、手機、地址、身份證等。
最終形成數據的風險地圖,庫、表、字段、敏感類型和等級,可以為統一加解密、統一日志等提供服務。
#coding:utf-8
'''
author:Eleven
Python3
'''
import pymysql
import re
# 通過正則匹配出個人敏感信息,如姓名、手機號碼、地址、身份證號碼、銀行卡號
def check_secret(value):
phone_pattern = '^((13[0-9])|(14[5,7,9])|(15[^4])|(18[0-9])|(17[0,1,3,5,6,7,8]))\\d{8}$' # 匹配手機號碼
if re.match(phone_pattern, value):
return ('secret_phone') # 標記字段是否涉密,以及涉密類型(如姓名、手機號碼、地址、身份證號碼、銀行卡號)
else:
return ('no_secret')
class DB(object):
def __init__(self,ip,username,password):
self.ip = ip
self.username = username
self.password = password
self.db = pymysql.connect(self.ip,self.username,self.password)
self.cursor = self.db.cursor()
# 通過schemata獲取所有數據庫名稱
def get_database(self):
self.cursor.execute("SELECT schema_name from information_schema.schemata ")
database_list = self.cursor.fetchall()
result = []
for line in database_list:
if line[0] not in ['information_schema','mysql','performance_schema','test','scan_result']: #排除默認的數據庫
result.append(line[0])
return result
# 獲取表名
def get_table(self,database):
self.cursor.execute("select table_name from information_schema.tables where table_schema= '%s' " % database)
table_list = self.cursor.fetchall()
result = []
for line in table_list:
result.append(line[0])
return result
# 獲取字段名
def get_column(self,database,table):
self.cursor.execute("select column_name from information_schema.columns where table_schema='%s' and table_name='%s'" % (database,table))
column_list = self.cursor.fetchall()
result = []
for line in column_list:
result.append(line[0])
return result
# 獲取字段內容
def get_content(self,database,table,column):
self.cursor.execute("select %s from %s.%s LIMIT 0,1"%(column,database,table))
content = self.cursor.fetchall()
if content:
return content[0][0]
def __del__(self):
self.db.close()
if __name__ == '__main__':
with open('ip.txt','r') as f:
db = DB(f.read(),'root','root')
databases = db.get_database()
for database in databases:
tables = db.get_table(database)
for table in tables:
columns = db.get_column(database,table)
for column in columns:
data = db.get_content(database,table,column)
data = str(data) # 轉成字符串,否則正則報錯
print(database,table,column,data,check_secret(data)) # 輸出結果
最終輸出的數據標識、標示結果,如下數據庫:

