Celery分布式任務隊列快速入門


本節內容

1. Celery介紹和基本使用

2. 項目中使用Celery

3. Celery定時任務

4. Celery與Django結合

5. Django中使用計划任務

 

一  Celery介紹和基本使用

 

 

需求場景

1.  對100台命令執行一條批量命令,命令執行需要很長時間,但是不想讓主程序等着結果返回,而是給主程序返回一個任務ID,task_id

主程序過一段時間根據task_id,獲取執行結果即可,再命令執行期間,主程序 可以繼續做其他事情

2.  定時任務,比如每天檢測一下所有的客戶資料,發現是客戶的生日,發個祝福短信

 

解決方案

1.  邏輯view 中啟一個進程

父進程結束,子進程跟着結束,子進程任務沒有完成,不符合需求

父進程結束,等着子進程結束,父進程需等着結果返回,不符合需求

小結:該方案解決不了阻塞問題,即需要等待 

2. 啟動 subprocess,任務托管給操作系統執行

實現task_id,實現異步,解決阻塞

小結:大批量高並發,主服務器會出現問題,解決不了並發

3. celery

celery提供多子節點,解決並發問題

 

celery介紹

celery是一個基於python開發的分布式異步消息隊列,輕松實現任務的異步處理

celery在執行任務時需要一個消息中間件來接收和發送任務消息,以及存儲任務結果,一般使用RabbitMQ 或 Redis

 

celery優點

簡單:熟悉celery的工作流程后,配置使用簡單

高可用:當任務執行失敗或執行過程中發生連接中斷,celery會自動嘗試重新執行任務

快速:一個單進程的celery每分鍾可處理上百萬個任務

靈活:幾乎celery的各個組件都可以被擴展及自定制

 

celery基本工作流程

其中中間隊列用於分配任務以及存儲執行結果

 

celery安裝及使用

1.  安裝python模塊

pip3 install celery
pip3 install redis

2.  安裝redis服務

wget  http://download.redis.io/releases/redis-3.2.8.tar.gz
tar -zxvf redis-3.2.8.tar.gz
cd redis-3.2.8
make

src/redis-server # 啟動redis 服務

3.  創建一個celery application 用來定義任務列表

創建一個任務 tasks.py

from celery import Celery
 
app = Celery('TASK',
             broker='redis://localhost',
             backend='redis://localhost')
 
@app.task
def add(x,y):
    print("running...",x,y)
    return x+y

 4.  啟動celery worker 來開始監聽並執行任務

celery -A tasks worker --loglevel=info

tasks 任務文件名,worker 任務角色,--loglevel=info 任務日志級別

5.  調用任務

打開另外終端,進入命令行模式,調用任務

6.  celery常用接口

  • tasks.add(4,6) ---> 本地執行

  • tasks.add.delay(3,4) --> worker執行

  • t=tasks.add.delay(3,4)  --> t.get()  獲取結果,或卡住,阻塞

  • t.ready()---> False:未執行完,True:已執行完

  • t.get(propagate=False) 拋出簡單異常,但程序不會停止

  • t.traceback 追蹤完整異常

 

補充:如何使用第三方工具

1. 導入第三方包,如 from celery import Celery

2. 實例化第三方類,如 app = Celery(......)

3. 實例化的對象去關聯執行任務的方法,如 @app.task

4. 分區角色  worker 執行任務,broker分配任務

 

二  項目中使用Celery

 

 

1.  項目目錄結構

project
    |-- __init__.py
    |-- celery.py   # 配置文檔 |-- tasks.py    # 任務函數 |-- tasks2.py   # 任務函數

2.  項目文件

project/celery.py

# from celery import Celery 默認當前路徑,更改為絕對路徑(當前路徑有個celery.py文件啦)
from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('project',
             broker='redis://localhost',
             backend='redis://localhost',
             include=['project.tasks','project.tasks2'])  # 配置文件和任務文件分開了,可以寫多個任務文件

# app 擴展配置
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

celery.py作用相當於配置文件

project/tasks.py

from __future__ import absolute_import, unicode_literals
from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

project/tasks.py

from __future__ import absolute_import, unicode_literals
from .celery import app

@app.task
def hello():
    return 'Hello World'

3.  啟動項目worker

celery -A project worker -l info

其中 project 為項目名

另啟終端,與project同目錄進入python3

 

4.  實現分布式 
當啟動多個時 celery -A project worker -l info,去broker去相應任務,實現分布式
 
 
5.  后台啟動woker
celery multi start w1 -A project -l info
celery multi start w2 -A project -l info
celery multi start w3 -A project -l info
 
celery multi restart w1 -A project -l info
celery multi stop w1 w2 w3        # 任務立刻停止
celery multi stopwait w1 w2 w3    # 任務執行完,停止

 

 

 
三  Celery定時任務
 
 
 
celery支持定時任務,設定好任務的執行時間,celery就會定時幫你執行,這個定時任務模塊叫 celery beat
 
項目目錄結構
project
    |-- __init__.py
    |-- celery.py          # 配置文件 |-- periodic_task.py   # 定時任務文件

 

腳本celery.py

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('project',
             broker='redis://localhost',
             backend='redis://localhost',
             include=['project.periodic_task',])

app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

腳本periodic_task.py

from __future__ import absolute_import, unicode_literals
from .celery import app
from celery.schedules import crontab

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每10s調用 test('hello')
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # 每20s調用 test('world')
    sender.add_periodic_task(20.0, test.s('world'), expires=10)

    # 每周一早上7:30 執行 test('Happy Mondays!')
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1), # 可靈活修改
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)
 
啟動角色 worker  執行任務
celery -A project worker -l info
啟動角色 beat 將定時任務放到隊列中
celery -A  project.periodic_task  beat  -l  debug

 

也可以在配置文件celery.py 里添加定時任務

app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'project.tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}
app.conf.timezone = 'UTC'

每周1的早上7.30執行project.tasks.add任務

 

還有更多定時配置方式如下:

Example Meaning
crontab() 每分鍾執行  
crontab(minute=0,hour=0) 每天0點執行
crontab(minute=0,hour='*/3') 每3小時執行: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0, hour='0,3,6,9,12,15,18,21')
同上
crontab(minute='*/15') 每15分鍾執行
crontab(day_of_week='sunday') 周天的每分鍾執行
crontab(minute='*', hour='*',day_of_week='sun')
同上
crontab(minute='*/10', hour='3,17,22',day_of_week='thu,fri')

周三、五,3-4 am, 5-6 pm, and 10-11 pm,每10分鍾執行

crontab(minute=0,hour='*/2,*/3') 每小時/2和每小時/3,執行
crontab(minute=0, hour='*/5') 每小時/5,執行
crontab(minute=0, hour='*/3,8-17') 每小時/3,8am-5pm,執行
crontab(0,0,day_of_month='2') Execute on the second day of every month.
crontab(0,0, day_of_month='2-30/3')
Execute on every even numbered day.
crontab(0,0, day_of_month='1-7,15-21')
Execute on the first and third weeks of the month.
crontab(0,0,day_of_month='11', month_of_year='5')
Execute on the eleventh of May every year.
crontab(0,0, month_of_year='*/3')
Execute on the first month of every quarter.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
四  Celery與Django結合
 
 
 
1.  項目目錄結構
LearnCelery
   |-- app1
        |-- tasks.py
        |-- models.py
   |-- app2
        |-- tasks.py
        |-- models.py
   |-- LearnCelery
        |-- __init__.py
        |-- celery.py
        |-- settings.py

2.  腳本代碼

LearnCelery/app/tasks.py   # 必須叫這個名字

from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time

# 所有的app都可以調用
@shared_task
def add(x, y):
    time.sleep(10)
    return x + y

@shared_task
def mul(x, y):
    time.sleep(10)
    return x * y

LearnCelery/LearnCelery/__init__.py

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

LearnCelery/LearnCelery/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# 單獨腳本調用Django內容時,需配置腳本的環境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')

app = Celery('mysite')

#  CELERY_ 作為前綴,在settings中寫配置
app.config_from_object('django.conf:settings', namespace='CELERY')

# 到Django各個app下,自動發現tasks.py 任務腳本
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

LearnCelery/LearnCelery/settings.py

# For celery
CELERY_BROKER_URL = 'redis://localhost'
CELERY_RESULT_BACKEND = 'redis://localhost'

3.  啟動celery

celery -A LearnCelery worker -l debug
4.  urls.py 視圖處理
urlpatterns = [
    url(r'^celery_call/$', views.celery_call),
    url(r'^celery_res/$', views.celery_res),
]

 

五  Django中使用計划任務
 
 
1.  安裝插件
pip3 install django-celery-beat
2.  修改配置 settings.py
INSTALLED_APPS = [
    ....   
    'django_celery_beat',
]

3. 數據庫遷移

python manage.py migrate

4.  啟動 celery beat

celery -A LearnCelery beat -l info -S django

定時任務存到數據庫里,啟動beat定時取任務放到隊列里執行

5.  admin管理

 

 

 

 

啟動celery beat和worker,會發現每隔2秒,beat會發起一個任務消息讓worker執行tasks任務

注意,經測試,每添加或修改一個任務,celery beat都需要重啟一次,要不然新的配置不會被celery beat進程讀到

 
更多信息請參考
http://www.cnblogs.com/alex3714/articles/6351797.html
 
代碼示例請參考
https://github.com/Jonathan1314/LearnCelery 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM