python 使用sqlalchemy進行數據庫操作


sqlalchemy是python下一個著名的數據庫orm庫,可以方便地進行數據表創建、數據增刪改查等操作

最詳細的教程,見官方:https://docs.sqlalchemy.org

 

這里列舉一些常用操作:

一、創建數據表

代碼以及相關的注釋:

import datetime
import sqlalchemy as db
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

# 為了使用MySQL支持的類型,如unsigned bigint
import sqlalchemy.dialects.mysql as mysql

# 創建對象的基類:
Base = declarative_base()
# 定義User對象, 盡量展示一些創建選項:
class User(Base):
    # 表的名字:
    __tablename__ = 't_user'
    # 創建表的擴展設置,這里設置表的字符集為utf8
    __table_args__ = (
        # 聯合唯一索引
        db.UniqueConstraint('id', 'name', name='uix_id_name'),

        # 聯合索引
        db.Index('sex_createtime', 'sex', 'createtime'),

        # 設置引擎和字符集,注意:這個map只能放到元組的最后,否則會報錯
        {
            'mysql_engine': 'InnoDB',
            'mysql_charset': 'utf8',
        }
    )

    # 表的結構字段定義
    # id字段,類型為unsigned bigint,定義為主鍵,自增,db.BigInteger不支持unsigned,所以使用mysql特別支持的類型
    # id = db.Column(db.BigInteger(), primary_key=True, autoincrement=True)
    id = db.Column(mysql.BIGINT(unsigned=True), primary_key=True, autoincrement=True)
    # name字段,定義為varchar(20),默認允許空,帶注釋
    name = db.Column(db.String(20), comment='姓名')
    # phone字段,定義為varchar(11),不允許空,有唯一限制,帶注釋
    phone = db.Column(db.String(11), nullable=False, unique=True, comment='電話')
    # score, float類型,帶索引, 允許空
    score = db.Column(db.Float, index=True, comment='成績')
    # sex,性別, int類型, 默認值為1
    sex = db.Column(db.Integer, default=1, comment='性別,1-男;2-女')
    # createtime, datetime類型, 不允許空
    createtime = db.Column(db.DateTime, nullable=False, comment='創建時間')
    # modifytime, datetime類型, 帶索引, 默認使用datetime.now()生成當前時間,注意,不帶()
    modifytime = db.Column(db.DateTime, default=datetime.datetime.now, comment='修改時間')

    # 只是用於打印對象
    def __str__(self):
        return "(" + ', '.join(['%s:%s' % item for item in self.__dict__.items()]) + ")"


# 初始化數據庫連接:
# 連接字符串模式:數據庫類型+連接庫+用戶名+密碼+主機,字符編碼,是否打印建表細節
# 其中,連接庫是當前用於操作數據庫的庫,對於python2.7,一般使用MysqlDb,對於Python3,一般使用pymysql
# 連接的例子如:create_engine("mysql+pymysql://cai:123@localhost/test?charset=utf8", echo=True)
engine = db.create_engine('mysql+pymysql://user:pass@localhost:3306/test')

# 刪除現有的表,謹慎決定是否需要這樣操作
Base.metadata.drop_all(engine)

# 創建表
Base.metadata.create_all(engine)

 

在mysql中生成的表結構如下:

CREATE TABLE `t_user` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(20) DEFAULT NULL COMMENT '姓名',
  `phone` varchar(11) NOT NULL COMMENT '電話',
  `score` float DEFAULT NULL COMMENT '成績',
  `sex` int(11) DEFAULT NULL COMMENT '性別,1-男;2-女',
  `createtime` datetime NOT NULL COMMENT '創建時間',
  `modifytime` datetime DEFAULT NULL COMMENT '修改時間',
  PRIMARY KEY (`id`),
  UNIQUE KEY `phone` (`phone`),
  UNIQUE KEY `uix_id_name` (`id`,`name`),
  KEY `sex_createtime` (`sex`,`createtime`),
  KEY `ix_t_user_score` (`score`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

最常用的SQLAlchemy Column類型

類型名 Python類型 說 明
Integer int 普通整數,一般是 32 位
SmallInteger int 取值范圍小的整數,一般是 16 位
BigInteger int 或 long 不限制精度的整數
Float float 浮點數
Numeric decimal.Decimal 定點數
String str 變長字符串
Text str 變長字符串,對較長或不限長度的字符串做了優化
Unicode unicode 變長 Unicode 字符串
UnicodeText unicode 變長 Unicode 字符串,對較長或不限長度的字符串做了優化
Boolean bool 布爾值
Date datetime.date 日期
Time datetime.time 時間
DateTime datetime.datetime 日期和時間
Interval datetime.timedelta 時間間隔
Enum str 一組字符串
PickleType 任何 Python 對象 自動使用 Pickle 序列化
LargeBinary str 二進制文件

最常使用的SQLAlchemy列選項

選項名 說 明
primary_key 如果設為 True ,這列就是表的主鍵
unique 如果設為 True ,這列不允許出現重復的值
index 如果設為 True ,為這列創建索引,提升查詢效率
nullable 如果設為 True ,這列允許使用空值;如果設為 False ,這列不允許使用空值
default 為這列定義默認值

 

mysql的特別定義列類型選項

BIGINT, BINARY, BIT, BLOB, BOOLEAN, CHAR, DATE, DATETIME, DECIMAL, DECIMAL, DOUBLE, ENUM, FLOAT, INTEGER, LONGBLOB, LONGTEXT, MEDIUMBLOB, MEDIUMINT, MEDIUMTEXT, NCHAR, NUMERIC, NVARCHAR, REAL, SET, SMALLINT, TEXT, TIME, TIMESTAMP, TINYBLOB, TINYINT, TINYTEXT, VARBINARY, VARCHAR, YEAR


二、數據的增刪改查

對數據操作之前,需要定義一個會話類型,並在操作數據時,生成一個會話實例進行操作

# 定義一個會話類型,用來進行數據操作(engine是前面步驟生成的引擎對象)
DBSession = sessionmaker(bind=engine)
#實例一個會話
session = DBSession()
# 數據的操作,如session.add(..)
# 這一句不能少,否則不會提交
session.commit()
session.close()

 

2.1 新增數據

# 插入一條數據
user1 = User(id=1,name="小明", phone="13111223344", score=98.2, sex=1, createtime=datetime.datetime.now())
session.add(user1)

# 插入多條數據
users = [
    User(id=2,name="小芳", phone="13111223345", score=83.1, sex=2, createtime=datetime.datetime.now()),
    User(id=3,name="小李", phone="13111223346", score=100, sex=2, createtime=datetime.datetime.now()),
    User(id=4,name="小牛", phone="13111223347", score=62.5, sex=1, createtime=datetime.datetime.now()),
]

session.add_all(users)

# 非orm方式,特點是快,可定制,如支持ON DUPLICATE KEY UPDATE
session.execute(User.__table__.insert(),
        [
            {"id":5,"name":"小五", "phone":"13111223348", "score":72.5, "sex":1, "createtime":datetime.datetime.now()},
        ]
    )

 

如何實現ON DUPLICATE KEY UPDATE?

sqlalchemy不支持ON DUPLICATE KEY UPDATE, 可以自己實現一個。

基本的實現方式:

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def append_string(insert, compiler, **kw):
    s = compiler.visit_insert(insert, **kw)
    if 'append_string' in insert.kwargs:
        return s + " " + insert.kwargs['append_string']
    return s


my_connection.execute(my_table.insert(append_string = 'ON DUPLICATE KEY UPDATE foo=foo'), my_values)

在實際使用中,這種使用方式顯得比較粗糙,一般來說,可以枚舉對象的字段,並對每個字段設置xxx=VALUES(xxx)進行更新。

 

2.2 查詢、更新和刪除

三者的操作直接合並來說了

query = session.query(User)
print query # 顯示SQL 語句
print query.statement # 同上
for user in query: # 遍歷時查詢
    print user.name
print query.all() # 返回的是一個類似列表的對象
print query.first().name # 記錄不存在時,first() 會返回 None
# print query.one().name # 不存在,或有多行記錄時會拋出異常
print query.filter(User.id == 2).first().name
print query.get(2).name # 以主鍵獲取,等效於上句
print query.filter('id = 2').first().name # 支持字符串
 
query2 = session.query(User.name)
print query2.all() # 每行是個元組
print query2.limit(1).all() # 最多返回 1 條記錄
print query2.offset(1).all() # 從第 2 條記錄開始返回
print query2.order_by(User.name).all()
print query2.order_by('name').all()
print query2.order_by(User.name.desc()).all()
print query2.order_by('name desc').all()
print session.query(User.id).order_by(User.name.desc(), User.id).all()
 
print query2.filter(User.id == 1).scalar() # 如果有記錄,返回第一條記錄的第一個元素
print session.query('id').select_from(User).filter('id = 1').scalar()
print query2.filter(User.id > 1, User.name != 'a').scalar() # and
query3 = query2.filter(User.id > 1) # 多次拼接的 filter 也是 and
query3 = query3.filter(User.name != 'a')
print query3.scalar()
print query2.filter(or_(User.id == 1, User.id == 2)).all() # or
print query2.filter(User.id.in_((1, 2))).all() # in
 
query4 = session.query(User.id)
print query4.filter(User.name == None).scalar()
print query4.filter('name is null').scalar()
print query4.filter(not_(User.name == None)).all() # not
print query4.filter(User.name != None).all()
 
print query4.count()
print session.query(func.count('*')).select_from(User).scalar()
print session.query(func.count('1')).select_from(User).scalar()
print session.query(func.count(User.id)).scalar()
print session.query(func.count('*')).filter(User.id > 0).scalar() # filter() 中包含 User,因此不需要指定表
print session.query(func.count('*')).filter(User.name == 'a').limit(1).scalar() == 1 # 可以用 limit() 限制 count() 的返回數
print session.query(func.sum(User.id)).scalar()
print session.query(func.now()).scalar() # func 后可以跟任意函數名,只要該數據庫支持
print session.query(func.current_timestamp()).scalar()
print session.query(func.md5(User.name)).filter(User.id == 1).scalar()
 
query.filter(User.id == 1).update({User.name: 'c'})
user = query.get(1)
print user.name
 
user.name = 'd'
session.flush() # 寫數據庫,但並不提交
print query.get(1).name
 
session.delete(user)
session.flush()
print query.get(1)
 
session.rollback()
print query.get(1).name
query.filter(User.id == 1).delete()
session.commit()
print query.get(1)

 

三、一些進階操作

如何批量插入大批數據?
可以使用非 ORM 的方式:

session.execute(
    User.__table__.insert(),
    [{'name': `randint(1, 100)`,'age': randint(1, 100)} for i in xrange(10000)]
)
session.commit()

上面我批量插入了 10000 條記錄,半秒內就執行完了;而 ORM 方式會花掉很長時間。

 

如何讓執行的 SQL 語句增加前綴?

使用 query 對象的 prefix_with() 方法:

session.query(User.name).prefix_with('HIGH_PRIORITY').all()
session.execute(User.__table__.insert().prefix_with('IGNORE'), {'id': 1, 'name': '1'})

 

如何替換一個已有主鍵的記錄?
使用 session.merge() 方法替代 session.add(),其實就是 SELECT + UPDATE:

user = User(id=1, name='ooxx')
session.merge(user)
session.commit()

或者使用 MySQL 的 INSERT … ON DUPLICATE KEY UPDATE,需要用到 @compiles 裝飾器,有點難懂,自己看吧:《SQLAlchemy ON DUPLICATE KEY UPDATE》 和 sqlalchemy_mysql_ext

 

如何使用無符號整數?
可以使用 MySQL 的方言:

如何使用無符號整數?
可以使用 MySQL 的方言:

 

模型的屬性名需要和表的字段名不一樣怎么辦?
開發時遇到過一個奇怪的需求,有個其他系統的表里包含了一個“from”字段,這在 Python 里是關鍵字,於是只能這樣處理了:

from_ = Column('from', CHAR(10))

 

如何獲取字段的長度?
Column 會生成一個很復雜的對象,想獲取長度比較麻煩,這里以 User.name 為例:

User.name.property.columns[0].type.length

 

如何指定使用 InnoDB,以及使用 UTF-8 編碼?
最簡單的方式就是修改數據庫的默認配置。如果非要在代碼里指定的話,可以這樣:

class User(BaseModel):
    __table_args__ = {
        'mysql_engine': 'InnoDB',
        'mysql_charset': 'utf8'
    }

 

MySQL 5.5 開始支持存儲 4 字節的 UTF-8 編碼的字符了,iOS 里自帶的 emoji(如 � 字符)就屬於這種。

如果是對表來設置的話,可以把上面代碼中的 utf8 改成 utf8mb4,DB_CONNECT_STRING 里的 charset 也這樣更改。
如果對庫或字段來設置,則還是自己寫 SQL 語句比較方便,具體細節可參考《How to support full Unicode in MySQL databases》
不建議全用 utf8mb4 代替 utf8,因為前者更慢,索引會占用更多空間。

如何設置外鍵約束?

from random import randint
from sqlalchemy import ForeignKey
 
 
class User(BaseModel):
    __tablename__ = 'user'
 
    id = Column(Integer, primary_key=True)
    age = Column(Integer)
 
 
class Friendship(BaseModel):
    __tablename__ = 'friendship'
 
    id = Column(Integer, primary_key=True)
    user_id1 = Column(Integer, ForeignKey('user.id'))
    user_id2 = Column(Integer, ForeignKey('user.id'))
 
 
for i in xrange(100):
    session.add(User(age=randint(1, 100)))
session.flush() # 或 session.commit(),執行完后,user 對象的 id 屬性才可以訪問(因為 id 是自增的)
 
for i in xrange(100):
    session.add(Friendship(user_id1=randint(1, 100), user_id2=randint(1, 100)))
session.commit()
 
session.query(User).filter(User.age < 50).delete()

執行這段代碼時,你應該會遇到一個錯誤:

sqlalchemy.exc.IntegrityError: (IntegrityError) (1451, 'Cannot delete or update a parent row: a foreign key constraint fails (`ooxx`.`friendship`, CONSTRAINT `friendship_ibfk_1` FOREIGN KEY (`user_id1`) REFERENCES `user` (`id`))') 'DELETE FROM user WHERE user.age < %s' (50,)

原因是刪除 user 表的數據,可能會導致 friendship 的外鍵不指向一個真實存在的記錄。在默認情況下,MySQL 會拒絕這種操作,也就是 RESTRICT。InnoDB 還允許指定 ON DELETE 為 CASCADE 和 SET NULL,前者會刪除 friendship 中無效的記錄,后者會將這些記錄的外鍵設為 NULL。
除了刪除,還有可能更改主鍵,這也會導致 friendship 的外鍵失效。於是相應的就有 ON UPDATE 了。其中 CASCADE 變成了更新相應的外鍵,而不是刪除。
而在 SQLAlchemy 中是這樣處理的:

class Friendship(BaseModel):
    __tablename__ = 'friendship'
 
    id = Column(Integer, primary_key=True)
    user_id1 = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))
    user_id2 = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))

 

如何連接表?

from sqlalchemy import distinct
from sqlalchemy.orm import aliased
 
 
Friend = aliased(User, name='Friend')
 
print session.query(User.id).join(Friendship, User.id == Friendship.user_id1).all() # 所有有朋友的用戶
print session.query(distinct(User.id)).join(Friendship, User.id == Friendship.user_id1).all() # 所有有朋友的用戶(去掉重復的)
print session.query(User.id).join(Friendship, User.id == Friendship.user_id1).distinct().all() # 同上
print session.query(Friendship.user_id2).join(User, User.id == Friendship.user_id1).order_by(Friendship.user_id2).distinct().all() # 所有被別人當成朋友的用戶
print session.query(Friendship.user_id2).select_from(User).join(Friendship, User.id == Friendship.user_id1).order_by(Friendship.user_id2).distinct().all() # 同上,join 的方向相反,但因為不是 STRAIGHT_JOIN,所以 MySQL 可以自己選擇順序
print session.query(User.id, Friendship.user_id2).join(Friendship, User.id == Friendship.user_id1).all() # 用戶及其朋友
print session.query(User.id, Friendship.user_id2).join(Friendship, User.id == Friendship.user_id1).filter(User.id < 10).all() # id 小於 10 的用戶及其朋友
print session.query(User.id, Friend.id).join(Friendship, User.id == Friendship.user_id1).join(Friend, Friend.id == Friendship.user_id2).all() # 兩次 join,由於使用到相同的表,因此需要別名
print session.query(User.id, Friendship.user_id2).outerjoin(Friendship, User.id == Friendship.user_id1).all() # 用戶及其朋友(無朋友則為 None,使用左連接)

這里我沒提到 relationship,雖然它看上去很方便,但需要學習的內容實在太多,還要考慮很多性能上的問題,所以干脆自己 join 吧。


為什么無法刪除 in 操作查詢出來的記錄?

session.query(User).filter(User.id.in_((1, 2, 3))).delete()

拋出這樣的異常:

sqlalchemy.exc.InvalidRequestError: Could not evaluate current criteria in Python.  Specify 'fetch' or False for the synchronize_session parameter.

但這樣是沒問題的:

session.query(User).filter(or_(User.id == 1, User.id == 2, User.id == 3)).delete()

搜了下找到《Sqlalchemy delete subquery》這個問題,提到了 delete 的一個注意點:刪除記錄時,默認會嘗試刪除 session 中符合條件的對象,而 in 操作估計還不支持,於是就出錯了。解決辦法就是刪除時不進行同步,然后再讓 session 里的所有實體都過期:

session.query(User).filter(User.id.in_((1, 2, 3))).delete(synchronize_session=False)
session.commit() # or session.expire_all()

此外,update 操作也有同樣的參數,如果后面立刻提交了,那么加上 synchronize_session=False 參數會更快。

 

如何對一個字段進行自增操作
最簡單的辦法就是獲取時加上寫鎖:

user = session.query(User).with_lockmode('update').get(1)
user.age += 1
session.commit()

如果不想多一次讀的話,這樣寫也是可以的:

session.query(User).filter(User.id == 1).update({
    User.age: User.age + 1
})
session.commit()
# 其實字段之間也可以做運算:
session.query(User).filter(User.id == 1).update({
    User.age: User.age + User.id
})

 

 

參考資料:

https://blog.csdn.net/tastelife/article/details/25218895

https://www.cnblogs.com/Oliver.net/p/7345647.html

https://www.cnblogs.com/Orangeorchard/p/8097547.html

https://www.runoob.com/python3/python3-mysql.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM