celery


Celery 是一個 基於python開發的分布式異步消息任務隊列,通過它可以輕松的實現任務的異步處理, 如果你的業務場景中需要用到異步任務,就可以考慮使用celery, 舉幾個實例場景中可用的例子:

  1. 你想對100台機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿着這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情。 
  2. 你想做一個定時任務,比如每天檢測一下你們所有客戶的資料,如果發現今天 是客戶的生日,就給他發個短信祝福

 

Celery 在執行任務時需要通過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 一般使用rabbitMQ or Redis,后面會講

1.1 Celery有以下優點:

  1. 簡單:一單熟悉了celery的工作流程后,配置和使用還是比較簡單的
  2. 高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務
  3. 快速:一個單進程的celery每分鍾可處理上百萬個任務
  4. 靈活: 幾乎celery的各個組件都可以被擴展及自定制

Celery基本工作流程圖

 

 

1.2 Celery安裝使用

Celery的默認broker是RabbitMQ, 僅需配置一行就可以

broker_url = 'amqp://guest:guest@localhost:5672//'

rabbitMQ 沒裝的話請裝一下,安裝看這里  http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#id3

 

使用Redis做broker也可以

配置

Configuration is easy, just configure the location of your Redis database:

app.conf.broker_url = 'redis://localhost:6379/0' 

Where the URL is in the format of:

redis://:password@hostname:port/db_number

all fields after the scheme are optional, and will default to localhost on port 6379, using database 0.

 

如果想獲取每個任務的執行結果,還需要配置一下把任務結果存在哪

If you also want to store the state and return values of tasks in Redis, you should configure these settings:

app.conf.result_backend = 'redis://localhost:6379/0'

celery

1. 基本使用

"""
celery worker  -A tasks -l info

"""
import pytz
from celery import Celery

app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672')

@app.task
def add(x, y):
    print("running...", x, y)
    import time
    time.sleep(10)
    return x + y
s1.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from s1 import add

"""
# 執行任務
result = add.delay(4, 4)
print(result)

# 檢查任務是否已經完成
print(result.ready())

# 獲取任務結果:可以設置timeout超時
v = result.get()
print(v)
"""

result = add.delay(4, 4)
from celery.result import AsyncResult
print(result,type(result))

"""
from celery.task.control import revoke
revoke(id, terminate=True)
"""
s2.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery
from celery.result import AsyncResult

app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672')

result = AsyncResult(id="f380db43-8998-4fb0-b3a4-cd1cbd49f14e", app=app)
print(result.get())
s3.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from datetime import datetime
from s1 import add
from celery.result import AsyncResult

# result = add.apply_async(args=[1, 3], eta=datetime(2018, 4, 11, 2, 32, 0))

"""
from datetime import datetime

v1 = datetime(2017, 4, 11, 3, 0, 0)
print(v1)

v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)

"""
result = add.apply_async(args=[1, 3], eta=datetime(2018, 4, 11, 3, 0, 0))
print(type(result))
# result.revoke()
print(result.get())
s4.py

2. 多文件

part2
├── __pycache__
├── s2.py
└── tasks
    ├── __init__.py
    ├── celery.py
    └── s1.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery

# app = Celery('tasks', broker='redis://192.168.0.100:6379/0', backend='redis://192.168.0.100:6379/0')
app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['tasks.s1'])

app.conf.update(
    result_expires=3600,
)
celery.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from .celery import app


@app.task
def add1(x, y):
    time.sleep(1)
    return x + y


@app.task
def add2(x, y):
    time.sleep(2)
    return x + y


@app.task
def add3(x, y):
    time.sleep(3)
    return x + y
s1.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from tasks.s1 import add1


result = add1.delay(4, 4)
from celery.result import AsyncResult
print(result,type(result))
s2.py
celery worker -A tasks

celery multi start n1 -A task
celery multi stop  n1 -A task
celery multi stopwait  n1 -A task

3. 定時任務

a. 函數版本

#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
啟動定制任務:
    celery beat -A s1
    需要依賴celerybeat-schedule.db,所以要對文件夾有寫的權限
    或 celery -A periodic_task beat -s /home/celery/var/run/celerybeat-schedule

執行:
    celery worker -A s1


"""
from celery import Celery
from celery.schedules import crontab

app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672')


@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每10s執行一次:test('hello')
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # 每30s執行一次:test('world')
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # 每天早上7:30執行一次:test('Happy Mondays!')
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

    # 每周3,5的3,7,20點 每12分鍾執行一次:test('Happy Mondays!')
    sender.add_periodic_task(
        crontab(
            minute=12, hour="3,7,20", day_of_week='thu,fri', day_of_month="*", day_of_year='*',
        ),
        test.s('11111'),
    )

    # 每周3,5的3,7,20點 每12分鍾執行一次:test('Happy Mondays!')
    sender.add_periodic_task(
        crontab(
            minute=25, hour=7, day_of_month=11, month_of_year=4,
        ),
        test.s('11111'),
    )


@app.task
def test(arg):
    print(arg)
函數版本

b. 配置版本

proj/
├── celery.py
└── s1.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
celery beat -A proj
celery worker -A proj -l info

"""
from celery import Celery
from celery.schedules import crontab

app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['proj.s1', ])
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False

app.conf.beat_schedule = {
    # 'add-every-10-seconds': {
    #     'task': 'proj.s1.add1',
    #     'schedule': 10.0,
    #     'args': (16, 16)
    # },
    'add-every-12-seconds': {
        'task': 'proj.s1.add1',
        'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
        'args': (16, 16)
    },
}
celery.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from .celery import app


@app.task
def add1(x, y):
    import datetime
    print(datetime.datetime.now())
    return x + y


@app.task
def add2(x, y):
    return x + y


@app.task
def add3(x, y):
    return x + y
s1.py

flask celery應用

proj
├── app.py
└── celery_tasks
    └── tasks.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
celery worker -A app.celery -l info
python3 app.py

"""
from flask import Flask
from celery import Celery
from celery.result import AsyncResult

app = Flask(__name__)

celery = Celery('xxxxxx', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672',
                include=['celery_tasks.tasks'])

TASK_ID = None

from celery_tasks import tasks


@app.route('/')
def index():
    global TASK_ID
    result = tasks.task.delay()
    # result = tasks.task.apply_async(args=[1, 3], eta=datetime(2018, 4, 11, 1, 24, 0))
    TASK_ID = result.id

    return "xxxx"


@app.route('/ready')
def ready():
    global TASK_ID
    result = AsyncResult(id=TASK_ID, app=celery)
    return str(result.ready())


@app.route('/result')
def result():
    global TASK_ID
    result = AsyncResult(id=TASK_ID, app=celery)
    if result.ready():
        return result.get()
    return "xxxx"


if __name__ == '__main__':
    app.run()
app.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from app import celery


@celery.task
def task(*args, **kwargs):
    import time
    time.sleep(5)
    print('..........')
    return "任務結果"
task.py

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM