Integration with FastAPI and APScheduler [with ray]


Integration with FastAPI and APScheduler

https://www.cnblogs.com/lightsong/p/15054120.html

上篇博文,介紹了如何給 API Server添加 APScheduler, 以便之后后台的定時任務。

但是這里有一個問題, 如果執行的定時任務很是耗時, 則會在主進程(API server)占有大量的計算資源, 導致API server響應新的連接不及時。

這里引入 RAY 框架來專門解決這個問題。

 

Ray

https://github.com/ray-project/ray

本質上說, 此框架是一個支持分布式計算的框架, 並且支持 強化學習, 以及模型調參的工作。

An open source framework that provides a simple, universal API for building distributed applications. Ray is packaged with RLlib, a scalable reinforcement learning library, and Tune, a scalable hyperparameter tuning library.

支持三種模式:

寄生於宿主進程

獨立進程

集群

 

樣例代碼, 在主進程中, 調用如下代碼, 則會將remote標注的函數推送到  ray 工作進程, 此工作進程可以在任何一個主機上。

import ray
ray.init()

@ray.remote
def f(x):
    return x * x

futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))

 

https://zhuanlan.zhihu.com/p/111340572

在 Paper 里面描述了一個典型的遠程調用流程:

Ray與Celery相比

celery也是一個分布式計算的框架。

但是celery部署work進程時候, 需要制定  task所在腳本,

這樣工作進程的環境部署,是跟要執行的腳本強相關的。

 

但是Ray,更加類似Jenkins的主從模式, 可以將待執行的腳本推送到worker node上,然后執行,

這在應用部署上更加解耦, ray相當於是一個分布式運行環境, 可以提交任何的腳本到平台上執行。

類似 spark 平台。

 

https://github.com/fanqingsong/distributed_computing_on_celery/blob/master/tasks.py

# tasks.py
import time
from celery import Celery

celery = Celery('tasks', broker='pyamqp://localhost:5672')

@celery.task
def sendmail(mail):
    print('sending mail to %s...' % mail['to'])
    time.sleep(2.0)
    print('mail sent.')

 

https://github.com/fanqingsong/distributed_computing_on_celery/blob/master/taskscaller.py

# tasks caller

from tasks import sendmail

sendmail.delay(dict(to='celery@python.org'))

print("call done")

 

run

#run tasks proccess
pipenv run celery -A tasks worker --loglevel=info -P eventlet

# run producer
pipenv run python taskscaller.py

 

Ray Cluster Overview

https://docs.ray.io/en/master/cluster/index.html

What is a Ray cluster?

One of Ray’s strengths is the ability to leverage multiple machines in the same program. Ray can, of course, be run on a single machine (and is done so often), but the real power is using Ray on a cluster of machines.

A Ray cluster consists of a head node and a set of worker nodes. The head node needs to be started first, and the worker nodes are given the address of the head node to form the cluster:

../_images/ray-cluster.jpg

https://docs.ray.io/en/master/configure.html#cluster-resources

# To start a head node.
$ ray start --head --num-cpus=<NUM_CPUS> --num-gpus=<NUM_GPUS> # To start a non-head node. $ ray start --address=<address> --num-cpus=<NUM_CPUS> --num-gpus=<NUM_GPUS> # Specifying custom resources ray start [--head] --num-cpus=<NUM_CPUS> --resources='{"Resource1": 4, "Resource2": 16}'

code refer

# Connect to ray. Notice if connected to existing cluster, you don't specify resources.
ray.init(address=<address>)

 

also refer to

https://docs.ray.io/en/releases-0.8.5/using-ray-on-a-cluster.html#deploying-ray-on-a-cluster

 

Autoscaling clusters with Ray

https://medium.com/distributed-computing-with-ray/autoscaling-clusters-with-ray-36bad4da6b9c

 

 

Ray Dashboard

https://docs.ray.io/en/master/ray-dashboard.html#ray-dashboard

提供了完備的后台診斷工具

(1)集群度量

(2)錯誤和異常,容易定位

(3)查看各個機器上的日志

。。。

Ray’s built-in dashboard provides metrics, charts, and other features that help Ray users to understand Ray clusters and libraries.

The dashboard lets you:

  • View cluster metrics.

  • See errors and exceptions at a glance.

  • View logs across many machines in a single pane.

  • Understand Ray memory utilization and debug memory errors.

  • See per-actor resource usage, executed tasks, logs, and more.

  • Kill actors and profile your Ray jobs.

  • See Tune jobs and trial information.

  • Detect cluster anomalies and debug them.

 

Logging directory structure

https://docs.ray.io/en/master/ray-logging.html#id1

By default, Ray logs are stored in a /tmp/ray/session_*/logs directory.

 

worker-[worker_id]-[job_id]-[pid].[out|err]: Python/Java part of Ray drivers and workers. All of stdout and stderr from tasks/actors are streamed here. Note that job_id is an id of the driver.

 

在代碼中添加打印,輔助定位

import ray
# Initiate a driver.
ray.init()

@ray.remote
def task():
    print(f"task_id: {ray.get_runtime_context().task_id}")

ray.get(task.remote())

 

(pid=47411) task_id: TaskID(a67dc375e60ddd1affffffffffffffffffffffff01000000)

 

API for log

https://docs.ray.io/en/master/package-ref.html#runtime-context-apis

Runtime Context APIs

ray.runtime_context. get_runtime_context ()[source]

Get the runtime context of the current driver/worker.

Example:

>>> ray.get_runtime_context().job_id # Get the job id. >>> ray.get_runtime_context().get() # Get all the metadata. 

PublicAPI (beta): This API is in beta and may change before becoming stable.

 

還可以查到 node_id, task_id

property job_id

Get current job ID for this worker or driver.

Job ID is the id of your Ray drivers that create tasks or actors.

Returns
If called by a driver, this returns the job ID. If called in

a task, return the job ID of the associated driver.

property node_id

Get current node ID for this worker or driver.

Node ID is the id of a node that your driver, task, or actor runs.

Returns

a node id for this worker or driver.

property task_id

Get current task ID for this worker or driver.

Task ID is the id of a Ray task. This shouldn’t be used in a driver process.

 

ray.wait() for Pipeline data processing

https://docs.ray.io/en/master/auto_examples/tips-for-first-time.html#tip-4-pipeline-data-processing

import time
import random
import ray

ray.init(num_cpus = 4)

@ray.remote
def do_some_work(x):
    time.sleep(random.uniform(0, 4)) # Replace this is with work you need to do.
    return x

def process_incremental(sum, result):
    time.sleep(1) # Replace this with some processing code.
    return sum + result

start = time.time()
result_ids = [do_some_work.remote(x) for x in range(4)]
sum = 0
while len(result_ids):
    done_id, result_ids = ray.wait(result_ids)
    sum = process_incremental(sum, ray.get(done_id[0]))
print("duration =", time.time() - start, "\nresult = ", sum)

 

AsyncIOScheduler of APScheduler

https://apscheduler.readthedocs.io/en/stable/modules/schedulers/asyncio.html

AsyncIOScheduler was meant to be used with the AsyncIO event loop. By default, it will run jobs in the event loop’s thread pool.

If you have an application that runs on an AsyncIO event loop, you will want to use this scheduler.

 

異步調度器

https://stackoverflow.com/questions/63001954/python-apscheduler-how-does-asyncioscheduler-work

from apscheduler.schedulers.asyncio import AsyncIOScheduler
import asyncio

async def job():
    print('hi')

scheduler = AsyncIOScheduler()
scheduler.add_job(job, "interval", seconds=3)

scheduler.start()

asyncio.get_event_loop().run_forever()

 

Code Demo

https://github.com/fanqingsong/fastapi_apscheduler

Purpose

With the help of fastapi and apscheduler, implement API to get cpu rate and set/delete periodical cpu scan job.

reference: https://ahaw021.medium.com/scheduled-jobs-with-fastapi-and-apscheduler-5a4c50580b0e

 

Architecture

Seperate workload from fastapi server, in order to prevent the server from being too busy.

Select APScheduler as time policy manager.

Select Ray as logic node to execute workload.

The call from fastapi or apscheduler to ray cluster is asynchronous, so all the communication is reactive, no blocking status exists.

components

 

Description:

To demostrating how to use fastapi and apscheduler

Requirements: previde API to get CPU rate, and get it periodically

(1) get_cpu_rate -- get current cpu rate by this call

(2) set_cpu_scanner_job -- set one scheduled job to scan cpu rate periodically

(3) del_cpu_scanner_job -- delete the scheduled job

 

#FastAPI and Pydantic Related Libraries
from fastapi import FastAPI
from pydantic import BaseModel,Field
from typing import List
import asyncio

#APScheduler Related Libraries
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

import uuid

import logging
import psutil
from datetime import datetime
import os

import ray

import time

ray.init(address="192.168.1.10:6379")

# Global Variables
app = FastAPI(title="APP for demostrating integration with FastAPI and APSCheduler", version="2020.11.1",
              description="An Example of Scheduling CPU scanner info periodically")

Schedule = None

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class CPURateResponse(BaseModel):
    cpu_rate:float=Field(title="CPU Rate", description="The current CPU rate")


class SetCPUScannerJobResponse(BaseModel):
    job_id:str=Field(title="CPU Scanner Job ID", description="CPU Scanner Job ID")


class DelCPUScannerJobResponse(BaseModel):
    job_id:str=Field(title="CPU Scanner Job ID", description="CPU Scanner Job ID")



@app.on_event("startup")
async def load_schedule_or_create_blank():
    """
    Instatialise the Schedule Object as a Global Param and also load existing Schedules from SQLite
    This allows for persistent schedules across server restarts.
    """
    print("#####startup event is called.")

    global Schedule
    try:
        jobstores = {
            'default': SQLAlchemyJobStore(url='sqlite:///../store/jobs.sqlite')
        }
        Schedule = AsyncIOScheduler(jobstores=jobstores)
        Schedule.start()
        # asyncio.get_event_loop().run_forever()
        logger.info("Created Schedule Object")
    except:
        logger.error("Unable to Create Schedule Object")


@app.on_event("shutdown")
async def pickle_schedule():
    """
    An Attempt at Shutting down the schedule to avoid orphan jobs
    """
    print("#####shutdown event is called.")

    global Schedule
    Schedule.shutdown()
    logger.info("Disabled Schedule")


@ray.remote
def get_cpu_rate_on_ray():
    logging.info("get_cpu_rate_on_ray called.")
    print("get_cpu_rate_on_ray called. !!")

    job_id = ray.get_runtime_context().job_id
    print(f"job_id={job_id}")

    # time.sleep(10)

    cpu_rate = psutil.cpu_percent(interval=1)

    logging.info(f"cpu_rate = {cpu_rate}")

    return cpu_rate

async def scan_cpu_rate(job_id):
    logging.info(f'###!!!!!!!!!!!!! Tick! call by apscheduler job {job_id}')

    future = get_cpu_rate_on_ray.remote()

    logging.info(future)

    cpu_rate = ray.get(future)

    logging.info(f"cpu_rate = {cpu_rate}")

@app.post("/get_cpu_rate/", response_model=CPURateResponse, tags=["API"])
def get_cpu_rate():
    future = get_cpu_rate_on_ray.remote()

    logging.info(future)

    cpu_rate = ray.get(future)

    logging.info(f"cpu_rate = {cpu_rate}")

    return {"cpu_rate": cpu_rate}


@app.post("/set_cpu_scanner_job/", response_model=SetCPUScannerJobResponse, tags=["API"])
def set_cpu_scanner_job():
    random_suffix = uuid.uuid1()
    job_id = str(random_suffix)

    cpu_scanner_job = Schedule.add_job(scan_cpu_rate, 'interval', seconds=30, id=job_id, args=[job_id])

    job_id = cpu_scanner_job.id
    logging.info(f"set cpu scanner job, id = {job_id}")

    return {"job_id": job_id}


@app.post("/del_cpu_scanner_job/", response_model=DelCPUScannerJobResponse, tags=["API"])
def del_cpu_scanner_job(job_id:str):

    Schedule.remove_job(job_id)

    logging.info(f"set cpu scanner job, id = {job_id}")

    return {"job_id": job_id}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM