Celery 是一個 基於python開發的分布式異步消息任務隊列,通過它可以輕松的實現任務的異步處理, 如果你的業務場景中需要用到異步任務,就可以考慮使用celery, 舉幾個實例場景中可用的例子:
- 你想對100台機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿着這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情。
- 你想做一個定時任務,比如每天檢測一下你們所有客戶的資料,如果發現今天 是客戶的生日,就給他發個短信祝福
Celery 在執行任務時需要通過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 一般使用rabbitMQ or Redis,后面會講
1.1 Celery有以下優點:
- 簡單:一單熟悉了celery的工作流程后,配置和使用還是比較簡單的
- 高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務
- 快速:一個單進程的celery每分鍾可處理上百萬個任務
- 靈活: 幾乎celery的各個組件都可以被擴展及自定制
Celery基本工作流程圖
1.2 Celery安裝使用
Celery的默認broker是RabbitMQ, 僅需配置一行就可以
broker_url = 'amqp://guest:guest@localhost:5672//'
rabbitMQ 沒裝的話請裝一下,安裝看這里 http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#id3
使用Redis做broker也可以
配置
Configuration is easy, just configure the location of your Redis database:
app.conf.broker_url = 'redis://localhost:6379/0'
Where the URL is in the format of:
redis://:password@hostname:port/db_number
all fields after the scheme are optional, and will default to localhost
on port 6379, using database 0.
如果想獲取每個任務的執行結果,還需要配置一下把任務結果存在哪
If you also want to store the state and return values of tasks in Redis, you should configure these settings:
app.conf.result_backend = 'redis://localhost:6379/0'
celery
1. 基本使用

""" celery worker -A tasks -l info """ import pytz from celery import Celery app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672') @app.task def add(x, y): print("running...", x, y) import time time.sleep(10) return x + y

#!/usr/bin/env python # -*- coding:utf-8 -*- from s1 import add """ # 執行任務 result = add.delay(4, 4) print(result) # 檢查任務是否已經完成 print(result.ready()) # 獲取任務結果:可以設置timeout超時 v = result.get() print(v) """ result = add.delay(4, 4) from celery.result import AsyncResult print(result,type(result)) """ from celery.task.control import revoke revoke(id, terminate=True) """

#!/usr/bin/env python # -*- coding:utf-8 -*- from celery import Celery from celery.result import AsyncResult app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672') result = AsyncResult(id="f380db43-8998-4fb0-b3a4-cd1cbd49f14e", app=app) print(result.get())

#!/usr/bin/env python # -*- coding:utf-8 -*- from datetime import datetime from s1 import add from celery.result import AsyncResult # result = add.apply_async(args=[1, 3], eta=datetime(2018, 4, 11, 2, 32, 0)) """ from datetime import datetime v1 = datetime(2017, 4, 11, 3, 0, 0) print(v1) v2 = datetime.utcfromtimestamp(v1.timestamp()) print(v2) """ result = add.apply_async(args=[1, 3], eta=datetime(2018, 4, 11, 3, 0, 0)) print(type(result)) # result.revoke() print(result.get())
2. 多文件
part2
├── __pycache__
├── s2.py
└── tasks
├── __init__.py
├── celery.py
└── s1.py

#!/usr/bin/env python # -*- coding:utf-8 -*- from celery import Celery # app = Celery('tasks', broker='redis://192.168.0.100:6379/0', backend='redis://192.168.0.100:6379/0') app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['tasks.s1']) app.conf.update( result_expires=3600, )

#!/usr/bin/env python # -*- coding:utf-8 -*- import time from .celery import app @app.task def add1(x, y): time.sleep(1) return x + y @app.task def add2(x, y): time.sleep(2) return x + y @app.task def add3(x, y): time.sleep(3) return x + y

#!/usr/bin/env python # -*- coding:utf-8 -*- from tasks.s1 import add1 result = add1.delay(4, 4) from celery.result import AsyncResult print(result,type(result))
celery worker -A tasks celery multi start n1 -A task celery multi stop n1 -A task celery multi stopwait n1 -A task
3. 定時任務
a. 函數版本

#!/usr/bin/env python # -*- coding:utf-8 -*- """ 啟動定制任務: celery beat -A s1 需要依賴celerybeat-schedule.db,所以要對文件夾有寫的權限 或 celery -A periodic_task beat -s /home/celery/var/run/celerybeat-schedule 執行: celery worker -A s1 """ from celery import Celery from celery.schedules import crontab app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672') @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # 每10s執行一次:test('hello') sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # 每30s執行一次:test('world') sender.add_periodic_task(30.0, test.s('world'), expires=10) # 每天早上7:30執行一次:test('Happy Mondays!') sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), test.s('Happy Mondays!'), ) # 每周3,5的3,7,20點 每12分鍾執行一次:test('Happy Mondays!') sender.add_periodic_task( crontab( minute=12, hour="3,7,20", day_of_week='thu,fri', day_of_month="*", day_of_year='*', ), test.s('11111'), ) # 每周3,5的3,7,20點 每12分鍾執行一次:test('Happy Mondays!') sender.add_periodic_task( crontab( minute=25, hour=7, day_of_month=11, month_of_year=4, ), test.s('11111'), ) @app.task def test(arg): print(arg)
b. 配置版本
proj/
├── celery.py
└── s1.py

#!/usr/bin/env python # -*- coding:utf-8 -*- """ celery beat -A proj celery worker -A proj -l info """ from celery import Celery from celery.schedules import crontab app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['proj.s1', ]) app.conf.timezone = 'Asia/Shanghai' app.conf.enable_utc = False app.conf.beat_schedule = { # 'add-every-10-seconds': { # 'task': 'proj.s1.add1', # 'schedule': 10.0, # 'args': (16, 16) # }, 'add-every-12-seconds': { 'task': 'proj.s1.add1', 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), 'args': (16, 16) }, }

#!/usr/bin/env python # -*- coding:utf-8 -*- import time from .celery import app @app.task def add1(x, y): import datetime print(datetime.datetime.now()) return x + y @app.task def add2(x, y): return x + y @app.task def add3(x, y): return x + y
flask celery應用
proj
├── app.py
└── celery_tasks
└── tasks.py

#!/usr/bin/env python # -*- coding:utf-8 -*- """ celery worker -A app.celery -l info python3 app.py """ from flask import Flask from celery import Celery from celery.result import AsyncResult app = Flask(__name__) celery = Celery('xxxxxx', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['celery_tasks.tasks']) TASK_ID = None from celery_tasks import tasks @app.route('/') def index(): global TASK_ID result = tasks.task.delay() # result = tasks.task.apply_async(args=[1, 3], eta=datetime(2018, 4, 11, 1, 24, 0)) TASK_ID = result.id return "xxxx" @app.route('/ready') def ready(): global TASK_ID result = AsyncResult(id=TASK_ID, app=celery) return str(result.ready()) @app.route('/result') def result(): global TASK_ID result = AsyncResult(id=TASK_ID, app=celery) if result.ready(): return result.get() return "xxxx" if __name__ == '__main__': app.run()

#!/usr/bin/env python # -*- coding:utf-8 -*- from app import celery @celery.task def task(*args, **kwargs): import time time.sleep(5) print('..........') return "任務結果"