一、SQLite3 數據庫
SQLite3 可使用 sqlite3 模塊與 Python 進行集成,一般 python 2.5 以上版本默認自帶了sqlite3模塊,因此不需要用戶另外下載。

所以,首先要創建一個數據庫的連接對象,即connection對象,語法如下:
sqlite3.connect(database [,timeout,其他可選參數])
function: 此API打開與SQLite數據庫文件的連接。如果成功打開數據庫,則返回一個連接對象。
database: 數據庫文件的路徑,或 “:memory:” ,后者表示在RAM中創建臨時數據庫。
timeout: 指定連接在引發異常之前等待鎖定消失的時間,默認為5.0(秒)
有了connection對象,就能創建游標對象了,即cursor對象,如下:
connection.cursor([cursorClass])
function: 創建一個游標,返回游標對象,該游標將在Python的整個數據庫編程中使用。
| 方法 | 說明 |
| connect.cursor() | 上述,返回游標對象 |
| connect.execute(sql [,parameters]) | 創建中間游標對象執行一個sql命令 |
| connect.executemany(sql [,parameters]) | 創建中間游標對象執行一個sql命令 |
| connect.executescript(sql_script) | 創建中間游標對象, 以腳本的形式執行sql命令 |
| connect.total_changes() | 返回自打開數據庫以來,已增刪改的行的總數 |
| connect.commit() | 提交當前事務,不使用時為放棄所做的修改,即不保存 |
| connect.rollback() | 回滾自上次調用commit()以來所做的修改,即撤銷 |
| connect.close() | 斷開數據庫連接 |
| 方法 | 說明 |
| cursor.execute(sql [,parameters]) | 執行一個sql命令 |
| cursor.executemany(sql,seq_of_parameters) | 對 seq_of_parameters 中的所有參數或映射執行一個sql命令 |
| cursor.executescript(sql_script) | 以腳本的形式一次執行多個sql命令 |
| cursor.fetchone() | 獲取查詢結果集中的下一行,返回一個單一的序列,當沒有更多可用的數據時,則返回 None。 |
| cursor.fetchmany([size=cursor.arraysize]) | 獲取查詢結果集中的下一行組,返回一個列表。當沒有更多的可用的行時,則返回一個空的列表。size指定特定行數。 |
| cursor.fetchall() | 獲取查詢結果集中所有(剩余)的行,返回一個列表。當沒有可用的行時,則返回一個空的列 |
二、輸入代碼
import sqlite3
from pandas import DataFrame
import re
from pandas import DataFrame
import re
class SQL_method:
'''
function: 可以實現對數據庫的基本操作
'''
def __init__(self, dbName, tableName, data, columns, COLUMNS, Read_All=True):
'''
function: 初始化參數
dbName: 數據庫文件名
tableName: 數據庫中表的名稱
data: 從csv文件中讀取且經過處理的數據
columns: 用於創建數據庫,為表的第一行
COLUMNS: 用於數據的格式化輸出,為輸出的表頭
Read_All: 創建表之后是否讀取出所有數據
'''
self.dbName = dbName
self.tableName = tableName
self.data = data
self.columns = columns
self.COLUMNS = COLUMNS
self.Read_All = Read_All
'''
function: 可以實現對數據庫的基本操作
'''
def __init__(self, dbName, tableName, data, columns, COLUMNS, Read_All=True):
'''
function: 初始化參數
dbName: 數據庫文件名
tableName: 數據庫中表的名稱
data: 從csv文件中讀取且經過處理的數據
columns: 用於創建數據庫,為表的第一行
COLUMNS: 用於數據的格式化輸出,為輸出的表頭
Read_All: 創建表之后是否讀取出所有數據
'''
self.dbName = dbName
self.tableName = tableName
self.data = data
self.columns = columns
self.COLUMNS = COLUMNS
self.Read_All = Read_All
def creatTable(self):
'''
function: 創建數據庫文件及相關的表
'''
# 連接數據庫
connect = sqlite3.connect(self.dbName)
# 創建表
connect.execute("CREATE TABLE {}({})".format(self.tableName, self.columns))
# 提交事務
connect.commit()
# 斷開連接
connect.close()
'''
function: 創建數據庫文件及相關的表
'''
# 連接數據庫
connect = sqlite3.connect(self.dbName)
# 創建表
connect.execute("CREATE TABLE {}({})".format(self.tableName, self.columns))
# 提交事務
connect.commit()
# 斷開連接
connect.close()
def destroyTable(self):
'''
function: 刪除數據庫文件中的表
'''
# 連接數據庫
connect = sqlite3.connect(self.dbName)
# 刪除表
connect.execute("DROP TABLE {}".format(self.tableName))
# 提交事務
connect.commit()
# 斷開連接
connect.close()
'''
function: 刪除數據庫文件中的表
'''
# 連接數據庫
connect = sqlite3.connect(self.dbName)
# 刪除表
connect.execute("DROP TABLE {}".format(self.tableName))
# 提交事務
connect.commit()
# 斷開連接
connect.close()
def insertDataS(self):
'''
function: 向數據庫文件中的表插入多條數據
'''
# 連接數據庫
connect = sqlite3.connect(self.dbName)
# 插入多條數據
connect.executemany("INSERT INTO {} VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)".format(self.tableName), self.data)
#for i in range(len(self.data)):
# connect.execute("INSERT INTO university VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)", data[i])
# 提交事務
connect.commit()
# 斷開連接
connect.close()
'''
function: 向數據庫文件中的表插入多條數據
'''
# 連接數據庫
connect = sqlite3.connect(self.dbName)
# 插入多條數據
connect.executemany("INSERT INTO {} VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)".format(self.tableName), self.data)
#for i in range(len(self.data)):
# connect.execute("INSERT INTO university VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)", data[i])
# 提交事務
connect.commit()
# 斷開連接
connect.close()
def getAllData(self):
'''
function: 得到數據庫文件中的所有數據
'''
# 連接數據庫
connect = sqlite3.connect(self.dbName)
# 創建游標對象
cursor = connect.cursor()
# 讀取數據
cursor.execute("SELECT * FROM {}".format(self.tableName))
dataList = cursor.fetchall()
# 斷開連接
connect.close()
return dataList
'''
function: 得到數據庫文件中的所有數據
'''
# 連接數據庫
connect = sqlite3.connect(self.dbName)
# 創建游標對象
cursor = connect.cursor()
# 讀取數據
cursor.execute("SELECT * FROM {}".format(self.tableName))
dataList = cursor.fetchall()
# 斷開連接
connect.close()
return dataList
def searchData(self, conditions, IfPrint=True):
'''
function: 查找特定的數據
'''
# 連接數據庫
connect = sqlite3.connect(self.dbName)
# 創建游標
cursor = connect.cursor()
# 查找數據
cursor.execute("SELECT * FROM {} WHERE {}".format(self.tableName, conditions))
data = cursor.fetchall()
# 關閉游標
cursor.close()
# 斷開數據庫連接
connect.close()
if IfPrint:
self.printData(data)
return data
'''
function: 查找特定的數據
'''
# 連接數據庫
connect = sqlite3.connect(self.dbName)
# 創建游標
cursor = connect.cursor()
# 查找數據
cursor.execute("SELECT * FROM {} WHERE {}".format(self.tableName, conditions))
data = cursor.fetchall()
# 關閉游標
cursor.close()
# 斷開數據庫連接
connect.close()
if IfPrint:
self.printData(data)
return data
def deleteData(self, conditions):
'''
function: 刪除數據庫中的數據
'''
# 連接數據庫
connect = sqlite3.connect(self.dbName)
# 插入多條數據
connect.execute("DELETE FROM {} WHERE {}".format(self.tableName, conditions))
# 提交事務
connect.commit()
# 斷開連接
connect.close()
'''
function: 刪除數據庫中的數據
'''
# 連接數據庫
connect = sqlite3.connect(self.dbName)
# 插入多條數據
connect.execute("DELETE FROM {} WHERE {}".format(self.tableName, conditions))
# 提交事務
connect.commit()
# 斷開連接
connect.close()
def printData(self, data):
print("{1:{0}^3}{2:{0}<11}{3:{0}<4}{4:{0}<4}{5:{0}<5}{6:{0}<5}{7:{0}^5}{8:{0}^5}{9:{0}^5}{10:{0}^5}{11:{0}^5}{12:{0}^6}{13:{0}^5}".format(chr(12288), *self.COLUMNS))
for i in range(len(data)):
print("{1:{0}<4.0f}{2:{0}<10}{3:{0}<5}{4:{0}<6}{5:{0}<7}{6:{0}<8}{7:{0}<7.0f}{8:{0}<8}{9:{0}<7.0f}{10:{0}<6.0f}{11:{0}<9.0f}{12:{0}<6.0f}{13:{0}<6.0f}".format(chr(12288), *data[i]))
print("{1:{0}^3}{2:{0}<11}{3:{0}<4}{4:{0}<4}{5:{0}<5}{6:{0}<5}{7:{0}^5}{8:{0}^5}{9:{0}^5}{10:{0}^5}{11:{0}^5}{12:{0}^6}{13:{0}^5}".format(chr(12288), *self.COLUMNS))
for i in range(len(data)):
print("{1:{0}<4.0f}{2:{0}<10}{3:{0}<5}{4:{0}<6}{5:{0}<7}{6:{0}<8}{7:{0}<7.0f}{8:{0}<8}{9:{0}<7.0f}{10:{0}<6.0f}{11:{0}<9.0f}{12:{0}<6.0f}{13:{0}<6.0f}".format(chr(12288), *data[i]))
def run(self):
try:
# 創建數據庫文件
self.creatTable()
print(">>> 數據庫創建成功!")
# 保存數據到數據庫
self.insertDataS()
print(">>> 表創建、數據插入成功!")
except:
print(">>> 數據庫已創建!")
# 讀取所有數據
if self.Read_All:
self.printData(self.getAllData())
try:
# 創建數據庫文件
self.creatTable()
print(">>> 數據庫創建成功!")
# 保存數據到數據庫
self.insertDataS()
print(">>> 表創建、數據插入成功!")
except:
print(">>> 數據庫已創建!")
# 讀取所有數據
if self.Read_All:
self.printData(self.getAllData())
def get_data(fileName):
'''
function: 讀取獲得大學排名的數據 並 將結果返回
'''
data = []
# 打開文件
f = open(fileName, 'r', encoding='utf-8')
# 按行讀取文件
for line in f.readlines():
# 替換掉其中的換行符和百分號 替換百分號是為了方便之后的排序和運算
line = line.replace('\n', '')
line = line.replace('%','')
# 將字符串按照 ',' 分割為列表
line = line.split(',')
for i in range(len(line)):
# 使用 異常處理 避開 出現中文無法轉換 的錯誤
try:
# 將空值填充為 0
if line[i] == '':
line[i] = '0'
# 將數字轉換為數值
line[i] = eval(line[i])
except:
continue
data.append(tuple(line))
# EN_columns、CH_columns 分別為 用於數據庫創建、數據的格式化輸出
EN_columns = "Rank real, University text, Province text, Grade real, SourseQuality real, TrainingResult real, ResearchScale real, \
ReserchQuality real, TopResult real, TopTalent real, TechnologyService real, Cooperation real, TransformationResults real"
CH_columns = ["排名", "學校名稱", "省市", "總分", "生涯質量", "培養結果(%)", "科研規模", "科研質量", "頂尖成果", "頂尖人才", "科技服務", "產學研合作", "成果轉化"]
return data[1:], EN_columns, CH_columns
'''
function: 讀取獲得大學排名的數據 並 將結果返回
'''
data = []
# 打開文件
f = open(fileName, 'r', encoding='utf-8')
# 按行讀取文件
for line in f.readlines():
# 替換掉其中的換行符和百分號 替換百分號是為了方便之后的排序和運算
line = line.replace('\n', '')
line = line.replace('%','')
# 將字符串按照 ',' 分割為列表
line = line.split(',')
for i in range(len(line)):
# 使用 異常處理 避開 出現中文無法轉換 的錯誤
try:
# 將空值填充為 0
if line[i] == '':
line[i] = '0'
# 將數字轉換為數值
line[i] = eval(line[i])
except:
continue
data.append(tuple(line))
# EN_columns、CH_columns 分別為 用於數據庫創建、數據的格式化輸出
EN_columns = "Rank real, University text, Province text, Grade real, SourseQuality real, TrainingResult real, ResearchScale real, \
ReserchQuality real, TopResult real, TopTalent real, TechnologyService real, Cooperation real, TransformationResults real"
CH_columns = ["排名", "學校名稱", "省市", "總分", "生涯質量", "培養結果(%)", "科研規模", "科研質量", "頂尖成果", "頂尖人才", "科技服務", "產學研合作", "成果轉化"]
return data[1:], EN_columns, CH_columns
if __name__ == "__main__":
# =================== 設置和得到基本數據 ===================
fileName = "D:\\123.csv"
data, EN_columns, CH_columns = get_data(fileName)
dbName = "university.db"
tableName = "university"
# =================== 設置和得到基本數據 ===================
fileName = "D:\\123.csv"
data, EN_columns, CH_columns = get_data(fileName)
dbName = "university.db"
tableName = "university"
# ================= 創建一個SQL_method對象 ==================
SQL = SQL_method(dbName, tableName, data, EN_columns, CH_columns, False)
SQL = SQL_method(dbName, tableName, data, EN_columns, CH_columns, False)
# =================== 創建數據庫並保存數據 ===================
SQL.run()
# =================== 在數據庫中查找數據項 ===================
# 查找記錄並輸出結果
print(">>> 查找數據項(University = '廣東工業大學') :")
SQL.searchData("University = '廣東工業大學'", True)
SQL.run()
# =================== 在數據庫中查找數據項 ===================
# 查找記錄並輸出結果
print(">>> 查找數據項(University = '廣東工業大學') :")
SQL.searchData("University = '廣東工業大學'", True)
# ================= 在數據庫中篩選數據項並排序 ==================
# 將選取廣東省的數據 並 對科研規模大小排序
print("\n>>> 篩選數據項並按照科研規模排序(Province = '廣東省') :")
SQL.searchData("Province = '廣東省' ORDER BY ResearchScale", True)
# 將選取廣東省的數據 並 對科研規模大小排序
print("\n>>> 篩選數據項並按照科研規模排序(Province = '廣東省') :")
SQL.searchData("Province = '廣東省' ORDER BY ResearchScale", True)
# =============== 對數據庫中的數據進行重新排序操作 ================
# 定義權值
Weight = [0.3, 0.15, 0.1, 0.1, 0.1, 0.1, 0.05, 0.05, 0.05]
value, sum = [], 0
# 獲取 Province = '廣東省' 的所有數據
sample = SQL.searchData("Province = '廣東省'", False)
# 按照權值求出各個大學的總得分
for i in range(len(sample)):
for j in range(len(Weight)):
sum += sample[i][4+j] * Weight[j]
value.append(sum)
sum = 0
# 將結果通過 pandas 的 DataFrame 方法組成一個二維序列
university = [university[1] for university in sample]
uv, tmp = [], []
for i in range(len(university)):
tmp.append(university[i])
tmp.append(value[i])
uv.append(tmp)
tmp = []
df = DataFrame(uv, columns=list(("大學", "總分")))
df = df.sort_values('總分')
df.index = [i for i in range(1, len(uv)+1)]
# 輸出結果
print("\n>>> 篩選【廣東省】的大學並通過權值運算后重排名的結果:\n", df)
# ===================== 在數據庫中刪除數據項 =====================
SQL.deleteData("Province = '北京市'")
SQL.deleteData("Province = '廣東省'")
SQL.deleteData("Province = '山東省'")
SQL.deleteData("Province = '山西省'")
SQL.deleteData("Province = '江西省'")
SQL.deleteData("Province = '河南省'")
print("\n>>> 數據刪除成功!")
SQL.printData(SQL.getAllData())
# 定義權值
Weight = [0.3, 0.15, 0.1, 0.1, 0.1, 0.1, 0.05, 0.05, 0.05]
value, sum = [], 0
# 獲取 Province = '廣東省' 的所有數據
sample = SQL.searchData("Province = '廣東省'", False)
# 按照權值求出各個大學的總得分
for i in range(len(sample)):
for j in range(len(Weight)):
sum += sample[i][4+j] * Weight[j]
value.append(sum)
sum = 0
# 將結果通過 pandas 的 DataFrame 方法組成一個二維序列
university = [university[1] for university in sample]
uv, tmp = [], []
for i in range(len(university)):
tmp.append(university[i])
tmp.append(value[i])
uv.append(tmp)
tmp = []
df = DataFrame(uv, columns=list(("大學", "總分")))
df = df.sort_values('總分')
df.index = [i for i in range(1, len(uv)+1)]
# 輸出結果
print("\n>>> 篩選【廣東省】的大學並通過權值運算后重排名的結果:\n", df)
# ===================== 在數據庫中刪除數據項 =====================
SQL.deleteData("Province = '北京市'")
SQL.deleteData("Province = '廣東省'")
SQL.deleteData("Province = '山東省'")
SQL.deleteData("Province = '山西省'")
SQL.deleteData("Province = '江西省'")
SQL.deleteData("Province = '河南省'")
print("\n>>> 數據刪除成功!")
SQL.printData(SQL.getAllData())
# ====================== 在數據庫中刪除表 ========================
SQL.destroyTable()
print(">>> 表刪除成功!")
PrintTableList(data, 10) # 輸出前10行數據
saveAsCsv("D:\\123.csv", data)
SQL.destroyTable()
print(">>> 表刪除成功!")
PrintTableList(data, 10) # 輸出前10行數據
saveAsCsv("D:\\123.csv", data)
三、運行結果

