Celery分布式任務隊列原理,用法


提交一個耗時的請求, 立即返回ID,
拿着這個ID去查詢任務處理進程

自己想到的方案:
前端請求接口a,(接口a立即開始處理任務,把任務的處理到哪一步放在redis中)
前端請求接口b,(接口b立即返回redis中進行到哪一步)

前端先調用接口a(此時接口a阻塞)
前端繼續輪詢接口b獲取a進行到哪一步了
當接口a成功返回結果,停止輪詢


Alex講的方案

前端請求接口a,接口立馬生成一個任務id,然后啟動一個進程執行這個任務,然后把id返回給前端(為什么不是線程?如果啟動子線程執行任務,主線程同樣會卡主,一樣阻塞,如果是守護線程,主線程執行完,守護線程也結束了,所以這兩種線程都不行),
把任務執行過程寫在redis/數據庫/文件里 whatever,
前端可以請求接口b,接口b拿着這個任務id去redis里找到結果並返回就OK了。(假設存在redis里)

這個方案看似沒有問題,實際上有個坑,就是不能支持高並發,為什么呢?
Alex的原話是:你提供的這個任務管理可能有多個用戶都用它,對不對?比如一個用戶,執行100台並發,20個用戶同時,每個人執行100台,就是2000個並發,你覺得你的這個任務服務器撐得住嗎?肯定不行的,是不是?
你現在是直接在任務服務器上調的腳本(調腳本就視為一個耗時的任務),隨着並發的用戶越多,機器會越來越慢,第21個用戶可能都打不開網站了。這就是個坑


一開始就不該把任務執行的模塊,放到任務分發的服務器上,
一開始就該分清楚,誰負責任務分發,誰負責任務執行。
意思是:一台服務器a分發任務(任務按先后順序放在隊列里),后面四台服務器b,c,d,e從a上從隊列里拿任務去執行

Celery工作原理:

自己的領悟:首先由五台機器,view是1台機器,后面有4台worker,拿Excel表入庫的任務來說

用戶將Excel文件發給view,view要將文件放入隊列中,4台worker從隊列中拿excel文件,那么必須要在4台worker上啟動某個程序

拿到文件解析入庫,成功后將信息寫到隊列中(是不是也可以在執行的過程中將執行的進度寫入隊列里?),可以和excel文件的隊列一樣,也可以不一樣。

Celery 扮演生產者和消費者的角色,brokers 就是生產者和消費者存放/拿取產品的地方(隊列)

但是`放產品`這個動作還是需要客戶端來做,例如:tasks.add.delay(1,2)   或者使用beat: celery -A proj beat -l info

 

celery在windows上不好使。要用linux,

啟動一台linux服務器,IP:192.168.1.10

`pip3 install celery` 

`yum install redis`

定義tasks.py: 

from celery import Celery
import time

# worker上能執行的任務是提前定義好的,這個任務就是用app來定義,app包括哪些任務呢,就是@app.task,broker是生產者隊列,backend是結果隊列

app = Celery('tasks', broker='redis://localhost', backend='redis://localhost')

@app.task
def add(x, y):
time.sleep(5)
return x + y

`redis-server`

`celery -A tasks worker-l debug`    # tasks是tasks.py省略擴展名,worker是角色,-l日志級別

 

 

 重新啟動一個終端:

[root@localhost ~]# python3
Python 3.6.8 (default, Apr 2 2020, 13:34:55)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import tasks
>>> tasks.add(3,4)
7

>>> t= tasks.add.delay(3,4)
>>> t   # 返回的是AsyncResult對象
<AsyncResult: 4ffaa775-0823-470b-b245-f3465dd921f8>

>>> t.ready()
False
>>> t.ready()
False
>>> t.ready()
True
>>> t.get()
7
>>>t.get(timeout=1)    # 給任務設置等待時間,提高用戶體驗

>>> t.get()    #如果任務出錯
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 228, in get
on_message=on_message,
File "/usr/local/lib/python3.6/site-packages/celery/backends/asynchronous.py", line 202, in wait_for_pending
return result.maybe_throw(callback=callback, propagate=propagate)
File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 333, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 326, in throw
self.on_ready.throw(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/vine/promises.py", line 244, in throw
reraise(type(exc), exc, tb)
File "/usr/local/lib/python3.6/site-packages/vine/five.py", line 195, in reraise
raise value
NameError: name 'aa' is not defined
>>> t.get(propagate=False)  # 出錯只取結果
NameError("name 'aa' is not defined",)
>>> tt.traceback   # 錯誤結果
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
NameError: name 'tt' is not defined
>>>

celery接收到任務:

 

啟動一個celery的worker,有5個進程,一個主進程3575 , 和4個工作進程,都是由主進程啟動的。工作進程用來執行任務

 工作進程應該可以設置, `celery -A proj worker -l info --concurrency=4 `

所以,執行`celery -A proj worker -l debug`命令2次,啟動2個worker,就是10個celery進程,每個worker5個進程(一個主進程和四個工作進程)

 

如果在項目中使用Celery?

在項目中使用Celery,之前是配置和任務寫在一起,現在把配置和任務分開

proj/__init__.py

     /celery .py
     /tasks .py         
               /periodic_task .py   #定時任務
 
from __future__ import absolute_import, unicode_literals
#導入安裝的celery(所以上面要寫絕對導入),而不是自己導自己(from .celery)
from celery import Celery  
 
app = Celery('proj',
             broker='redis://',
             backend='redis://',
             include=['proj.tasks', 'proj.periodic_task'])  # 這個celery管理了哪些task文件可以有多個,其中有個定時任務periodic_task
 
# Optional configuration, see the application user guide.
# update方法更新配置,也可以直接寫在上面初始化Celery里面
app.conf.update(
    result_expires=3600,  # 任務結果一小時內沒人取就丟棄
)
 
if __name__ == '__main__':
    app.start()
celery.py

 

from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)
tasks.py

 

from __future__ import absolute_import, unicode_literals
from .celery import app
from celery.schedules import crontab
 
#該裝飾器的作用是celery一連上就執行被裝飾的函數
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每10秒執行test('hello')
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
 
    # 每30秒執行test('world')
    sender.add_periodic_task(30.0, test.s('world'), expires=10)
 
    # 每周一上午7:30執行
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )
 
@app.task
def test(arg):
    print(arg)
periodic_task.py

 

 

worker啟動:`celery -A proj worker -l debug`

發起任務: 

from proj import  tasks,tasks1    #要cd到proj的上一層目錄

t1 = tasks.add.delay(2,4)

t2 = tasks1.get_rand.delay(1,20)

啟動定時任務調度器beat:

`celery -A proj.periodic_task beat -l debug`

celery單獨啟動一個進程來定時發起這些任務,就是beat, 注意, 這里是發起任務,不是執行,這個進程只會不斷的去檢查你的任務計划, 每發現有任務需要執行了,就發起一個任務調用消息,交給celery worker去執行

 

上面是通過調用函數添加定時任務,也可以像寫配置文件 一樣的形式添加, 下面是每30s執行的任務

app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'proj.tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}
app.conf.timezone = 'UTC'

復雜的,例如:每周1的早上7.30執行tasks.add任務

from celery.schedules import crontab
 
app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'proj.tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

 

可以在1台服務器上啟動4個worker (后台運行)

[root@localhost ~]# celery multi start w1 -A proj -l debug
celery multi v4.4.5 (cliffs)
> Starting nodes...
    > w1@localhost.localdomain: OK
[root@localhost ~]# celery multi start w2 -A proj -l debug
celery multi v4.4.5 (cliffs)
> Starting nodes...
    > w2@localhost.localdomain: OK
[root@localhost ~]# celery multi start w3 -A proj -l debug
celery multi v4.4.5 (cliffs)
> Starting nodes...
    > w3@localhost.localdomain: OK
[root@localhost ~]# celery multi start w4 -A proj -l debug
celery multi v4.4.5 (cliffs)
> Starting nodes...
    > w4@localhost.localdomain: OK
[root@localhost ~]# 

可以在多台服務器上運行worker,只需要修改redis配置就行。他們都從同一個redis領任務

重啟 w1 w2 :`celery multi restart w1 w2 -A proj`

停止w1 w2:`celery multi stop w1 w2 -A proj`

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM