確保任務不重疊解決方法:
from celery import task
from celery.five import monotonic
from celery.utils.log import get_task_logger
from contextlib import contextmanager
from django.core.cache import cache
from hashlib import md5
from djangofeeds.models import Feed
logger = get_task_logger(__name__)
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
@contextmanager
def memcache_lock(lock_id, oid):
timeout_at = monotonic() + LOCK_EXPIRE - 3
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, LOCK_EXPIRE)
try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
@task(bind=True)
def import_feed(self, feed_url):
# The cache key consists of the task name and the MD5 digest
# of the feed URL.
feed_url_hexdigest = md5(feed_url).hexdigest()
lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
logger.debug('Importing feed: %s', feed_url)
with memcache_lock(lock_id, self.app.oid) as acquired:
if acquired:
return Feed.objects.import_feed(feed_url).url
logger.debug(
'Feed %s is already being imported by another worker', feed_url)
celery 特性:
Celery 是一個簡單、靈活且可靠的,處理大量消息的分布式系統,並且提供維護這樣一個系統的必需工具。由於在工作的平台中用到Celery系統(用於發送郵件、發送短信、發送上線等任務),記錄一下學習的知識。
使用rabbitmq做celery的broker和redis做celery的broker的特性
使用RabbitMQ作為Celery Broker的優點:
Highly customizable routing(高度定制路由)
Persistent queues(一致性隊列)
使用redis作為celery brocker的優點:
high speed due to in memory datastore(速度極快的內存數據庫)
can double up as both key-value datastore and job queue(可以保證key-value 數據存儲及job序列)
celery-安裝
pip3 install celery(4.0版本celery beat不支持熱加載)
celery-flower監控安裝
pip3 install flower
django celery 安裝
pip3 install django-celery
celery 原理介紹
某個方法的消息請求celery執行,首先celery根據綁定的規則把任務消息放到制定的路由隊列中去,此隊列對應的worker節點取出執行。
說明:
為什么要定義多個worker?每個worker都會新建一個進程,充分利用服務器資源,提高執行效率。
同一個服務器可以啟動多個worker節點?可以,啟動參數里面寫上不同的–hostname即可。
celery默認會創建一個celery任務隊列,沒有任何綁定的任務將會發送到此消息隊列中。
celery 多woker實驗
celery加redis的多節點配置實例,由於資源限制只找了兩台機器做測試
10.10.42.33 10.10.190.234
我們把redis服務放在10.10.190.234那台服務器上
我們把flower服務也啟動在10.10.42.33那台服務器上
代碼中定義的隊列有queue_add、queue_sum (還有個默認隊列celery)
33、234服務器用於啟動worker節點
33服務器上啟動處理celery和queue_add隊列的worker節點
234服務器上啟動處理celery和queue_sum隊列的worker節點
配置文件展示
celeryconfig配置文件:
cat celeryconfig.py
#!/usr/bin/python
#coding:utf-8
from kombu import Queue
CELERY_TIMEZONE = 'Asia/Shanghai'
####################################
# 一般配置 #
####################################
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
# List of modules to import when celery starts.
CELERY_IMPORTS = ('tasks', )
CELERYD_MAX_TASKS_PER_CHILD = 40 # 每個worker執行了多少任務就會死掉
BROKER_POOL_LIMIT = 10 #默認celery與broker連接池連接數
CELERY_DEFAULT_QUEUE='default'
CELERY_DEFAULT_ROUTING_KEY='task.default'
CELERY_RESULT_BACKEND='redis://:fafafa@10.10.190.234:6379/0'
BROKER_URL='redis://:fafafa@10.19.190.234:6379/0'
#默認隊列
CELERY_DEFAULT_QUEUE = 'celery' #定義默認隊列
CELERY_DEFAULT_ROUTING_KEY = 'celery' #定義默認路由
CELERYD_LOG_FILE="./logs/celery.log"
CELERY_QUEUES = (
Queue("queue_add", routing_key='queue_add'),
Queue('queue_reduce', routing_key='queue_sum'),
Queue('celery', routing_key='celery'),
)
CELERY_ROUTES = {
'task.add':{'queue':'queue_add', 'routing_key':'queue_add'},
'task.reduce':{'queue':'queue_reduce', 'routing_key':'queue_sum'},
}
查看任務配置文件:
cat task.py
import os
import sys
import datetime
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from celery import Celery
from celery import chain, group, chord, Task
from celeryservice import celeryconfig
app = Celery()
app.config_from_object(celeryconfig)
__all__ = ['add', 'reduce','sum_all', 'other']
####################################
# task定義 #
####################################
@app.task
def add(x, y):
return x + y
@app.task
def reduce(x, y):
return x - y
@app.task
def sum(values):
return sum([int(value) for value in values])
@app.task
def other(x, y):
return x * y
flower任務配置文件
cat flower.py
#!/usr/bin/env python #coding:utf-8 broker_api = 'redis://:afafafafa@10.10.190.234:6379/0' logging = 'DEBUG' address = '0.0.0.0' port = 5555 basic_auth = ['zero:zero'] #外部訪問密碼 persistent=True #持久化celery tasks(如果為false的話,重啟flower之后,監控的task就消失了) db="./flower/flower_db"
啟動服務:
在33上啟動服務
celery worker -A task --loglevel=info --queues=celery,queue_add --hostname=celery_worker33 >/dev/null 2>&1 &
在234上啟動服務
celery worker -A task --loglevel=info --queues=celery,queue_add --hostname=celery_worker33 >/dev/null 2>&1 &
服務驗證:
在任一台有celeryservice項目代碼的服務器上,運行add、reduce、sum、other任務(測試可簡單使用add.delay(1,2)等)
add只會在33上運行,
sum任務,可能會在33或234服務器的worker節點運行
reduce任務,只會在234上運行。
other任務可能會在33或者234上運行。
關於使用過程中的優化
使用celery的錯誤處理機制
如下內容來自於網站,還沒實踐,存檔用。
大多數任務並沒有使用錯誤處理,如果任務失敗,那就失敗了。在一些情況下這很不錯,但是作者見到的多數失敗任務都是去調用第三方API然后出現了網絡錯誤,
或者資源不可用這些錯誤,而對於這些錯誤,最簡單的方式就是重試一下,也許就是第三方API臨時服務或者網絡出現問題,沒准馬上就好了,那么為什么不試着加個重試測試一下呢?
@app.task(bind=True, default_retry_delay=300, max_retries=5)
def my_task_A():
try:
print("doing stuff here...")
except SomeNetworkException as e:
print("maybe do some clenup here....")
self.retry(e)
定時任務遇到的問題:

通過flower 查看 跑多線程報錯, 需要減少線程數.
celery配置文件的一些詳細解釋:
# -*- coding:utf-8 -*-
from datetime import timedelta
from settings import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_DB_NUM
# 某個程序中出現的隊列,在broker中不存在,則立刻創建它
CELERY_CREATE_MISSING_QUEUES = True
CELERY_IMPORTS = ("async_task.tasks", "async_task.notify")
# 使用redis 作為任務隊列
BROKER_URL = 'redis://:' + REDIS_PASSWORD + '@' + REDIS_HOST + ':' + str(REDIS_PORT) + '/' + str(REDIS_DB_NUM)
#CELERY_RESULT_BACKEND = 'redis://:' + REDIS_PASSWORD + '@' + REDIS_HOST + ':' + str(REDIS_PORT) + '/10'
CELERYD_CONCURRENCY = 20 # 並發worker數
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYD_FORCE_EXECV = True # 非常重要,有些情況下可以防止死鎖
CELERYD_PREFETCH_MULTIPLIER = 1
CELERYD_MAX_TASKS_PER_CHILD = 100 # 每個worker最多執行萬100個任務就會被銷毀,可防止內存泄露
# CELERYD_TASK_TIME_LIMIT = 60 # 單個任務的運行時間不超過此值,否則會被SIGKILL 信號殺死
# BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 90}
# 任務發出后,經過一段時間還未收到acknowledge , 就將任務重新交給其他worker執行
CELERY_DISABLE_RATE_LIMITS = True
# 定時任務
CELERYBEAT_SCHEDULE = {
'msg_notify': {
'task': 'async_task.notify.msg_notify',
'schedule': timedelta(seconds=10),
#'args': (redis_db),
'options' : {'queue':'my_period_task'}
},
'report_result': {
'task': 'async_task.tasks.report_result',
'schedule': timedelta(seconds=10),
#'args': (redis_db),
'options' : {'queue':'my_period_task'}
},
#'report_retry': {
# 'task': 'async_task.tasks.report_retry',
# 'schedule': timedelta(seconds=60),
# 'options' : {'queue':'my_period_task'}
#},
}
################################################
# 啟動worker的命令
# *** 定時器 ***
# nohup celery beat -s /var/log/boas/celerybeat-schedule --logfile=/var/log/boas/celerybeat.log -l info &
# *** worker ***
# nohup celery worker -f /var/log/boas/boas_celery.log -l INFO &
celery整體架構圖:

