celery+rabbitmq基本使用


版本:

  (celery==4.3,rabbitmq==3.7)

一.使用rabbitmq

# 安裝rabbitMQ(mac)
brew install rabbitmq
# 配置環境變量(.bash_profile或.profile,注意路徑)
export RABBIT_HOME=/usr/local/Cellar/rabbitmq/3.7.15
export PATH=$PATH:$RABBIT_HOME/sbin

# 設置RabbitMQ:創建一個用戶,一個虛擬主機並設置權限,(需要先啟動服務)
sudo rabbitmqctl add_user myuser mypassword
sudo rabbitmqctl add_vhost myvhost  # 虛擬主機
sudo rabbitmqctl set_user_tags myuser mytag  # administrator
sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

#啟動服務
sudo rabbitmq-server
# 后台運行
sudo rabbitmq-server -detached
# 不要kill(1)來停止服務器
sudo rabbitmqctl stop

二.celery使用

# 安裝
pip install celery

  應用

# tasks.py
from celery import Celery

app = Celery(
    'tasks',  # 當前模塊的名字
    broker='amqp://guest@localhost:port//'  # 消息隊列的url
)

@app.task
def add(x, y):
    return x + y

  運行worker

celery -A tasks worker --loglevel=info

  調用任務

# delay
from tasks import add
add.delay(arg1,arg2,kwarg1='x',kwarg2='y')
add.delay(*args, **kwargs).apply_async(args, kwargs)

# apply_async
task.apply_async(args=[arg1,arg2], kwargs={'kwargs':'x','kwargs':'y'})
tasks.apply_async((arg,), {'kwarg': value})
# 從現在起10秒內執行
tasks.apply_async(countdown=10) 
# 從現在起10秒內執行,使用指定eta
tasks.apply_async(eta=now + timedelta(seconds=10))
# 從現在起一分鍾后執行,但在2分鍾后過期
tasks.apply_async(countdown=60, expires=120)
# 在2天后到期,設置使用datetime對象
T.apply_async(expires=now + timedelta(days=2))

# send_task:任務未在當前進程中注冊
app.send_task('任務', args=[arg,], queue='default')

# signature用於傳遞任務調用簽名的對象(例如通過網絡發送),並且它們也支持calling api
task.s(arg1,arg2,kwarg1='x',kwargs2='y').apply_async()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM