Python: flask使用celery編寫異步任務


flask使用celery編寫異步任務

轉載請以鏈接方式注明出處

flask是一個阻塞式的框架。這里的“阻塞”是指flask處理請求的時候,一次只能處理一個,當多個requests過來,flask會說,大家不要急,一個一個來。如果恰好這時候某個請求耗費了大量的時間(種種原因),在這段時間內,flask說:我忙着呢,后來的排好隊。然后服務進程被占用着,后來的請求得不到處理。這就叫做被阻塞住了。說得再多不如一只栗子:

我的老爺機運行這只函數需要大約8s的時間,可以很好地用來充當“阻塞俠”的角色。如果你的本配置比較好,可以適當增大數字。然后我們這么構建一個flask服務端:

###  flask_celery.py  ###

運行一下,同時訪問‘/’與‘/t’你就能明白,當‘/’未返回結果的時候,‘/t’是沒有響應的。

多進程的方案只能說可以緩解這種“阻塞”,但無法根治,因為進程有限,訪問量不可控,早晚所有進程都被阻塞。所以一個分布式的任務隊列框架被搬上台面:celery。

celery會這么說:flask,你把那些請求起來比較耗時的任務丟給我,我幫你處理,你留着資源去處理下一個請求吧!很仗義的樣子有木有!只是flask說,好啊,不過,你有地方放嗎?因為celery進程也是按照先來后到的原則處理任務,多個任務到來,flask也會來不及處理的時候。它想到:那就先存起來!不過它自己卻不提供存儲任務的機制。所以這個時候celery需要一個中間人,可以將所有任務統一保存在中間人這里,然后依次處理。這個中間人,官網上給出了兩個:RabbitMQ 或者 Redis。本教程選擇Redis。

【本教程基於Debian  python3.5.1】

所以上述栗子,我將使用celery來優化:

官網上已經給出了使用celery的方法,可以照搬:

1 第一步 配置celery

###  flask_celery.py  ###

這樣子,app、celery、redis三者便建立起了一只通道。

2 第二步 傳遞耗時任務

app此時可以通過一個裝飾器來標記耗時任務,所以這首先要求將耗時任務寫到一個函數里面,所幸我已經做到了:

@celery.task()這只裝飾器好像在說:這函數是celery的任務,可以交給celery。

直接調用long_time_def()就可以將任務傳遞給celery?當然不是,你需要這么調用函數:

apply_async()這只函數才是將任務提交給celery的動作。所以路由‘/’就便成了這樣子:

ok,至此,代碼工作就完成了,但先別急着啟動服務,因為還差一步。

3 第三步 運行celery

在終端里輸入:

-l INFO 可以詳細輸出任務信息。

默認情況下,celery會開啟4個線程來處理任務,想要更多的任務線程,可以通過參數 -c 來指定,例如:

這樣,便開啟了100個任務線程。

4 第四步 運行

運行flask_celery.py,可以驗證一下是不是耗時的任務就醬紫提交給celery處理了!

總結:

這樣實現的異步,我個人的評價是:與其號稱“異步”,不如就跟隨官網的說法,叫“將耗時任務交給celery后台”。理性地對待,就會發現其最大的局限性:函數long_time_def()是有一個返回值“hello”的,將函數交給celery后,flask立即返回了值給前端,至於函數的返回結果“hello”,flask無法獲取到,因為此時request已經結束了。

有人說,使用task的wait()函數,獲取celery執行玩任務之后的返回值,然后再response。形如:

可行,只是有一個問題:wait()會保存阻塞,直到該任務執行完,那使用celery還有什么意義啊!不還是會阻塞?

所以你說:粑粑,我想要實現這么一個異步請求,它有一個很耗時的操作(比如數據庫讀取,網址訪問等),但人家希望flask在它處理的這段時間里,不會被阻塞掉!而且,重點是,那個耗時操作的返回結果很重要,我想將其返回給相應的request,而不是丟棄掉!說白了,就是讓阻塞式的同步框架flask去做一個異步框架該做的事情!

僅憑flask,我想自信地說,做不到做不到不到到到到……

有種來打我

本篇文章跟之前的所有文章一樣,沒有彩蛋!

很長的字幕……

flask:再給我celery我也做不到不到到……但是……

gevent打了一個冷戰:放心吧猴子,我會永遠保護你……


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM