Python操作MySQL:pymysql和SQLAlchemy


本篇對於Python操作MySQL主要使用兩種方式:

  • 原生模塊 pymsql
  • ORM框架 SQLAchemy

 

pymsql

pymsql是Python中操作MySQL的模塊,其使用方法和MySQLdb幾乎相同。

下載安裝

pip3 install pymysql

使用操作

1、執行SQL

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
  
# 創建連接
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
# 創建游標
cursor = conn.cursor()
  
# 執行SQL,並返回收影響行數
effect_row = cursor.execute("update hosts set host = '1.1.1.2'")
  
# 執行SQL,並返回受影響行數
#effect_row = cursor.execute("update hosts set host = '1.1.1.2' where nid > %s", (1,))
  
# 執行SQL,並返回受影響行數
#effect_row = cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)])
  
  
# 提交,不然無法保存新建或者修改的數據
conn.commit()
  
# 關閉游標
cursor.close()
# 關閉連接
conn.close()

示例:

import pymysql


conn = pymysql.connect(host="10.37.129.3",port=3306,user="egon",passwd="123456",db="homework",charset="utf8")

cursor = conn.cursor()


#方式一:

sql = "select * from course where cid=1"
effect_row = cursor.execute(sql)


# 方式二:
sql = "select * from course where cid='%s'" %(1,)
effect_row = cursor.execute(sql)


# 方式三  普通  列表
sql = "select * from course where cid='%s'"

effect_row = cursor.execute(sql,1)
effect_row = cursor.execute(sql,[1])


# 方式四  字典格式

sql = "select * from course where cid='%(u)s'"
effect_row = cursor.execute(sql,{"u":1})


row_1 = cursor.fetchone()

cursor.close()
conn.close()

print(row_1)

2、獲取新創建數據自增ID

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
  
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
cursor = conn.cursor()
cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)])
conn.commit()
cursor.close()
conn.close()
  
# 獲取最新自增ID
new_id = cursor.lastrowid

3、獲取查詢數據

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
  
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
cursor = conn.cursor()
cursor.execute("select * from hosts")
  
# 獲取第一行數據
row_1 = cursor.fetchone()
  
# 獲取前n行數據
# row_2 = cursor.fetchmany(3)
# 獲取所有數據
# row_3 = cursor.fetchall()
  
conn.commit()
cursor.close()
conn.close()

注:在fetch數據時按照順序進行,可以使用cursor.scroll(num,mode)來移動游標位置,如:

  • cursor.scroll(1,mode='relative')  # 相對當前位置移動
  • cursor.scroll(2,mode='absolute') # 相對絕對位置移動

4、fetch數據類型

  關於默認獲取的數據是元祖類型,如果想要或者字典類型的數據,即:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
  
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
  
# 游標設置為字典類型
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
r = cursor.execute("call p1()")
  
result = cursor.fetchone()
  
conn.commit()
cursor.close()
conn.close()

 5、插入演示

import pymysql


conn = pymysql.connect(host="10.37.129.3",port=3306,user="egon",passwd="123456",db="student_info",charset="utf8")

cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)


#插入一行
# sql = "insert into student_info(sname,gender,class_id) VALUES('alex1','女',2)"

#插入多行
sql = "insert into student_info(sname,gender,class_id) VALUES('alex1','女',2),('alex2','女',2),('alex3','女',2)"

r = cursor.execute(sql)

#或

sql = "insert into userinfo(username,password) values(%s,%s)"
# 受影響的行數
r = cursor.executemany(sql,[('egon','sb'),('laoyao','BS')])




conn.commit()
cursor.close()
conn.close()

print(r)

 

 

6、補充

# sql 注入
import pymysql

user = input("username:")
pwd = input("password:")

conn = pymysql.connect(host="localhost",user='root',password='',database="db3")   #建立與客戶端的鏈接
cursor = conn.cursor()
sql = "select * from userinfo where username='%s' and pwd='%s'" %(user,pwd,) #不要自己定義放置占位符user和pwd,防數據庫被泄露
# select * from userinfo where username='uu' or 1=1 -- ' and password='%s'
#上面部分會理解成,第一部分:select * from userinfo where username='uu',第二部分:or 1=1,第三部分:-- 注釋,
# 第四部分:' and password='%s',第四部分會當成注釋內容。所以執行該程序會顯示登錄成功。
cursor.execute(sql)         #執行sql語句
result = cursor.fetchone()  #返回第一行內容
#關閉連接
cursor.close()
conn.close()

if result:
    print('登錄成功')
else:
    print('登錄失敗')

 

import pymysql
user = input("username:")
pwd = input("password:")

conn = pymysql.connect(host="localhost",user='root',password='',database="db3")
cursor = conn.cursor()
sql = "select * from userinfo where username=%s and pwd=%s"
# cursor.execute(sql,(user,pwd))
cursor.execute(sql,[user,pwd])    #cursor.execute(sql,user,pwd)也可寫成cursor.execute(sql,[user,pwd]),效果一樣
# cursor.execute(sql,{'u':user,'p':pwd})    #sql中加入key值,打印結果就會變成字典的格式而不是元組格式
result = cursor.fetchone()                  #取一行
cursor.close()
conn.close()
if result:
    print('登錄成功')
else:
    print('登錄失敗')

print(result)

  

import pymysql

# 增加,刪,該
#增
# conn = pymysql.connect(host="localhost",user='root',password='',database="db3")
# cursor = conn.cursor()
# sql = "insert into userinfo(username,pwd) values('root','123123')"
# 受影響的行數
# r = cursor.execute(sql)
# #  ******
# conn.commit()     #對數據庫有改變均要執行conn.commit()命令,提交給數據庫。所以增刪改均需有這條命令,查不需要。
# cursor.close()
# conn.close()

# conn = pymysql.connect(host="localhost",user='root',password='',database="db3")
# cursor = conn.cursor()
# # sql = "insert into userinfo(username,pwd) values(%s,%s)"
# # cursor.execute(sql,(user,pwd,))
#
# sql = "insert into userinfo(username,password) values(%s,%s)"
# # 受影響的行數
# r = cursor.executemany(sql,[('egon','sb'),('laoyao','BS')])
# #  ******
# conn.commit()
# cursor.close()
# conn.close()




# 查
# conn = pymysql.connect(host="localhost",user='root',password='',database="db666")
# cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# sql = "select * from userinfo"
# cursor.execute(sql)

# cursor.scroll(1,mode='relative')  # 相對當前位置移動
# cursor.scroll(2,mode='absolute') # 相對絕對位置移動
# result = cursor.fetchone()
# print(result)
# result = cursor.fetchone()
# print(result)
# result = cursor.fetchone()
# print(result)
# result = cursor.fetchall()
# print(result)


# result = cursor.fetchmany(4)
# print(result)
# cursor.close()
# conn.close()




# 新插入數據的自增ID: cursor.lastrowid
# import pymysql
#
# conn = pymysql.connect(host="localhost",user='root',password='',database="db3")
# cursor = conn.cursor()
# sql = "insert into userinfo(username,pwd) values('asdfasdf','123123')"
# cursor.execute(sql)
# conn.commit()
# print(cursor.lastrowid)       #lastrowid最后一個自增id
# cursor.close()
# conn.close()

  

6、作業

作業:
        參考表結構:
            用戶類型

            用戶信息

            權限

            用戶類型&權限
        功能:

            # 登陸、注冊、找回密碼
            # 用戶管理
            # 用戶類型
            # 權限管理
            # 分配權限

        特別的:程序僅一個可執行文件

 

create database wuSir default character set utf8 collate utf8_general_ci;

use wuSir;


create table auth_info(
    aid int not null auto_increment primary key,
    auth_name varchar(32),
    unique(auth_name)
    )engine=innodb default charset=utf8;


create table user_info(
    uid int not null auto_increment primary key,
    name varchar(32),
    passwd varchar(32),
    sex ENUM("","")
    )engine=innodb default charset=utf8;

create table user_auth(
    id int,
    auth_id int,
    constraint auth_info foreign key(auth_id) references auth_info(aid),
    constraint user_info foreign key(id) references user_info(uid)
    )engine=innodb default charset=utf8;



insert into auth_info(auth_name) values("訂單管理"),("用戶管理"),("菜單管理"),("權限分配"),("Bug管理");

insert into user_info(name,passwd,sex) values("alex",123,""),("egon",123,"");

insert into user_auth(id,auth_id) values(1,1),(2,1),(2,2),(2,3);
創建庫表 SQL

 

import pymysql


user = input("please input name: ").strip()
passwd = input("please input passwd: ").strip()


conn = pymysql.connect(host="10.37.129.3",port=3306,user="egon",passwd="123456",db="wuSir",charset="utf8")

cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

sql = "select * from user_info where name=%s and passwd=%s"

cursor.execute(sql,(user,passwd,))
result = cursor.fetchone()

if result["name"] == user and result["passwd"] == passwd:
    uid = result["uid"]
    # print(uid)
    sql = "select auth_name from auth_info where aid in (select auth_id from user_auth where id =%s)"

    cursor.execute(sql,(uid,))
    result = cursor.fetchall()
    for i in result:
        print(i["auth_name"])

else:
    print("error")

cursor.close()
conn.close()
Python 代碼

 

 

SQLAchemy

SQLAlchemy是Python編程語言下的一款ORM框架,該框架建立在數據庫API之上,使用關系對象映射進行數據庫操作,簡言之便是:將對象轉換成SQL,然后使用數據API執行SQL並獲取執行結果。

安裝:

pip3 install SQLAlchemy

 

SQLAlchemy本身無法操作數據庫,其必須以來pymsql等第三方插件,Dialect用於和數據API進行交流,根據配置文件的不同調用不同的數據庫API,從而實現對數據庫的操作,如:

MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
   
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
   
MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
   
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
   
更多詳見:http://docs.sqlalchemy.org/en/latest/dialects/index.html

一、內部處理

使用 Engine/ConnectionPooling/Dialect 進行數據庫操作,Engine使用ConnectionPooling連接數據庫,然后再通過Dialect執行SQL語句。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine
  
  
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)
  
# 執行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES ('1.1.1.22', 3)"
# )
  
# 新插入行自增ID
# cur.lastrowid
  
# 執行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES(%s, %s)",[('1.1.1.22', 3),('1.1.1.221', 3),]
# )
  
  
# 執行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES (%(host)s, %(color_id)s)",
#     host='1.1.1.99', color_id=3
# )
  
# 執行SQL
# cur = engine.execute('select * from hosts')
# 獲取第一行數據
# cur.fetchone()
# 獲取第n行數據
# cur.fetchmany(3)
# 獲取所有數據
# cur.fetchall()

二、ORM功能使用

使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有組件對數據進行操作。根據類創建對象,對象轉換成SQL,執行SQL。

1、創建表

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
 
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)
 
Base = declarative_base()
 
# 創建單表
class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    extra = Column(String(16))
 
    __table_args__ = (
    UniqueConstraint('id', 'name', name='uix_id_name'),
        Index('ix_id_name', 'name', 'extra'),
    )
 
 
# 一對多
class Favor(Base):
    __tablename__ = 'favor'
    nid = Column(Integer, primary_key=True)
    caption = Column(String(50), default='red', unique=True)
 
 
class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    favor_id = Column(Integer, ForeignKey("favor.nid"))
 
 
# 多對多
class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)
    port = Column(Integer, default=22)
 
 
class Server(Base):
    __tablename__ = 'server'
 
    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)
 
 
class ServerToGroup(Base):
    __tablename__ = 'servertogroup'
    nid = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey('server.id'))
    group_id = Column(Integer, ForeignKey('group.id'))
 
 
def init_db():
    Base.metadata.create_all(engine)
 
 
def drop_db():
    Base.metadata.drop_all(engine)

注:設置外檢的另一種方式 ForeignKeyConstraint(['other_id'], ['othertable.other_id'])

2、操作表

表結構 + 數據庫連接

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)

Base = declarative_base()

# 創建單表
class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    extra = Column(String(16))

    __table_args__ = (
    UniqueConstraint('id', 'name', name='uix_id_name'),
        Index('ix_id_name', 'name', 'extra'),
    )

    def __repr__(self):
        return "%s-%s" %(self.id, self.name)

# 一對多
class Favor(Base):
    __tablename__ = 'favor'
    nid = Column(Integer, primary_key=True)
    caption = Column(String(50), default='red', unique=True)

    def __repr__(self):
        return "%s-%s" %(self.nid, self.caption)

class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    favor_id = Column(Integer, ForeignKey("favor.nid"))
    # 與生成表結構無關,僅用於查詢方便
    favor = relationship("Favor", backref='pers')

# 多對多
class ServerToGroup(Base):
    __tablename__ = 'servertogroup'
    nid = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey('server.id'))
    group_id = Column(Integer, ForeignKey('group.id'))
    group = relationship("Group", backref='s2g')
    server = relationship("Server", backref='s2g')

class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)
    port = Column(Integer, default=22)
    # group = relationship('Group',secondary=ServerToGroup,backref='host_list')


class Server(Base):
    __tablename__ = 'server'

    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)




def init_db():
    Base.metadata.create_all(engine)


def drop_db():
    Base.metadata.drop_all(engine)


Session = sessionmaker(bind=engine)
session = Session()

增 

obj = Users(name="alex0", extra='sb')
session.add(obj)
session.add_all([
    Users(name="alex1", extra='sb'),
    Users(name="alex2", extra='sb'),
])
session.commit()

session.query(Users).filter(Users.id > 2).delete()
session.commit()

session.query(Users).filter(Users.id > 2).update({"name" : "099"})
session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate")
session.commit()

ret = session.query(Users).all()
ret = session.query(Users.name, Users.extra).all()
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter_by(name='alex').first()

ret = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(User.id).all()

ret = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()

其他

# 條件
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()


# 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()

# 限制
ret = session.query(Users)[1:2]

# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分組
from sqlalchemy.sql import func

ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()

ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 連表

ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()

ret = session.query(Person).join(Favor).all()

ret = session.query(Person).join(Favor, isouter=True).all()


# 組合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()

 示例:

1、查詢語法

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine

Base = declarative_base()


# 創建單表
class UserType(Base):
    __tablename__ = 'usertype'
    id = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(String(32), nullable=True, index=True)

class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True, autoincrement=True)  #主鍵    自增
    name = Column(String(32), nullable=True)                    #不為空
    email = Column(String(16), unique=True)                     #唯一
    user_type_id = Column(Integer,ForeignKey("usertype.id"))    #外鍵

    __table_args__ = (
        UniqueConstraint('name', 'email', name='uix_id_name'),  #聯合唯一索引
        Index('ix_n_ex','name', 'email',),
    )


engine = create_engine("mysql+pymysql://egon:123456@10.37.129.3:3306/day63?charset=utf8", max_overflow=5)

Session = sessionmaker(bind=engine)
session = Session()


# ret = session.query(Users)
    #SELECT users.id AS users_id, users.name AS users_name, users.email AS users_email, users.user_type_id AS users_user_type_id FROM users

# ret = session.query(Users).all()
    #[<__main__.Users object at 0x1037620f0>, <__main__.Users object at 0x103762160>, <__main__.Users object at 0x1037621d0>]

# ret = session.query(Users.name, Users.email).all()
#     [('alex', '163'), ('egon', '173'), ('wuSir', '183')]

# ret = session.query(Users).filter_by(name='alex').all()
#     [<__main__.Users object at 0x103759198>]

# ret = session.query(Users).filter_by(name='alex').first()
#     <__main__.Users object at 0x103758240>

#user_list = session.query(UserType.id,UserType.title).filter(UserType.id>=1).all()
#   [(1, '普通用戶'), (2, '黃金用戶')]    

session.commit()
session.close()

2、插入語法

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine

Base = declarative_base()


# 創建單表
class UserType(Base):
    __tablename__ = 'usertype'
    id = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(String(32), nullable=True, index=True)

class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True, autoincrement=True)  #主鍵    自增
    name = Column(String(32), nullable=True)                    #不為空
    email = Column(String(16), unique=True)                     #唯一
    user_type_id = Column(Integer,ForeignKey("usertype.id"))    #外鍵

    __table_args__ = (
        UniqueConstraint('name', 'email', name='uix_id_name'),  #聯合唯一索引
        Index('ix_n_ex','name', 'email',),
    )


engine = create_engine("mysql+pymysql://egon:123456@10.37.129.3:3306/day63?charset=utf8", max_overflow=5)


Session = sessionmaker(bind=engine)
session = Session()


#方式一:
# obj = UserType(title="普通用戶")
# obj1 = UserType(title="黃金用戶")
# session.add(obj)
# session.add(obj1)


#方式二:

objs= [
  Users(name="alex",email="163",user_type_id=1),
  Users(name="egon",email="173",user_type_id=1),
  Users(name="tom",email="183",user_type_id=2)
]

session.add_all(objs)


session.commit()
session.close()

3、刪除 修改 語法

#刪除
# session.query(Users).filter(Users.id>1).delete()


#修改

#session.query(Users).filter(Users.id > 2).update({"name" : "099"})
#session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False)
#session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate")
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHAR
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine

# 創建對象的基類:
Base = declarative_base()


# 創建單表
"""
1   白金
2   黑金
obj.xx ==> [obj,obj...]
"""
# 創建表單usertype
class UserType(Base):
    # 表的名字:
    __tablename__ = 'usertype'
    # 表的結構:
    id = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(VARCHAR(32), nullable=True, index=True)


"""
1   方少偉   1
2   成套     1
3   小白     2
# 正向
ut = relationship(backref='xx')
obj.ut ==> 1   白金
"""

class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(VARCHAR(32), nullable=True, index=True)
    email = Column(VARCHAR(16), unique=True)
    user_type_id = Column(Integer,ForeignKey("usertype.id"))

    user_type = relationship("UserType",backref='xxoo')     # 一對多:
    # __table_args__ = (
    #     UniqueConstraint('id', 'name', name='uix_id_name'),
    #     Index('ix_n_ex','name', 'email',),
    # )


def create_db():
    engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/s4day62db?charset=utf8", max_overflow=5)
    # 新增表
    Base.metadata.create_all(engine)

def drop_db():
    engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/s4day62db?charset=utf8", max_overflow=5)
    # 刪除表
    Base.metadata.drop_all(engine)

# 初始化數據庫連接:(create_engine()用來初始化數據庫連接)
engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/db5?charset=utf8", max_overflow=5)
#'數據庫類型+數據庫驅動名稱://用戶名:口令@機器地址:端口號/數據庫名'
# mysql使用的數據庫,pymysql模板,root賬戶,root : 后加密碼,mysql客戶端未設置密碼。db5為數據庫,max_overflow代表最大連接數量


# 新增表
Base.metadata.create_all(engine)
# 創建Session類型:
Session = sessionmaker(bind=engine)
# 創建session對象:
session = Session()

# 類 -> 表
# 對象 -> 行
# ###### 增加 ######
## 創建新obj1對象
# obj1 = UserType(title='普通用戶')
# # 添加到session:
# session.add(obj1)

# objs =[
#   UserType(title='超級用戶'),
#   UserType(title='白金用戶'),
#   UserType(title='黑金用戶'),
# ]
# session.add_all(objs)

# ###### 查 ######
# print(session.query(UserType))
## 創建Query查詢,調用one()返回一行,如果調用all()則返回所有行:
# user_type_list = session.query(UserType).all()  #session.query(UserType)相當於迭代器,不加 .all()的話,for循環結果也是一樣的
# for row in user_type_list:
#     print(row.id,row.title)



# #select UserType.id,UserType.title  UserType where UserType.id > 2                #sql語句
# user_type_list = session.query(UserType.id,UserType.title).filter(UserType.id > 2)  #使用框架sqlalchemy來實現,filter過濾
# for row in user_type_list:
#     print(row.id,row.title)

# print(user_type_list) #打印結果:SELECT usertype.id AS usertype_id, usertype.title AS usertype_title FROM usertype WHERE usertype.id > %(id_1)s




# 分組,排序,連表,通配符,子查詢,limit,union,where,原生SQL
# ret = session.query(Users, UserType)
# select * from user,usertype;
#
# ret = session.query(Users, UserType).filter(Users.usertype_id==UserType.id)
# select * from user,usertype whre user.usertype_id = usertype.id

# result = session.query(Users).join(UserType)
# print(result)

# result = session.query(Users).join(UserType,isouter=True)
# print(result)



# sql語句用sqlalchemy框架實現:
# 1.
# select * from b where id in (select id from tb2)      #sql語句
#sqlalchemy框架實現: ...

# 2
# select * from (select * from UserType where UserType.id > 0) as B     #sql語句
#sqlalchemy框架實現:
# q1 = session.query(UserType).filter(UserType.id > 0).subquery()   #subquery()子查詢
# print(q1)
# result = session.query(q1).all()
# print(result)


# 3
# select id ,(select * from users where users.user_type_id=usertype.id) from usertype;  #sql語句,嵌套sql語句select * from users where users.user_type_id=usertype.id
#sqlalchemy框架實現:
# session.query(UserType,session.query(Users).filter(Users.id == 1).subquery())  #subquery()子查詢的固定用法,有嵌套sql語句為子查詢,需使用subquery()
# session.query(UserType,Users)
# result = session.query(UserType.id,session.query(Users).as_scalar())  #as_scalar() 相當於臨時表的固定用法
# print(result)
# result = session.query(UserType.id,session.query(Users).filter(Users.user_type_id==UserType.id).as_scalar())
# print(result)






# 問題1. 獲取用戶信息以及與其關聯的用戶類型名稱(FK,Relationship=>正向操作)
# user_list = session.query(Users,UserType).join(UserType,isouter=True)   #聯表left ... join ...on...
# # print(user_list)
# for row in user_list:
#     print(row[0].id,row[0].name,row[0].email,row[0].user_type_id,row[1].title)      # row[0] 為表Users,row[1]為表UserType

# user_list= session.query(Users.name,UserType.title)     #不同表不可以直接獲取,需要聯表獲取
# print("user_list:",user_list) #SELECT users.name AS users_name, usertype.title AS usertype_title FROM users, usertype

# user_list = session.query(Users.name,UserType.title).join(UserType,isouter=True).all()    #左聯表
# user_list = session.query(UserType.title,Users.name).join(Users,isouter=True).all()        #換個位置后,就變成右聯表
#isouter=True代表左聯表left ... join ...on...,不加isouter=True代表inner ... join ... on ...
# print("user_list:",user_list)
# for row in user_list:
#     print("row:",row)           #打印結果是元組形式,所以row[0]與row.name,row[1]和row.title打印結果是一樣的
#     print(row[0],row[1],row.name,row.title)


# user_list = session.query(Users)
# for row in user_list:
#     print(row.name,row.id,row.user_type.title)  #row.user_type.title,建表users時使用了user_type = relationship("UserType",backref='xxoo')
# #--->xyp 1 超級用戶     #row.user_type.title,表user和usertype建立了連接,user的行可以直接調用usertype的內容
# #    xyp2 2 白金用戶



# 問題2. 獲取用戶類型
type_list = session.query(UserType)
for row in type_list:
    print(row.id,row.title,session.query(Users).filter(Users.user_type_id == row.id).all())
    # --->2 白金用戶 [<__main__.Users object at 0x00000000039639E8>]
    #     1 超級用戶 [<__main__.Users object at 0x0000000003963B00>]
    #     3 黑金用戶[]          #因為user表僅二行,所以usertype表的id就無法與Users第三行的user_type_id 匹配


# type_list = session.query(UserType)
# for row in type_list:
#     print(row.id,row.title,row.xxoo)        #建表users時使用了user_type = relationship("UserType",backref='xxoo')
    # --->2 白金用戶 [<__main__.Users object at 0x0000000003963DA0>]
    #     1 超級用戶 [<__main__.Users object at 0x0000000003963F98>]
    #     3 黑金用戶 []





# ###### 刪除 ######
# session.query(UserType.id,UserType.title).filter(UserType.id > 2).delete()    # 刪除前需先查

# ###### 修改 ######
#  修改前需先查
#改變全部title列為"黑金":
# session.query(UserType.id,UserType.title).filter(UserType.id > 0).update({"title" : "黑金"})
#改變UserType.id > 0的title列加字符串"x",synchronize_session=False用在修改字符串固定用法
# session.query(UserType.id,UserType.title).filter(UserType.id > 0).update({UserType.title: UserType.title + "x"}, synchronize_session=False)
#改變UserType.id > 0的title列,num列建表時未建,Users.num + 1,整型的計算。synchronize_session="evaluate" 用在修改整型固定用法
# session.query(UserType.id,UserType.title).filter(UserType.id > 0).update({"num": Users.num + 1}, synchronize_session="evaluate")


# 提交即保存到數據庫:
session.commit()
# 關閉session:
session.close()
補充

 

更多功能參見文檔,猛擊這里下載PDF

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM