提交一個耗時的請求, 立即返回ID,
拿着這個ID去查詢任務處理進程
自己想到的方案:
前端請求接口a,(接口a立即開始處理任務,把任務的處理到哪一步放在redis中)
前端請求接口b,(接口b立即返回redis中進行到哪一步)
前端先調用接口a(此時接口a阻塞)
前端繼續輪詢接口b獲取a進行到哪一步了
當接口a成功返回結果,停止輪詢
Alex講的方案
前端請求接口a,接口立馬生成一個任務id,然后啟動一個進程執行這個任務,然后把id返回給前端(為什么不是線程?如果啟動子線程執行任務,主線程同樣會卡主,一樣阻塞,如果是守護線程,主線程執行完,守護線程也結束了,所以這兩種線程都不行),
把任務執行過程寫在redis/數據庫/文件里 whatever,
前端可以請求接口b,接口b拿着這個任務id去redis里找到結果並返回就OK了。(假設存在redis里)
這個方案看似沒有問題,實際上有個坑,就是不能支持高並發,為什么呢?
Alex的原話是:你提供的這個任務管理可能有多個用戶都用它,對不對?比如一個用戶,執行100台並發,20個用戶同時,每個人執行100台,就是2000個並發,你覺得你的這個任務服務器撐得住嗎?肯定不行的,是不是?
你現在是直接在任務服務器上調的腳本(調腳本就視為一個耗時的任務),隨着並發的用戶越多,機器會越來越慢,第21個用戶可能都打不開網站了。這就是個坑
一開始就不該把任務執行的模塊,放到任務分發的服務器上,
一開始就該分清楚,誰負責任務分發,誰負責任務執行。
意思是:一台服務器a分發任務(任務按先后順序放在隊列里),后面四台服務器b,c,d,e從a上從隊列里拿任務去執行
Celery工作原理:
自己的領悟:首先由五台機器,view是1台機器,后面有4台worker,拿Excel表入庫的任務來說
用戶將Excel文件發給view,view要將文件放入隊列中,4台worker從隊列中拿excel文件,那么必須要在4台worker上啟動某個程序
拿到文件解析入庫,成功后將信息寫到隊列中(是不是也可以在執行的過程中將執行的進度寫入隊列里?),可以和excel文件的隊列一樣,也可以不一樣。
Celery 扮演生產者和消費者的角色,brokers 就是生產者和消費者存放/拿取產品的地方(隊列)
但是`放產品`這個動作還是需要客戶端來做,例如:tasks.add.delay(1,2) 或者使用beat: celery -A proj beat -l info
celery在windows上不好使。要用linux,
啟動一台linux服務器,IP:192.168.1.10
`pip3 install celery`
`yum install redis`
定義tasks.py:
from celery import Celery
import time
# worker上能執行的任務是提前定義好的,這個任務就是用app來定義,app包括哪些任務呢,就是@app.task,broker是生產者隊列,backend是結果隊列
app = Celery('tasks', broker='redis://localhost', backend='redis://localhost')
@app.task
def add(x, y):
time.sleep(5)
return x + y
`redis-server`
`celery -A tasks worker-l debug` # tasks是tasks.py省略擴展名,worker是角色,-l日志級別
重新啟動一個終端:
[root@localhost ~]# python3
Python 3.6.8 (default, Apr 2 2020, 13:34:55)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import tasks
>>> tasks.add(3,4)
7
>>> t= tasks.add.delay(3,4)
>>> t # 返回的是AsyncResult對象
<AsyncResult: 4ffaa775-0823-470b-b245-f3465dd921f8>
>>> t.ready()
False
>>> t.ready()
False
>>> t.ready()
True
>>> t.get()
7
>>>t.get(timeout=1) # 給任務設置等待時間,提高用戶體驗
>>> t.get() #如果任務出錯
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 228, in get
on_message=on_message,
File "/usr/local/lib/python3.6/site-packages/celery/backends/asynchronous.py", line 202, in wait_for_pending
return result.maybe_throw(callback=callback, propagate=propagate)
File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 333, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 326, in throw
self.on_ready.throw(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/vine/promises.py", line 244, in throw
reraise(type(exc), exc, tb)
File "/usr/local/lib/python3.6/site-packages/vine/five.py", line 195, in reraise
raise value
NameError: name 'aa' is not defined
>>> t.get(propagate=False) # 出錯只取結果
NameError("name 'aa' is not defined",)
>>> tt.traceback # 錯誤結果
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
NameError: name 'tt' is not defined
>>>
celery接收到任務:
啟動一個celery的worker,有5個進程,一個主進程3575 , 和4個工作進程,都是由主進程啟動的。工作進程用來執行任務
工作進程應該可以設置, `celery -A proj worker -l info --concurrency=4 `
所以,執行`celery -A proj worker -l debug`命令2次,啟動2個worker,就是10個celery進程,每個worker5個進程(一個主進程和四個工作進程)
如果在項目中使用Celery?
在項目中使用Celery,之前是配置和任務寫在一起,現在把配置和任務分開
proj
/__init__
.py
/celery
.py
/tasks
.py
/periodic_task
.py #定時任務

from __future__ import absolute_import, unicode_literals #導入安裝的celery(所以上面要寫絕對導入),而不是自己導自己(from .celery) from celery import Celery app = Celery('proj', broker='redis://', backend='redis://', include=['proj.tasks', 'proj.periodic_task']) # 這個celery管理了哪些task文件可以有多個,其中有個定時任務periodic_task # Optional configuration, see the application user guide. # update方法更新配置,也可以直接寫在上面初始化Celery里面 app.conf.update( result_expires=3600, # 任務結果一小時內沒人取就丟棄 ) if __name__ == '__main__': app.start()

from __future__ import absolute_import, unicode_literals from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)

from __future__ import absolute_import, unicode_literals from .celery import app from celery.schedules import crontab #該裝飾器的作用是celery一連上就執行被裝飾的函數 @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # 每10秒執行test('hello') sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # 每30秒執行test('world') sender.add_periodic_task(30.0, test.s('world'), expires=10) # 每周一上午7:30執行 sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), test.s('Happy Mondays!'), ) @app.task def test(arg): print(arg)
worker啟動:`celery -A proj worker -l debug`
發起任務:
from proj import tasks,tasks1 #要cd到proj的上一層目錄
t1 = tasks.add.delay(2,4)
t2 = tasks1.get_rand.delay(1,20)
啟動定時任務調度器beat:
`celery -A proj.periodic_task beat -l debug`
celery單獨啟動一個進程來定時發起這些任務,就是beat, 注意, 這里是發起任務,不是執行,這個進程只會不斷的去檢查你的任務計划, 每發現有任務需要執行了,就發起一個任務調用消息,交給celery worker去執行
上面是通過調用函數添加定時任務,也可以像寫配置文件 一樣的形式添加, 下面是每30s執行的任務
app.conf.beat_schedule = { 'add-every-30-seconds': { 'task': 'proj.tasks.add', 'schedule': 30.0, 'args': (16, 16) }, } app.conf.timezone = 'UTC'
復雜的,例如:每周1的早上7.30執行tasks.add任務
from celery.schedules import crontab app.conf.beat_schedule = { # Executes every Monday morning at 7:30 a.m. 'add-every-monday-morning': { 'task': 'proj.tasks.add', 'schedule': crontab(hour=7, minute=30, day_of_week=1), 'args': (16, 16), }, }
可以在1台服務器上啟動4個worker (后台運行)
[root@localhost ~]# celery multi start w1 -A proj -l debug celery multi v4.4.5 (cliffs) > Starting nodes... > w1@localhost.localdomain: OK [root@localhost ~]# celery multi start w2 -A proj -l debug celery multi v4.4.5 (cliffs) > Starting nodes... > w2@localhost.localdomain: OK [root@localhost ~]# celery multi start w3 -A proj -l debug celery multi v4.4.5 (cliffs) > Starting nodes... > w3@localhost.localdomain: OK [root@localhost ~]# celery multi start w4 -A proj -l debug celery multi v4.4.5 (cliffs) > Starting nodes... > w4@localhost.localdomain: OK [root@localhost ~]#
可以在多台服務器上運行worker,只需要修改redis配置就行。他們都從同一個redis領任務
重啟 w1 w2 :`celery multi restart w1 w2 -A proj`
停止w1 w2:`celery multi stop w1 w2 -A proj`