一、簡介
FastAPI中你可以使用任何關系型數據庫,可以通過SQLAlchemy將其輕松的適應於任何的數據庫,比如:
- PostgreSQL
- MySQL
- SQLite
- Oracle
- Microsoft SQL Server
- ...
SQLAlchemy是一個ORM(object-relational mapping)的框架。在ORM中,你創建一個類就會通過SQLAlchemy將其自動轉成一張表,在類中的每一個屬性就會將其轉成表中的字段。
這里有一些實例,假如有一個大的項目,里面包含一個子包叫做sql_app:
. └── sql_app ├── __init__.py ├── crud.py ├── database.py ├── main.py ├── models.py └── schemas.py
- __init__.py 是一個空文件,但是說明sql_app是一個package
- database.py 數據庫配置相關
- models.py 數據庫模型表
- schemas.py 模型驗證
- crud.py 數據庫操作相關
- main.py 主文件
二、簡單實例
該實例以MySQL為例,SQLAlchemy需要借助於pymysql連接數據庫,所以需要進行安裝這兩個工具包:
pip install sqlalchemy
pip install pymysql
1、database.py
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:123456@127.0.0.1:3306/test" # echo=True表示引擎將用repr()函數記錄所有語句及其參數列表到日志 engine = create_engine( SQLALCHEMY_DATABASE_URL, encoding='utf8', echo=True ) # SQLAlchemy中,CRUD是通過會話進行管理的,所以需要先創建會話, # 每一個SessionLocal實例就是一個數據庫session # flush指發送到數據庫語句到數據庫,但數據庫不一定執行寫入磁盤 # commit是指提交事務,將變更保存到數據庫文件中 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # 創建基本映射類 Base = declarative_base()
在數據庫相關的配置文件中,首先創建一個SQLAlchemy的"engine",然后創建SessionLocal實例進行會話,最后創建模型類的基類。
2、models.py
from sqlalchemy import Boolean, Column, Integer, String from database import Base class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) email = Column(String(32), unique=True, index=True) hashed_password = Column(String(32)) is_active = Column(Boolean, default=True)
通過數據庫配置文件中的基類來創建模型類。
3、schemas.py
from pydantic import BaseModel class UserBase(BaseModel): email: str class UserCreate(UserBase): """ 請求模型驗證: email: password: """ password: str class User(UserBase): """ 響應模型: id: email: is_active 並且設置orm_mode與之兼容 """ id: int is_active: bool class Config: orm_mode = True
定義請求參數模型驗證與響應模型驗證的Pydantic模型,其中響應模型中設置orm_mode=True參數,表示與ORM模型兼容,因為后續中返回的數據庫查詢是orm模型,通過設置這個參數可以將orm模型通過pydantic模型進行驗證。
4、crud.py
from sqlalchemy.orm import Session import models, schemas # 通過id查詢用戶 def get_user(db: Session, user_id: int): return db.query(models.User).filter(models.User.id == user_id).first() # 新建用戶 def db_create_user(db: Session, user: schemas.UserCreate): fake_hashed_password = user.password + "notreallyhashed" db_user = models.User(email=user.email, hashed_password=fake_hashed_password) db.add(db_user) db.commit() # 提交保存到數據庫中 db.refresh(db_user) # 刷新 return db_user
通過傳入數據庫連接以及參數等進行數據庫操作,包括創建用戶、查詢用戶等,返回的是orm模型對象。
5、main.py
from fastapi import FastAPI, Depends, HTTPException import crud, schemas from database import SessionLocal, engine, Base from sqlalchemy.orm import Session import uvicorn Base.metadata.create_all(bind=engine) #數據庫初始化,如果沒有庫或者表,會自動創建 app = FastAPI() # Dependency def get_db(): """ 每一個請求處理完畢后會關閉當前連接,不同的請求使用不同的連接 :return: """ db = SessionLocal() try: yield db finally: db.close() # 新建用戶 @app.post("/users/", response_model=schemas.User) def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): return crud.db_create_user(db=db, user=user) # 通過id查詢用戶 @app.get("/user/{user_id}", response_model=schemas.User) def read_user(user_id: int, db: Session = Depends(get_db)): db_user = crud.get_user(db, user_id=user_id) if not db_user: raise HTTPException(status_code=404, detail="User not found") return db_user if __name__ == '__main__': uvicorn.run(app=app, host="127.0.0.1", port=8000)
主文件進行數據庫初始化、FastAPI實例創建以及處理各種請求。
進入到交互文檔查看:
- http://127.0.0.1:8000/users/
# 請求 { "email": "hhh@example113.com", "password": "ss123456" } # 響應 { "email": "hhh@example113.com", "id": 7, "is_active": true }
- http://127.0.0.1:8000/user/7
# 響應 { "email": "hhh@example113.com", "id": 7, "is_active": true }
三、復雜實例
在之前的基礎上再加一個模型類Item,User與之是一對多的關系。
1、models.py
from sqlalchemy import Boolean, Column, Integer, String, ForeignKey from sqlalchemy.orm import relationship from database import Base class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) email = Column(String(32), unique=True, index=True) hashed_password = Column(String(32)) is_active = Column(Boolean, default=True) items = relationship("Item", back_populates="owner") class Item(Base): __tablename__ = "items" id = Column(Integer, primary_key=True, index=True) title = Column(String(32), index=True) description = Column(String(32), index=True) owner_id = Column(Integer, ForeignKey("users.id")) owner = relationship("User", back_populates="items")
2、schemas.py
from typing import Optional,List from pydantic import BaseModel class ItemBase(BaseModel): title: str description: Optional[str] = None class ItemCreate(ItemBase): pass class Item(ItemBase): id: int owner_id: int class Config: orm_mode = True class UserBase(BaseModel): email: str class UserCreate(UserBase): """ 請求模型驗證: email: password: """ password: str class User(UserBase): """ 響應模型: id: email: is_active 並且設置orm_mode與之兼容 """ id: int is_active: bool items: List[Item] = [] class Config: orm_mode = True
3、crud.py
from sqlalchemy.orm import Session import models, schemas # 通過id查詢用戶 def get_user(db: Session, user_id: int): return db.query(models.User).filter(models.User.id == user_id).first() # 新建用戶 def db_create_user(db: Session, user: schemas.UserCreate): fake_hashed_password = user.password + "notreallyhashed" db_user = models.User(email=user.email, hashed_password=fake_hashed_password) db.add(db_user) db.commit() # 提交保存到數據庫中 db.refresh(db_user) # 刷新 return db_user # 獲取用戶擁有的item def get_item(db: Session, skip: int = 0, limit: int = 100): return db.query(models.Item).offset(skip).limit(limit).all() # 新建用戶的item def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int): db_item = models.Item(**item.dict(), owner_id=user_id) db.add(db_item) db.commit() db.refresh(db_item) return db_item
4、main.py
from typing import List from fastapi import FastAPI, Depends, HTTPException import crud, schemas from database import SessionLocal, engine, Base from sqlalchemy.orm import Session import uvicorn Base.metadata.create_all(bind=engine) app = FastAPI() # Dependency def get_db(): """ 每一個請求處理完畢后會關閉當前連接,不同的請求使用不同的連接 :return: """ db = SessionLocal() try: yield db finally: db.close() # 新建用戶 @app.post("/users/", response_model=schemas.User) def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): return crud.db_create_user(db=db, user=user) # 通過id查詢用戶 @app.get("/user/{user_id}", response_model=schemas.User) def read_user(user_id: int, db: Session = Depends(get_db)): db_user = crud.get_user(db, user_id=user_id) if not db_user: raise HTTPException(status_code=404, detail="User not found") return db_user # 讀取用戶擁有的item @app.get("/items/", response_model=List[schemas.Item]) def read_items(skip: int = 0, limit: int = 0, db: Session = Depends(get_db)): items = crud.get_item(db=db, skip=skip, limit=limit) return items # 創建用戶的item @app.post("/users/{user_id}/items", response_model=schemas.Item) def create_item_user(user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)): return crud.create_user_item(db=db, item=item, user_id=user_id) if __name__ == '__main__': uvicorn.run(app=app, host="127.0.0.1", port=8000)
當啟動項目后,會生成新的Item數據表,以及與User表之間建立關系:
# User表 create table users ( id int auto_increment primary key, email varchar(32) null, hashed_password varchar(32) null, is_active tinyint(1) null, constraint ix_users_email unique (email) ); create index ix_users_id on users (id); # Item表 create table items ( id int auto_increment primary key, title varchar(32) null, description varchar(32) null, owner_id int null, constraint items_ibfk_1 foreign key (owner_id) references users (id) ); create index ix_items_description on items (description); create index ix_items_id on items (id); create index ix_items_title on items (title); create index owner_id on items (owner_id);
最后進入交互文檔進行測試。