接上一篇文章FastAPI(六十六)實戰開發《在線課程學習系統》接口開發--用戶注冊接口開發。這次我們分享實際開發--用戶登陸接口開發。
我們先來梳理下邏輯
1.查詢用戶是否存在2.校驗密碼是否正確3.密碼校驗失敗記錄失敗次數4.失敗次數大於10次,當天不能登陸5.密碼校驗通過產生對應的token返回
接着我們去設計pydantic,用於校驗用戶登陸
class UserLogin(UserBase):
password: str
這里我們繼承的是之前的UserBase。
對應操作數據庫的curd我們用之前注冊的時候使用的get_user_username即可。
我們把密碼輸入失敗和token放在redis中,那么redis對應的配置。
config.py配置 redishost='127.0.0.1' redisport='6379' redisdb=0
我們在main.py增加配置
from fastapi import FastAPI
from aioredis import create_redis_pool, Redis
from routers.user import usersRouter
from routers.websoocket import socketRouter
from routers.file import fileRouter
from config import *
app = FastAPI()
async def get_redis_pool() -> Redis:
redis = await create_redis_pool(f"redis://:@"+redishost+":"+redisport+"/"+redisdb+"?encoding=utf-8")
return redis
@app.on_event("startup")
async def startup_event():
app.state.redis = await get_redis_pool()
@app.on_event("shutdown")
async def shutdown_event():
app.state.redis.close()
await app.state.redis.wait_closed()
我們把產生token的配置也一並配置進去
#config.py SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30
那么產生token的代碼如何實現呢。
from jose import JWTError, jwt
#routers/user.py
def create_access_token(data: dict):
to_encode = data.copy()
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
接下來我們就是去根據邏輯去實現最后的代碼了。
@usersRouter.post("/login", response_model=UsersToken)
async def login(request: Request, user: UserCreate, db: Session = Depends(get_db)):
db_crest = get_user_username(db, user.username)
if not db_crest:
logger.info("login:"+user.username+"不存在")
return reponse(code=100205,message='用戶不存在',data="")
verifypassowrd = verify_password(user.password, db_crest.password)
if verifypassowrd:
useris = await request.app.state.redis.get(user.username)
if not useris:
try:
token = create_access_token(data={"sub": user.username})
except Exception as e:
logger.exception(e)
return reponse(code=100203,message='產生token失敗',data='')
request.app.state.redis.set(user.username, token, expire=ACCESS_TOKEN_EXPIRE_MINUTES * 60)
return reponse(code=200,message='成功',data={"token":token})
return reponse(code=100202,message='重復登陸',data='')
else:
result=await request.app.state.redis.hgetall(user.username+"_password", encoding='utf8')
if not result:
times = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
request.app.state.redis.hmset_dict(user.username+"_password",num=0,time=times)
else:
errornum=int(result['num'])
numtime=(datetime.now() - datetime.strptime(result['time'],'%Y-%m-%d %H:%M:%S')).seconds / 60
if errornum<10 and numtime<30:
#更新錯誤次數
errornum += 1
request.app.state.redis.hmset_dict(user.username + "_password", num=errornum)
return reponse(code=100206,data='',message='密碼錯誤')
elif errornum<10 and numtime>30:
#次數置於1,時間設置現在時間
errornum=1
times = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
request.app.state.redis.hmset_dict(user.username + "_password", num=errornum,time=times)
return reponse(code=100206,data='',message='密碼錯誤')
elif errornum>10 and numtime<30:
#次數設置成最大,返回
errornum+=1
request.app.state.redis.hmset_dict(user.username + "_password", num=errornum)
return reponse(code=100204,message='輸入密碼錯誤次數過多,賬號暫時鎖定,請30min再來登錄',data='')
else:
errornum = 1
times = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
request.app.state.redis.hmset_dict(user.username + "_password", num=errornum, time=times)
return reponse(code=100206, data='', message='密碼錯誤')
我們按照最后的代碼邏輯實現去完成。
一個完整的登陸接口就實現完畢了。

