本節內容
一 Celery介紹和基本使用 |
需求場景
1. 對100台命令執行一條批量命令,命令執行需要很長時間,但是不想讓主程序等着結果返回,而是給主程序返回一個任務ID,task_id
主程序過一段時間根據task_id,獲取執行結果即可,再命令執行期間,主程序 可以繼續做其他事情
2. 定時任務,比如每天檢測一下所有的客戶資料,發現是客戶的生日,發個祝福短信
解決方案
1. 邏輯view 中啟一個進程
父進程結束,子進程跟着結束,子進程任務沒有完成,不符合需求
父進程結束,等着子進程結束,父進程需等着結果返回,不符合需求
小結:該方案解決不了阻塞問題,即需要等待
2. 啟動 subprocess,任務托管給操作系統執行
實現task_id,實現異步,解決阻塞
小結:大批量高並發,主服務器會出現問題,解決不了並發
3. celery
celery提供多子節點,解決並發問題
celery介紹
celery是一個基於python開發的分布式異步消息隊列,輕松實現任務的異步處理
celery在執行任務時需要一個消息中間件來接收和發送任務消息,以及存儲任務結果,一般使用RabbitMQ 或 Redis
celery優點
簡單:熟悉celery的工作流程后,配置使用簡單
高可用:當任務執行失敗或執行過程中發生連接中斷,celery會自動嘗試重新執行任務
快速:一個單進程的celery每分鍾可處理上百萬個任務
靈活:幾乎celery的各個組件都可以被擴展及自定制
celery基本工作流程
其中中間隊列用於分配任務以及存儲執行結果
celery安裝及使用
1. 安裝python模塊
pip3 install celery
pip3 install redis
2. 安裝redis服務
wget http://download.redis.io/releases/redis-3.2.8.tar.gz tar -zxvf redis-3.2.8.tar.gz cd redis-3.2.8 make
src/redis-server # 啟動redis 服務
3. 創建一個celery application 用來定義任務列表
創建一個任務 tasks.py
from celery import Celery app = Celery('TASK', broker='redis://localhost', backend='redis://localhost') @app.task def add(x,y): print("running...",x,y) return x+y
4. 啟動celery worker 來開始監聽並執行任務
celery -A tasks worker --loglevel=info
tasks 任務文件名,worker 任務角色,--loglevel=info 任務日志級別
5. 調用任務
打開另外終端,進入命令行模式,調用任務
6. celery常用接口
-
tasks.add(4,6) ---> 本地執行
-
tasks.add.delay(3,4) --> worker執行
-
t=tasks.add.delay(3,4) --> t.get() 獲取結果,或卡住,阻塞
-
t.ready()---> False:未執行完,True:已執行完
-
t.get(propagate=False) 拋出簡單異常,但程序不會停止
-
t.traceback 追蹤完整異常
補充:如何使用第三方工具
1. 導入第三方包,如 from celery import Celery
2. 實例化第三方類,如 app = Celery(......)
3. 實例化的對象去關聯執行任務的方法,如 @app.task
4. 分區角色 worker 執行任務,broker分配任務
二 項目中使用Celery |
1. 項目目錄結構
project |-- __init__.py |-- celery.py # 配置文檔 |-- tasks.py # 任務函數 |-- tasks2.py # 任務函數
2. 項目文件
project/celery.py
# from celery import Celery 默認當前路徑,更改為絕對路徑(當前路徑有個celery.py文件啦) from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('project', broker='redis://localhost', backend='redis://localhost', include=['project.tasks','project.tasks2']) # 配置文件和任務文件分開了,可以寫多個任務文件 # app 擴展配置 app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
celery.py作用相當於配置文件
project/tasks.py
from __future__ import absolute_import, unicode_literals from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y
project/tasks.py
from __future__ import absolute_import, unicode_literals from .celery import app @app.task def hello(): return 'Hello World'
3. 啟動項目worker
celery -A project worker -l info
其中 project 為項目名
另啟終端,與project同目錄進入python3
celery multi start w1 -A project -l info celery multi start w2 -A project -l info celery multi start w3 -A project -l info celery multi restart w1 -A project -l info celery multi stop w1 w2 w3 # 任務立刻停止 celery multi stopwait w1 w2 w3 # 任務執行完,停止

三 Celery定時任務 |
project |-- __init__.py |-- celery.py # 配置文件 |-- periodic_task.py # 定時任務文件
腳本celery.py
from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('project', broker='redis://localhost', backend='redis://localhost', include=['project.periodic_task',]) app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
腳本periodic_task.py
from __future__ import absolute_import, unicode_literals from .celery import app from celery.schedules import crontab @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # 每10s調用 test('hello') sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # 每20s調用 test('world') sender.add_periodic_task(20.0, test.s('world'), expires=10) # 每周一早上7:30 執行 test('Happy Mondays!') sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), # 可靈活修改 test.s('Happy Mondays!'), ) @app.task def test(arg): print(arg)
celery -A project worker -l info
celery -A project.periodic_task beat -l debug
也可以在配置文件celery.py 里添加定時任務
app.conf.beat_schedule = { 'add-every-30-seconds': { 'task': 'project.tasks.add', 'schedule': 30.0, 'args': (16, 16) }, } app.conf.timezone = 'UTC'
每周1的早上7.30執行project.tasks.add任務
還有更多定時配置方式如下:
Example | Meaning |
crontab() |
每分鍾執行 |
crontab(minute=0,hour=0) |
每天0點執行 |
crontab(minute=0,hour='*/3') |
每3小時執行: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm. |
|
同上 |
crontab(minute='*/15') |
每15分鍾執行 |
crontab(day_of_week='sunday') |
周天的每分鍾執行 |
|
同上 |
|
周三、五,3-4 am, 5-6 pm, and 10-11 pm,每10分鍾執行 |
crontab(minute=0,hour='*/2,*/3') |
每小時/2和每小時/3,執行 |
crontab(minute=0, hour='*/5') |
每小時/5,執行 |
crontab(minute=0, hour='*/3,8-17') |
每小時/3,8am-5pm,執行 |
crontab(0,0,day_of_month='2') |
Execute on the second day of every month. |
|
Execute on every even numbered day. |
|
Execute on the first and third weeks of the month. |
|
Execute on the eleventh of May every year. |
|
Execute on the first month of every quarter. |
四 Celery與Django結合 |
LearnCelery |-- app1 |-- tasks.py |-- models.py |-- app2 |-- tasks.py |-- models.py |-- LearnCelery |-- __init__.py |-- celery.py |-- settings.py
2. 腳本代碼
LearnCelery/app/tasks.py # 必須叫這個名字
from __future__ import absolute_import, unicode_literals from celery import shared_task import time # 所有的app都可以調用 @shared_task def add(x, y): time.sleep(10) return x + y @shared_task def mul(x, y): time.sleep(10) return x * y
LearnCelery/LearnCelery/__init__.py
from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ['celery_app']
LearnCelery/LearnCelery/celery.py
from __future__ import absolute_import, unicode_literals import os from celery import Celery # 單獨腳本調用Django內容時,需配置腳本的環境變量 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings') app = Celery('mysite') # CELERY_ 作為前綴,在settings中寫配置 app.config_from_object('django.conf:settings', namespace='CELERY') # 到Django各個app下,自動發現tasks.py 任務腳本 app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
LearnCelery/LearnCelery/settings.py
# For celery CELERY_BROKER_URL = 'redis://localhost' CELERY_RESULT_BACKEND = 'redis://localhost'
3. 啟動celery
celery -A LearnCelery worker -l debug
urlpatterns = [ url(r'^celery_call/$', views.celery_call), url(r'^celery_res/$', views.celery_res), ]
五 Django中使用計划任務 |
pip3 install django-celery-beat
INSTALLED_APPS = [ .... 'django_celery_beat', ]
3. 數據庫遷移
python manage.py migrate
4. 啟動 celery beat
celery -A LearnCelery beat -l info -S django
定時任務存到數據庫里,啟動beat定時取任務放到隊列里執行
5. admin管理

啟動celery beat和worker,會發現每隔2秒,beat會發起一個任務消息讓worker執行tasks任務
注意,經測試,每添加或修改一個任務,celery beat都需要重啟一次,要不然新的配置不會被celery beat進程讀到