Python之celery的簡介與使用


celery的簡介

  celery是一個基於分布式消息傳輸的異步任務隊列,它專注於實時處理,同時也支持任務調度。它的執行單元為任務(task),利用多線程,如Eventletgevent等,它們能被並發地執行在單個或多個職程服務器(worker servers)上。任務能異步執行(后台運行)或同步執行(等待任務完成)。
  在生產系統中,celery能夠一天處理上百萬的任務。它的完整架構圖如下:

celery架構圖

組件介紹:

  • Producer:調用了Celery提供的API、函數或者裝飾器而產生任務並交給任務隊列處理的都是任務生產者。
  • Celery Beat:任務調度器,Beat進程會讀取配置文件的內容,周期性地將配置中到期需要執行的任務發送給任務隊列。
  • Broker:消息代理,又稱消息中間件,接受任務生產者發送過來的任務消息,存進隊列再按序分發給任務消費方(通常是消息隊列或者數據庫)。Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper等作為消息代理,但適用於生產環境的只有RabbitMQ和Redis, 官方推薦 RabbitMQ。
  • Celery Worker:執行任務的消費者,通常會在多台服務器運行多個消費者來提高執行效率。
  • Result Backend:任務處理完后保存狀態信息和結果,以供查詢。Celery默認已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

  在客戶端和消費者之間傳輸數據需要序列化和反序列化。 Celery 支出的序列化方案如下所示:

celery序列化

准備工作

  在本文中,我們使用的celery的消息代理和后端存儲數據庫都使用redis,序列化和反序列化選擇msgpack。
  首先,我們需要安裝redis數據庫,具體的安裝方法可參考:http://www.runoob.com/redis/redis-install.html 。啟動redis,我們會看到如下界面:

redis啟動

在redis可視化軟件rdm中,我們看到的數據庫如下:

rdm

里面沒有任何數據。
  接着,為了能夠在python中使用celery,我們需要安裝以下模塊:

  • celery
  • redis
  • msgpack

這樣,我們的准備工作就完畢了。

一個簡單的例子

  我們創建的工程名稱為proj,結構如下圖:

  首先是主程序app_test.py,代碼如下:

from celery import Celery

app = Celery('proj', include=['proj.tasks'])
app.config_from_object('proj.celeryconfig')

if __name__ == '__main__':
    app.start()

分析一下這個程序:

  1. "from celery import Celery"是導入celery中的Celery類。
  2. app是Celery類的實例,創建的時候添加了proj.tasks這個模塊,也就是包含了proj/tasks.py這個文件。
  3. 把Celery配置存放進proj/celeryconfig.py文件,使用app.config_from_object加載配置。

  接着是任務函數文件tasks.py,代碼如下:

import time
from proj.app_test import app

@app.task
def add(x, y):
    time.sleep(1)
    return x + y

tasks.py只有一個任務函數add,讓它生效的最直接的方法就是添加app.task這個裝飾器。add的功能是先休眠一秒,然后返回兩個數的和。

  接着是配置文件celeryconfig.py,代碼如下:

BROKER_URL = 'redis://localhost' # 使用Redis作為消息代理

CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任務結果存在了Redis

CELERY_TASK_SERIALIZER = 'msgpack' # 任務序列化和反序列化使用msgpack方案

CELERY_RESULT_SERIALIZER = 'json' # 讀取任務結果一般性能要求不高,所以使用了可讀性更好的JSON

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間

CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的內容類型

  最后是調用文件diaoyong.py,代碼如下:

from proj.tasks import add
import time

t1 = time.time()

r1 = add.delay(1, 2)
r2 = add.delay(2, 4)
r3 = add.delay(3, 6)
r4 = add.delay(4, 8)
r5 = add.delay(5, 10)

r_list = [r1, r2, r3, r4, r5]
for r in r_list:
    while not r.ready():
        pass
    print(r.result)

t2 = time.time()

print('共耗時:%s' % str(t2-t1))

在這個程序中,我們調用了add函數五次,delay()用來調用任務。

例子的運行

  到此為止,我們已經理解了整個項目的結構與代碼。
  接下來,我們嘗試着把這個項目運行起來。
  首先,我們需要啟動redis。接着,切換至proj項目所在目錄,並運行命令:

celery -A proj.app_test worker -l info

界面如下:

celery啟動

然后,我們運行diaoyong.py,輸出的結果如下:

3
6
9
12
15
共耗時:1.1370790004730225

后台輸出如下:

celery后台運行

接着,我們看一下rdm中的數據:

rdm中的數據

至此,我們已經成功運行了這個項目。
  下面,我們嘗試着對這個運行結果做些分析。首先,我們一次性調用了五次add函數,但是運行的總時間才1秒多。這是celery異步運行的結果,如果是同步運行,那么,至少需要5秒多,因為每調用add函數一次,就會休眠一秒。這就是celery的強大之處。
  從后台輸出可以看到,程序會先將任務分發出來,每個任務一個ID,在后台統一處理,處理完后會有相應的結果返回,同時該結果也會儲存之后台數據庫。可以利用ready()判斷任務是否執行完畢,再用result獲取任務的結果。
  本文項目的github地址為:https://github.com/percent4/celery_example
  本次分享到此結束,感謝閱讀~
  注意:本人現已開通微信公眾號: Python爬蟲與算法(微信號為:easy_web_scrape), 歡迎大家關注哦~~

參考文獻

  1. Celery 初步:http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html#first-steps
  2. 使用Celery:https://zhuanlan.zhihu.com/p/22304455
  3. 異步神器celery:https://www.jianshu.com/p/9be4d8d30d8e


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM