Celery配置實踐筆記


說點什么:

整理下工作中配置celery的一些實踐,寫在這里,一方面是備忘,另外一方面是整理成文檔給其他同事使用。

演示用的項目,同時也發布在Github上: https://github.com/blackmatrix7/celery-demo

這份筆記會隨着經驗的積累,逐步調整完善,不過通常情況下,Github上的更新會比較快。


Celery 配置實踐筆記

Celery 配置實踐筆記,目前已記錄:

  • 異步執行任務
  • 為不同任務分配不同的隊列
  • 計划任務

需要補充:

  • 為不同的任務配置不同的優先級
  • Celey任務的返回結果

創建Celery

配置Celery參數

在創建celery實例之前,需要對celery的參數進行一些配置。

在這里列出一些比較常用的Celery配置項:

配置項名稱 說明
CELERY_DEFAULT_QUEUE 默認的隊列名稱,當沒有為task特別指定隊列時,采用此隊列
CELERY_BROKER_URL 消息代理,用於發布者傳遞消息給消費者,推薦RabbitMQ
CELERY_RESULT_BACKEND 后端,用於存儲任務執行結果,推薦redis
CELERY_TASK_SERIALIZER 任務的序列化方式
CELERY_RESULT_SERIALIZER 任務執行結果的序列化方式
CELERY_ACCEPT_CONTENT
CELERYD_CONCURRENCY 任務消費者的並發數
CELERY_TIMEZONE 時區設置,計划任務需要,推薦 Asia/Shanghai
CELERY_QUEUES Celery隊列設定
CELERY_ROUTES Celery路由設定,用於給不同的任務指派不同的隊列
CELERYBEAT_SCHEDULE Celery計划任務設定

為了使用方便,在這里引入了之前寫的config模塊(支持本地配置不提交到github),本身是非必須的,用dict存儲celery配置一樣可以。所以在下面的示例代碼中,會以注釋的形式,演示不使用config模塊實現celery配置。

# 等價於直接定義一個broker,如
# broker = 'amqp://user:password@127.0.0.1:5672//'
broker = current_config.CELERY_BROKER_URL
# 將config模塊的配置轉換成dict,等價於直接通過dict定義一組celery配置
celery_conf = {k: v for k, v in current_config.items()}

創建celery實例

將之前創建的broker和celery_conf傳遞給celery,用於創建celery實例。

manage.py

# 創建celery實例
celery = Celery('demo',  broker=broker)
# 載入celery配置
celery.config_from_object(celery_conf)

定義模擬函數

在項目的handlers下,定義一些用來異步任務或計划任務的調用的函數。

這里定義了兩個py文件,分別是async_tasks.pyschedules.py,前者用於異步任務,后者用於計划任務。

異步任務下有兩個函數,模擬發送郵件和推送消息;計划任務下有數個根據計划任務執行事件定義的模擬函數。

創建celery worker

推薦以celery.start的方式來啟動celery

啟動參數

在創建完celery實例后,調用celery實例的start方法,來啟動celery。

在demo中,在manage.py中創建celery實例。

celery = Celery('demo',  broker=broker)
celery.config_from_object(celery_conf)
# argv中傳入啟動celery所需的參數
celery.start(argv=['celery', 'worker', '-l', 'info', '-f', 'logs/celery.log'])

第一個參數是固定的,用於啟動celery

第二個參數是啟動的celery組件,這里啟動的是worker,用於執行任務

第三個參數和第四個參數為一組,指定日志的級別,這里記錄級別為info的日子

第五個參數和第六個參數為一組,指定日志文件的位置,這里將日志記錄在log/celery.log

指定消費的隊列

在啟動命令中,增加一個-Q的參數,用於指定消費的隊列。

celery.start(argv=['celery', 'worker', '-Q', 'message_queue', '-l', 'info', '-f', 'logs/message.log'])

上述的參數中,'-Q', 'message_queue'兩個參數,是指定這個worker消費名為“message_queue”的隊列。

創建異步任務

對於需要在celery中異步執行的函數,只需要在函數上增加一個裝飾器。

# 導入manage.py中創建的celery實例
from manage import celery

# 需要在celery中異步執行的函數,增加celery.task裝飾器
@celery.task
def async_send_email(send_from, send_to, subject, content):
    """
    忽略方法實現
    """
    pass

異步執行任務

對於以celery.task裝飾的函數,可以同步運行,也可以異步運行。

接上一個例子的async_send_email函數。

同步執行時,直接調用這個函數即可。

async_push_message(send_to=['張三', '李四'], content='老板喊你來背鍋了')
async_push_message(send_to=['劉五', '趙六'], content='老板喊你來領獎金了')

需要異步執行時,調用函數的delay()方法執行,此時會將任務委托給celery后台的worker執行。

async_push_message.delay(send_to=['張三', '李四'], content='老板喊你來背鍋了')
async_push_message.delay(send_to=['劉五', '趙六'], content='老板喊你來領獎金了')

為任務指定不同隊列

其實在之前配置celery參數時,就已經實現為不同任務指定不同隊列的目標。

在這里詳細說明下配置項:

# 導入Queue
from kombu import Queue
# 導入Task所在的模塊,所有使用celery.task裝飾器裝飾過的函數,所需要把所在的模塊導入
# 我們之前創建的幾個測試用函數,都在handlers.async_tasks和handlers.schedules中
# 所以在這里需要導入這兩個模塊,以str表示模塊的位置,模塊組成tuple后賦值給CELERY_IMPORTS
# 這樣Celery在啟動時,會自動找到這些模塊,並導入模塊內的task
CELERY_IMPORTS = ('handlers.async_tasks', 'handlers.schedules')
# 為Celery設定多個隊列,CELERY_QUEUES是個tuple,每個tuple的元素都是由一個Queue的實例組成
# 創建Queue的實例時,傳入name和routing_key,name即隊列名稱
CELERY_QUEUES = (
    Queue(name='email_queue', routing_key='email_router'),
    Queue(name='message_queue', routing_key='message_router'),
    Queue(name='schedules_queue', routing_key='schedules_router'),
)
# 最后,為不同的task指派不同的隊列
# 將所有的task組成dict,key為task的名稱,即task所在的模塊,及函數名
# 如async_send_email所在的模塊為handlers.async_tasks
# 那么task名稱就是handlers.async_tasks.async_send_email
# 每個task的value值也是為dict,設定需要指派的隊列name,及對應的routing_key
# 這里的name和routing_key需要和CELERY_QUEUES設定的完全一致
CELERY_ROUTES = {
    'handlers.async_tasks.async_send_email': {
        'queue': 'email_queue',
        'routing_key': 'vcan.email_router',
    },
    'handlers.async_tasks.async_push_message': {
        'queue': 'message_queue',
        'routing_key': 'message_router',
    },
    'handlers.schedules.every_30_seconds': {
        'queue': 'schedules_queue',
        'routing_key': 'schedules_router',
    }
}

配置完成后,不同的task會根據CELERY_ROUTES的設置,指派到不同的消息隊列。

計划任務

celery除用於異步執行任務外,還可以用於執行計划任務。

設定celery時區

在開始配置計划任務之前對配置文件中設定celery時區。

# 設定 Celery 時區
CELERY_TIMEZONE = 'Asia/Shanghai'

定義計划任務

在配置文件中定義計划任務

計划任務也是異步執行任務的一種方式,所以也需要參考之前的說明,為計划任務分配不同的隊列。

在配置文件中,新增一項CELERYBEAT_SCHEDULE的配置

from datetime import timedelta
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
	# 給計划任務取一個獨一無二的名字吧
    'every-30-seconds': {
    	# task就是需要執行計划任務的函數
         'task': 'handlers.schedules.every_30_seconds',
         # 配置計划任務的執行時間,這里是每30秒執行一次
         'schedule': timedelta(seconds=30),
         # 傳入給計划任務函數的參數
         'args': {'value': '2333333'}
    }
}

CELERYBEAT_SCHEDULE是一個dict,每個key為計划任務的名稱,value也是dict,包含task、schedule、args。

task即需要執行計划任務的函數,這里是handlers.schedules.every_30_seconds,即handlers的schedules的every_30_seconds函數,如需配置其他的函數,依照此規則定義即可。

args是傳遞給計划任務函數的參數,在這個例子中,即傳遞給every_30_seconds的參數,如果無需參數,則args配置為None。

schedule即配置計划任務的執行時間,例子中使用的是timedelta實例,用於實現固定間隔某些時間執行。除此之外,還可以設定某個時間點執行,或重復某個時間點執行。這個就需要用到celery的crontab類。

'push_occupancy_rates': {
            'task': 'handlers.schedules.test_func_b',
            # 每天中午12點執行
            'schedule': crontab(hour='12', minute='0'),
            'args': None
        },

關於crontab更詳細的配置方式,可以參考官方手冊

Example Meaning
crontab() Execute every minute.
crontab(minute=0, hour=0) Execute daily at midnight.
crontab(minute=0, hour='*/3') Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0, hour='0,3,6,9,12,15,18,21') Same as previous.
crontab(minute='*/15') Execute every 15 minutes.
crontab(day_of_week='sunday') Execute every minute (!) at Sundays.
crontab(minute='', hour='', day_of_week='sun') Same as previous.
crontab(minute='*/10', hour='3,17,22', day_of_week='thu,fri') Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays.
crontab(minute=0,hour='/2,/3') Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour='*/5') Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).
crontab(minute=0,hour='*/3,8-17') Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
crontab(0, 0,day_of_month='2') Execute on the second day of every month.
crontab(0, 0, day_of_month='2-30/3') Execute on every even numbered day.
crontab(0, 0, day_of_month='1-7,15-21') Execute on the first and third weeks of the month.
crontab(0, 0,day_of_month='11', month_of_year='5') Execute on the eleventh of May every year.
crontab(0, 0, month_of_year='*/3') Execute on the first month of every quarter.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM