使用celery之深入celery配置(轉)


原文:http://www.dongwm.com/archives/shi-yong-celeryzhi-shen-ru-celerypei-zhi/

前言

celery的官方文檔其實相對還是寫的很不錯的.但是在一些深層次的使用上面卻顯得雜亂甚至就沒有某些方面的介紹, 通過我的一個測試環境的settings.py來說明一些使用celery的技巧和解決辦法

amqp交換類型

其實一共有4種交換類型,還有默認類型和自定義類型. 但是對我們配置隊列只會用到其中之三,我來一個個說明,英語好的話可以直接去看英文文檔

首先思考一下流程:

  1. celerybeat生成任務消息,然后發送消息到一個exchange(交換機)

  2. 交換機決定那個(些)隊列會接收這個消息,這個其實就是根據下面的exchange的類型和綁定到這個交換機所用的bindingkey

我們這里要說的其實就是怎么樣決定第二步誰接收的問題

  1. Direct Exchange

如其名,直接交換,也就是指定一個消息被那個隊列接收, 這個消息被celerybeat定義個一個routing key,如果你發送給交換機並且那個隊列綁定的bindingkey 那么就會直接轉給這個隊列

  1. Topic Exchange

你設想一下這樣的環境(我舉例個小型的應該用場景): 你有三個隊列和三個消息, A消息可能希望被X,Y處理,B消息你希望被,X,Z處理,C消息你希望被Y,Z處理.並且這個不是隊列的不同而是消息希望被相關的隊列都去執行,看一張圖可能更好理解:

對,Topic可以根據同類的屬性進程通配, 你只需要routing key有’.’分割:比如上圖中的usa.news, usa.weather, europe.news, europe.weather

  1. Fanout Exchange

先想一下廣播的概念, 在設想你有某個任務,相當耗費時間,但是卻要求很高的實時性,那么你可以需要多台服務器的多個workers一起工作,每個服務器負擔其中的一部分,但是celerybeat只會生成一個任務,被某個worker取走就沒了, 所以你需要讓每個服務器的隊列都要收到這個消息.這里很需要注意的是:你的fanout類型的消息在生成的時候為多份,每個隊列一份,而不是一個消息發送給單一隊列的次數

我的settings.py

這里只是相關於celery的部分:

import djcelery
djcelery.setup_loader()

INSTALLED_APPS = (
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.sites',
    #'django.contrib.staticfiles',
    'django.contrib.messages',
    # Uncomment the next line to enable the admin:
    'django.contrib.admin',
    'django.contrib.staticfiles',
    # Uncomment the next line to enable admin documentation:
    # 'django.contrib.admindocs',
    'dongwm.smhome',
    'dongwm.apply',
    'djcelery', # 這里增加了djcelery 也就是為了在django admin里面可一直接配置和查看celery
    'django_extensions',
    'djsupervisor',
    'django.contrib.humanize',
    'django_jenkins'
)

BROKER_URL = 'amqp://username:password@localhost:5672/yourvhost'

CELERY_IMPORTS = (
    'dongwm.smhome.tasks',
    'dongwm.smdata.tasks',
)

CELERY_RESULT_BACKEND = "amqp" # 官網優化的地方也推薦使用c的librabbitmq
CELERY_TASK_RESULT_EXPIRES = 1200 # celery任務執行結果的超時時間,我的任務都不需要返回結果,只需要正確執行就行
CELERYD_CONCURRENCY = 50 # celery worker的並發數 也是命令行-c指定的數目,事實上實踐發現並不是worker也多越好,保證任務不堆積,加上一定新增任務的預留就可以
CELERYD_PREFETCH_MULTIPLIER = 4 # celery worker 每次去rabbitmq取任務的數量,我這里預取了4個慢慢執行,因為任務有長有短沒有預取太多
CELERYD_MAX_TASKS_PER_CHILD = 40 # 每個worker執行了多少任務就會死掉,我建議數量可以大一些,比如200
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 這是使用了django-celery默認的數據庫調度模型,任務執行周期都被存在你指定的orm數據庫中
CELERY_DEFAULT_QUEUE = "default_dongwm" # 默認的隊列,如果一個消息不符合其他的隊列就會放在默認隊列里面

CELERY_QUEUES = {
    "default_dongwm": { # 這是上面指定的默認隊列
        "exchange": "default_dongwm",
        "exchange_type": "direct",
        "routing_key": "default_dongwm"
    },
    "topicqueue": { # 這是一個topic隊列 凡是topictest開頭的routing key都會被放到這個隊列
        "routing_key": "topictest.#",
        "exchange": "topic_exchange",
        "exchange_type": "topic",
    },
    "test2": { # test和test2是2個fanout隊列,注意他們的exchange相同
        "exchange": "broadcast_tasks",
        "exchange_type": "fanout",
        "binding_key": "broadcast_tasks",
    },
    "test": {
        "exchange": "broadcast_tasks",
        "exchange_type": "fanout",
        "binding_key": "broadcast_tasks2",
    },
}

class MyRouter(object):

    def route_for_task(self, task, args=None, kwargs=None):

        if task.startswith('topictest'):
            return {
                'queue': 'topicqueue',
            }
        # 我的dongwm.tasks文件里面有2個任務都是test開頭
        elif task.startswith('dongwm.tasks.test'):
            return {
                "exchange": "broadcast_tasks",
            }
        # 剩下的其實就會被放到默認隊列
        else:
            return None

# CELERY_ROUTES本來也可以用一個大的含有多個字典的字典,但是不如直接對它做一個名稱統配
CELERY_ROUTES = (MyRouter(), )

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM