celery定時器以及出錯解決方案Celery Received unregistered task of type


# coding=utf-8
import connexion
import copy
import requests
import json
import time
import uuid
from celery import Celery
from celery.schedules import crontab
from flask import jsonify
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)

app = connexion.FlaskApp(__name__, specification_dir='.')

application = app.app
# Celery configuration db 7
application.config['CELERY_BROKER_URL'] = 'redis://:pwd@r-xxx.redis.rds.aliyuncs.com:6379/7'
application.config['result_backend'] = 'redis://:pwd@r-xxx.redis.rds.aliyuncs.com:6379/7'

# 調度器
# 問題:Celery Received unregistered task of type
# 解決:我的flask的main.py文件和celery的主腳本task_timed_del_post.py在同一目錄,
# 我的定時函數@celery.task(bind=True) def timed_del_post(self)也同時在該腳本中沒有獨立出去
# 下面的app是我工程項目的目錄,在app里開始找,然后找到自己,然后找到函數timed_del_post
# 這里配置調度器和下面使用調度器效果一致
# "schedule": crontab(minute="*/1"),
application.config['beat_schedule'] = {
         "test001": {
            'task': 'app.task_timed_del_post.timed_del_post',
            'schedule': 1.0,
            'args': ()
        }
}


RESULT = {
    'code': 0,
    'message': 'Success',
    'data': {}
}
result = copy.deepcopy(RESULT)


def make_celery(app):
    celery = Celery(app.name, backend=app.config['result_backend'],
                    broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)

    celery.conf.update(
        result_serializer='json',
        task_serializer='json',
    )
    celery.conf.timezone = 'UTC'
    # celery.conf.beat_schedule = {
    #      "test001": {
    #         'task': 'app.task_timed_del_post.timed_del_post',
    #         'schedule': 1.0,
    #         'args': ()
    #     }
    # }

    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

celery = make_celery(app.app)


# 核心函數
@celery.task(bind=True)
def timed_del_post(self):
    print("hello world!")

    return "ok"


# 運行方式:
# 服務器使用一個celery_service
# screen -S celery_service
# cd /root/backend/services/app
# 我的初始化celery對象叫做celery, 如果初始化叫做app,應該默認不寫,叫別的名改別的名字
# celery -A app.task_timed_del_post.celery worker -B --loglevel=info
# celery -A app.task_timed_del_post worker -B -s /tmp/celerybeat-schedule --loglevel=info

  

app文件夾下:

task_timed_del_post.py

timed_del_post函數在task_timed_del_post.py

 

更簡單點的:

from celery import Celery, Task
from celery.utils.log import get_task_logger
from flask import Flask
from datetime import timedelta

app = Flask(__name__)
app.config['CELERY_NAME'] = 'test_celery'
app.config['CELERY_BROKER_URL'] = 'redis://:lyp82nLF@r-2zeaf096da357c24.redis.rds.aliyuncs.com:6379/7'
app.config['CELERY_RESULT_BACKEND'] = 'redis://:lyp82nLF@r-2zeaf096da357c24.redis.rds.aliyuncs.com:6379/7'
app.config['CELERYBEAT_SCHEDULE'] = {
        'add-every-10-seconds': {
            'task': 'app.task_coretask.long_task',
            'schedule': 1.0,
            'args': ()
        },
    }
logger = get_task_logger(__name__)
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)


@celery.task(bind=True)
def long_task(self):
    """啟動worker: celery -A test_celery.celery worker -B --loglevel=debug"""
    logger.debug('================')
    logger.debug('此處寫需要定時調用的任務')
    logger.debug('================')
    print("hello world")
    return "ok"

# 服務器使用一個celery_service
# screen -S
# cd /root/backend/services/app
# celery -A app.task_coretask.celery worker -B --loglevel=info

# 測試client
# class Trigger(Task):
#     def run(self):
#         task = long_task.apply_async()
#         print(task)
#
#
# if __name__ == '__main__':
#     trigger = Trigger()
#     trigger.run()

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM