Celery是由Python開發的一個簡單、靈活、可靠的處理大量任務的分發系統,它不僅支持實時處理也支持任務調度。
- user:用戶程序,用於告知celery去執行一個任務。
- broker: 存放任務(依賴RabbitMQ或Redis,進行存儲)
- worker:執行任務
celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(測試中) 充當broker來進行消息的接收,並且也支持多個broker和worker來實現高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html

Celery version 4.0 runs on Python ❨2.7, 3.4, 3.5❩ PyPy ❨5.4, 5.5❩ This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required. If you’re running an older version of Python, you need to be running an older version of Celery: Python 2.6: Celery series 3.1 or earlier. Python 2.5: Celery series 3.0 or earlier. Python 2.4 was Celery series 2.2 or earlier. Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
環境准備:
- 安裝rabbitMQ或Redis
見:http://www.cnblogs.com/wupeiqi/articles/5132791.html - 安裝celery
pip3 install celery
快速上手

import time from celery import Celery app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379') @app.task def xxxxxx(x, y): time.sleep(10) return x + y

#!/usr/bin/env python # -*- coding:utf-8 -*- from s1 import xxxxxx # 立即告知celery去執行xxxxxx任務,並傳入兩個參數 result = xxxxxx.delay(4, 4) print(result.id)

from celery.result import AsyncResult from s1 import app async = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app) if async.successful(): result = async.get() print(result) # result.forget() # 將結果刪除 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常后正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
執行 s1.py 創建worker(終端執行命令):
注釋: 要在項目目錄里執行《 在windows是不支持這個命令得 要安裝 pip3 install eventle》在執行得時候
celery worker -A s1 -l info -P eventlet # 在windows 下執行的命令
celery worker -A s1 -l info
執行 s2.py ,創建一個任務並獲取任務ID:
python3 s2.py
執行 s3.py ,檢查任務狀態並獲取結果:
python3 s3.py
多任務結構
pro_cel ├── celery_tasks# celery相關文件夾 │ ├── celery.py # celery連接和配置相關文件 │ └── tasks.py # 所有任務函數 ├── check_result.py # 檢查結果 └── send_task.py # 觸發任務

#!/usr/bin/env python # -*- coding:utf-8 -*- from celery import Celery celery = Celery('xxxxxx', broker='redis://192.168.0.111:6379', backend='redis://192.168.0.111:6379', include=['celery_tasks.tasks']) # 時區 celery.conf.timezone = 'Asia/Shanghai' # 是否使用UTC celery.conf.enable_utc = False

#!/usr/bin/env python # -*- coding:utf-8 -*- import time from .celery import celery @celery.task def xxxxx(*args, **kwargs): time.sleep(5) return "任務結果" @celery.task def hhhhhh(*args, **kwargs): time.sleep(5) return "任務結果"

#!/usr/bin/env python # -*- coding:utf-8 -*- from celery.result import AsyncResult from celery_tasks.celery import celery async = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery) if async.successful(): result = async.get() print(result) # result.forget() # 將結果刪除 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常后正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')

#!/usr/bin/env python # -*- coding:utf-8 -*- import celery_tasks.tasks # 立即告知celery去執行xxxxxx任務,並傳入兩個參數 result = celery_tasks.tasks.xxxxx.delay(4, 4) print(result.id)
更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html
定時任務
1. 設定時間讓celery執行一個任務
import datetime from celery_tasks.tasks import xxxxx """ from datetime import datetime v1 = datetime(2017, 4, 11, 3, 0, 0) print(v1) v2 = datetime.utcfromtimestamp(v1.timestamp()) print(v2) """ ctime = datetime.datetime.now() utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp()) s10 = datetime.timedelta(seconds=10) ctime_x = utc_ctime + s10 # 使用apply_async並設定時間 result = xxxxx.apply_async(args=[1, 3], eta=ctime_x) print(result.id)
2. 類似於contab的定時任務
""" celery beat -A proj celery worker -A proj -l info """ from celery import Celery from celery.schedules import crontab app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['proj.s1', ]) app.conf.timezone = 'Asia/Shanghai' app.conf.enable_utc = False app.conf.beat_schedule = { # 'add-every-10-seconds': { # 'task': 'proj.s1.add1', # 'schedule': 10.0, # 'args': (16, 16) # }, 'add-every-12-seconds': { 'task': 'proj.s1.add1', 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), 'args': (16, 16) }, }
注:如果想要定時執行類似於crontab的任務,需要定制 Scheduler來完成。
Flask中應用Celery
pro_flask_celery/ ├── app.py ├── celery_tasks ├── celery.py # 必須得有一個 celery.py的文件 這里放連接 └── tasks.py

#!/usr/bin/env python # -*- coding:utf-8 -*- from flask import Flask from celery.result import AsyncResult from celery_tasks import tasks from celery_tasks.celery import celery app = Flask(__name__) TASK_ID = None @app.route('/') def index(): global TASK_ID result = tasks.xxxxx.delay() # result = tasks.task.apply_async(args=[1, 3], eta=datetime(2018, 5, 19, 1, 24, 0)) TASK_ID = result.id return "任務已經提交" @app.route('/result') def result(): global TASK_ID result = AsyncResult(id=TASK_ID, app=celery) if result.ready(): return result.get() return "xxxx" if __name__ == '__main__': app.run()

#!/usr/bin/env python # -*- coding:utf-8 -*- from celery import Celery from celery.schedules import crontab celery = Celery('xxxxxx', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379', include=['celery_tasks.tasks']) # 時區 celery.conf.timezone = 'Asia/Shanghai' # 是否使用UTC celery.conf.enable_utc = False

#!/usr/bin/env python # -*- coding:utf-8 -*- import time from .celery import celery @celery.task def hello(*args, **kwargs): print('執行hello') return "hello" @celery.task def xxxxx(*args, **kwargs): print('執行xxxxx') return "xxxxx" @celery.task def hhhhhh(*args, **kwargs): time.sleep(5) return "任務結果"
春生Flask中應用Celery

from flask import Flask,render_template,request,redirect import time from celery_tasks import tasks from celery.result import AsyncResult from celery_tasks.celery import cel app = Flask(__name__) GOODS = [ # {'title':'商品名稱','pirce':100,'ticket':'7ec48f84-7160-4c1d-bb78-9c9327f7a978'} ] @app.route('/index') def index(): return render_template('index.html',goods = GOODS) @app.route('/add',methods=['GET','POST']) def add(): if request.method == "GET": return render_template('add.html',goods = GOODS) title = request.form.get('title') price = request.form.get('price') # 處理業務邏輯 # 耗時 1分鍾 # 立即交給broker去執行 result = tasks.x1.delay(1,8) # 去觸發 函數 result.id 拿到一個 字符串憑證 # 10s之后,broker才開始執行 import datetime # 可以 t = "2018-8-8" ctime = datetime.datetime.now() # 獲取當前時間 utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp()) # 當前時間轉換成UTC時間 ctime_x = utc_ctime + datetime.timedelta(seconds=10) # 時間 utc時間 seconds=10 ----就是當前時間的10秒后執行 result = tasks.x1.apply_async(args=[1, 8], eta=ctime_x) # apply_async 是 GOODS.append({'title':title,'price':price,'ticket':result.id}) return redirect('/index') @app.route('/detail') def detail(): ticket = request.args.get('ticket') result = AsyncResult(id=ticket, app=cel) if result.successful(): val = result.get() return "執行完成,結果:%s" %val else: return '正在處理中...' if __name__ == '__main__': app.run()

from celery import Celery from celery.schedules import crontab cel = Celery('tasks', # 是一個名字 broker='redis://:beta@140.143.227.206:8888/0', # 放任務 backend='redis://:beta@140.143.227.206:8888/0', # 取結果 include=['celery_tasks.tasks','celery_tasks.xxx'] ) # 如果需要 每天都要執行的 任務之前 要 執行這個 celery beat -A celery_tasks cel.conf.beat_schedule = { # 'add-every-10-seconds': { # 'task': 'celery_tasks.tasks.x2', # 找到 那個函數 # 'args': (98, 10), # 給 x2 傳參數 # 'schedule': 10.0, # 每10秒執行下這個任務 # }, 'add-every-12-seconds': { 'task': 'celery_tasks.tasks.x2', 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # month_of_year = 月 day_of_month = 日 hour = 時 minute = 分 'args': (26, 16) }, }

import time from .celery import cel from celery import shared_task @shared_task def x1(x, y): time.sleep(10) return x + y @cel.task def x2(x, y): time.sleep(5) return x - y @cel.task def x3(x, y): time.sleep(2) return x * y