python淺學【網絡服務中間件】之Celery


一、關於Celery:

什么是任務隊列:

  任務隊列一般用於線程或計算機之間分配工作的一種機制。

  任務隊列的輸入是一個稱為任務的工作單元,有專門的工作進行不斷的監視任務隊列,進行執行新的任務工作。

 

什么的Celery:

  Celery 通過消息機制進行通信,通常使用中間人(Broker)作為客戶端和職程(Worker)調節。啟動一個任務,客戶端向消息隊列發送一條消息,然后中間人(Broker)將消息傳遞給一個職程(Worker),最后由職程(Worker)進行執行中間人(Broker)分配的任務。

  Celery 可以有多個職程(Worker)和中間人(Broker),用來提高Celery的高可用性以及橫向擴展能力。

  Celery 是用 Python 編寫的,但協議可以用任何語言實現。除了 Python 語言實現之外,還有Node.js的node-celery和php的celery-php

 

二、python對Celery的簡單使用:

編寫task.py

import time
from celery import Celery


app = Celery('task', broker='amqp://', backend='redis://localhost')
app.config_from_object('config')

@app.task
def worker(name):
    print(f'{name}工作正在運行')
    time.sleep(2)
    return f'{name}-ok'

 

執行命令:celery worker -A task --loglevel=info

task是任務文件名,worker任務角色,--loglevel=info 任務日志級別

結果:

(base) [root@localhost mywork]# celery worker -A task --loglevel=info
/root/miniconda3/lib/python3.7/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,
 
 -------------- celery@localhost.localdomain v4.4.2 (cliffs)
--- ***** ----- 
-- ******* ---- Linux-3.10.0-1062.18.1.el7.x86_64-x86_64-with-centos-7.7.1908-Core 2020-03-23 22:25:02
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         task_log:0x7fef6d700150
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . task_log.worker

[2020-03-23 22:25:03,019: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2020-03-23 22:25:03,030: INFO/MainProcess] mingle: searching for neighbors
[2020-03-23 22:25:04,054: INFO/MainProcess] mingle: all alone
[2020-03-23 22:25:04,087: INFO/MainProcess] celery@localhost.localdomain ready.

 

編寫run.py:

from task import worker

def run(name):
    w = worker.delay(name)
    while not w.ready():
        pass
    result = w.get()
    print(result)
    return result

run('log')
run('Riy')
run('test')

 

執行run.py 結果如下:

[tasks]
  . task.worker
[2020-03-23 22:54:32,337: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2020-03-23 22:54:32,348: INFO/MainProcess] mingle: searching for neighbors
[2020-03-23 22:54:33,377: INFO/MainProcess] mingle: all alone
[2020-03-23 22:54:33,397: INFO/MainProcess] celery@localhost.localdomain ready.
[2020-03-23 22:54:37,556: INFO/MainProcess] Received task: task.worker[2731ffec-d29e-4271-b41e-ca1c58b666c6]  
[2020-03-23 22:54:37,557: WARNING/ForkPoolWorker-1] log工作正在運行
[2020-03-23 22:54:39,567: INFO/MainProcess] Received task: task.worker[d4630eb6-15a6-4535-a007-6753d5173d7e]  
[2020-03-23 22:54:39,568: INFO/ForkPoolWorker-1] Task task.worker[2731ffec-d29e-4271-b41e-ca1c58b666c6] succeeded in 2.011716676002834s: 'log-ok'
[2020-03-23 22:54:39,570: WARNING/ForkPoolWorker-1] Riy工作正在運行
[2020-03-23 22:54:41,573: INFO/ForkPoolWorker-1] Task task.worker[d4630eb6-15a6-4535-a007-6753d5173d7e] succeeded in 2.0033114409889095s: 'Riy-ok'
[2020-03-23 22:54:41,576: INFO/MainProcess] Received task: task.worker[135cc550-0141-44c1-9719-1bed5d85c0ca]  
[2020-03-23 22:54:41,577: WARNING/ForkPoolWorker-1] test工作正在運行
[2020-03-23 22:54:43,580: INFO/ForkPoolWorker-1] Task task.worker[135cc550-0141-44c1-9719-1bed5d85c0ca] succeeded in 2.0028840559971286s: 'test-ok'

  

如果您想要更好地控制任務執行的時間,例如,特定時間或一周中的某天,您可以使用crontab計划類型:

創建config.py文件如下:

from celery.schedules import crontab

CELERY_TIMEZONE = 'Asia/Shanghai'

CELERYBEAT_SCHEDULE = {
# Executes every Monday morning at 7:30 a.m
    'add_time':{
        'task':'task_log.worker',
        'schedule':crontab(hour=7, minute=30, day_of_week=1),
        'args':(16, 16)
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM