celery中的生產者消費者問題


celery中的生產者消費者問題

task1.py文件中:

# demo1:task.py and celery.py in one file
# run it by
from celery import Celery
import time

# 定義worker(消費者),並指定broker和backend(共享緩沖區)
# 每啟動一個woker就相當於創建一個消費者(啟動woker方法:celery -A 創建app的語句所在的文件名)
# 給woker指定queue的方法:
# 1、啟動woker時-Q指定創建的這個消費者要從共享緩沖區中哪個隊列中取出產品並消費
# 2、如果沒有指定queue,則啟動的woker默認從共享緩沖區的default隊列中取出產品並消費
# 注意:兩個不同的woker監聽共享緩沖區中的同一個隊列會出錯(避免出錯的方法:給啟動的每個woker用-Q指定要監聽的隊列,並用@...(queue='')的方法給要在使用該woker處理的task函數指定相同的放入的隊列)

app2=Celery('task2_app2',broker='redis://127.0.0.1:6379/0',backend='redis://127.0.0.1:6379/0')

# 定義task(生產者)
# 在程序執行過程中,每調用一次add.delay就相當於生產者生產一個產品並放入共享緩沖區
@app2.task(queue='queue2')# 指定該生產者生產的產品要放入共享緩沖區中的哪個隊列
# 給task指定queue的方法:
# 1、此處是直接給task指定了queue
# 2、也可以采用給task命名,然后在app.conf.update中定義name與queue的對應規則來批量給多個task指定對應的queue。
# 3、如果沒有指定,則默認將task放入共享緩沖區中名為default的隊列中
def add(x,y):
print('running ',x,'+',y)
print(x+y)
time.sleep(10)
return x+y

 

 

生產者:應用程序

生產動作:調用add.delay()

共享緩沖區:broker和backend

消費者:每個woker都是一個消費者

消費動作:woker啟動后會自動監聽並從broker中取出任務並執行(消費)

 

創建消費者:celery -A task1 worker -l info

(開啟worker的實質實際上就是執行app=Celery(...)語句,可以使用 –-concurrency=個數 來限制每個消費者可以並行的線程數)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM