Celery框架使用知識點匯總


Celery定義:

Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統

專注於實時處理的異步任務隊列

同時也支持任務調度

Celery原理:

Celery執行過程中redis中key的變化:

執行后可看到 redis 上生成了兩個 key

_kombu.binding.celery:這個不用管(我推測是 celery 服務的某個 id 標記)

celery:表示當前正在隊列中的 task,等待被 worker 所接收

然后啟動一個 worker

$celery worker -A a --loglevel=debug

執行后可看到 celery 這個 key 消失了,同時新增了 2 個 key

celery 消失說明任務已經被剛啟動的 worker 接收了,worker 會自己去執行這個 task,當前沒有等待被接收的任務

_kombu.binding.celery.pidbox:這個也不用管(我推測也是 celery 服務的某個 id 標記)

_kombu.binding.celeryev:這個我推測是當前連接到這個 redis 的 worker 的列表,每一行表示有一個 worker,但有時會多出來幾行,這個我也沒搞明白,總之這個不重要

下面我們試一下延時任務

test_task.apply_async(('==== ttttt2 =====', ), countdown=60)

然后啟動腳本,發起一個60秒后執行任務,並且開啟 celery 准備執行任務

celery worker -A a --loglevel=debug

在 60 秒內查看 redis,可以看到沒有出現 celery 這個 key,但多出了另外兩個 key

unacked:可以理解為這個是被 worker 接收了但是還沒開始執行的 task 列表(因為60秒后才會開始執行)

unacked_index:用戶標記上面 unacked 的任務的 id,理論上行數應該和 unacked 的行數是一樣的

60 秒后再次查看 redis,可以看到 redis 中的 key 又回到了無任務的狀態

這表示被 worker 領取的任務確實在 60 秒后執行了

 

結論,由此可以推測出 celery 和 redis 之間交互的基本原理:

celery 和 redis 之間交互的基本原理:

1、當發起一個 task 時,會向 redis 的 celery key 中插入一條記錄。

2、如果這時有正在待命的空閑 worker,這個 task 會立即被 worker 領取。

3、如果這時沒有空閑的 worker,這個 task 的記錄會保留在 celery key 中。

4、這時會將這個 task 的記錄從 key celery 中移除,並添加相關信息到 unacked 和 unacked_index 中。

5、worker 根據 task 設定的期望執行時間執行任務,如果接到的不是延時任務或者已經超過了期望時間,則立刻執行。

6、worker 開始執行任務時,通知 redis。(如果設置了 CELERY_ACKS_LATE = True 那么會在任務執行結束時再通知)

7、redis 接到通知后,將 unacked 和 unacked_index 中相關記錄移除。

8、如果在接到通知前,worker 中斷了,這時 redis 中的 unacked 和 unacked_index 記錄會重新回到 celery key 中。(這個回寫的操作是由 worker 在 “臨死” 前自己完成的,所以在關閉 worker 時為防止任務丟失,請務必使用正確的方法停止它,如: celery multi stop w1 -A proj1)

9、在 celery key 中的 task 可以再次重復上述 2 以下的流程。

使用場景:

異步任務,將耗時操作提交給Celery去異步執行,比如發送短信/郵件/消息推送。音視頻處理等等

定時任務。類似於crontab,比如每日數據統計

Celery架構:

組成架構圖:

Celery的架構由三部分組成,消息中間件(message broker)、任務執行單元(worker)和 任務執行結果存儲(task result store)組成。

2.1 消息中間件

Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等

2.2 任務執行單元

Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中。

2.3 任務結果存儲

Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP,Redis等

2.4 版本支持情況

Celery version 4.0 runs on

    Python ❨2.7, 3.4, 3.5❩

    PyPy ❨5.4, 5.5❩

This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

If you’re running an older version of Python, you need to be running an older version of Celery:

  Python 2.6: Celery series 3.1 or earlier.

  Python 2.5: Celery series 3.0 or earlier.

  Python 2.4 was Celery series 2.2 or earlier.

Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

功能架構圖:

 

組件介紹

Producer : 任務生產者. 調用 Celery API , 函數或者裝飾器, 而產生任務並交給任務隊列處理的都是任務生產者。

Broker : 消息代理, 隊列本身. 也稱為消息中間件. 接受任務生產者發送過來的任務消息, 存進隊列再按序分發給任務消費方(通常是消息隊列或者數據庫). 通常用 RabbitMQ或者Redis

Celery Beat : 任務調度器. Beat 進程會讀取配置文件的內容, 周期性的將配置中到期需要執行的任務發送給任務隊列.例如如下配置的一個周期性執行的一個任務:

CELERYBEAT_SCHEDULE = {

    'send_mail': {

        'task': 'work.notify.email.send_mail',

        # 'schedule': timedelta(minute=1),

        'schedule': crontab(minute='*/1'),

        'args': ('usr', 'sub', 'msg')

    }
}

 

任務調度主要是為了解決業務場景中定時或周期任務,分別使用timedelta和crontab來定義計划任務,crontab的精度無法精確到秒時可使用timedelta代替,CELERYBEAT_SCHEDULE下可以定義多個計划/周期任務,send_mail為任務名稱,task為任務單元導入名,schedule為具體調度,args為任務單元的參數.

運行時可先啟動work進程池(celery worker -A work.app -l info)然后再啟動beat進程池(celery beat -A work.app -l info),觀察會發現beat進程每分鍾生成一個任務,work進程發現任務后立即執行

Celery Worker : 執行任務的消費者, 通常會在多台服務器運行多個消費者, 提高運行效率.

Result Backend : 任務處理完成之后保存狀態信息和結果, 以供查詢.

Celery安裝配置:

pip install celery

pip install celery[redis]

Celery異步執行任務:

示例:使用任務調度配置,執行celery任務(add, div)

知識點:
  1. 配置CELERYBEAT_SCHEDULE定時任務配置(timedelta,crontab)
  2. 導入指定的任務模塊
  3. bind=True的使用(包括:內部如何調用self.retry方法)
  4. base=MyTask捕獲任務狀態函數使用(包括:添加循環引用時候使用)
代碼如下:
#=======新建proj1項目(python package包)

#=======修改init初始化文件內:

__author__ = "xiaoming"
from celery import Celery

app = Celery('celery-demo1')  ##, include=["proj.tasks"]
app.config_from_object("proj1.celeryconfig")
#=======修改celery配置文件:

from datetime import timedelta
from celery.schedules import crontab

BROKER_URL = "redis://127.0.0.1:6379/1"
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/2"


# 設置時區
CELERY_TIMEZONE = 'Asia/Shanghai'


# 導入指定的任務模塊
CELERY_IMPORTS = (
    'proj1.tasks'
)


CELERYBEAT_SCHEDULE = {

'task1': {     #每隔10秒鍾執行任務
    'task': 'proj1.tasks.add',
        'schedule': timedelta(seconds=10),
    'args': (2, 8)
},
#每天的19:28分執行任務
'task2':{
    'task': 'proj1.tasks.div',
    'schedule':crontab(hour=16, minute=2),
    'args': (4, 5)
}
}

# 使用
# Worker celery worker -A task  -l INFO     啟動worker
# Beat: celery beat -A task -l INFO         啟動beat
# celery -B -A task worker -l INFO         同時啟動beat worker、
# celery worker --help
#=======修改tasks.py文件


from celery import Task
from proj1 import app
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


@app.task(bind=True)
def div(self, x, y):
    logger.info(('Executing task id {0.id}, args: {0.args!r}'
                 'kwargs: {0.kwargs!r}').format(self.request))
    try:
        result = x / y
    except ZeroDivisionError as e:
        raise self.retry(exc=e, countdown=5, max_retries=3)  # 發生 ZeroDivisionError 錯誤時, 每 5s 重試一次, 最多重試 3 次.

    return result


# tasks.py
class MyTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print('task done: {0}'.format(retval))
        # add.apply_async(args)  #添加循環引用時候使用
        return super(MyTask, self).on_success(retval, task_id, args, kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('task fail, reason: {0}'.format(exc))
        return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)


# 正確函數, 執行 MyTask.on_success() :
@app.task(base=MyTask)
def add(x, y):
    return x + y


# # 錯誤函數, 執行 MyTask.on_failure() :
# @app.task  # 普通函數裝飾為 celery task
# def add(x, y):
#     raise KeyError
#     return x + y

 

啟動命令:

Windows啟動

啟動worker : celery worker -A proj1 -l info -P eventlet

啟動beat : celery beat -A proj1 -l INFO

執行效果:

 

 

 

 

示例:基於celery的Task類實現任務的調度執行

知識點:
  1. 通過重寫類中方法來跟蹤業務的狀態信息處理
  2. 定時任務配置,任務隊列設置
  3. Task類中使用的參數
  4. 無用任務如何殺掉任務,如何獲取任務的狀態信息
代碼如下:

Celery_task/__init__.py文件

 

#===============新建項目名稱: celery_task(python package)

#===============項目中初始化內容 init.py文件內容
from celery import Celery

app = Celery('tasks')
app.config_from_object('celery_task.celeryconfig')

 

/celery_task/celeryconfig.py文件

#===============celeryconfig配置文件內容

# from __future__ import absolute_import  #以絕對路徑引用文件
from celery.schedules import crontab
from datetime import timedelta
from kombu import Queue, Exchange


BROKER_URL = "redis://127.0.0.1:6379/1"

CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/2"
# 時區設置
CELERY_TIMEZONE = 'Asia/Shanghai'

CELERY_IMPORTS = (
 'celery_task.my_scheduler',
 'celery_task.tasks'
)

#定義執行任務的隊列
#定義隊列機制,來分別處理不同的task,達到充足的worker去執行
CELERY_QUEUES = (

    Queue('default', exchange=Exchange('default'), routing_key='default'),
    Queue('app_task2', exchange=Exchange('app_task2'), routing_key='app_task2'),
)

#然后定義routes用來決定不同的任務去哪一個queue
CELERY_ROUTES = {
    # 'celery_task.tasks.collectiontask': {'queue': 'default', 'routing_key': 'default'},
    'celery_task.my_scheduler.main_task': {'queue': 'app_task2', 'routing_key': 'app_task2'}
}
PERIOD = 10

CELERYBEAT_SCHEDULE = {
    # 'task1': {
    #     'task': 'my_collect',
    #     'schedule': timedelta(seconds=30),
    #     'options': {
    #         'queue': 'app_task1'
    #     }
    # },
    'task2': {
        'task': 'celery_task.my_scheduler.main_task',
        'schedule': timedelta(seconds=120),
        'options': {
            'queue': 'app_task2'
        }
    }
}

 

/celery_task/my_scheduler.py文件

 

 

#===============my_scheduler.py文件中的內容:

from celery_task import app
import time
from celery_task.tasks import collectiontask
from celery.result import AsyncResult
# from celery.app.control import Control

@app.task()
def main_task():
    a = []
    b = []
    for task in range(1,2):
        # task=1
        print("mian_task run")
        status = collectiontask.apply_async(args=('hello{}'.format(task),), queue='default', retry=True)
        print(f"status.id: {status.id}, status.task_id:{status.task_id}")
        a.append(status.id)
        b.append(status.task_id)
        # print("1setter",collectiontask.request.setter)
        # print("1deleter",collectiontask.request.deleter)
        # print("1fdel============================================",collectiontask.request.fdel)
        # print("1fset",collectiontask.request.fset)
        # print("1getter",collectiontask.request.getter)
        time.sleep(10)
    for ii in a:
        state = AsyncResult(id=str(ii), app=collectiontask.app)
        print("任務的狀態是1:=================================", state)
        with open("1.txt", 'a') as fd:
            fd.write("state:{}\n".format(state))

    for ii in b:
        # print("**********************************************************************")
        # celery_control = Control(collectiontask.app)
        # celery_control.revoke(str(ii), terminate=True, signal='SIGTERM')
        # time.sleep(2)
        state = AsyncResult(id=str(ii), app=collectiontask.app).status
        print("任務的狀態是2:=================================", state)
        with open("2.txt", 'a') as fd:
            fd.write("state:{}\n".format(state))

 

Celery_task/asks.py文件

 

#===============tasks.py文件中的內容是:
import time
from celery.result import AsyncResult
from celery.app.control import Control
from celery.task import Task
# from celery_task import app


def test1(x):
    print("test1", x)


def test2(x):
    print("test2", x)


class collectiontask(Task):
    name = 'my_collect'

    def run(self, *args, **kwargs):
        if args[0] == 'hello0':
            pass
        else:
            try:
                b = 0
                a = 1 / b
            except Exception as exc:
                # raise self.retry(countdown=3, exc=exc)
                # self.on_failure(exc=exc, task_id=self.request.id, args=args, kwargs=kwargs, einfo=self.request.delivery_info)
                self.on_failure(exc=exc, task_id=self.request.id, args=(11,22), kwargs=[], einfo=self.request.delivery_info)

    def on_success(self, retval, task_id, args, kwargs):
        print("task_id:", task_id)
        print("args:", args)
        print("kwargs:",kwargs)
        print(self.name)
        # collectiontask.apply_async(args, countdown=PERIOD)
        collectiontask.apply_async(args=args, queue='default',countdown=20)
        return super(collectiontask, self).on_success(retval, task_id, args, kwargs)
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print(f"f task_id: {task_id}")
        print(f"f args: {args}")
        print(f"f kwargs: {kwargs}")
        print(f"f einfo: {einfo}")
        print(f"f_exc: {exc}")
        print(f"type(exc): {type(exc)}")

        state = AsyncResult(id=str(task_id), app=self.app).state
        print(f"任務失敗后該狀態:{state}")
        celery_control = Control(collectiontask.app)

        celery_control.revoke(str(task_id), terminate=True, signal='SIGTERM')
        # collectiontask.app.control.revoke(task_id, terminate=True, signal='SIGTERM')
        time.sleep(5)
        state = AsyncResult(id=str(task_id), app=self.app).state
        print(f"失敗任務殺掉后的狀態是:{state}")
        return super(collectiontask, self).on_failure(exc, task_id, args,
                                                   kwargs, einfo)


    def on_retry(self, exc, task_id, args, kwargs, einfo):
        print(f"重試任務r_exc:{exc}")
        print(f"重試的次數:  {self.request.retries}")
        print(f"重試任務r_task_id:{task_id}")
        print(f"重試任務r_args: {args}")
        if self.request.retries == 2:
            celery_control = Control(self.app)
            celery_control.revoke(str(task_id), terminate=True, signal='SIGTERM')
            print(f"重試任務,殺任務成功!!!")

#===============任務的啟動命令:
# celery beat -A celery_task -l info
# celery worker -A celery_task -l info -P eventlet

 

import redis

result = redis.StrictRedis(host='127.0.0.1', db=1, port=6379)
print(result.keys())
result.flushall()

 

啟動命令:

# celery beat -A celery_task -l info

# celery worker -A celery_task -l info -P eventlet

執行效果:

 

 

 

 

 

 

 

遺留問題:
殺任務失敗(windows環境)

在window10系統中通過cmd啟動任務,這對殺死worker功能,沒有執行成功!

顯示如下的問題:

 

 

 

采用的方法是:

id = task_id["task_id"]

state = AsyncResult(id=str(id), app=main.app).state  # 這個是查看當前celery該任務的狀態

# 停止任務

celery_control = Control(main.app)

celery_control.revoke(str(id), terminate=True)

 

此種方法,我在linux系統中執行是成功的,不確定是什么原因導致,有知道的還望指點!!

Celery相關配置和命令:

相關配置(一般常用的):

 

# 注意,celery4版本后,CELERY_BROKER_URL改為BROKER_URL
BROKER_URL = 'amqp://username:passwd@host:port/虛擬主機名'
# 指定結果的接受地址
CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db'
# 指定任務序列化方式
CELERY_TASK_SERIALIZER = 'msgpack' 
# 指定結果序列化方式
CELERY_RESULT_SERIALIZER = 'msgpack'
# 任務過期時間,celery任務執行結果的超時時間
CELERY_TASK_RESULT_EXPIRES = 60 * 20   
# 指定任務接受的序列化類型.
CELERY_ACCEPT_CONTENT = ["msgpack"]   
# 任務發送完成是否需要確認,這一項對性能有一點影響     
CELERY_ACKS_LATE = True  
# 壓縮方案選擇,可以是zlib, bzip2,默認是發送沒有壓縮的數據
CELERY_MESSAGE_COMPRESSION = 'zlib' 
# 規定完成任務的時間
CELERYD_TASK_TIME_LIMIT = 5  # 在5s內完成任務,否則執行該任務的worker將被殺死,任務移交給父進程
# celery worker的並發數,默認是服務器的內核數目,也是命令行-c參數指定的數目
CELERYD_CONCURRENCY = 4 
# celery worker 每次去rabbitmq預取任務的數量
CELERYD_PREFETCH_MULTIPLIER = 4 
# 每個worker執行了多少任務就會死掉,默認是無限的
CELERYD_MAX_TASKS_PER_CHILD = 40 
# 設置默認的隊列名稱,如果一個消息不符合其他的隊列就會放在默認隊列里面,如果什么都不設置的話,數據都會發送到默認的隊列中
CELERY_DEFAULT_QUEUE = "default" 
# 設置詳細的隊列
CELERY_QUEUES = {
    "default": { # 這是上面指定的默認隊列
        "exchange": "default",
        "exchange_type": "direct",
        "routing_key": "default"
    },
    "topicqueue": { # 這是一個topic隊列 凡是topictest開頭的routing key都會被放到這個隊列
        "routing_key": "topic.#",
        "exchange": "topic_exchange",
        "exchange_type": "topic",
    },
    "task_eeg": { # 設置扇形交換機
        "exchange": "tasks",
        "exchange_type": "fanout",
        "binding_key": "tasks",
    },
}

 

 

相關命令:

celery -A app.celery_tasks.celery worker -Q queue --loglevel=info

# -A參數指定創建的celery對象的位置,該app.celery_tasks.celery指的是app包下面的celery_tasks.py模塊的celery實例,注意一定是初始化后的實例,后面加worker表示該實例就是任務執行者;

# -Q參數指的是該worker接收指定的隊列的任務,這是為了當多個隊列有不同的任務時可以獨立;如果不設會接收所有的隊列的任務;

發布任務

celery -A celery_task beat

執行任務

celery -A celery_task worker -l info -P eventlet

將以上兩條合並

celery -B -A celery_task worker

后台啟動celery worker進程

celery multi start work_1 -A appcelery

停止worker進程,如果無法停止,加上-A

celery multi stop WORKNAME

重啟worker進程

celery multi restart WORKNAME

查看進程數

celery status -A celery_task

# -l參數指定worker輸出的日志級別;

 

分析序列化的消息:

 

將序列化消息反序列化

 

{"body": "gAJ9cQAoWAQAAAB0YXNrcQFYGAAAAHRlc3RfY2VsZXJ5LmFkZF90b2dldGhlcnECWAIAAABpZHEDWCQAAAA2NmQ1YTg2Yi0xZDM5LTRjODgtYmM5OC0yYzE4YjJjOThhMjFxBFgEAAAAYXJnc3EFSwlLKoZxBlgGAAAAa3dhcmdzcQd9cQhYBwAAAHJldHJpZXNxCUsAWAMAAABldGFxCk5YBwAAAGV4cGlyZXNxC05YAwAAAHV0Y3EMiFgJAAAAY2FsbGJhY2tzcQ1OWAgAAABlcnJiYWNrc3EOTlgJAAAAdGltZWxpbWl0cQ9OToZxEFgHAAAAdGFza3NldHERTlgFAAAAY2hvcmRxEk51Lg==",  
# body是序列化后使用base64編碼的信息,包括具體的任務參數,其中包括了需要執行的方法、參數和一些任務基本信息
"content-encoding": "binary", # 序列化數據的編碼方式
"content-type": "application/x-python-serialize",  # 任務數據的序列化方式,默認使用python內置的序列化模塊pickle
"headers": {}, 
"properties": 
        {"reply_to": "b7580727-07e5-307b-b1d0-4b731a796652",       # 結果的唯一id
        "correlation_id": "66d5a86b-1d39-4c88-bc98-2c18b2c98a21",  # 任務的唯一id
        "delivery_mode": 2, 
        "delivery_info": {"priority": 0, "exchange": "celery", "routing_key": "celery"},  # 指定交換機名稱,路由鍵,屬性
        "body_encoding": "base64", # body的編碼方式
        "delivery_tag": "bfcfe35d-b65b-4088-bcb5-7a1bb8c9afd9"}}

 

常見的數據序列化方式

binary: 二進制序列化方式;python的pickle默認的序列化方法;

json:json 支持多種語言, 可用於跨語言方案,但好像不支持自定義的類對象;

XML:類似標簽語言;

msgpack:二進制的類 json 序列化方案, 但比 json 的數據結構更小, 更快;

yaml:yaml 表達能力更強, 支持的數據類型較 json 多, 但是 python 客戶端的性能不如 json

經過比較,為了保持跨語言的兼容性和速度,采用msgpack或json方式;

比如:

CELERY_TASK_SERIALIZER = 'msgpack'

CELERY_RESULT_SERIALIZER = 'msgpack'

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24   # 任務過期時間

CELERY_ACCEPT_CONTENT = ["msgpack"]            # 指定任務接受的內容序列化的類型.

 

Celery遇到的坑:

中間件redis無法連接:

問題如下:

[2020-02-15 14:45:03,340: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379/1: Error 10060 connecting to localhost:6379. WSAETIMEDOUT..

Trying again in 2.00 seconds...

問題分析:

Redis沒有啟動,查看windows10服務,redis服務已經啟動(重啟后還是無效)

關閉windows10關聯的redis服務,手動啟動redis服務(無效)

//手動啟動命令D:\Program Files\Redis>  .\redis-server.exe redis.windows.conf

解決方法:

Redis服務啟動保持不變,通過調整broker和backend

BROKER_URL = "redis://localhost:6379/1"

CELERY_RESULT_BACKEND = "redis://localhost:6379/2"

改為:

BROKER_URL = "redis://127.0.0.1:6379/1"

CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/2"

Celery任務出現未注冊的任務:

問題如下:

[2020-02-15 14:48:52,934: ERROR/MainProcess] Received unregistered task of type 'proj.tasks.add'.

The message has been ignored and discarded.

Did you remember to import the module containing this task?

Or maybe you're using relative imports?

問題分析:

'proj.tasks.add'.這個引用包,在celery中無法注冊,因此尋找能夠注冊該任務的方法

解決方法:

方法1:在配置文件里添加以下代碼:

CELERY_IMPORTS = (

    "proj.tasks"

)   #注意沒有逗號

方法2: 通過項目初始化init文件中添加如下的代碼,通過include 加載進來

from celery import Celery

app = Celery('celery-demo', include=["proj.tasks"])

app.config_from_object("proj.celeryconfig")’

解決后效果如下:

 

 

 

參考:

https://www.cnblogs.com/cwp-bg/p/8759638.html

https://www.cnblogs.com/wangyong123/p/11782965.html#_label3

https://www.jianshu.com/p/22753ba20546

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM