Python 並行分布式框架 Celery
Celery 官網:http://www.celeryproject.org
Celery 官方文檔英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文檔中文版:http://docs.jinkan.org/docs/celery
celery配置:http://docs.jinkan.org/docs/celery/configuration.html#configuration
參考:http://www.cnblogs.com/landpack/p/5564768.html http://blog.csdn.net/happyAnger6/article/details/51408266
http://www.cnblogs.com/forward-wang/p/5970806.html
分布式隊列神器 Celery:https://segmentfault.com/a/1190000008022050
celery最佳實踐:https://my.oschina.net/siddontang/blog/284107
Celery 分布式任務隊列快速入門:http://www.cnblogs.com/alex3714/p/6351797.html
異步任務神器 Celery 快速入門教程:https://blog.csdn.net/chenqiuge1984/article/details/80127446
定時任務管理之python篇celery使用:http://student-lp.iteye.com/blog/2093397
異步任務神器 Celery:http://python.jobbole.com/87086/
celery任務調度框架實踐:https://blog.csdn.net/qq_28921653/article/details/79555212
Celery-4.1 用戶指南: Monitoring and Management Guide:https://blog.csdn.net/libing_thinking/article/details/78592801
Celery安裝及使用:https://blog.csdn.net/u012325060/article/details/79292243
Celery學習筆記(一):https://blog.csdn.net/sdulsj/article/details/73741350
Celery 簡介
除了redis,還可以使用另外一個神器---Celery。Celery是一個異步任務的調度工具。
Celery 是 Distributed Task Queue,分布式任務隊列,分布式決定了可以有多個 worker 的存在,隊列表示其是異步操作,即存在一個產生任務提出需求的工頭,和一群等着被分配工作的碼農。
在 Python 中定義 Celery 的時候,我們要引入 Broker,中文翻譯過來就是“中間人”的意思,在這里 Broker 起到一個中間人的角色。在工頭提出任務的時候,把所有的任務放到 Broker 里面,在 Broker 的另外一頭,一群碼農等着取出一個個任務准備着手做。
這種模式注定了整個系統會是個開環系統,工頭對於碼農們把任務做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務的結果。這個 Backend 有點像我們的 Broker,也是存儲任務的信息用的,只不過這里存的是那些任務的返回結果。我們可以選擇只讓錯誤執行的任務返回結果到 Backend,這樣我們取回結果,便可以知道有多少任務執行失敗了。
Celery(芹菜)是一個異步任務隊列/基於分布式消息傳遞的作業隊列。它側重於實時操作,但對調度支持也很好。Celery用於生產系統每天處理數以百萬計的任務。Celery是用Python編寫的,但該協議可以在任何語言實現。它也可以與其他語言通過webhooks實現。Celery建議的消息隊列是RabbitMQ,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, 和數據庫(使用SQLAlchemy的或Django的 ORM) 。Celery是易於集成Django, Pylons and Flask,使用 django-celery, celery-pylons and Flask-Celery 附加包即可。
Celery 介紹
在Celery中幾個基本的概念,需要先了解下,不然不知道為什么要安裝下面的東西。概念:Broker、Backend。
什么是broker?
broker是一個消息傳輸的中間件,可以理解為一個郵箱。每當應用程序調用celery的異步任務的時候,會向broker傳遞消息,而后celery的worker將會取到消息,進行對於的程序執行。好吧,這個郵箱可以看成是一個消息隊列。其中Broker的中文意思是 經紀人 ,其實就是一開始說的 消息隊列 ,用來發送和接受消息。這個Broker有幾個方案可供選擇:RabbitMQ (消息隊列),Redis(緩存數據庫),數據庫(不推薦),等等
什么是backend?
通常程序發送的消息,發完就完了,可能都不知道對方時候接受了。為此,celery實現了一個backend,用於存儲這些消息以及celery執行的一些消息和結果。Backend是在Celery的配置中的一個配置項 CELERY_RESULT_BACKEND ,作用是保存結果和狀態,如果你需要跟蹤任務的狀態,那么需要設置這一項,可以是Database backend,也可以是Cache backend,具體可以參考這里: CELERY_RESULT_BACKEND 。
對於 brokers,官方推薦是 rabbitmq 和 redis,至於 backend,就是數據庫。為了簡單可以都使用 redis。
我自己演示使用RabbitMQ作為Broker,用MySQL作為backend。
來一張圖,這是在網上最多的一張Celery的圖了,確實描述的非常好
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
消息中間件
Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ
任務執行單元
Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中。
任務結果存儲
Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, redis,memcached, mongodb,SQLAlchemy, Django ORM,Apache Cassandra, IronCache 等。
這里我先不去看它是如何存儲的,就先選用redis來存儲任務執行結果。
因為涉及到消息中間件(在Celery幫助文檔中稱呼為中間人<broker>),為了更好的去理解文檔中的例子,可以安裝兩個中間件,一個是RabbitMQ,一個redis。
根據 Celery的幫助文檔 安裝和設置RabbitMQ, 要使用 Celery,需要創建一個 RabbitMQ 用戶、一個虛擬主機,並且允許這個用戶訪問這個虛擬主機。
$ sudo rabbitmqctl add_user forward password #創建了一個RabbitMQ用戶,用戶名為forward,密碼是password $ sudo rabbitmqctl add_vhost ubuntu #創建了一個虛擬主機,主機名為ubuntu # 設置權限。允許用戶forward訪問虛擬主機ubuntu,因為RabbitMQ通過主機名來與節點通信 $ sudo rabbitmqctl set_permissions -p ubuntu forward ".*" ".*" ".*" $ sudo rabbitmq-server # 啟用RabbitMQ服務器
結果如下,成功運行:
安裝Redis,它的安裝比較簡單
$ sudo pip install redis
然后進行簡單的配置,只需要設置 Redis 數據庫的位置:
BROKER_URL = 'redis://localhost:6379/0'
URL的格式為:
redis://:password@hostname:port/db_number
URL Scheme 后的所有字段都是可選的,並且默認為 localhost 的 6479 端口,使用數據庫 0。我的配置是:
redis://:password@ubuntu:6379/5
安裝Celery,我是用標准的Python工具pip安裝的,如下:
$ sudo pip install celery
Celery 是一個強大的 分布式任務隊列 的 異步處理框架,它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現異步任務(async task)和定時任務(crontab)。我們需要一個消息隊列來下發我們的任務。首先要有一個消息中間件,此處選擇rabbitmq (也可選擇 redis 或 Amazon Simple Queue Service(SQS)消息隊列服務)。推薦 選擇 rabbitmq 。使用RabbitMQ是官方特別推薦的方式,因此我也使用它作為我們的broker。它的架構組成如下圖:
可以看到,Celery 主要包含以下幾個模塊:
-
任務模塊 Task
包含異步任務和定時任務。其中,異步任務通常在業務邏輯中被觸發並發往任務隊列,而定時任務由 Celery Beat 進程周期性地將任務發往任務隊列。
-
消息中間件 Broker
Broker,即為任務調度隊列,接收任務生產者發來的消息(即任務),將任務存入隊列。Celery 本身不提供隊列服務,官方推薦使用 RabbitMQ 和 Redis 等。
-
任務執行單元 Worker
Worker 是執行任務的處理單元,它實時監控消息隊列,獲取隊列中調度的任務,並執行它。
-
任務結果存儲 Backend
Backend 用於存儲任務的執行結果,以供查詢。同消息中間件一樣,存儲也可使用 RabbitMQ, redis 和 MongoDB 等。
安裝
有了上面的概念,需要安裝這么幾個東西:RabbitMQ、SQLAlchemy、Celery
安裝rabbitmq
官網安裝方法:http://www.rabbitmq.com/install-windows.html
啟動管理插件:sbin/rabbitmq-plugins enable rabbitmq_management
啟動rabbitmq:sbin/rabbitmq-server -detached
rabbitmq已經啟動,可以打開頁面來看看
地址:http://localhost:15672/#/
用戶名密碼都是guest 。現在可以進來了,可以看到具體頁面。 關於rabbitmq的配置,網上很多 自己去搜以下就ok了。
消息中間件有了,現在該來代碼了,使用 celeby官網代碼。
剩下兩個都是Python的東西了,直接pip安裝就好了,對於從來沒有安裝過mysql驅動的同學可能需要安裝MySQL-python。安裝完成之后,啟動服務: $ rabbitmq-server[回車]。啟動后不要關閉窗口, 下面操作新建窗口(Tab)。
安裝celery
Celery可以通過pip自動安裝,如果你喜歡使用虛擬環境安裝可以先使用virtualenv創建一個自己的虛擬環境。反正我喜歡使用virtualenv建立自己的環境。
pip install celery
http://www.open-open.com/lib/view/open1441161168878.html
開始使用 Celery
使用celery包含三個方面:1. 定義任務函數。2. 運行celery服務。3. 客戶應用程序的調用。
創建一個文件 tasks.py
輸入下列代碼:
-
from celery import Celery
-
-
broker = 'redis://127.0.0.1:6379/5'
-
backend = 'redis://127.0.0.1:6379/6'
-
-
-
app = Celery( 'tasks', broker=broker, backend=backend)
-
-
-
def add(x, y):
-
return x + y
上述代碼導入了celery,然后創建了celery 實例 app,實例化的過程中指定了任務名tasks
(和文件名一致),傳入了broker和backend。然后創建了一個任務函數add
。下面啟動celery服務。在當前命令行終端運行(分別在 env1 和 env2 下執行):
celery -A tasks worker --loglevel=info
目錄結構 (celery -A tasks worker --loglevel=info 這條命令當前工作目錄必須和 tasks.py 所在的目錄相同。即 進入tasks.py所在目錄執行這條命令。)
使用 python 虛擬環境 模擬兩個不同的 主機。
此時會看見一對輸出。包括注冊的任務啦。
交互式客戶端程序調用方法
打開一個命令行,進入Python環境。
In [0]:from tasks import add In [1]: r = add.delay(2, 2) In [2]: add.delay(2, 2) Out[2]: <AsyncResult: 6fdb0629-4beb-4eb7-be47-f22be1395e1d> In [3]: r = add.delay(3, 3) In [4]: r.re r.ready r.result r.revoke In [4]: r.ready() Out[4]: True In [6]: r.result Out[6]: 6 In [7]: r.get() Out[7]: 6
調用 delay 函數即可啟動 add 這個任務。這個函數的效果是發送一條消息到broker中去,這個消息包括要執行的函數、函數的參數以及其他信息,具體的可以看 Celery官方文檔。這個時候 worker 會等待 broker 中的消息,一旦收到消息就會立刻執行消息。
啟動了一個任務之后,可以看到之前啟動的worker已經開始執行任務了。
現在是在python環境中調用的add函數,實際上通常在應用程序中調用這個方法。
注意:如果把返回值賦值給一個變量,那么原來的應用程序也會被阻塞,需要等待異步任務返回的結果。因此,實際使用中,不需要把結果賦值。
應用程序中調用方法
新建一個 main.py 文件 代碼如下:
-
from tasks import add
-
-
r = add.delay( 2, 2)
-
r = add.delay( 3, 3)
-
print r.ready()
-
print r.result
-
print r.get()
在celery命令行可以看見celery執行的日志。打開 backend的redis,也可以看見celery執行的信息。
使用 Redis Desktop Manager 查看 Redis 數據庫內容如圖:
使用配置文件
Celery 的配置比較多,可以在 官方配置文檔:http://docs.celeryproject.org/en/latest/userguide/configuration.html 查詢每個配置項的含義。
上述的使用是簡單的配置,下面介紹一個更健壯的方式來使用celery。首先創建一個python包,celery服務,姑且命名為proj。目錄文件如下:
-
-
☁ proj tree
-
.
-
├── __init__.py
-
├── celery.py # 創建 celery 實例
-
├── config.py # 配置文件
-
└── tasks.py # 任務函數
首先是 celery.py
-
#!/usr/bin/env python
-
# -*- coding:utf-8 -*-
-
-
from __future__ import absolute_import
-
from celery import Celery
-
-
app = Celery( 'proj', include=['proj.tasks'])
-
-
app.config_from_object( 'proj.config')
-
-
if __name__ == '__main__':
-
app.start()
這一次創建 app,並沒有直接指定 broker 和 backend。而是在配置文件中。
config.py
-
#!/usr/bin/env python
-
# -*- coding:utf-8 -*-
-
-
from __future__ import absolute_import
-
-
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
-
BROKER_URL = 'redis://127.0.0.1:6379/6'
剩下的就是tasks.py
-
#!/usr/bin/env python
-
# -*- coding:utf-8 -*-
-
-
from __future__ import absolute_import
-
from proj.celery import app
-
-
-
def add(x, y):
-
return x + y
使用方法也很簡單,在 proj 的同一級目錄執行 celery:
celery -A proj worker -l info
現在使用任務也很簡單,直接在客戶端代碼調用 proj.tasks 里的函數即可。
指定 路由 到的 隊列
Celery的官方文檔 。先看代碼(tasks.py):
-
from celery import Celery
-
-
app = Celery()
-
app.config_from_object( "celeryconfig")
-
-
-
def taskA(x,y):
-
return x + y
-
-
-
def taskB(x,y,z):
-
return x + y + z
-
-
-
def add(x,y):
-
return x + y
上面的tasks.py中,首先定義了一個Celery對象,然后用celeryconfig.py對celery對象進行設置,之后再分別定義了三個task,分別是taskA,taskB和add。接下來看一下celeryconfig.py 文件
-
from kombu import Exchange,Queue
-
-
BROKER_URL = "redis://10.32.105.227:6379/0" CELERY_RESULT_BACKEND = "redis://10.32.105.227:6379/0"
-
-
CELERY_QUEUES = (
-
Queue( "default",Exchange("default"),routing_key="default"),
-
Queue( "for_task_A",Exchange("for_task_A"),routing_key="task_a"),
-
Queue( "for_task_B",Exchange("for_task_B"),routing_key="task_a")
-
)
-
-
CELERY_ROUTES = {
-
'tasks.taskA':{"queue":"for_task_A","routing_key":"task_a"},
-
'tasks.taskB":{"queue":"for_task_B","routing_key:"task_b"}
-
}
在 celeryconfig.py 文件中,首先設置了brokel以及result_backend,接下來定義了三個Message Queue,並且指明了Queue對應的Exchange(當使用Redis作為broker時,Exchange的名字必須和Queue的名字一樣)以及routing_key的值。
現在在一台主機上面啟動一個worker,這個worker只執行for_task_A隊列中的消息,這是通過在啟動worker是使用-Q Queue_Name參數指定的。
celery -A tasks worker -l info -n worker.%h -Q for_task_A
然后到另一台主機上面執行taskA任務。首先 切換當前目錄到代碼所在的工程下,啟動python,執行下面代碼啟動taskA:
-
from tasks import *
-
-
task_A_re = taskA.delay( 100,200)
執行完上面的代碼之后,task_A消息會被立即發送到for_task_A隊列中去。此時已經啟動的worker.atsgxxx 會立即執行taskA任務。
重復上面的過程,在另外一台機器上啟動一個worker專門執行for_task_B中的任務。修改上一步驟的代碼,把 taskA 改成 taskB 並執行。
-
from tasks import *
-
-
task_B_re = taskB.delay( 100,200)
在上面的 tasks.py 文件中還定義了add任務,但是在celeryconfig.py文件中沒有指定這個任務route到那個Queue中去執行,此時執行add任務的時候,add會route到Celery默認的名字叫做celery的隊列中去。
因為這個消息沒有在celeryconfig.py文件中指定應該route到哪一個Queue中,所以會被發送到默認的名字為celery的Queue中,但是我們還沒有啟動worker執行celery中的任務。接下來我們在啟動一個worker執行celery隊列中的任務。
celery -A tasks worker -l info -n worker.%h -Q celery
然后再查看add的狀態,會發現狀態由PENDING變成了SUCCESS。
Scheduler ( 定時任務,周期性任務 )
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
一種常見的需求是每隔一段時間執行一個任務。
在celery中執行定時任務非常簡單,只需要設置celery對象的CELERYBEAT_SCHEDULE屬性即可。
配置如下
config.py
-
#!/usr/bin/env python
-
# -*- coding:utf-8 -*-
-
-
from __future__ import absolute_import
-
-
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
-
BROKER_URL = 'redis://127.0.0.1:6379/6'
-
-
CELERY_TIMEZONE = 'Asia/Shanghai'
-
-
from datetime import timedelta
-
-
CELERYBEAT_SCHEDULE = {
-
'add-every-30-seconds': {
-
'task': 'proj.tasks.add',
-
'schedule': timedelta(seconds=30),
-
'args': (16, 16)
-
},
-
}
注意配置文件需要指定時區。這段代碼表示每隔30秒執行 add 函數。一旦使用了 scheduler, 啟動 celery需要加上-B 參數。
celery -A proj worker -B -l info
設置多個定時任務
CELERY_TIMEZONE = 'UTC' CELERYBEAT_SCHEDULE = { 'taskA_schedule' : { 'task':'tasks.taskA', 'schedule':20, 'args':(5,6) }, 'taskB_scheduler' : { 'task':"tasks.taskB", "schedule":200, "args":(10,20,30) }, 'add_schedule': { "task":"tasks.add", "schedule":10, "args":(1,2) } }
定義3個定時任務,即每隔20s執行taskA任務,參數為(5,6),每隔200s執行taskB任務,參數為(10,20,30),每隔10s執行add任務,參數為(1,2).通過下列命令啟動一個定時任務: celery -A tasks beat。使用 beat 參數即可啟動定時任務。
crontab
計划任務當然也可以用crontab實現,celery也有crontab模式。修改 config.py
-
#!/usr/bin/env python
-
# -*- coding:utf-8 -*-
-
-
from __future__ import absolute_import
-
-
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
-
BROKER_URL = 'redis://127.0.0.1:6379/6'
-
-
CELERY_TIMEZONE = 'Asia/Shanghai'
-
-
from celery.schedules import crontab
-
-
CELERYBEAT_SCHEDULE = {
-
# Executes every Monday morning at 7:30 A.M
-
'add-every-monday-morning': {
-
'task': 'tasks.add',
-
'schedule': crontab(hour=7, minute=30, day_of_week=1),
-
'args': (16, 16),
-
},
-
}
scheduler的切分度很細,可以精確到秒。crontab模式就不用說了。
當然celery還有更高級的用法,比如 多個機器 使用,啟用多個 worker並發處理 等。
發送任務到隊列中
apply_async(args[, kwargs[, …]])、delay(*args, **kwargs) :http://docs.celeryproject.org/en/master/userguide/calling.html
send_task :http://docs.celeryproject.org/en/master/reference/celery.html#celery.Celery.send_task
from celery import Celery celery = Celery() celery.config_from_object('celeryconfig') send_task('tasks.test1', args=[hotplay_id, start_dt, end_dt], queue='hotplay_jy_queue')
Celery 監控 和 管理 以及 命令幫助
輸入 celery -h 可以看到 celery 的命令和幫助
更詳細的幫助可以看官方文檔:http://docs.celeryproject.org/en/master/userguide/monitoring.html
Celery 官網 示例
官網示例:http://docs.celeryproject.org/en/master/getting-started/first-steps-with-celery.html#first-steps
一個簡單例子
第一步
編寫簡單的純python函數
-
def say(x,y):
-
return x+y
-
-
if __name__ == '__main__':
-
say( 'Hello','World')
第二步
如果這個函數不是簡單的輸出兩個字符串相加,而是需要查詢數據庫或者進行復雜的處理。這種處理需要耗費大量的時間,還是這種方式執行會是多么糟糕的事情。為了演示這種現象,可以使用sleep函數來模擬高耗時任務。
-
import time
-
-
def say(x,y):
-
time.sleep( 5)
-
return x+y
-
-
if __name__ == '__main__':
-
say( 'Hello','World')
第三步
這時候我們可能會思考怎么使用多進程或者多線程去實現這種任務。對於多進程與多線程的不足這里不做討論。現在我們可以想想celery到底能不能解決這種問題。
-
import time
-
from celery import Celery
-
-
app = Celery( 'sample',broker='amqp://guest@localhost//')
-
-
-
def say(x,y):
-
time.sleep( 5)
-
return x+y
-
-
if __name__ == '__main__':
-
say( 'Hello','World')
現在來解釋一下新加入的幾行代碼,首先說明一下加入的新代碼完全不需要改變原來的代碼。導入celery模塊就不用解釋了,聲明一個celery實例app的參數需要解釋一下。
- 第一個參數是這個python文件的名字,注意到已經把.py去掉了。
- 第二個參數是用到的rabbitmq隊列。可以看到其使用的方式非常簡單,因為它是默認的消息隊列端口號都不需要指明。
第四步
現在我們已經使用了celery框架了,我們需要讓它找幾個工人幫我們干活。好現在就讓他們干活。
celery -A sample worker --loglevel=info
這條命令有些長,我來解釋一下吧。
- -A 代表的是Application的首字母,我們的應用就是在 sample 里面 定義的。
- worker 就是我們的工人了,他們會努力完成我們的工作的。
- -loglevel=info 指明了我們的工作后台執行情況,雖然工人們已經向你保證過一定努力完成任務。但是謹慎的你還是希望看看工作進展情況。
回車后你可以看到類似下面這樣一個輸出,如果是沒有紅色的輸出那么你應該是沒有遇到什么錯誤的。
第五步
現在我們的任務已經被加載到了內存中,我們不能再像之前那樣執行python sample.py來運行程序了。我們可以通過終端進入python然后通過下面的方式加載任務。輸入python語句。
-
from sample import say
-
say.delay( 'hello','world')
我們的函數會立即返回,不需要等待。就那么簡單celery解決了我們的問題。可以發現我們的say函數不是直接調用了,它被celery 的 task 裝飾器 修飾過了。所以多了一些屬性。目前我們只需要知道使用delay就行了。
簡單案例
確保你之前的RabbitMQ已經啟動。還是官網的那個例子,在任意目錄新建一個tasks.py的文件,內容如下:
-
from celery import Celery
-
-
app = Celery( 'tasks', broker='amqp://guest@localhost//')
-
-
-
def add(x, y):
-
return x + y
使用redis作為消息隊列
-
app = Celery( 'task', broker='redis://localhost:6379/4')
-
app.conf.update(
-
CELERY_TASK_SERIALIZER= 'json',
-
CELERY_ACCEPT_CONTENT=[ 'json'], # Ignore other content
-
CELERY_RESULT_SERIALIZER= 'json',
-
CELERYD_CONCURRENCY = 8
-
)
-
-
-
def add(x, y):
-
return x + y
在同級目錄執行:
$ celery -A tasks.app worker --loglevel=info
該命令的意思是啟動一個worker ( tasks文件中的app實例,默認實例名為app,-A 參數后也可直接加文件名,不需要 .app),把tasks中的任務(add(x,y))把任務放到隊列中。保持窗口打開,新開一個窗口進入交互模式,python或者ipython:
到此為止,你已經可以使用celery執行任務了,上面的python交互模式下簡單的調用了add任務,並傳遞 4,4 參數。
但此時有一個問題,你突然想知道這個任務的執行結果和狀態,到底完了沒有。因此就需要設置backend了
修改之前的tasks.py中的代碼為:
-
# coding:utf-8
-
import subprocess
-
from time import sleep
-
-
from celery import Celery
-
-
backend = 'db+mysql://root:@192.168.0.102/celery'
-
broker = 'amqp://guest@192.168.0.102:5672'
-
-
app = Celery( 'tasks', backend=backend, broker=broker)
-
-
-
-
def add(x, y):
-
sleep( 10)
-
return x + y
-
-
-
-
def hostname():
-
return subprocess.check_output(['hostname'])
除了添加backend之外,上面還添加了一個who的方法用來測試多服務器操作。修改完成之后,還按之前的方式啟動。
同樣進入python的交互模型:
-
-
-
-
>>>
-
-
測試多服務器
做完上面的測試之后,產生了一個疑惑,Celery叫做分布式任務管理,那它的分布式體現在哪?它的任務都是怎么執行的?在哪個機器上執行的?在當前服務器上的celery服務不關閉的情況下,按照同樣的方式在另外一台服務器上安裝Celery,並啟動:
$ celery -A tasks worker --loglevel=info
發現前一個服務器的Celery服務中輸出你剛啟動的服務器的hostname,前提是那台服務器連上了你的rabbitmq。然后再進入python交互模式:
-
-
>>>
-
-
-
-
>>>
看你輸入的內容已經觀察兩台服務器上你啟動celery服務的輸出。
Celery的使用技巧(Celery配置文件和發送任務)
在實際的項目中我們需要明確前后台的分界線,因此我們的celery編寫的時候就應該是分成前后台兩個部分編寫。在celery簡單入門中的總結部分我們也提出了另外一個問題,就是需要分離celery的配置文件。
第一步
編寫后台任務tasks.py腳本文件。在這個文件中我們不需要再聲明celery的實例,我們只需要導入其task裝飾器來注冊我們的任務即可。后台處理業務邏輯完全獨立於前台,這里只是簡單的hello world程序需要多少個參數只需要告訴前台就可以了,在實際項目中可能你需要的是后台執行發送一封郵件的任務或者進行復雜的數據庫查詢任務等。
-
import time
-
from celery.task import task
-
-
-
-
def say(x,y):
-
time.sleep( 5)
-
return x+y
第二步
有了那么完美的后台,我們的前台編寫肯定也輕松不少。到底簡單到什么地步呢,來看看前台的代碼吧!為了形象的表明其職能,我們將其命名為client.py腳本文件。
-
from celery import Celery
-
-
app = Celery()
-
-
app.config_from_object( 'celeryconfig')
-
app.send_task( "tasks.say",['hello','world'])
可以看到只需要簡單的幾步:1. 聲明一個celery實例。2. 加載配置文件。3. 發送任務。
第三步
繼續完成celery的配置。官方的介紹使用celeryconfig.py作為配置文件名,這樣可以防止與你現在的應用的配置同名。
CELERY_IMPORTS = ('tasks') CELERY_IGNORE_RESULT = False BROKER_HOST = '127.0.0.1' BROKER_PORT = 5672 BROKER_URL = 'amqp://' CELERY_RESULT_BACKEND = 'amqp'
可以看到我們指定了CELERY_RESULT_BACKEND為amqp默認的隊列!這樣我們就可以查看處理后的運行狀態了,后面將會介紹處理結果的查看。
第四步
啟動celery后台服務,這里是測試與學習celery的教程。在實際生產環境中,如果是通過這種方式啟動的后台進程是不行的。所謂后台進程通常是需要作為守護進程運行在后台的,在python的世界里總是有一些工具能夠滿足你的需要。這里可以使用supervisor作為進程管理工具。在后面的文章中將會介紹如何使用supervisor工具。
celery worker -l info --beat
注意現在運行worker的方式也與前面介紹的不一樣了,下面簡單介紹各個參數。
-l info 與--loglevel=info的作用是一樣的。
--beat 周期性的運行。即設置 心跳。
第五步
前台的運行就比較簡單了,與平時運行的python腳本一樣。python client.py。
現在前台的任務是運行了,可是任務是被寫死了。我們的任務大多數時候是動態的,為演示動態工作的情況我們可以使用終端發送任務。
在python終端導入celery模塊聲明實例然后加載配置文件,完成了這些步驟后就可以動態的發送任務並且查看任務狀態了。注意在配置文件celeryconfig.py中我們已經開啟了處理的結果回應模式了CELERY_IGNORE_RESULT = False並且在回應方式配置中我們設置了CELERY_RESULT_BACKEND = 'amqp'這樣我們就可以查看到處理的狀態了。
-
-
-
False
-
-
'PENDING'
-
-
TRUE
-
-
u'SUCCESS'
可以看到任務發送給celery后馬上查看任務狀態會處於PENDING狀態。稍等片刻就可以查看到SUCCESS狀態了。這種效果真棒不是嗎?在圖像處理中或者其他的一些搞耗時的任務中,我們只需要把任務發送給后台就不用去管它了。當我們需要結果的時候只需要查看一些是否成功完成了,如果返回成功我們就可以去后台數據庫去找處理后生成的數據了。
celery使用mangodb保存數據
第一步
安裝好mongodb了!就可以使用它了,首先讓我們修改celeryconfig.py文件,使celery知道我們有一個新成員要加入我們的項目,它就是mongodb配置的方式如下。
ELERY_IMPORTS = ('tasks') CELERY_IGNORE_RESULT = False BROKER_HOST = '127.0.0.1' BROKER_PORT = 5672 BROKER_URL = 'amqp://' #CELERY_RESULT_BACKEND = 'amqp' CELERY_RESULT_BACKEND = 'mongodb' CELERY_RESULT_BACKEND_SETTINGS = { "host":"127.0.0.1", "port":27017, "database":"jobs", "taskmeta_collection":"stock_taskmeta_collection", }
把#CELERY_RESULT_BACKEND = 'amp'注釋掉了,但是沒有刪除目的是對比前后的改變。為了使用mongodb我們有簡單了配置一下主機端口以及數據庫名字等。顯然你可以按照你喜歡的名字來配置它。
第二步
啟動 mongodb 數據庫:mongod。修改客戶端client.py讓他能夠動態的傳人我們的數據,非常簡單代碼如下。
-
import sys
-
from celery import Celery
-
-
app = Celery()
-
-
app.config_from_object( 'celeryconfig')
-
app.send_task( "tasks.say",[sys.argv[1],sys.argv[2]])
任務tasks.py不需要修改!
-
import time
-
from celery.task import task
-
-
-
-
def say(x,y):
-
time.sleep( 5)
-
return x+y
第三步
測試代碼,先啟動celery任務。
celery worker -l info --beat
再來啟動我們的客戶端,注意這次啟動的時候需要給兩個參數啦!
mongo
python client.py welcome landpack
等上5秒鍾,我們的后台處理完成后我們就可以去查看數據庫了。
第四步
查看mongodb,需要啟動一個mongodb客戶端,啟動非常簡單直接輸入 mongo 。然后是輸入一些簡單的mongo查詢語句。
最后查到的數據結果可能是你不想看到的,因為mongo已經進行了處理。想了解更多可以查看官方的文檔。