一、需求:以最小的侵入的方式,對fastapi 中的接口進行日志記錄(主要描述接口是干什么的,最終結果是不是完成了)
1、現狀:
import time
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
t_start=time.time()
response = await call_next(request)
time_cost=time.time()-t_start
return response
request中沒有我們想要的某個接口的描述。比如描述接口的功能:”素材-插入“,”素材-刪除“等。而且對業務函數的結果進行攔截獲取的時候,會重新消耗到response中迭代器:
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
t_start=time.time()
response = await call_next(request)
time_cost=time.time()-t_start
result = b""
async for chunk in response.body_iterator:
result += chunk
# 處理結果
return Response(
content=result,
status_code=response.status_code,
headers=dict(response.headers),
)
無疑會導致整個時間的消耗會大一些。
2、變通的思路:
1、每個業務接口上加裝飾器: 幾百個接口,都加一遍裝飾器。有點呵呵了
2、埋點: 幾百個接口,都埋一下點。呵~~
3、能不能中間結果處理的時候,把想要的參數掛在request 上面。畢竟starlette.requests會貫穿整個響應的周期的。
3、根據上述的第3點進行解決
考慮到fastapi 業務代碼的書寫方式
@app.get("/",name="hahah",summary="這是summary",description="這是description")
def read_root(request:Request):
return {"Hello": "World"}
希望通過router里面的summary/name等描述,放在log 中的描述中。
4、最終解決方案
通過對源碼的觀察。這里重寫fastapi.routing.get_request_handler 函數。
# 重寫fastapi routing 把路由解析參數附加到requests
import asyncio
import email.message
import enum
import inspect
import json
from typing import (
Any,
Callable,
Coroutine,
Dict,
List,
Optional,
Sequence,
Set,
Type,
Union,
)
from fastapi import params
from fastapi.datastructures import Default, DefaultPlaceholder
from fastapi.dependencies.models import Dependant
from fastapi.dependencies.utils import (
get_body_field,
get_dependant,
get_parameterless_sub_dependant,
solve_dependencies,
)
from fastapi.encoders import DictIntStrAny, SetIntStr
from fastapi.exceptions import RequestValidationError
from fastapi.openapi.constants import STATUS_CODES_WITH_NO_BODY
from fastapi.routing import run_endpoint_function
from fastapi.utils import (
create_cloned_field,
create_response_field,
generate_operation_id_for_path,
)
from pydantic.error_wrappers import ErrorWrapper
from pydantic.fields import ModelField, Undefined
from starlette import routing
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.routing import BaseRoute
from starlette.routing import (
compile_path,
get_name,
request_response,
)
from fastapi.routing import serialize_response
from fastapi import routing as fastapi_routing
def get_request_handler(
dependant: Dependant,
body_field: Optional[ModelField] = None,
status_code: Optional[int] = None,
response_class: Union[Type[Response], DefaultPlaceholder] = Default(JSONResponse),
response_field: Optional[ModelField] = None,
response_model_include: Optional[Union[SetIntStr, DictIntStrAny]] = None,
response_model_exclude: Optional[Union[SetIntStr, DictIntStrAny]] = None,
response_model_by_alias: bool = True,
response_model_exclude_unset: bool = False,
response_model_exclude_defaults: bool = False,
response_model_exclude_none: bool = False,
dependency_overrides_provider: Optional[Any] = None,
customize_para:Dict=None,
) -> Callable[[Request], Coroutine[Any, Any, Response]]:
assert dependant.call is not None, "dependant.call must be a function"
is_coroutine = asyncio.iscoroutinefunction(dependant.call)
is_body_form = body_field and isinstance(body_field.field_info, params.Form)
if isinstance(response_class, DefaultPlaceholder):
actual_response_class: Type[Response] = response_class.value
else:
actual_response_class = response_class
async def app(request: Request) -> Response:
request.scope["customize_para"]=customize_para # 添加
try:
body: Any = None
if body_field:
if is_body_form:
body = await request.form()
else:
body_bytes = await request.body()
if body_bytes:
json_body: Any = Undefined
content_type_value = request.headers.get("content-type")
if not content_type_value:
json_body = await request.json()
else:
message = email.message.Message()
message["content-type"] = content_type_value
if message.get_content_maintype() == "application":
subtype = message.get_content_subtype()
if subtype == "json" or subtype.endswith("+json"):
json_body = await request.json()
if json_body != Undefined:
body = json_body
else:
body = body_bytes
except json.JSONDecodeError as e:
raise RequestValidationError([ErrorWrapper(e, ("body", e.pos))], body=e.doc)
except Exception as e:
raise HTTPException(
status_code=400, detail="There was an error parsing the body"
) from e
solved_result = await solve_dependencies(
request=request,
dependant=dependant,
body=body,
dependency_overrides_provider=dependency_overrides_provider,
)
values, errors, background_tasks, sub_response, _ = solved_result
if errors:
raise RequestValidationError(errors, body=body)
else:
raw_response = await run_endpoint_function(
dependant=dependant, values=values, is_coroutine=is_coroutine
)
if isinstance(raw_response, Response):
if raw_response.background is None:
raw_response.background = background_tasks
# 有結果了
request.scope["customize_result"] = raw_response.body # 添加
return raw_response
# 有結果了
request.scope["customize_result"] = raw_response # 添加
response_data = await serialize_response(
field=response_field,
response_content=raw_response,
include=response_model_include,
exclude=response_model_exclude,
by_alias=response_model_by_alias,
exclude_unset=response_model_exclude_unset,
exclude_defaults=response_model_exclude_defaults,
exclude_none=response_model_exclude_none,
is_coroutine=is_coroutine,
)
response_args: Dict[str, Any] = {"background": background_tasks}
# If status_code was set, use it, otherwise use the default from the
# response class, in the case of redirect it's 307
if status_code is not None:
response_args["status_code"] = status_code
response = actual_response_class(response_data, **response_args)
response.headers.raw.extend(sub_response.headers.raw)
if sub_response.status_code:
response.status_code = sub_response.status_code
return response
return app
class APIRoute(routing.Route):
def __init__(
self,
path: str,
endpoint: Callable[..., Any],
*,
response_model: Optional[Type[Any]] = None,
status_code: Optional[int] = None,
tags: Optional[List[str]] = None,
dependencies: Optional[Sequence[params.Depends]] = None,
summary: Optional[str] = None,
description: Optional[str] = None,
response_description: str = "Successful Response",
responses: Optional[Dict[Union[int, str], Dict[str, Any]]] = None,
deprecated: Optional[bool] = None,
name: Optional[str] = None,
methods: Optional[Union[Set[str], List[str]]] = None,
operation_id: Optional[str] = None,
response_model_include: Optional[Union[SetIntStr, DictIntStrAny]] = None,
response_model_exclude: Optional[Union[SetIntStr, DictIntStrAny]] = None,
response_model_by_alias: bool = True,
response_model_exclude_unset: bool = False,
response_model_exclude_defaults: bool = False,
response_model_exclude_none: bool = False,
include_in_schema: bool = True,
response_class: Union[Type[Response], DefaultPlaceholder] = Default(
JSONResponse
),
dependency_overrides_provider: Optional[Any] = None,
callbacks: Optional[List[BaseRoute]] = None,
openapi_extra: Optional[Dict[str, Any]] = None,
) -> None:
# normalise enums e.g. http.HTTPStatus
if isinstance(status_code, enum.IntEnum):
status_code = int(status_code)
self.path = path
self.endpoint = endpoint
self.name = get_name(endpoint) if name is None else name
self.path_regex, self.path_format, self.param_convertors = compile_path(path)
if methods is None:
methods = ["GET"]
self.methods: Set[str] = set([method.upper() for method in methods])
self.unique_id = generate_operation_id_for_path(
name=self.name, path=self.path_format, method=list(methods)[0]
)
self.response_model = response_model
if self.response_model:
assert (
status_code not in STATUS_CODES_WITH_NO_BODY
), f"Status code {status_code} must not have a response body"
response_name = "Response_" + self.unique_id
self.response_field = create_response_field(
name=response_name, type_=self.response_model
)
# Create a clone of the field, so that a Pydantic submodel is not returned
# as is just because it's an instance of a subclass of a more limited class
# e.g. UserInDB (containing hashed_password) could be a subclass of User
# that doesn't have the hashed_password. But because it's a subclass, it
# would pass the validation and be returned as is.
# By being a new field, no inheritance will be passed as is. A new model
# will be always created.
self.secure_cloned_response_field: Optional[
ModelField
] = create_cloned_field(self.response_field)
else:
self.response_field = None # type: ignore
self.secure_cloned_response_field = None
self.status_code = status_code
self.tags = tags or []
if dependencies:
self.dependencies = list(dependencies)
else:
self.dependencies = []
self.summary = summary
self.description = description or inspect.cleandoc(self.endpoint.__doc__ or "")
# if a "form feed" character (page break) is found in the description text,
# truncate description text to the content preceding the first "form feed"
self.description = self.description.split("\f")[0]
self.response_description = response_description
self.responses = responses or {}
response_fields = {}
for additional_status_code, response in self.responses.items():
assert isinstance(response, dict), "An additional response must be a dict"
model = response.get("model")
if model:
assert (
additional_status_code not in STATUS_CODES_WITH_NO_BODY
), f"Status code {additional_status_code} must not have a response body"
response_name = f"Response_{additional_status_code}_{self.unique_id}"
response_field = create_response_field(name=response_name, type_=model)
response_fields[additional_status_code] = response_field
if response_fields:
self.response_fields: Dict[Union[int, str], ModelField] = response_fields
else:
self.response_fields = {}
self.deprecated = deprecated
self.operation_id = operation_id
self.response_model_include = response_model_include
self.response_model_exclude = response_model_exclude
self.response_model_by_alias = response_model_by_alias
self.response_model_exclude_unset = response_model_exclude_unset
self.response_model_exclude_defaults = response_model_exclude_defaults
self.response_model_exclude_none = response_model_exclude_none
self.include_in_schema = include_in_schema
self.response_class = response_class
assert callable(endpoint), "An endpoint must be a callable"
self.dependant = get_dependant(path=self.path_format, call=self.endpoint)
for depends in self.dependencies[::-1]:
self.dependant.dependencies.insert(
0,
get_parameterless_sub_dependant(depends=depends, path=self.path_format),
)
self.body_field = get_body_field(dependant=self.dependant, name=self.unique_id)
self.dependency_overrides_provider = dependency_overrides_provider
self.callbacks = callbacks
self.app = request_response(self.get_route_handler({"summary":self.summary,"description":self.description,"name":self.name}))
self.openapi_extra = openapi_extra
def get_route_handler(self,customize_para:Dict=None) -> Callable[[Request], Coroutine[Any, Any, Response]]:
return get_request_handler(
dependant=self.dependant,
body_field=self.body_field,
status_code=self.status_code,
response_class=self.response_class,
response_field=self.secure_cloned_response_field,
response_model_include=self.response_model_include,
response_model_exclude=self.response_model_exclude,
response_model_by_alias=self.response_model_by_alias,
response_model_exclude_unset=self.response_model_exclude_unset,
response_model_exclude_defaults=self.response_model_exclude_defaults,
response_model_exclude_none=self.response_model_exclude_none,
dependency_overrides_provider=self.dependency_overrides_provider,
customize_para=customize_para,
)
fastapi_routing.get_request_handler=get_request_handler
fastapi_routing.APIRoute.__init__=APIRoute.__init__
fastapi_routing.APIRoute.get_route_handler=APIRoute.get_route_handler
使用此文件的時候,把此文件在fastapi之前引入即可。
效果是:
request.scope["customize_result"]中存儲業務函數的響應
request.scope["customize_para"]中存儲router 中的name、summary、description參數
這樣就可以愉快的在middleware中獲取想要的數據啦!
