FastAPI 數據庫訪問(一)使用SQLAlchemy訪問關系數據庫


作者:麥克煎蛋   出處:https://www.cnblogs.com/mazhiyong/ 轉載請保留這段聲明,謝謝!

 

SQLAlchemy是一個基於Python實現的ORM框架。它提供了一種方法,用於將用戶定義的Python類與數據庫表相關聯,並將這些類(對象)的實例與其對應表中的行相關聯。它包括一個透明地同步對象及其相關行之間狀態的所有變化的系統,以及根據用戶定義的類及其定義的彼此之間的關系表達數據庫查詢的系統。

關於SQLAlchemy的具體使用細節這里不再贅述,重點講述數據庫模型與Pydantic模型使用、以及數據庫Session有關的內容。

這里我們以MySQL為例。SQLAlchemy本身無法操作數據庫,其必須借助pymysql等第三方插件。

pip install pymysql
pip install sqlalchemy

一、 首先實現對數據庫的操作

這里以聯系人為例,實現了對聯系人數據的新增、讀取以及更新操作:

注意,這里的數據模型DBUser指的是與數據庫相關的數據模型。

from sqlalchemy import Column, DateTime, String, text, create_engine
from sqlalchemy.dialects.mysql import INTEGER, VARCHAR
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy.orm import Session

# db connect config(略,可自行填寫)
MYSQL_USER = ''
MYSQL_PASS = ''
MYSQL_HOST = ''
MYSQL_PORT = '3306'
MYSQL_DB = ''

SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://%s:%s@%s:%s/%s' % (MYSQL_USER, MYSQL_PASS, MYSQL_HOST, MYSQL_PORT, MYSQL_DB)

# 創建對象的基類:
Base = declarative_base()

# 初始化數據庫連接:
engine = create_engine(SQLALCHEMY_DATABASE_URI)

# SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
SessionLocal = sessionmaker(bind=engine)


class DBUser(Base):
    __tablename__ = 'test_user'

    id = Column(INTEGER(64), primary_key=True, comment='編號')
    username = Column(String(100))
    password = Column(String(100))
    sex = Column(VARCHAR(10), server_default=text("''"), comment='性別')
    login_time = Column(INTEGER(11), server_default=text("'0'"), comment='登陸時間,主要為了登陸JWT校驗使用')
    create_date = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP"))
    update_date = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))

    @classmethod
    def add(cls, db: Session, data):
        db.add(data)
        db.commit()
        # db.refresh(data)

    @classmethod
    def get_by_username(cls, db: Session, username):
        data = db.query(cls).filter_by(username=username).first()

        return data

    @classmethod
    def update(cls, db: Session, username, sex):
        db.query(cls).filter_by(username=username).update({cls.sex: sex})

        db.commit()

這里的db:Session從調用者中傳入,每次請求只會用一個數據庫Session,請求結束后關閉。

二、實現業務邏輯

這里以聯系人注冊、登陸、數據讀取等常用流程為例。

注意以下基礎數據模型,指的是Pydantic數據模型,用於返回給終端。

同時要注意到,SQLAlchemy模型用 "="來定義屬性,而Pydantic模型用":"來聲明類型,不要弄混了。

class User(BaseModel):
    id: Optional[int] = None
    username: str
    sex: Optional[str] = None
    login_time: Optional[int] = None

    class Config:
        orm_mode = True

 

注意,我們給Pydantic模型添加了一個 Config類。Config用來給Pydantic提供配置信息,這里我們添加了配置信息"orm_mode = True"。

配置項"orm_mode"除了可以讓Pydantic讀取字典類型的數據,還支持Pydantic讀取屬性數據,比如SQLAlchemy模型的數據。

這樣Pydantic數據模型就可以兼容SQLAlchemy數據模型,我們可以在路徑操作函數中直接返回SQLAlchemy數據模型(沒有這個配置項的支持是不行的)。

1、用戶注冊

@app.post("/register", response_model=User)
async def register(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    # 密碼加密
    password = get_password_hash(form_data.password)

    db_user = DBUser.get_by_username(db, form_data.username)
    if db_user:
        return db_user

    db_user = DBUser(username=form_data.username, password=password)
    DBUser.add(db, db_user)

    return db_user

通過get_db來獲取數據庫Session,請求結束后關閉。

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

2、用戶登陸

@app.post("/login", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    # 首先校驗用戶信息
    user = authenticate_user(db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    # 生成並返回token信息
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )

    return {"access_token": access_token, "token_type": "bearer"}

在登陸的時候要對用戶名和密碼進行校驗:

# 用戶信息校驗:username和password分別校驗
def authenticate_user(db: Session, username: str, password: str):
    user = DBUser.get_by_username(db, username)
    if not user:
        return False
    if not verify_password(password, user.password):
        return False
    return user

如果登陸成功則返回token信息:

# 生成token,帶有過期時間
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

3、接口訪問示例

async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        print(username)

        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except PyJWTError:
        raise credentials_exception
    user = DBUser.get_by_username(db, token_data.username)
    if user is None:
        raise credentials_exception
    return user


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

這里以讀取用戶信息為例,請求端要在頭信息中攜帶token信息。

 

后端收到請求后,要對token進行解析,如果合法則繼續訪問,如果非法則返回401錯誤信息。

其他接口的校驗過程與此類似。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM