Celery消息隊列,Celery單任務異步提交使用,Celery多任務異步提交使用,Celery執行定時任務


簡介

Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統
專注於實時處理的異步任務隊列,同時也支持任務調度

架構圖

......

Celery使用場景

異步任務:將耗時操作任務提交給Celery去異步執行,比如發送短信/郵件、消息推送、音視頻處理等等
定時任務:定時執行某件事情,比如每天數據統計

安裝

pip install eventlet
pip install celery
消息中間件:RabbitMQ/Redis
app=Celery('任務名',backend='xxx',broker='xxx')

Celery單任務異步提交使用

 

創建一個任意項目

在項目里創建一個 celery_app_task.py 文件 (相當於雇工人干活)

import celery
import time

# 消息隊列存儲在redis中,連接redis
# broker='redis://127.0.0.1:6379/2' 不加密碼
# backend = 'redis://:123456@127.0.0.1:6379/1' 有密碼連接方式

backend = 'redis://27.0.0.1:6379/1' # 消息存儲redis文件1
broker = 'redis://127.0.0.1:6379/2' # 結果存儲redis文件2

# 第一個參數,給celery命個名,可以任意命名
cel = celery.Celery('test', backend=backend, broker=broker)

# 往下寫任務(發郵件等一系列任務)
@cel.task   # 生成任務,以后需要提交的任務寫這里
def add(x, y):
    return x + y

創建一個 add_task.py 文件

from Django項目.day32.celerytest.celery_app_task import add

result = add.delay(4, 5)    # celery異步提交任務到消息中間件里
print(result.id)

創建一個 result.py 文件 (查看結果)

from celery.result import AsyncResult
from Django項目.day32.celerytest.celery_app_task import cel

async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel) # 注意 id 參數的值為add_task.py執行的結果

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常后正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

celery提交單任務異步執行順序

1. 先執行   add.py 文件  
2. 在執行   linux系統下:celery worker -A celery_app_task -l info
           windows系統下:celery worker -A celery_app_task -l info -P eventlet
3. 最后執行 result.py  # 查看結果

Celery多任務異步提交使用

多任務結構圖

 

pro_cel
    ├── celery_task# celery相關文件夾
    │   ├── celery.py   # celery連接和配置相關文件,必須叫這個名字
    │   └── tasks1.py    #  所有任務函數
    │   └── tasks2.py    #  所有任務函數
    ├── check_result.py # 檢查結果
    └── send_task.py    # 觸發任務

按照結構圖創建文件夾celery_task

在celery_task里面創建 celery.py(必須取名為celery) 文件,(雇多個工人)子路徑

import celery
import time

# 消息隊列存儲在redis中,連接redis
# broker='redis://127.0.0.1:6379/2' 不加密碼
# backend = 'redis://:123456@127.0.0.1:6379/1' 有密碼連接方式

# 消息存儲redis文件1
backend = 'redis://127.0.0.1:6379/1'

# 結果存儲redis文件2
broker = 'redis://127.0.0.1:6379/2'

# 第一個參數,給celery命個名,可以任意命名
cel = celery.Celery('test', backend=backend, broker=broker,   
                    include=['celery_task.task1', 'celery_task.task2'])   # 異步執行多個任務(雇了多個工人)


# 處理時區問題
# 時區
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

雇佣工作工人,linux系統下:celery worker -A celery_app_task -l info
雇佣工作工人,windows系統下:celery worker -A celery_app_task -l info -P eventlet

創建task1.py文件,子路徑

from celery_task.celery import cel

@cel.task  # 生成task1的任務
def add(x, y):
    return x + y

創建task2.py文件,子路經

from celery_task.celery import cel

@cel.task  # 生成task2的任務
def add(x, y):
    return x - y

創建send_task.py文件(Celery的根路徑下),添加到消息中間件里

from celery_task import task1,task2

result1 = task1.add.delay(4, 5)    # celery異步提普通交任務1到消息中間件里
print(result1.id)

result2 = task2.add.delay(4, 5)  # celery異步提交普通任務2到消息中間件里
print(result2.id)

創建check_result.py文件(Celery的根路徑下),拿到任務執行結果

from celery.result import AsyncResult
from celery_task.celery import cel

async = AsyncResult(id="cbe2cb11-259a-4877-850d-8e736dd76eaf", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常后正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

celery提交多任務異步執行順序

1.先執行   send_task.py 文件   # 一次性提交多個任務 2.在執行   linux系統下:celery worker -A celery_app_task -l info
          windows系統下:celery worker -A celery_app_task -l info -P eventlet
3.最后執行 check_result.py  # 查看結果  

 Celery執行定時任務

Celery架構圖

創建send_task.py文件,跟路徑,提交定時任務

from celery_task import task1, task2
from datetime import datetime

# 方法一
# 提交定時任務,設置定時任務執行時間
# v1 = datetime(2019, 4, 20, 16, 50, 50) # 參數為年月日時分秒
# print(v1)# 轉成utc時間
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = task1.add.apply_async(args=[1, 3], eta=v2)  # 提交定時任務
# print(result.id)


# 方法二(推薦)
ctime = datetime.now()
# 默認用utc時間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10) # seconds往后延遲多少秒的時間片段 
task_time = utc_ctime + time_delay # 使用apply_async並設定時間,

result = task1.add.apply_async(args=[4, 3], eta=task_time) # 提交定時任務,eta指定執行時間
print(result.id)

創建celery_task文件夾

在celery_task文件夾里,創建celery.py文件,子路徑,雇佣處理定時任務的工人

from celery import Celery
import time
from datetime import timedelta
# from celery.schedules import crontab


# 消息隊列存儲在redis中,連接redis
# broker='redis://127.0.0.1:6379/2' 不加密碼
# backend = 'redis://:123456@127.0.0.1:6379/1' 有密碼連接方式

backend = 'redis://127.0.0.1:6379/1'  # 消息存儲redis文件1
broker = 'redis://127.0.0.1:6379/2'  # 結果存儲redis文件2

# 第一個參數,給celery命個名,可以任意命名
cel = Celery('test', backend=backend, broker=broker,
# 包含以下兩個任務文件,去相應的py文件中找任務,對多個任務做分類
include=['celery_task.tasks1','celery_task.tasks2'])

# 處理時區問題 # 時區 cel.conf.timezone = 'Asia/Shanghai' # 是否使用UTC cel.conf.enable_utc = False

# 雇佣工人幫我提交任務到消息隊列里 cel.conf.beat_schedule = { # 名字隨意命名 'add-every-10-seconds': { # 執行tasks1下的test_celery函數 'task': 'celery_task.task1.add', # 每隔2秒執行一次 # 'schedule': 1.0, # 'schedule': crontab(minute="*/1"), # 每分鍾執行一次,也可以每小時等 'schedule': timedelta(seconds=2), 'args': ('test',) # 給執行函數傳遞參數 }   
    
   # 每年每月等提交任務到消息隊列里
# 'add-every-12-seconds': { # 'task': 'celery_task.tasks1.test_celery', # # 每年4月11號,8點42分執行 # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 每年4月11日8點42分提交任務到消息隊列里 # 'schedule': crontab(minute=42, hour=8), # 每天8點42分提交任務到消息隊列里 # 'args': (16, 16) # }, }

雇佣提交定時任務到消息隊列里命令:celery beat -A celery_task -l info
雇佣工作工人,
linux系統下:celery worker -A celery_app_task -l info
雇佣工作工人windows系統下:celery worker -A celery_app_task -l info -P eventlet

創建task2.py文件,子路經,生成任務2

from celery_task.celery import cel

@cel.task  # 生成task2的任務
def add(x, y):
    return x + y

創建task1.py文件,子路經,生成任務1

from celery_task.celery import cel

@cel.task  # 生成task1的任務
def add(x, y):
    return x - y

創建check_result.py文件,跟路徑,查看定時任務執行結果

from celery.result import AsyncResult
from celery_task.celery import cel

async = AsyncResult(id="e6742e1f-a6a4-43b8-915a-a221e33db756", app=cel)  # id為提交任務時給我的id,send_task.py里的result if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常后正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM