Sanic是異步庫,想要發揮其強大的性能,當需要使用第三方庫的時候,就需要使用異步的庫,在python中,異步orm較為常見的就兩個可,一個SQLAlchemy,一個Tortoise-ORM
SQLAlchemy 在1.4版本之后,已經支持異步了,既然要用異步,那同步庫的PyMYSQL肯就就不能滿足了,所以需要用異步庫aiomysql
SQLAlchemy官網:https://www.sqlalchemy.org/
SQLAlchemy中文網站:https://www.osgeo.cn/sqlalchemy/index.html
aiomysql官網:https://aiomysql.readthedocs.io/en/latest/
安裝SQLAlchemy: pip install SQLAlchemy
安裝aiomysql: pip install aiomysql
使用示例
先確保有數據庫:庫名隨便取,比如test
創建模型
由於沒有sqlalchemy1.4以后的教程太少,沒有找到用aiomysql作為驅動的映射教程,所以這里將對象映射到數據庫的操作還是用mysql,系統運行中用aiomysql
orm與app綁定,這里就用異步的aiomysql了
在中間件中定義獲取數據庫session和釋放資源
路由
插入數據
查詢
models
from sqlalchemy import Column, String, Integer
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class BaseModel(Base):
__abstract__ = True
id = Column(Integer, primary_key=True, comment='id,主鍵')
class User(BaseModel):
""" 用戶表 """
__tablename__ = "user"
name = Column(String(64), comment='名字')
age = Column(Integer, comment='年齡')
def to_dict(self):
return {"id": self.id, "name": self.name, "age": self.age}
模型映射到數據庫
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Base
# 導入相應的模塊
engine = create_engine("mysql+pymysql://root:123456@localhost/test")
# 創建session對象
session = sessionmaker(engine)()
# 創建表,執行所有BaseModel類的子類
Base.metadata.create_all(engine)
# 提交,必須
session.commit()
業務邏輯
from contextvars import ContextVar
from sanic import Sanic, response
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
from models import User
app = Sanic("SanicAndSqlalchemy")
# 創建異步數據庫引擎
bind = create_async_engine("mysql+aiomysql://root:123456@localhost/test", echo=True)
_base_model_session_ctx = ContextVar("session")
@app.middleware("request")
async def inject_session(request):
"""
請求中間件
創建一個可用的 AsyncSession 對象並且將其綁定至 request.ctx 中,
而 _base_model_session_ctx 也會在這是被賦予可用的值,
如果需要在其他地方使用 session 對象(而非從 request.ctx 中取值),該全局變量或許能幫助您(它是線程安全的)。
"""
request.ctx.session = sessionmaker(bind, AsyncSession, expire_on_commit=False)()
request.ctx.session_ctx_token = _base_model_session_ctx.set(request.ctx.session)
@app.middleware("response")
async def close_session(request, response):
""" 響應中間件,將創建的 AsyncSession 關閉,並重置 _base_model_session_ctx 的值,進而釋放資源 """
if hasattr(request.ctx, "session_ctx_token"):
_base_model_session_ctx.reset(request.ctx.session_ctx_token)
await request.ctx.session.close()
@app.post("/user")
async def create_user(request):
session = request.ctx.session
async with session.begin():
user = User(**request.json)
session.add(user)
return response.json(user.to_dict())
@app.get("/user/<user_id:int>")
async def get_user(request, user_id):
session = request.ctx.session
async with session.begin():
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar()
return response.json({'code': 200, 'message': '查詢成功', 'data': user.to_dict() if user else {}})
if __name__ == '__main__':
import uvicorn
uvicorn.run('main:app', host='0.0.0.0', port=8000, debug=True)