flask使用celery编写异步任务
【转载请以链接方式注明出处】
flask是一个阻塞式的框架。这里的“阻塞”是指flask处理请求的时候,一次只能处理一个,当多个requests过来,flask会说,大家不要急,一个一个来。如果恰好这时候某个请求耗费了大量的时间(种种原因),在这段时间内,flask说:我忙着呢,后来的排好队。然后服务进程被占用着,后来的请求得不到处理。这就叫做被阻塞住了。说得再多不如一只栗子:
1
2
3
4
5
|
def long_time_def():
for _ in range(10000):
for j in range(10000):
i = 1
return 'hello'
|
我的老爷机运行这只函数需要大约8s的时间,可以很好地用来充当“阻塞侠”的角色。如果你的本配置比较好,可以适当增大数字。然后我们这么构建一个flask服务端:
### flask_celery.py ###
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
app = Flask(__name__)
@app.route("/")
def hello():
print ("耗时的请求")
result=long_time_def()
return result
@app.route('/t')
def test():
return 'test!'
if __name__ == "__main__":
app.run(debug=True)
|
运行一下,同时访问‘/’与‘/t’你就能明白,当‘/’未返回结果的时候,‘/t’是没有响应的。
多进程的方案只能说可以缓解这种“阻塞”,但无法根治,因为进程有限,访问量不可控,早晚所有进程都被阻塞。所以一个分布式的任务队列框架被搬上台面:celery。
celery会这么说:flask,你把那些请求起来比较耗时的任务丢给我,我帮你处理,你留着资源去处理下一个请求吧!很仗义的样子有木有!只是flask说,好啊,不过,你有地方放吗?因为celery进程也是按照先来后到的原则处理任务,多个任务到来,flask也会来不及处理的时候。它想到:那就先存起来!不过它自己却不提供存储任务的机制。所以这个时候celery需要一个中间人,可以将所有任务统一保存在中间人这里,然后依次处理。这个中间人,官网上给出了两个:RabbitMQ 或者 Redis。本教程选择Redis。
【本教程基于Debian python3.5.1】
所以上述栗子,我将使用celery来优化:
官网上已经给出了使用celery的方法,可以照搬:
1 第一步 配置celery
### flask_celery.py ###
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
app = Flask(__name__)
from celery import Celery
from celery import platforms #如果你不是linux的root用户,这两行没必要
platforms.C_FORCE_ROOT=True #允许root权限运行celery
def make_celery(app):
celery = Celery('flask_celery', #此处官网使用app.import_name,因为这里将所有代码写在同一个文件flask_celery.py,所以直接写名字。
broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND']
)
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379/0',
CELERY_RESULT_BACKEND='redis://localhost:6379/1'
)
celery = make_celery(app)
|
这样子,app、celery、redis三者便建立起了一只通道。
2 第二步 传递耗时任务
app此时可以通过一个装饰器来标记耗时任务,所以这首先要求将耗时任务写到一个函数里面,所幸我已经做到了:
1
2
3
4
5
6
|
@celery.task()
def long_time_def():
for _ in range(10000):
for j in range(10000):
i = 1
return 'hello'
|
@celery.task()这只装饰器好像在说:这函数是celery的任务,可以交给celery。
直接调用long_time_def()就可以将任务传递给celery?当然不是,你需要这么调用函数:
1
|
long_time_def.apply_async()
|
apply_async()这只函数才是将任务提交给celery的动作。所以路由‘/’就便成了这样子:
1
2
3
4
5
|
@app.route("/")
def hello():
print ("耗时的任务")
result=long_time_def.apply_async() #变化在这里
return '耗时的任务已经交给了celery'
|
ok,至此,代码工作就完成了,但先别急着启动服务,因为还差一步。
3 第三步 运行celery
在终端里输入:
1
|
celery worker -l INFO -A flask_celery.celery
|
-l INFO 可以详细输出任务信息。
默认情况下,celery会开启4个线程来处理任务,想要更多的任务线程,可以通过参数 -c 来指定,例如:
1
|
celery worker -l INFO -c 100 -A flask_celery.celery
|
这样,便开启了100个任务线程。
4 第四步 运行
运行flask_celery.py,可以验证一下是不是耗时的任务就酱紫提交给celery处理了!
总结:
这样实现的异步,我个人的评价是:与其号称“异步”,不如就跟随官网的说法,叫“将耗时任务交给celery后台”。理性地对待,就会发现其最大的局限性:函数long_time_def()是有一个返回值“hello”的,将函数交给celery后,flask立即返回了值给前端,至于函数的返回结果“hello”,flask无法获取到,因为此时request已经结束了。
有人说,使用task的wait()函数,获取celery执行玩任务之后的返回值,然后再response。形如:
1
2
3
|
resp=result.wait() #获取celery任务的返回值,这里是‘hello’
return resp
# return ‘耗时的任务已经交给了celery
|
可行,只是有一个问题:wait()会保存阻塞,直到该任务执行完,那使用celery还有什么意义啊!不还是会阻塞?
所以你说:粑粑,我想要实现这么一个异步请求,它有一个很耗时的操作(比如数据库读取,网址访问等),但人家希望flask在它处理的这段时间里,不会被阻塞掉!而且,重点是,那个耗时操作的返回结果很重要,我想将其返回给相应的request,而不是丢弃掉!说白了,就是让阻塞式的同步框架flask去做一个异步框架该做的事情!
仅凭flask,我想自信地说,做不到做不到不到到到到……
本篇文章跟之前的所有文章一样,没有彩蛋!
很长的字幕……
flask:再给我celery我也做不到不到到……但是……
gevent打了一个冷战:放心吧猴子,我会永远保护你……