接上一篇文章FastAPI(六十六)實戰開發《在線課程學習系統》接口開發--用戶注冊接口開發。這次我們分享實際開發--用戶登陸接口開發。
我們先來梳理下邏輯
1.查詢用戶是否存在
2.校驗密碼是否正確
3.密碼校驗失敗記錄失敗次數
4.失敗次數大於10次,當天不能登陸
5.密碼校驗通過產生對應的token返回
接着我們去設計pydantic,用於校驗用戶登陸
class UserLogin(UserBase): password: str
這里我們繼承的是之前的UserBase。
對應操作數據庫的curd我們用之前注冊的時候使用的get_user_username即可。
我們把密碼輸入失敗和token放在redis中,那么redis對應的配置。
config.py配置 redishost='127.0.0.1' redisport='6379' redisdb=0
我們在main.py增加配置
from fastapi import FastAPI from aioredis import create_redis_pool, Redis from routers.user import usersRouter from routers.websoocket import socketRouter from routers.file import fileRouter from config import * app = FastAPI() async def get_redis_pool() -> Redis: redis = await create_redis_pool(f"redis://:@"+redishost+":"+redisport+"/"+redisdb+"?encoding=utf-8") return redis @app.on_event("startup") async def startup_event(): app.state.redis = await get_redis_pool() @app.on_event("shutdown") async def shutdown_event(): app.state.redis.close() await app.state.redis.wait_closed()
我們把產生token的配置也一並配置進去
#config.py SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30
那么產生token的代碼如何實現呢。
from jose import JWTError, jwt #routers/user.py def create_access_token(data: dict): to_encode = data.copy() encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt
接下來我們就是去根據邏輯去實現最后的代碼了。
@usersRouter.post("/login", response_model=UsersToken) async def login(request: Request, user: UserCreate, db: Session = Depends(get_db)): db_crest = get_user_username(db, user.username) if not db_crest: logger.info("login:"+user.username+"不存在") return reponse(code=100205,message='用戶不存在',data="") verifypassowrd = verify_password(user.password, db_crest.password) if verifypassowrd: useris = await request.app.state.redis.get(user.username) if not useris: try: token = create_access_token(data={"sub": user.username}) except Exception as e: logger.exception(e) return reponse(code=100203,message='產生token失敗',data='') request.app.state.redis.set(user.username, token, expire=ACCESS_TOKEN_EXPIRE_MINUTES * 60) return reponse(code=200,message='成功',data={"token":token}) return reponse(code=100202,message='重復登陸',data='') else: result=await request.app.state.redis.hgetall(user.username+"_password", encoding='utf8') if not result: times = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S") request.app.state.redis.hmset_dict(user.username+"_password",num=0,time=times) else: errornum=int(result['num']) numtime=(datetime.now() - datetime.strptime(result['time'],'%Y-%m-%d %H:%M:%S')).seconds / 60 if errornum<10 and numtime<30: #更新錯誤次數 errornum += 1 request.app.state.redis.hmset_dict(user.username + "_password", num=errornum) return reponse(code=100206,data='',message='密碼錯誤') elif errornum<10 and numtime>30: #次數置於1,時間設置現在時間 errornum=1 times = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S") request.app.state.redis.hmset_dict(user.username + "_password", num=errornum,time=times) return reponse(code=100206,data='',message='密碼錯誤') elif errornum>10 and numtime<30: #次數設置成最大,返回 errornum+=1 request.app.state.redis.hmset_dict(user.username + "_password", num=errornum) return reponse(code=100204,message='輸入密碼錯誤次數過多,賬號暫時鎖定,請30min再來登錄',data='') else: errornum = 1 times = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S") request.app.state.redis.hmset_dict(user.username + "_password", num=errornum, time=times) return reponse(code=100206, data='', message='密碼錯誤')
我們按照最后的代碼邏輯實現去完成。
一個完整的登陸接口就實現完畢了。