python celery 多work多隊列


1.Celery模塊調用

既然celery是一個分布式的任務調度模塊,那么celery是如何和分布式掛鈎呢,celery可以支持多台不通的計算機執行不同的任務或者相同的任務。

如果要說celery的分布式應用的話,就要提到celery的消息路由機制,AMQP協議。具體的可以查看AMQP的文檔。簡單地說就是可以有多個消息隊列(Message Queue),不同的消息可以指定發送給不同的Message Queue,而這是通過Exchange來實現的。發送消息到Message Queue中時,可以指定routiing_key,Exchange通過routing_key來把消息路由(routes)到不同的Message Queue中去。

多worker,多隊列,實例:

1.在服務器上編寫文件tasks.py。首先定義一個Celery的對象,然后通過celeryconfig.py對celery對象進行設置。之后又分別定義了三個task,分別是taskA, taskB和add。

 

#!/usr/bin/env
#-*-conding:utf-8-*-
from celery import Celery,platforms
platforms.C_FORCE_ROOT = True

app = Celery()
app.config_from_object("celeryconfig")

@app.task
def tashA(x,y):
	return x*y

@app.task
def taskB(x,y,z):
	return x+y+z

@app.task
def add(x,y):
	return x+y

2.編寫celeryconfig.py文件。

 

#!/usr/bin/env python
#-*- coding:utf-8 -*-
from kombu import Exchange,Queue
from celery import platforms
platforms.C_FORCE_ROOT = True

BROKER_URL = "redis://localhost:6379/7" 
CELERY_RESULT_BACKEND = "redis://localhost:6379/8"

CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B") 
)

CELERY_ROUTES = {
'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"},
'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"}
}

3.啟動worker來指定task

celery -A tasks worker -l info -n workerA.%h -Q for_task_A

celery -A tasks worker -l info -n workerB.%h -Q for_task_B

4.傳入參數

將上面兩個文件導出到pycharm中:

 

編寫文件傳參:

 

from tasks import *
re1 = taskA.delay(100, 200)
re2 = taskB.delay(1,2, 3)print(re3.status)          #查看re3的狀態
print(re3.id)               #查看re3的id

運行之后可見:taskA,taskB都已正常執行。

5.我們可以看到add(re3)的狀態是PENDING,表示沒有執行,這個是因為沒有celeryconfig.py文件中指定改route到哪一個Queue中,所以會被發動到默認的名字celery的Queue中,但是我們還沒有啟動worker執行celery中的任務。下面,我們來啟動一個worker來執行celery隊列中的任務。

celery -A tasks worker -l info -n worker.%h -Q celery 

這樣我們再次運行pycharm就可以看見add也被運行了,並且redis數據庫中也有該id了。

2.Celery與定時任務

1.在celery中執行定時任務非常簡單,只需要設置celery對象中的CELERYBEAT_SCHEDULE屬性即可。
下面我們接着在celeryconfig.py中添加CELERYBEAT_SCHEDULE變量:

 

CELERY_TIMEZONE = 'UTC'
CELERYBEAT_SCHEDULE = {
    'taskA_schedule' : {
        'task':'tasks.taskA',
        'schedule':20,
        'args':(5,6)
    },
    'taskB_scheduler' : {
        'task':"tasks.taskB",
        "schedule":200,
        "args":(10,20,30)
    },
    'add_schedule': {
        "task":"tasks.add",
        "schedule":10,
        "args":(1,2)
    }
}

2.Celery啟動定時任務

celery -A tasks worker -l info -n workerA.%h -Q for_task_A -B

 

啟動完成后:

taskA每20秒執行一次taskA.delay(5, 6)

taskB每200秒執行一次taskB.delay(10, 20, 30)

Celery每10秒執行一次add.delay(1, 2)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM