轉載請注明出處:點我
我的第一篇博客!嘿嘿!
在公司實習,接觸到的第一個項目就用到了Celery,之前是完全沒有接觸過Celery這玩意,然后花了點時間仔細的研究了下怎么用。在學習過程中也遇到了些問題,所以把自己的學習過程記錄下來,供他人參考下。
先說一下我的實驗環境:兩台ubuntu的機子,一台win7的機子,都安裝好了必須的軟件。用戶名為atsgxxx的機子跑的是ubuntu的系統,Redis就運行在這個上面,另外一台ubuntu的機子的用戶名是sclu084。
Celery
那么什么是Celery呢?
Celery是一個用Python開發的異步的分布式任務調度模塊。
Celery本身不包含消息服務,使用第三方消息服務,也就是Broker,來傳遞任務,目前支持的有Rebbimq,Redis,數據庫以及其他的一些比如Amazon SQS,Monogdb和IronMQ 。
因為項目里面用的是Redis,所以這里以Redis作為Broker。
安裝Celery
sudo apt-get install celery
使用Redis作為Broker的話,可以兩者一塊安裝
sudo pip install -U celery[redis]
當然如果正式生產環境中,有可能redis服務器和Celery在不同的機器上面的話,就要兩者單獨安裝
sudo apt-get install redis-server 這個命令可以安裝redis,包括了redi-cli工具
第一個簡單的例子
這個例子來自於Celery的官方文檔。先看代碼:
1 from celery import Celery 2 app = Celery('tasks',broker="redis://127.0.0.1:6379/0") 3 4 @app.task 5 def add(x,y): 6 return x + y
把代碼保存為tasks.py文件(這個例子運行在atsgxxx這台機器上,上面運行了Redis,所以broker是127.0.0.1)。然后再terminal下啟動worker。
celery -A tasks worker -l info
這個命令會啟動一個worker來執行task。執行完這條命令后,不出意外的出現下面這個界面的話表示worker已經啟動成功,正在等待執行任務。
然后啟動另外一個終端,進入python工作環境,執行任務,如下圖所示:
調用delay函數即可啟動add這個任務,add函數的參數為4,4,這個函數的效果是發送一條消息到broker中去,這個消息包括要執行的函數已經執行函數的參數,還有一些其他信息,具體的可以看Celery的文檔。
因為之前已經啟動了一個worker,這個worker會等待broker中的消息,一旦收到消息就會立刻執行消息
啟動了一個任務之后,可以看到之前啟動的worker已經開始執行任務了。效果如下圖所示:
從上圖中可以看到,任務已經被執行成功。
Celery與分布式
既然Celery是一個分布式的任務調度模塊,那么Celery是如何跟分布式掛上鈎的呢?首先得明白什么是分布式。我的理解是所謂的分布式就是由多台分布在不同地方的計算機通過網絡共同完成任務。在Celery里面,就可以是多台不同的計算機執行不同的任務或者是相同的任務。
如果要說Celery的分布式應用的話,我覺得就要提到Celery的消息路由機制,就要提一下AMQP協議。具體的可以查看AMQP的文檔。簡單地說就是可以有多個消息隊列(Message Queue),不同的消息可以指定發送給不同的Message Queue,而這是通過Exchange來實現的。發送消息到Message Queue中時,可以指定routiing_key,Exchange通過routing_key來把消息路由(routes)到不同的Message Queue中去。具體的可以參考下這個網頁,上面講的很詳細的了。
現在來看下代碼:(代碼實現的功能是在兩台ubuntu上面啟動worker,每個worker執行指定的Queue中的Task,然后在win7上面執行消息。同時演示了默認消息隊列的使用。)
1 from celery import Celery 2 3 app = Celery() 4 app.config_from_object("celeryconfig") 5 6 @app.task 7 def taskA(x,y): 8 return x + y 9 10 @app.task 11 def taskB(x,y,z): 12 return x + y + z 13 14 @app.task 15 def add(x,y): 16 return x + y
上面的tasks.py中,首先定義了一個Celery對象,然后用celeryconfig.py對celery對象進行設置,之后再分別定義了三個task,分別是taskA,taskB和add。接下來看一下celeryconfig.py文件
1 from kombu import Exchange,Queue 2 3 BROKER_URL = "redis://10.32.105.227:6379/0" CELERY_RESULT_BACKEND = "redis://10.32.105.227:6379/0" 4 5 CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_task_A",Exchange("for_task_A"),routing_key="task_a"),
Queue("for_task_B",Exchange("for_task_B"),routing_key="task_a")
) 6 7 CELERY_ROUTES = { 8 'tasks.taskA':{"queue":"for_task_A","routing_key":"task_a"}, 9 'tasks.taskB":{"queue":"for_task_B","routing_key:"task_b"} 10 }
在celeryconfig.py文件中,首先設置了brokel以及result_backend,接下來定義了三個Message Queue,並且指明了Queue對應的Exchange(當使用Redis作為broker時,Exchange的名字必須和Queue的名字一樣)以及routing_key的值。
現在現在其中一台ubuntu上面啟動一個worker,這個worker只執行for_task_A隊列中的消息,這是通過在啟動worker是使用-Q Queue_Name參數指定的。
celery -A tasks worker -l info -n worker.%h -Q for_task_A
其中-n參數表示這個worker的name,-Q參數指定了這個worker執行for_task_A隊列中的消息。執行結果如下圖所示:
上面的執行結果表明名字為worker.atsgxxx的任務已經啟動,等待執行for_task_A中的任務。
然后再win7上面執行taskA任務。在win7上,進入CMD,切換當前目錄到代碼坐在的工程下,啟動python,執行下面代碼啟動taskA:
from tasks import * task_A_re = taskA.delay(100,200)
執行完上面的代碼之后,task_A消息會被立即發送到for_task_A隊列中去。此時已經啟動的worker.atsgxxx 會立即執行taskA任務。效果如下圖所示:
可以看到taskA已經被worker.atsgxxx執行成功.
然后再win7上面查看taskA的執行狀態:
也顯示taskA已經成功被執行了。
重復上面的過程,在另外一台機器上啟動一個worker專門執行for_task_B中的任務,在win7上執行taskB任務。整個過程及結果如下面的圖所示:
在上面的tasks.py文件中還定義了add任務,但是在celeryconfig.py文件中沒有指定這個任務route到那個Queue中去執行,此時執行add任務的時候,add會route到Celery默認的名字叫做celery的隊列中去。
下面現在wind7上面執行add任務,然后再另外一個終端上面啟動一個worker執行名字為celery的隊列中的消息(這個名字叫做celery的Queue不是我們定義的,是Celery默認的)。結果如下圖所示:
此時可以看到add任務的狀態是PENDING,表示還沒有被執行,因為這個消息沒有在celeryconfig.py文件中指定應該route到哪一個Queue中,所以會被發送到默認的名字為celery的Queue中,但是我們還沒有啟動worker執行celery中的任務。接下來我們在啟動一個worker執行celery隊列中的任務。
1 celery -A tasks worker -l info -n worker.%h -Q celery
然后再查看add的狀態,會發現狀態由PENDING變成了SUCCESS。效果如下圖所示:
Celery與定時任務
在celery中執行定時任務非常簡單,只需要設置celery對象的CELERYBEAT_SCHEDULE屬性即可。
下面我們接着上面的代碼,在celeryconfig.py中添加CELERYBEAT_SCHEDULE變量:
1 CELERY_TIMEZONE = 'UTC' 2 CELERYBEAT_SCHEDULE = { 3 'taskA_schedule' : { 4 'task':'tasks.taskA', 5 'schedule':20, 6 'args':(5,6) 7 }, 8 'taskB_scheduler' : { 9 'task':"tasks.taskB", 10 "schedule":200, 11 "args":(10,20,30) 12 }, 13 'add_schedule': { 14 "task":"tasks.add", 15 "schedule":10, 16 "args":(1,2) 17 } 18 }
其中定義了3個定時任務,即每隔20s執行taskA任務,參數為(5,6),每隔200s執行taskB任務,參數為(10,20,30),每隔10s執行add任務,參數為(1,2).通過下列命令啟動一個定時任務:
1 celery -A tasks beat
使用beat參數即可啟動定時任務。
下面分別在三台機器上面啟動三個worker分別執行for_task_A,for_task_B和celery這三個Queue中的任務。啟動之后,再在其中一台機器上面啟動定時任務。結果如下圖所示(第一張為啟動定時任務一段時間后的截圖):
可以看到一旦scheduler啟動起來,就會按照CELERYBEAT_SCHEDULE指定的時間執行指定的任務。然后已經啟動的worker已接受到一消息就會執行任務,如下圖所示: