1、隊列介紹
任務隊列用作跨線程或機器分配工作的機制。
任務隊列的輸入是稱為任務的工作單元。
專用工作進程不斷監視任務隊列以執行新工作。
Celery通過消息進行通信,通常使用經紀人(brokers)在客戶和工人之間進行調解。
為了啟動任務,客戶端向隊列添加消息,然后經紀人(brokers)將該消息傳遞給工作者。
Celery系統可以由多個工作人員和經紀人組成,讓位於高可用性和水平擴展。
Celery是用Python編寫的,但協議可以用任何語言實現。
還可以實現語言互操作性,從而暴露HTTP端點並具有請求它的任務(webhooks)。
1.1如何安裝
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#id6
1.2 環境 ,自定義包的引入
# 導入包,添加目錄的時候只用添加它的父目錄。
# 如果想要添加包中的文件,就必須,把包的目錄放到path環境中
sys.path.append(r"C:\Users\Administrator\PycharmProjects\untitled1")
1.3啟動task、
# 只要啟動了celery服務,可以在任何地方添加任務,有兩種方式啟動
1.3.1 # KeyError: 'module.tasks.tasks.add'# 這是因為在tasks中沒有這個任務名 # 我們需要正確的導入, # 如果你想要用源文件的方法記得只能導入那個方法 # from XXXX import tasks.add 這是錯誤的
1.3.2 # 在相應的文件中添加app的對象,然后重寫想要的方法,記得@ # 因為只是校驗包名,所以我們可以隨便寫個方法,但要名字對
2、celery使用
2.1 borkers
Celery需要消息傳輸(borkers)來發送和接收消息。
2.2 backend
選擇並安裝消息傳輸(borkers)。安裝Celery並創建您的第一個任務。啟動worker並調用任務。在任務轉換到不同狀態時跟蹤任務,並檢查返回值。
注意:如果您想跟蹤任務的狀態,Celery需要在某處存儲或發送狀態。(backend)(我們需要配置后端)
有幾個內置的結果后端可供選擇:SQLAlchemy / Django ORM,Memcached,Redis,RPC(RabbitMQ / AMQP),以及 - 或者您可以定義自己的。
在本例中,我們使用rpc結果后端,它將狀態作為瞬態消息發回。后端是通過Celery的后端參數指定的(或者如果您選擇使用配置模塊,則通過result_backend設置):
如果存儲在redis中,celery表代表沒有完成,celery-task-meta-f418abea-7827-4220-b72e-a0669e8b8a08表代表完成。
2.3 使用方式
2.3.1 單文件使用,模塊形式
#創建task.py from celery import Celery
#您需要的第一件事是Celery實例。我們稱之為Celery應用程序或簡稱app。
#由於此實例用作您希望在Celery中執行的所有操作的入口點,例如創建任務和管理工作程序,因此其他模塊必須可以導入它。
#在本教程中,我們將所有內容保存在單個模塊中,但對於較大的項目,您需要創建專用模塊。
app = Celery('tasks', broker='pyamqp://guest@localhost//')
#Celery的第一個參數是當前模塊的名稱。只有在__main__模塊中定義任務時才能自動生成名稱。
#第二個參數是broker關鍵字參數,指定要使用的消息代理的URL。這里使用RabbitMQ(也是默認選項)。redis://localhost BROKER_URL = 'redis://localhost:6379/0' 0代表0數據庫 @app.task def add(x, y): return x + y
運行的命令:(必須在tasks當前目錄下)
$ celery -A tasks worker --loglevel=info
該命令中-A參數表示的是Celery APP的名稱,這個實例中指的就是tasks.py,就是這個tasks文件,他會自動從中找到app然后執行,worker是一個執行任務角色,后面的
loglevel=info記錄日志類型默認是info,這個命令啟動了一個worker,用來執行程序中add這個加法任務(task)
回調任務:
from tasks import add add.delay(4, 4)
跟蹤狀態:
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')
>>> result = add.delay(4, 4)
ready()方法返回任務是否已完成處理:
>>> result.ready()
Ture
>>> result.traceback
如果任務引發異常,您還可以訪問原始回溯:
>>> result.get(propagate=False)
同步操作,timeout可以指定時間;可以直接打印異常;可以拒絕打印異常如上。
后端使用資源來存儲和傳輸結果。要確保釋放資源,最后必須在調用任務后返回的EVERY AsyncResult實例上調用get()或forget()。要不然它會一直存在數據庫中
2.3.2 配置文件使用
與上面的單文件模塊一起使用或者在包方式中使用
您可以通過調用app.config_from_object()方法告訴Celery實例使用配置模塊在task.py:
app.config_from_object('celeryconfig')
此模塊通常稱為“celeryconfig”,但您可以使用任何模塊名稱。在上面的例子中,一個名為celeryconfig.py的模塊必須可以從當前目錄或Python路徑加載。它可能看起來像這樣:
broker_url = 'pyamqp://' result_backend = 'rpc://' task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'Europe/Oslo' enable_utc = True
與消費類應用一樣,不需要太多配置即可運行。 它有一個輸入和一個輸出。輸入必須連接到代理,輸出可以選擇連接到結果后端。 但是,如果仔細觀察背面,會有一個蓋子顯示滑塊,刻度盤和按鈕的負載:這是配置。 對於大多數用例,默認配置應該足夠好,但是可以配置許多選項以使Celery完全按照需要工作。 閱讀可用選項是一個好主意,熟悉可配置的內容。您可以在“配置”和“默認值”參考中閱讀有關選項的信息。可以直接在應用程序上或使用專用配置模塊設置配置。
app.命令 都是可以直接放到tasks文件中的 例如,您可以通過更改task_serializer設置來配置用於序列化任務有效負載的默認序列化程序: app.conf.task_serializer = 'json'
如果您一次配置了許多設置,則可以使用update: app.conf.update( task_serializer='json', accept_content=['json'], # Ignore other content result_serializer='json', timezone='Europe/Oslo', enable_utc=True, ) 對於大型項目,建議使用專用配置模塊。 不鼓勵硬編碼周期性任務間隔和任務路由選項。將它們保存在集中位置要好得多。 對於庫來說尤其如此,因為它使用戶能夠控制其任務的行為方式。集中配置還允許您的SysAdmin在發生系統故障時進行簡單的更改。 為了演示配置文件的強大功能,您可以將行為不當的任務路由到專用隊列: celeryconfig.py: task_routes = { 'tasks.add': 'low-priority', } Or instead of routing it you could rate limit the task instead, so that only 10 tasks of this type can be processed in a minute (10/m): celeryconfig.py: task_annotations = { 'tasks.add': {'rate_limit': '10/m'} } If you’re using RabbitMQ or Redis as the broker then you can also direct the workers to set a new rate limit for the task at runtime: $ celery -A tasks control rate_limit tasks.add 10/m worker
2.3.3 包形式使用
文件結構:(必須這樣結構,與運行的命令有關)
proj/__init__.py /celery.py /tasks.py
celery.py
from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('proj', broker='amqp://', backend='amqp://', include=['proj.tasks']) # Optional configuration, see the application user guide. app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
task.py
from __future__ import absolute_import, unicode_literals from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
運行的命令(我們需要移到項目目錄的上一級)
celery -A proj worker -l info
2.3.4 一些參數:
- transport:是您在celery模塊的broker參數中指定的URL,您還可以使用-b選項在命令行上指定其他代理。 - concurrency:是用於同時處理任務的prefork工作進程的數量,當所有這些進程忙於工作時,新任務必須等待其中一個任務完成才能處理。默認並發數是該計算機上CPU的數量(包括核心),您可以使用celery worker -c選項指定自定義數字。沒有建議值,因為最佳數量取決於許多因素,但如果您的任務主要是I / O限制,那么您可以嘗試增加它,實驗表明添加超過CPU數量的兩倍很少有效,而且可能會降低性能。包括默認的prefork池,Celery還支持使用Eventlet,Gevent,並在單個線程中運行(請參閱並發)。 - events:是一個選項,啟用后會導致Celery為工作中發生的操作發送監視消息(事件)。這些可用於監視程序,如芹菜事件和Flower - 實時Celery監視器,您可以在監視和管理指南中閱讀。 -queues:是工作人員將從中使用任務的隊列列表。可以告訴工作人員同時從多個隊列中進行消費,這用於將消息路由到特定工作人員,作為服務質量,關注點分離和優先級排序的手段,所有這些都在“路由指南”中進行了描述。
2.3.5 后台運行
celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \ --logfile=/var/log/celery/%n%I.log
如果不指定 pid 和log的話會在當前生成 w1.pid,w1.log,w1-1.log,w1-2.log (2線程)
w1就是worker1的名稱,線程會根據你的cpu自己定 。
十個進程的話 w1 換成10
在生產中,您將需要在后台運行worker,這在daemonization教程中有詳細描述。守護程序腳本使用celery multi命令在后台啟動一個或多個worker: $ celery multi start w1 -A proj -l info celery multi v4.0.0 (latentcall) > Starting nodes... > w1.halcyon.local: OK You can restart it too: $ celery multi restart w1 -A proj -l info celery multi v4.0.0 (latentcall) > Stopping nodes... > w1.halcyon.local: TERM -> 64024 > Waiting for 1 node..... > w1.halcyon.local: OK > Restarting node w1.halcyon.local: OK celery multi v4.0.0 (latentcall) > Stopping nodes... > w1.halcyon.local: TERM -> 64052 or stop it: $ celery multi stop w1 -A proj -l info The stop command is asynchronous so it won’t wait for the worker to shutdown. You’ll probably want to use the stopwait command instead, this ensures all currently executing tasks are completed before exiting: $ celery multi stopwait w1 -A proj -l info celery multi不存儲有關worker的信息,因此在重新啟動時需要使用相同的命令行參數。停止時,只能使用相同的pidfile和logfile參數。 默認情況下,它會在當前目錄中創建pid和日志文件,以防止多個工作人員在彼此之上啟動,鼓勵您將這些文件放在專用目錄中: $ mkdir -p /var/run/celery $ mkdir -p /var/log/celery $ celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \ --logfile=/var/log/celery/%n%I.log 使用multi命令可以啟動多個worker,並且還有一個強大的命令行語法來為不同的worker指定參數,例如: $ celery multi start 10 -A proj -l info -Q:1-3 images,video -Q:4,5 data \ -Q default -L:4,5 debug
2.3.6 定時器功能
使用定時器功能首先得配置一下schedule,剛剛我們是直接在Celery函數加入配置,現在我們專門用一個文件來放配置文件,schedule也會寫在這里面。我們可以用celery的beat去周期的生成任務和執行任務
https://blog.csdn.net/freeking101/article/details/74707619
修改tasks.py
from celery import Celery from time import sleep import celeryconfig app = Celery('tasks')#, backend='amqp', broker='amqp://guest@localhost//') app.config_from_object('celeryconfig') @app.task def add(x, y): sleep(5) return x + y
增加配置文件celeryconfig.py
from celery.schedules import crontab BROKER_URL = 'amqp://guest@localhost//' CELERY_RESULT_BACKEND = 'amqp://' CELERYBEAT_SCHEDULE={ "every-1-minute": { 'task': 'tasks.add', 'schedule': crontab(minute='*/1'), #如果是一秒中直接寫
'args': (5,6)# 執行的參數 } }
表示一分鍾觸發一次add的函數,args是傳入的參數,表示一分鍾執行一次add(5,6),注意如果再添加一個任務,不能與every-1-minute重復,不然只有最后一個生效了。
然后執行celery -A tasks worker -B --loglevel=info 就能夠增加觸發beat任務了,會在本地生成一個celerybeat-schedule文件。(linux可以直接運行 )
最好在-B后面加一個 -s /tmp/celerybeat-schedule ,不然很可能導致當前目錄沒有寫權限而報permission refused
運行:(這是因為window中必須分離)
celery -A tasks worker --loglevel=info
celery -A tasks beat (beat就是領導發布定時任務)-i info
Celery ValueError: not enough values to unpack (expected 3, got 0)的解決方案
pip install eventlet
然后啟動worker的時候加一個參數,如下:(總之在最后加上P)
celery -A <mymodule> worker -l info -P eventlet