python 操作MYSQL數據庫主要有兩種方式:
使用原生模塊:pymysql
ORM框架:SQLAchemy
一、pymysql
1.1下載安裝模塊
第一種:cmd下:執行命令下載安裝:pip3 install pymysql 第二種:IDE下pycharm python環境路徑下添加模塊
1.2使用操作
#導入模塊
import pymysql
#建立連接通道,建立連接填入(連接數據庫的IP地址,端口號,用戶名,密碼,要操作的數據庫,字符編碼)
conn = pymysql.connect(
host="",
port="",
user='',
password='',
database=""
charset="",
)
# 創建游標,操作設置為字典類型,返回結果為字典格式!不寫默認是元組格式!
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
#操作數據庫的sql語句
sql=""
# 向數據庫發送數據,在方法內部進行拼接!!!
#向數據庫發送操作單條操作指令
# 格式化輸入的值可以單個按順序傳入 或是寫成列表 (注意 順序和位置)
r = cursor.execute(sql,v1,v2……)
r = cursor.execute(sql,args)
#r 代表接收返回受影響的行數(數字)及執行這一條sql語句,數據庫中有多少行受到了影響。
#sql 指上邊寫的sql語句
#args 指要給sql語句中傳的參數
sql 語句可以不傳值 及為空 []
sql 語句可以傳一個值 及 [v1,]
sql 語句可以傳多值 及 [v1,v2,v3……]
#向數據庫發送操作多條數據指令 args=[(v1,s1),(v2,s2),(v3,s3)]
r = cursor.executemany(sql,[('egon','sb'),('laoyao','BS')])
#數據庫有四種操作:增刪改查!
# 執行查操作的時候就得接收從數據庫返回的數據!
#執行增刪改操作的時候,就需要像數據庫提交數據!
#查操作:(接收的數據格式由創建的游標樣式決定!)
#接收數據有三種方式:
res = cursor.fetchone() #接收返回的第一行數據
ret = cursor.fetchmany(n) #接收返回的n行數據
req = cursor.fetchall() #接收返回的說有數據
#注:在fetch數據時按照順序進行,可以使用cursor.scroll(num,mode)來移動游標位置,如:
cursor.scroll(1,mode='relative') # 相對當前位置移動
cursor.scroll(2,mode='absolute') # 相對絕對位置移動
#增刪改操作:
#寫完發送操作語句之后,就需要把更改的數據提交,不然數據庫無法完成新建或是修改操作
conn.commit() #提交
#注:此處有個獲取新建數據自增ID的操作(只能拿到最后那一行的id數)
#執行增加語句,並提交之后,可以獲取到
new_id=cursor.lastrowid
print(new_id)
#操作完成之后,就需要關閉連接
cursor.close() #關閉游標
conn.close() #關閉連接
操作總結:
1、重中之重,一定要注意sql注入的問題!!!
#格式化寫入sql語句,就會造成sql注入的情況!!! import pymysql user = input("username:") pwd = input("password:") conn = pymysql.connect(host="localhost",user='root',password='',database="db666") cursor = conn.cursor() sql = "select * from userinfo where username='%s' and password='%s'" %(user,pwd,) # select * from userinfo where username='uu' or 1=1 -- ' and password='%s' cursor.execute(sql) result = cursor.fetchone() cursor.close() conn.close() if result: print('登錄成功') else: print('登錄失敗')
import pymysql user = input("username:") pwd = input("password:") conn = pymysql.connect(host="localhost",user='root',password='',database="db666") cursor = conn.cursor() sql = "select * from userinfo where username=%s and password=%s" # sql = "select * from userinfo where username=%(u)s and password=%(p)s" #傳入數據類型舉例 cursor.execute(sql,user,pwd) #直接傳值 # cursor.execute(sql,[user,pwd]) #列表形式 # cursor.execute(sql,{'u':user,'p':pwd}) #字典格式 result = cursor.fetchone() cursor.close() conn.close() if result: print('登錄成功') else: print('登錄失敗')
import pymysql # 增加,刪,該 # conn = pymysql.connect(host="localhost",user='root',password='',database="db666") # cursor = conn.cursor() # sql = "insert into userinfo(username,password) values('root','123123')" # 受影響的行數 # r = cursor.execute(sql) # # ****** # conn.commit() # cursor.close() # conn.close() # conn = pymysql.connect(host="localhost",user='root',password='',database="db666") # cursor = conn.cursor() # # sql = "insert into userinfo(username,password) values(%s,%s)" # # cursor.execute(sql,(user,pwd,)) #插入多條信息 # sql = "insert into userinfo(username,password) values(%s,%s)" # # 受影響的行數 # r = cursor.executemany(sql,[('egon','sa'),('laoyao','BS')]) # # ****** # conn.commit() # cursor.close() # conn.close() # 查 # conn = pymysql.connect(host="localhost",user='root',password='',database="db666") # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # sql = "select * from userinfo" # cursor.execute(sql) # cursor.scroll(1,mode='relative') # 相對當前位置移動 # cursor.scroll(2,mode='absolute') # 相對絕對位置移動 # result = cursor.fetchone() # print(result) # result = cursor.fetchone() # print(result) # result = cursor.fetchone() # print(result) # result = cursor.fetchall() # print(result) # result = cursor.fetchmany(4) # print(result) # cursor.close() # conn.close() # 新插入數據的自增ID: cursor.lastrowid # import pymysql # # conn = pymysql.connect(host="localhost",user='root',password='',database="db666") # cursor = conn.cursor() # sql = "insert into userinfo(username,password) values('asdfasdf','123123')" # cursor.execute(sql) # conn.commit() # print(cursor.lastrowid) # cursor.close() # conn.close()
二、SQLAchemy
2.1下載安裝模塊
pip3 install SQLAlchemy
IDE下pycharm python環境路徑下添加模塊
2.2原理
SQLAlchemy是python編程語言下的一款ORM框架,該框架建立在數據庫API之上,使用關系對象映射進行數據庫操作,簡言之便是:將對象轉換成SQL,然后使用數據API執行SQL並獲取執行結果。

利用模塊,按照對應規則,自動生成sql語句!
作用:提供簡單的規則,自動轉換成sql語句,最終還是執行sql語句,獲取結果!
ORM操作流程:
創建一個類,類對應數據庫的表,類能實例一個對象,這個對象對應表里的數據行
關系對象映射關系:
代碼 數據庫
類 ---> 表
對象 ---> 行
DB first :手動創建數據庫和表,通過ORM框架 根據數據庫,通過類生成一個一個表
code first :手動創建類和數據庫,通過ORM框架 利用類創建表
SQLAlchemy本身無法操作數據庫,其必須以來pymsql等第三方插件,Dialect用於和數據API進行交流,根據配置文件的不同調用不同的數據庫API,從而實現對數據庫的操作。
遠程連接數據庫引擎類型: MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多詳見:http://docs.sqlalchemy.org/en/latest/dialects/index.html
SQLAchemy 只負責把類轉換成sql語句,連接數據庫還是需要插件配合着數據庫模塊連接的。
連接的數據庫不同,轉換成的sql語句也不同。
提前必須有連接不同數據庫的模塊或是軟件,SQLAchemy再去配置
規則:導入模塊,生成一個基類,然后再用創建類的方法去創建表,sql語句中的語法,全部轉換成了方法
雖然沒有使用__init__方法,但是在執行定義的時候,會copy到__init__中
找到當前所有繼承base的類,然后創建對應的表
注意:利用SQLAchemy 創建表之前,需要先手動創建一個數據庫!
2.3操作
導入模塊:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHAR
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
創建基類:
Base = declarative_base()
通過pymysql與mysql數據庫建立遠程連接 和設置最大連接數:
engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/day63?charset=utf8", max_overflow=5)
創建單表:
class 類名(Base):
__tablename__="表名" #創建表名
列名=Column(數據類型,是否為空,主鍵,自增,索引,唯一索引)
__table_args__(
UniqueConstraint("列名1","列名2","聯合唯一索引名"),
index("索引名","列名1","列名2"),
) #創建聯合唯一索引
數據類型:Integer 整型;String 字符串類型(CHAR,VARCHAR也可以);
是否為空:nullable=True,
是否為主鍵:primary_key=True,
是否自增:autoincrement=True
索引:index=True
唯一索引:unique=True
例:
class Users(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(VARCHAR(32), nullable=True, index=True)
email = Column(VARCHAR(16), unique=True)
__table_args__ = (
UniqueConstraint('id', 'name', name='uix_id_name'),
Index('ix_n_ex','name', 'email',),
)
創建有外鍵關系的多表:
1、先創建一個繼承Base基類 存放數據的普通表
2、創建一個繼承Base基類 與其有外鍵關系的表
3、語法:外鍵名("表名.列名")ForeignKey("usertype.id")
例:
class UserType(Base):
__tablename__ = "usertype"
id = Column(Integer,primary_key=True,autoincrement=True)
title = Column(String(32),nullable=True,index=True)
class Users(Base):
__tablename__ = "users"
id = Column(Integer,primary_key=True,autoincrement=True)
name = Column(String(32),nullable=True,index=True)
email = Column(String(16),unique=True)
u_type_id = Column(Integer,ForeignKey("usertype.id"))
#外鍵名 = Column(數據類型,ForeignKey("表名.列名"))
生成表或是刪除表(可以把操作寫成一個函數!):
#找到當前所有繼承base的類,然后創建所有的表
def create_table():
Base.metadata.create_all(engine)
#找到當前所有繼承base的類,然后刪除所有的表
def del_table():
Base.metadata.drop_all(engine)
操作表:
萬年不變的 數據行 的增刪改查
#首先,先建立鏈接通道
Session = sessionmaker(bind=engine)
session = Session()
#其次,操作表 注意:操作內填如的內容,一定並必須是表達式!
#增
對哪張表更改,就用其對應的類進行實例化,生成的對象就代表着數據行
#增加單個 session.add()
obj = UserType(title = "黑金用戶")
session.add(obj)
#增加多個 session.add_all()
objs =[
UserType(title = "會員用戶"),
UserType(title = "超級用戶"),
UserType(title = "鉑金用戶"),
UserType(title = "黑金用戶"),
]
session.add_all(objs)
#查 session.query(類名).all() #直接獲取整個類(表)下所有的對象(數據行)
#直接操作,獲取的是像數據庫發送執行的sql語句
res = session.query(UserType) #SQL語句
print(res)
#獲取所有對應類(表)的對象(數據行) 列表類型
res_list = session.query(UserType).all()
print(res_list)
#查詢操作,獲取表中某列的值!是對接收到的整個列表進行循環遍歷查找
#查詢整個表內的信息 ------->等效於數據庫中: select xxx from usertype
res_list = session.query(UserType).all()
for sss in res_list:
print(sss.id,sss.title)
#注意點:.filter()方法是過濾的意思,相當於sql語句中的where
#條件查找表內信息 -------->等效於數據庫中: select xxx usertype where 條件
res_list = session.query(UserType).filter(UserType.id >2)
for sss in res_list:
print(sss.id,sss.title)
#注意點:執行刪除和更改操作時,都是先把數據行找到(查操作),再進行刪或改操作!
#刪 找到對應的數據行,刪除即可 .delete()
#先找后刪,等效於------> delete from usertype where usertype.id > 4
session.query(UserType).filter(UserType.id > 4).delete()
#改 先查后改 注意傳值的格式!
#這里有個參數 synchronize_session 沒別的招,看源碼解釋!!!
#對表進行批量更改!
session.query(UserType).filter(UserType.id>0).update({"title":"黑金"})
#動態獲取原表的數據(char類型),對表進行批量更改
session.query(UserType).filter(UserType.id>0).update({UserType.title:UserType.title+"SX"},synchronize_session=False)
#動態獲取原表的數據(int類型),對表進行批量更改
session.query(UserType).filter(UserType.id>0).update({"title":UserType.id+1},synchronize_session="evaluate")
#查找其他操作:
# 分組,排序,連表,通配符,子查詢,limit,union,where,原生SQL、
# 條件
#過濾,又叫條件判斷
ret = session.query(Users).filter_by(name='alex').all()
#兩個表達式同時存在,逗號分開,不寫關系默認是 and
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
#between and
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
#in判斷 語法:in_
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
# not in 判斷 語法:表達式最前加 ~
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
#子查詢
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
#邏輯判斷: and_ or_ 操作
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
or_(
Users.id < 2,
and_(Users.name == 'eric', Users.id > 3),
Users.extra != ""
)).all()
# 通配符 .like()的方法調用
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()
# 限制
ret = session.query(Users)[1:2]
#分頁 .limit(n) 取n個數據
res_list = session.query(UserType).limit(2).all()
for sss in res_list:
print(sss.id,sss.title)
# 排序 查表.order_by(列名.desc()/列名.asc()) [.desc() 由大到小;.asc() 由小到大]
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
# 分組 聚合函數func.方法(列名) 和 .group_by(列名) 方法
from sqlalchemy.sql import func
ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).all()
ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
# 連表
ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() #直連
ret = session.query(Person).join(Favor).all()
ret = session.query(Person).join(Favor, isouter=True).all()
#子查詢 三種樣式
#查詢的信息作為另一張表的條件
# 1.select * from b where id in (select id from tb2)
#最為一張新表進行二次篩選
# 2. select * from (select * from tb) as B
#查詢語句.subquery() 查詢結果作為一個子查詢(新表)好在進行下一步的查詢。不加.subquery()的話會報錯,不再往下查詢
# q1 = session.query(UserType).filter(UserType.id > 0).subquery()
# result = session.query(q1).all()
# print(result)
#作為一個列內的數據,在另一張表中顯示 ****** .as_scalar()方法
# 3. select id ,(select * from users where users.user_type_id=usertype.id) from usertype;
# session.query(UserType,Users)
# result = session.query(UserType.id,session.query(Users).as_scalar())
# print(result) #查看對應的sql語句
# result = session.query(UserType.id,session.query(Users).filter(Users.user_type_id==UserType.id).as_scalar())
# print(result) #查看對應的sql語句
# 組合(上下連表) .union() 和 .union_all() 注意是:先把信息找到再操作!
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()
- 便利的功能 relationship() 與生成表結構無關,僅用於查詢方便 釋放了連表操作的繁瑣查找,直接通過方法定位!
使用規范:哪個類中有外鍵列,就在外鍵列下添加。
語法:自定義名=relationship("外鍵有聯系的類名",backref="任意命名")
# 問題1. 獲取用戶信息以及與其關聯的用戶類型名稱(FK,Relationship=>正向操作)
#初始方法:連表操作
user_list = session.query(Users,UserType).join(UserType,isouter=True)
print(user_list)
for row in user_list:
print(row[0].id,row[0].name,row[0].email,row[0].user_type_id,row[1].title)
user_list = session.query(Users.name,UserType.title).join(UserType,isouter=True).all()
for row in user_list:
print(row[0],row[1],row.name,row.title)
#(FK,Relationship=>正向操作) 先查用戶信息表,通過命名的 自定義名 正向獲取用戶類型
user_list = session.query(Users)
for row in user_list:
print(row.name,row.id,row.user_type.title)
# 問題2. 獲取用戶類型 (FK,Relationship=>反向操作)
#連表操作:
type_list = session.query(UserType)
for row in type_list:
print(row.id,row.title,session.query(Users).filter(Users.user_type_id == row.id).all())
#反向操作:先查類型表,再通過backref 自定義的變量 反向查找用戶信息
type_list = session.query(UserType)
for row in type_list:
print(row.id,row.title,row.xxoo)
PS:正向操作與反向操作,是相對於外鍵來相對判斷的!
例如:A表與B表,A表中建立了與B表聯系的外鍵,A表通過外鍵獲取B表中的信息,叫正向操作;反之,叫反向操作!
最后,操作及語法寫完后,都需要提交給數據庫去執行,不再使用也需要斷開連接!
session.commit() #提交
session.close() #關閉連接
#!/usr/bin/env python # _*_ coding:utf-8 _*_ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine Base = declarative_base() engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/day63?charset=utf8",max_overflow=5) class UserType(Base): __tablename__ = "usertype" id = Column(Integer,primary_key=True,autoincrement=True) title = Column(String(32),nullable=True,index=True) class Users(Base): __tablename__ = "users" id = Column(Integer,primary_key=True,autoincrement=True) name = Column(String(32),nullable=True,index=True) email = Column(String(16),unique=True) u_type_id = Column(Integer,ForeignKey("usertype.id")) u_type = relationship("UserType",backref="sss") def create_table(): Base.metadata.create_all(engine) def del_table(): Base.metadata.drop_all(engine) #類 --> 表 #對象 --> 行 #建立鏈接通道 Session = sessionmaker(bind=engine) session = Session() #操作內填入的內容,必須是表達式 ########## 增加 ################### #增加單個 # obj = UserType(title = "黑金用戶") # session.add(obj) #增加多個 # objs =[ # UserType(title = "會員用戶"), # UserType(title = "超級用戶"), # UserType(title = "鉑金用戶"), # UserType(title = "黑金用戶"), # ] # session.add_all(objs) ############ 查詢 ################ # res = session.query(UserType) #SQL語句 # print(res) # res_list = session.query(UserType).all() #獲取所有對應類(表)的對象(數據行) 列表類型 # print(res_list) #select xxx from usertype # res_list = session.query(UserType).limit(2).all() # for sss in res_list: # print(sss.id,sss.title) # # #select xxx usertype where *** # res_list = session.query(UserType).filter(UserType.id >2) # for sss in res_list: # print(sss.id,sss.title) ############### 刪除 ################### # delete from usertype where usertype.id > 4 # session.query(UserType).filter(UserType.id > 4).delete() ################ 更改 ######################## #這里有個參數 synchronize_session 沒別的招,看源碼解釋!!! # session.query(UserType).filter(UserType.id>0).update({"title":"黑金"}) #對表進行批量更改 # session.query(UserType).filter(UserType.id>0).update({UserType.title:UserType.title+"SX"},synchronize_session=False) #動態獲取原先的數據,對表進行批量更改 # session.query(UserType).filter(UserType.id>0).update({"title":UserType.id+1},synchronize_session="evaluate") #對表進行批量更改 ############# 查詢其他操作 ################# # 分組,排序,連表,通配符,limit,union,where,原生SQL# #條件 and or # ret = session.query(Users).filter(Users.id > 1, Users.name == "sesc").all() # for row in ret: # print(row.email) # #正向操作 # res = session.query(Users) # for row in res: # print(row.id,row.name,row.u_type.title) # # #反向操作 # res = session.query(UserType) # for row in res: # for a in row.sss: # print(row.id,row.title,a.name) session.commit() session.close()
