作者:麥克煎蛋 出處:https://www.cnblogs.com/mazhiyong/ 轉載請保留這段聲明,謝謝!
OAuth2 scopes是一種細粒度的安全許可機制,通常用來對用戶或者第三方應用提供特定的訪問許可。
在OAuth2的規范中,scopes是一個基於空格分隔符的字符串列表。這些scopes代表着"許可"。
每一個scope項是一個不帶空格的字符串,通常用來表示特定的安全許可,例如:
users:read
或者users:write:這是通常的使用場景
instagram_basic:
Facebook / Instagram的使用場景https://www.googleapis.com/auth/drive
: Google使用場景
scope的具體內容根據業務需求而定,對OAuth2來說只是字符串。
我們可以在FastAPI中直接使用無縫集成的OAuth2 scopes。
一、通過token返回scopes信息
1、后台獲取權限
通常情況下,用戶登陸成功以后,我們可以獲取到用戶真實的權限,並通過token返回scopes信息。
示例中的scopes信息僅是演示,使用時應該根據用戶的實際系統權限來賦值。
@app.post("/login", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): # 首先校驗用戶信息 user = authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # 生成並返回token信息 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username, "scopes": "me"}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}
2、用戶登陸時選擇
用戶也可以在登陸時選擇scopes信息,這也是Google等登陸時所用的機制。
我們需要在OAuth2PasswordBearer
中添加scopes信息。
oauth2_scheme = OAuth2PasswordBearer( tokenUrl="token", scopes={"me": "Read information about the current user.", "items": "Read items."}, )
同樣我們也是通過token返回scopes信息,只不過scopes信息的來源是用戶端。
@app.post("/login2", response_model=Token) async def login_for_access_token2(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): # 首先校驗用戶信息 user = authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # 生成並返回token信息 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username, "scopes": form_data.scopes}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}
我們可以在交互式文檔中查看顯示效果:
二、scopes權限校驗
當有請求訪問時,需要從token中解析出有效信息,不僅需要完成用戶身份校驗,還需要完成基於scopes的權限校驗。
1、scopes數據傳遞
首先需要在路徑操作中添加對scopes的依賴項:
@app.get("/users/me/", response_model=User) async def read_users_me( current_user: User = Security(get_current_user, scopes=["me"]) ): return current_user
Security實際上是Depends的子類,只不過多了一個參數,可以接收scopes的列表信息。
通過使用Security而不是Depends,FastAPI將會知道它會聲明並內部使用scopes信息,並且在交互式文檔中顯示這些信息。
2、scopes數據解析
SecurityScopes的屬性scopes,是一個包含所有它需要的scopes以及所有依賴項(把它作為子依賴項)的列表。
SecurityScopes的屬性scope_str,是包含所有scopes的一個字符串(以空格分隔)。
class SecurityScopes: def __init__(self, scopes: List[str] = None): self.scopes = scopes or [] self.scope_str = " ".join(self.scopes)
async def get_current_user(security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): if security_scopes.scopes: authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' else: authenticate_value = f"Bearer" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": authenticate_value}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") print(username) if username is None: raise credentials_exception # 讀取scopes信息 token_scopes = payload.get("scopes", []) token_data = TokenData(scopes=token_scopes, username=username) except (PyJWTError, ValidationError): raise credentials_exception # 用戶身份校驗 user = DBUser.get_by_username(db, token_data.username) if user is None: raise credentials_exception # 基於scope的權限校驗 for scope in security_scopes.scopes: if scope not in token_data.scopes: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not enough permissions", headers={"WWW-Authenticate": authenticate_value}, ) return user
3、依賴項樹和scopes
修改部分代碼邏輯如下:
async def get_current_active_user( current_user: User = Security(get_current_user, scopes=["me"]) ): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user @app.get("/users/me/", response_model=User) async def read_users_me( current_user: User = Security(get_current_active_user, scopes=["me"]) ): return current_user @app.get("/items/") async def read_items(token: str = Depends(oauth2_scheme)): return {"token": token} @app.get("/users/me/items/") async def read_own_items( current_user: User = Security(get_current_active_user, scopes=["items"]) ): return [{"item_id": "Foo", "owner": current_user.username}] @app.get("/status/") async def read_system_status(current_user: User = Depends(get_current_user)): return {"status": "ok"}
關於依賴項和scopes的層次提下如下:
路徑操作read_own_items
有:
* 依賴項需要的scopes: ["items"]
* 依賴項函數 get_current_active_user:
* 依賴項需要的scopes: ["me"]
* 依賴項函數 get_current_user:
* 自身不需要scopes
* 依賴項 oauth2_scheme
* SecurityScopes類型的參數security_scopes
* 參數security_scopes
的屬性scopes包含所有以上聲明的scopes的一個列表,因此
* 對於路徑操作 read_own_items來說,security_scopes
.scopes 包含 ["me", "items"]
* 對於路徑操作 read_users_me來說,security_scopes
.scopes 包含 ["me"]
* 對於路徑操作 read_system_status來說,security_scopes
.scopes 的內容為[]