celery定時任務


 

 

1,celery介紹
Celery 是一個強大的分布式任務隊列,它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現異步任務( async task )和定時任務( crontab )。 異步任務比如是發送郵件、或者文件上傳, 圖像處理等等一些比較耗時的操作 ,定時任務是需要在特定時間執行的任務。它的架構組成如下圖:

 

 



任務隊列
任務隊列是一種跨線程、跨機器工作的一種機制.
任務隊列中包含稱作任務的工作單元。有專門的工作進程持續不斷的監視任務隊列,並從中獲得新的任務並處理.

任務模塊
包含異步任務和定時任務。其中,異步任務通常在業務邏輯中被觸發並發往任務隊列,而定時任務由 Celery Beat 進程周期性地將任務發往任務隊列。

消息中間件 Broker
Broker ,即為任務調度隊列,接收任務生產者發來的消息(即任務),將任務存入隊列。 Celery 本身不提供隊列服務,官方推薦使用 RabbitMQ 和 Redis 等。

任務執行單元 Worker
Worker 是執行任務的處理單元,它實時監控消息隊列,獲取隊列中調度的任務,並執行它。

任務結果存儲 Backend
Backend 用於存儲任務的執行結果,以供查詢。同消息中間件一樣,存儲也可使用 RabbitMQ, Redis 和 MongoDB 等。


2.1 使用Celery實現異步任務

a. 創建Celery實例
b. 啟動Celery Worker,通過delay()或者apply_async()將任務發布到broker
c. 應用程序調用異步任務
d. 存儲結果
Celery Beat: 任務調度器,Beat進程會讀取配置文件的內容,周期性的將配置中到期需要執行的任務發送給任務隊列

2.2 使用Celery定時任務

a. 創建Celery實例
b. 配置文件中配置任務,發送任務celery -A xxx beat
c. 啟動Celery Worker celery -A xxx worker -l info -P eventlet
d. 存儲結果

3. 代碼實現

3.1 test1.py

from .. import app
import time

def test11():
    time.sleep(1)
    print('test11')

def test22():
    time.sleep(2)
    print('test22')
    test11()
@app.task
def test1_run():
  test11()
test22()

3.2 test2.py

from .. import app
import time

def test33():
    time.sleep(3)
    print('test33')

def test44():
    time.sleep(4)
    print('test44')
    test33()

@app.task
def test2_run():
    test33()
    test44() 

3.3 celery_task.__init__.py

# 拒絕隱式引入,如果celery.py和celery模塊名字一樣,避免沖突,需要加上這條語句
# 該代碼中,名字是不一樣的,最好也要不一樣
from __future__ import absolute_import
from celery import Celery

app = Celery('tasks')
app.config_from_object('celery_task.celeryconfig') 

3.4 celeryconfig.py

from __future__ import absolute_import
from celery.schedules import crontab
from datetime import timedelta

# 使用redis存儲任務隊列
broker_url = 'redis://127.0.0.1:6379/7'
# 使用redis存儲結果
result_backend = 'redis://127.0.0.1:6379/8'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
# 時區設置
timezone = 'Asia/Shanghai'
# celery默認開啟自己的日志
# False表示不關閉
worker_hijack_root_logger = False
# 存儲結果過期時間,過期后自動刪除
# 單位為秒
result_expires = 60 * 60 * 24

# 導入任務所在文件
imports = [
    'celery_task.app_scripts.test1',
    'celery_task.app_scripts.test2'
]

# 需要執行任務的配置
beat_schedule = {
    'test1': {
        # 具體需要執行的函數
        # 該函數必須要使用@app.task裝飾
        'task': 'celery_task.app_scripts.test1.test1_run',
        # 定時時間
        # 每分鍾執行一次,不能為小數
        'schedule': crontab(minute='*/1'),
        # 或者這么寫,每小時執行一次
        # "schedule": crontab(minute=0, hour="*/1")
        # 執行的函數需要的參數
        'args': ()
    },
    'test2': {
        'task': 'celery_task.app_scripts.test2.test2_run',
        # 設置定時的時間,10秒一次
        'schedule': timedelta(seconds=10),
        'args': ()
    }
}

 

如果大寫的話,需要寫成:
 1 CELERYBEAT_SCHEDULE = {
 2     'celery_app.task.task1': {
 3         'task': 'celery_app.task.task1',
 4         'schedule': timedelta(seconds=20),
 5         'args': (1, 10)
 6     },
 7     'celery_app.task.task2': {
 8         'task': 'celery_app.task.task2',
 9         'schedule': crontab(minute='*/2'),
10         'args': ()
11     }
12 }

 

4. 執行定時任務

4.1 發布任務

在celery_task同級目錄下,執行命令:
celery -A celery_task beat

4.2 執行任務

在celery_task同級目錄下,執行命令:
celery -A celery_task worker -l info -P eventlet
可以看到輸出:

[2018-09-07 16:54:57,809: WARNING/MainProcess] test33 [2018-09-07 16:55:00,002: INFO/MainProcess] Received task: celery_task.app_scrip ts.test1.test1_run[0134cb52-29a3-4f57-890e-9730feac19e7] [2018-09-07 16:55:01,069: WARNING/MainProcess] test11 [2018-09-07 16:55:01,821: WARNING/MainProcess] test44 [2018-09-07 16:55:03,083: WARNING/MainProcess] test22 [2018-09-07 16:55:04,234: WARNING/MainProcess] test11 

如果同時在<b>兩個虛擬環境(服務器)</b>中都執行定時任務,都可以看到有以上LOG打印。

4.3 celery相關命令

發布任務
celery -A celery_task beat
執行任務
celery -A celery_task worker -l info -P eventlet
將以上兩條合並
celery -B -A celery_task worker
后台啟動celery worker進程
celery multi start work_1 -A appcelery
停止worker進程,如果無法停止,加上-A
celery multi stop WORKNAME
重啟worker進程
celery multi restart WORKNAME
查看進程數
celery status -A celery_task

4.4 定時方式

from celery.schedules import crontab from datetime import timedelta # 1 每10秒鍾執行一次 'schedule':timedelta(seconds=30) # 2 每分鍾執行一次 'schedule':crontab(minute='*/1') 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM