作者:麥克煎蛋 出處:https://www.cnblogs.com/mazhiyong/ 轉載請保留這段聲明,謝謝!
SQLAlchemy是一個基於Python實現的ORM框架。它提供了一種方法,用於將用戶定義的Python類與數據庫表相關聯,並將這些類(對象)的實例與其對應表中的行相關聯。它包括一個透明地同步對象及其相關行之間狀態的所有變化的系統,以及根據用戶定義的類及其定義的彼此之間的關系表達數據庫查詢的系統。
這里我們以MySQL為例。SQLAlchemy本身無法操作數據庫,其必須借助pymysql等第三方插件。
pip install pymysql
pip install sqlalchemy
一、 首先實現對數據庫的操作
這里以聯系人為例,實現了對聯系人數據的新增、讀取以及更新操作:
注意,這里的數據模型DBUser指的是與數據庫相關的數據模型。
from sqlalchemy import Column, DateTime, String, text, create_engine from sqlalchemy.dialects.mysql import INTEGER, VARCHAR from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import Session # db connect config(略,可自行填寫) MYSQL_USER = '' MYSQL_PASS = '' MYSQL_HOST = '' MYSQL_PORT = '3306' MYSQL_DB = '' SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://%s:%s@%s:%s/%s' % (MYSQL_USER, MYSQL_PASS, MYSQL_HOST, MYSQL_PORT, MYSQL_DB) # 創建對象的基類: Base = declarative_base() # 初始化數據庫連接: engine = create_engine(SQLALCHEMY_DATABASE_URI) # SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) SessionLocal = sessionmaker(bind=engine) class DBUser(Base): __tablename__ = 'test_user' id = Column(INTEGER(64), primary_key=True, comment='編號') username = Column(String(100)) password = Column(String(100)) sex = Column(VARCHAR(10), server_default=text("''"), comment='性別') login_time = Column(INTEGER(11), server_default=text("'0'"), comment='登陸時間,主要為了登陸JWT校驗使用') create_date = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP")) update_date = Column(DateTime, nullable=False, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP")) @classmethod def add(cls, db: Session, data): db.add(data) db.commit() # db.refresh(data) @classmethod def get_by_username(cls, db: Session, username): data = db.query(cls).filter_by(username=username).first() return data @classmethod def update(cls, db: Session, username, sex): db.query(cls).filter_by(username=username).update({cls.sex: sex}) db.commit()
這里的db:Session從調用者中傳入,每次請求只會用一個數據庫Session,請求結束后關閉。
二、實現業務邏輯
這里以聯系人注冊、登陸、數據讀取等常用流程為例。
注意以下基礎數據模型,指的是Pydantic數據模型,用於返回給終端。
同時要注意到,SQLAlchemy模型用 "="來定義屬性,而Pydantic模型用":"來聲明類型,不要弄混了。
class User(BaseModel): id: Optional[int] = None username: str sex: Optional[str] = None login_time: Optional[int] = None class Config: orm_mode = True
注意,我們給Pydantic模型添加了一個 Config類。Config用來給Pydantic提供配置信息,這里我們添加了配置信息"orm_mode = True"。
配置項"orm_mode"除了可以讓Pydantic讀取字典類型的數據,還支持Pydantic讀取屬性數據,比如SQLAlchemy模型的數據。
這樣Pydantic數據模型就可以兼容SQLAlchemy數據模型,我們可以在路徑操作函數中直接返回SQLAlchemy數據模型(沒有這個配置項的支持是不行的)。
1、用戶注冊
@app.post("/register", response_model=User) async def register(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): # 密碼加密 password = get_password_hash(form_data.password) db_user = DBUser.get_by_username(db, form_data.username) if db_user: return db_user db_user = DBUser(username=form_data.username, password=password) DBUser.add(db, db_user) return db_user
通過get_db來獲取數據庫Session,請求結束后關閉。
def get_db(): db = SessionLocal() try: yield db finally: db.close()
2、用戶登陸
@app.post("/login", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): # 首先校驗用戶信息 user = authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # 生成並返回token信息 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}
在登陸的時候要對用戶名和密碼進行校驗:
# 用戶信息校驗:username和password分別校驗 def authenticate_user(db: Session, username: str, password: str): user = DBUser.get_by_username(db, username) if not user: return False if not verify_password(password, user.password): return False return user
如果登陸成功則返回token信息:
# 生成token,帶有過期時間 def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt
3、接口訪問示例
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") print(username) if username is None: raise credentials_exception token_data = TokenData(username=username) except PyJWTError: raise credentials_exception user = DBUser.get_by_username(db, token_data.username) if user is None: raise credentials_exception return user @app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_user)): return current_user
這里以讀取用戶信息為例,請求端要在頭信息中攜帶token信息。
后端收到請求后,要對token進行解析,如果合法則繼續訪問,如果非法則返回401錯誤信息。
其他接口的校驗過程與此類似。