一、關於Celery:
什么是任務隊列:
任務隊列一般用於線程或計算機之間分配工作的一種機制。
任務隊列的輸入是一個稱為任務的工作單元,有專門的工作進行不斷的監視任務隊列,進行執行新的任務工作。
什么的Celery:
Celery 通過消息機制進行通信,通常使用中間人(Broker)作為客戶端和職程(Worker)調節。啟動一個任務,客戶端向消息隊列發送一條消息,然后中間人(Broker)將消息傳遞給一個職程(Worker),最后由職程(Worker)進行執行中間人(Broker)分配的任務。
Celery 可以有多個職程(Worker)和中間人(Broker),用來提高Celery的高可用性以及橫向擴展能力。
Celery 是用 Python 編寫的,但協議可以用任何語言實現。除了 Python 語言實現之外,還有Node.js的node-celery和php的celery-php
二、python對Celery的簡單使用:
編寫task.py
import time from celery import Celery app = Celery('task', broker='amqp://', backend='redis://localhost') app.config_from_object('config') @app.task def worker(name): print(f'{name}工作正在運行') time.sleep(2) return f'{name}-ok'
執行命令:celery worker -A task --loglevel=info
task是任務文件名,worker任務角色,--loglevel=info 任務日志級別
結果:
(base) [root@localhost mywork]# celery worker -A task --loglevel=info /root/miniconda3/lib/python3.7/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is absolutely not recommended! Please specify a different user using the --uid option. User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- celery@localhost.localdomain v4.4.2 (cliffs) --- ***** ----- -- ******* ---- Linux-3.10.0-1062.18.1.el7.x86_64-x86_64-with-centos-7.7.1908-Core 2020-03-23 22:25:02 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: task_log:0x7fef6d700150 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: redis://localhost/ - *** --- * --- .> concurrency: 1 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . task_log.worker [2020-03-23 22:25:03,019: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672// [2020-03-23 22:25:03,030: INFO/MainProcess] mingle: searching for neighbors [2020-03-23 22:25:04,054: INFO/MainProcess] mingle: all alone [2020-03-23 22:25:04,087: INFO/MainProcess] celery@localhost.localdomain ready.
編寫run.py:
from task import worker def run(name): w = worker.delay(name) while not w.ready(): pass result = w.get() print(result) return result run('log') run('Riy') run('test')
執行run.py 結果如下:
[tasks] . task.worker [2020-03-23 22:54:32,337: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672// [2020-03-23 22:54:32,348: INFO/MainProcess] mingle: searching for neighbors [2020-03-23 22:54:33,377: INFO/MainProcess] mingle: all alone [2020-03-23 22:54:33,397: INFO/MainProcess] celery@localhost.localdomain ready. [2020-03-23 22:54:37,556: INFO/MainProcess] Received task: task.worker[2731ffec-d29e-4271-b41e-ca1c58b666c6] [2020-03-23 22:54:37,557: WARNING/ForkPoolWorker-1] log工作正在運行 [2020-03-23 22:54:39,567: INFO/MainProcess] Received task: task.worker[d4630eb6-15a6-4535-a007-6753d5173d7e] [2020-03-23 22:54:39,568: INFO/ForkPoolWorker-1] Task task.worker[2731ffec-d29e-4271-b41e-ca1c58b666c6] succeeded in 2.011716676002834s: 'log-ok' [2020-03-23 22:54:39,570: WARNING/ForkPoolWorker-1] Riy工作正在運行 [2020-03-23 22:54:41,573: INFO/ForkPoolWorker-1] Task task.worker[d4630eb6-15a6-4535-a007-6753d5173d7e] succeeded in 2.0033114409889095s: 'Riy-ok' [2020-03-23 22:54:41,576: INFO/MainProcess] Received task: task.worker[135cc550-0141-44c1-9719-1bed5d85c0ca] [2020-03-23 22:54:41,577: WARNING/ForkPoolWorker-1] test工作正在運行 [2020-03-23 22:54:43,580: INFO/ForkPoolWorker-1] Task task.worker[135cc550-0141-44c1-9719-1bed5d85c0ca] succeeded in 2.0028840559971286s: 'test-ok'
如果您想要更好地控制任務執行的時間,例如,特定時間或一周中的某天,您可以使用crontab計划類型:
創建config.py文件如下:
from celery.schedules import crontab CELERY_TIMEZONE = 'Asia/Shanghai' CELERYBEAT_SCHEDULE = { # Executes every Monday morning at 7:30 a.m 'add_time':{ 'task':'task_log.worker', 'schedule':crontab(hour=7, minute=30, day_of_week=1), 'args':(16, 16) } }