API Server with Scheduler
一般API服務器,僅僅提供API接口, 執行單次業務邏輯的執行。
如果在API服務器后台,執行定時執行功能, 讓后台承擔業務邏輯的定時執行功能, 添加APScheduler庫。
FastAPI
https://fastapi.tiangolo.com
現代,快速,web框架, 用於構建APIs。
FastAPI is a modern, fast (high-performance), web framework for building APIs with Python 3.6+ based on standard Python type hints.
The key features are:
Fast: Very high performance, on par with NodeJS and Go (thanks to Starlette and Pydantic). One of the fastest Python frameworks available.
Fast to code: Increase the speed to develop features by about 200% to 300%. *
- Fewer bugs: Reduce about 40% of human (developer) induced errors. *
- Intuitive: Great editor support. Completion everywhere. Less time debugging.
- Easy: Designed to be easy to use and learn. Less time reading docs.
- Short: Minimize code duplication. Multiple features from each parameter declaration. Fewer bugs.
- Robust: Get production-ready code. With automatic interactive documentation.
- Standards-based: Based on (and fully compatible with) the open standards for APIs: OpenAPI (previously known as Swagger) and JSON Schema.
APScheduler
https://apscheduler.readthedocs.io/en/latest/index.html
支持添加和刪除任務
對任務保持持久化,重啟后可以恢復JOB
Advanced Python Scheduler (APScheduler) is a Python library that lets you schedule your Python code to be executed later, either just once or periodically. You can add new jobs or remove old ones on the fly as you please. If you store your jobs in a database, they will also survive scheduler restarts and maintain their state. When the scheduler is restarted, it will then run all the jobs it should have run while it was offline 1.
pydantic
https://pydantic-docs.helpmanual.io/
用於數據校驗 和 配置管理。
Data validation and settings management using python type annotations.
pydantic enforces type hints at runtime, and provides user friendly errors when data is invalid.
Define how data should be in pure, canonical python; validate it with pydantic.
定義 數據模型 和 將數據填充到 數據模型中。
from datetime import datetime from typing import List, Optional from pydantic import BaseModel class User(BaseModel): id: int name = 'John Doe' signup_ts: Optional[datetime] = None friends: List[int] = [] external_data = { 'id': '123', 'signup_ts': '2019-06-01 12:22', 'friends': [1, 2, '3'], } user = User(**external_data) print(user.id) #> 123 print(repr(user.signup_ts)) #> datetime.datetime(2019, 6, 1, 12, 22) print(user.friends) #> [1, 2, 3] print(user.dict()) """ { 'id': 123, 'signup_ts': datetime.datetime(2019, 6, 1, 12, 22), 'friends': [1, 2, 3], 'name': 'John Doe', } """
如果將數據轉換 到 數據模型中, 遇到報錯, 則將所有報錯 收集到錯誤處理信息中。
from pydantic import ValidationError try: User(signup_ts='broken', friends=[1, 2, 'not number']) except ValidationError as e: print(e.json())
output
[ { "loc": [ "id" ], "msg": "field required", "type": "value_error.missing" }, { "loc": [ "signup_ts" ], "msg": "invalid datetime format", "type": "value_error.datetime" }, { "loc": [ "friends", 2 ], "msg": "value is not a valid integer", "type": "type_error.integer" } ]
Demo
https://github.com/fanqingsong/fastapi_apscheduler
使用 psutil獲取cpu使用百分比。
提供API直接獲取,和定期打印。
cpu scanner
uvicorn cpu_scanner:app --reload
Description: To demostrating how to use fastapi and apscheduler
Requirements: previde API to get CPU rate, and get it periodically
(1) get_cpu_rate -- get current cpu rate by this call
(2) set_cpu_scanner_job -- set one scheduled job to scan cpu rate periodically
(3) del_cpu_scanner_job -- delete the scheduled job
code
#FastAPI and Pydantic Related Libraries from fastapi import FastAPI from pydantic import BaseModel,Field from typing import List #APScheduler Related Libraries from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore import uuid import logging import psutil from datetime import datetime import os # Global Variables app = FastAPI(title="APP for demostrating integration with FastAPI and APSCheduler", version="2020.11.1", description="An Example of Scheduling CPU scanner info periodically") Schedule = None logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def scan_cpu_rate(job_id): logging.info(f'!!!!!!!!!!!!!!!! Tick! call by job {job_id}') cpu_rate = psutil.cpu_percent(interval=1) logging.info(f"cpu_rate = {cpu_rate}") class CPURateResponse(BaseModel): cpu_rate:float=Field(title="CPU Rate", description="The current CPU rate") class SetCPUScannerJobResponse(BaseModel): job_id:str=Field(title="CPU Scanner Job ID", description="CPU Scanner Job ID") class DelCPUScannerJobResponse(BaseModel): job_id:str=Field(title="CPU Scanner Job ID", description="CPU Scanner Job ID") @app.on_event("startup") async def load_schedule_or_create_blank(): """ Instatialise the Schedule Object as a Global Param and also load existing Schedules from SQLite This allows for persistent schedules across server restarts. """ global Schedule try: jobstores = { 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } Schedule = AsyncIOScheduler(jobstores=jobstores) Schedule.start() logger.info("Created Schedule Object") except: logger.error("Unable to Create Schedule Object") @app.on_event("shutdown") async def pickle_schedule(): """ An Attempt at Shutting down the schedule to avoid orphan jobs """ global Schedule Schedule.shutdown() logger.info("Disabled Schedule") @app.post("/get_cpu_rate/", response_model=CPURateResponse, tags=["API"]) def get_cpu_rate(): cpu_rate = psutil.cpu_percent(interval=1) logging.info(f"cpu_rate = {cpu_rate}") return {"cpu_rate": cpu_rate} @app.post("/set_cpu_scanner_job/", response_model=SetCPUScannerJobResponse, tags=["API"]) def set_cpu_scanner_job(): random_suffix = uuid.uuid1() job_id = str(random_suffix) cpu_scanner_job = Schedule.add_job(scan_cpu_rate, 'interval', seconds=30, id=job_id, args=[job_id]) job_id = cpu_scanner_job.id logging.info(f"set cpu scanner job, id = {job_id}") return {"job_id": job_id} @app.post("/del_cpu_scanner_job/", response_model=DelCPUScannerJobResponse, tags=["API"]) def del_cpu_scanner_job(job_id:str): Schedule.remove_job(job_id) logging.info(f"set cpu scanner job, id = {job_id}") return {"job_id": job_id}
Reference
Scheduled Jobs with FastAPI and APScheduler
https://ahaw021.medium.com/scheduled-jobs-with-fastapi-and-apscheduler-5a4c50580b0e
Adding Job
https://apscheduler.readthedocs.io/en/latest/userguide.html#adding-jobs
Removing Job
https://apscheduler.readthedocs.io/en/latest/userguide.html#removing-jobs
UUID 生成,作為Job id
https://docs.python.org/3/library/uuid.html#example
psutil獲取cpu rate
https://psutil.readthedocs.io/en/latest/#psutil.cpu_percent
與django集成
https://pypi.org/project/django-apscheduler/
This is a Django app that adds a lightweight wrapper around APScheduler. It enables storing persistent jobs in the database using Django's ORM.
django-apscheduler is a great choice for quickly and easily adding basic scheduling features to your Django applications with minimal dependencies and very little additional configuration. The ideal use case probably involves running a handful of tasks on a fixed execution schedule.
The tradeoff of this simplicity is that you need to be careful to ensure that you only have one scheduler actively running at a particular point in time.