1,celery介紹
Celery 是一個強大的分布式任務隊列,它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現異步任務( async task )和定時任務( crontab )。 異步任務比如是發送郵件、或者文件上傳, 圖像處理等等一些比較耗時的操作 ,定時任務是需要在特定時間執行的任務。它的架構組成如下圖:
任務隊列
任務隊列是一種跨線程、跨機器工作的一種機制.
任務隊列中包含稱作任務的工作單元。有專門的工作進程持續不斷的監視任務隊列,並從中獲得新的任務並處理.
任務模塊
包含異步任務和定時任務。其中,異步任務通常在業務邏輯中被觸發並發往任務隊列,而定時任務由 Celery Beat 進程周期性地將任務發往任務隊列。
消息中間件 Broker
Broker ,即為任務調度隊列,接收任務生產者發來的消息(即任務),將任務存入隊列。 Celery 本身不提供隊列服務,官方推薦使用 RabbitMQ 和 Redis 等。
任務執行單元 Worker
Worker 是執行任務的處理單元,它實時監控消息隊列,獲取隊列中調度的任務,並執行它。
任務結果存儲 Backend
Backend 用於存儲任務的執行結果,以供查詢。同消息中間件一樣,存儲也可使用 RabbitMQ, Redis 和 MongoDB 等。
2.1 使用Celery實現異步任務
a. 創建Celery實例
b. 啟動Celery Worker,通過delay()或者apply_async()將任務發布到broker
c. 應用程序調用異步任務
d. 存儲結果
Celery Beat: 任務調度器,Beat進程會讀取配置文件的內容,周期性的將配置中到期需要執行的任務發送給任務隊列
2.2 使用Celery定時任務
a. 創建Celery實例
b. 配置文件中配置任務,發送任務celery -A xxx beat
c. 啟動Celery Worker celery -A xxx worker -l info -P eventlet
d. 存儲結果
3. 代碼實現
3.1 test1.py
from .. import app
import time
def test11():
time.sleep(1)
print('test11')
def test22():
time.sleep(2)
print('test22')
test11()
@app.task
def test1_run():
test11()
test22()
3.2 test2.py
from .. import app
import time
def test33():
time.sleep(3)
print('test33')
def test44():
time.sleep(4)
print('test44')
test33()
@app.task
def test2_run():
test33()
test44()
3.3 celery_task.__init__.py
# 拒絕隱式引入,如果celery.py和celery模塊名字一樣,避免沖突,需要加上這條語句
# 該代碼中,名字是不一樣的,最好也要不一樣
from __future__ import absolute_import
from celery import Celery
app = Celery('tasks')
app.config_from_object('celery_task.celeryconfig')
3.4 celeryconfig.py
from __future__ import absolute_import
from celery.schedules import crontab
from datetime import timedelta
# 使用redis存儲任務隊列
broker_url = 'redis://127.0.0.1:6379/7'
# 使用redis存儲結果
result_backend = 'redis://127.0.0.1:6379/8'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
# 時區設置
timezone = 'Asia/Shanghai'
# celery默認開啟自己的日志
# False表示不關閉
worker_hijack_root_logger = False
# 存儲結果過期時間,過期后自動刪除
# 單位為秒
result_expires = 60 * 60 * 24
# 導入任務所在文件
imports = [
'celery_task.app_scripts.test1',
'celery_task.app_scripts.test2'
]
# 需要執行任務的配置
beat_schedule = {
'test1': {
# 具體需要執行的函數
# 該函數必須要使用@app.task裝飾
'task': 'celery_task.app_scripts.test1.test1_run',
# 定時時間
# 每分鍾執行一次,不能為小數
'schedule': crontab(minute='*/1'),
# 或者這么寫,每小時執行一次
# "schedule": crontab(minute=0, hour="*/1")
# 執行的函數需要的參數
'args': ()
},
'test2': {
'task': 'celery_task.app_scripts.test2.test2_run',
# 設置定時的時間,10秒一次
'schedule': timedelta(seconds=10),
'args': ()
}
}
如果大寫的話,需要寫成:
1 CELERYBEAT_SCHEDULE = { 2 'celery_app.task.task1': { 3 'task': 'celery_app.task.task1', 4 'schedule': timedelta(seconds=20), 5 'args': (1, 10) 6 }, 7 'celery_app.task.task2': { 8 'task': 'celery_app.task.task2', 9 'schedule': crontab(minute='*/2'), 10 'args': () 11 } 12 }
4. 執行定時任務
4.1 發布任務
在celery_task同級目錄下,執行命令:
celery -A celery_task beat
4.2 執行任務
在celery_task同級目錄下,執行命令:
celery -A celery_task worker -l info -P eventlet
可以看到輸出:
[2018-09-07 16:54:57,809: WARNING/MainProcess] test33 [2018-09-07 16:55:00,002: INFO/MainProcess] Received task: celery_task.app_scrip ts.test1.test1_run[0134cb52-29a3-4f57-890e-9730feac19e7] [2018-09-07 16:55:01,069: WARNING/MainProcess] test11 [2018-09-07 16:55:01,821: WARNING/MainProcess] test44 [2018-09-07 16:55:03,083: WARNING/MainProcess] test22 [2018-09-07 16:55:04,234: WARNING/MainProcess] test11
如果同時在<b>兩個虛擬環境(服務器)</b>中都執行定時任務,都可以看到有以上LOG打印。
4.3 celery相關命令
發布任務
celery -A celery_task beat
執行任務
celery -A celery_task worker -l info -P eventlet
將以上兩條合並
celery -B -A celery_task worker
后台啟動celery worker進程
celery multi start work_1 -A appcelery
停止worker進程,如果無法停止,加上-A
celery multi stop WORKNAME
重啟worker進程
celery multi restart WORKNAME
查看進程數
celery status -A celery_task
4.4 定時方式
from celery.schedules import crontab from datetime import timedelta # 1 每10秒鍾執行一次 'schedule':timedelta(seconds=30) # 2 每分鍾執行一次 'schedule':crontab(minute='*/1')
