Celery 和 Redis 入門


Celery 是一個廣泛應用於網絡應用程序的任務處理系統。

它可以在以下情況下使用:

在請求響應周期中做網絡調用。服務器應當立即響應任何網絡請求。如果在請求響應周期內需要進行網絡調用,則應在周期外完成調用。例如當用戶在網站上注冊時,需要發送激活郵件。發送郵件是一種網絡調用,耗時2到3秒。用戶應該無需等待這2到3秒。因此,發送激活郵件應當在請求響應周期外完成,celery 就能實現這一點。

將一個由幾個獨立部分組成的大任務分成多個小任務。假設你想知道臉書用戶的時間流。臉書提供不同的端點來獲取不同的數據。譬如,一個端點用以獲取用戶時間流中的圖片,一個端點獲取用戶時間流中的博文,一個端點得到用戶的點贊信息等。如果你的函數需要和臉書的5個端點依此通信,每個網絡調用平均耗時2秒,你將需要10秒完成一次函數執行。但是,你可以把這項工作分為5個獨立的任務(你很快就會發現這很容易做到),並讓 celery 來處理這些任務。Celery 可以並行地與這5個端點通信,在2秒之內就能得到所有端點的響應。

簡單的 celery 例子

假設我們有一個函數,並傳給它一個網址列表。該函數需要獲取這些網址的響應。

沒有使用 celery

創建文件celery_blog.py

import requests import time def func(urls): start = time.time() for url in urls: resp = requests.get(url) print resp.status_code print "It took", time.time() - start, "seconds" if __name__ == "__main__": func(["http://oneapm.com", "http://jd.com", "https://taobao.com", "http://baidu.com", "http://news.oneapm.com"])

運行:

python celery_blog.py

輸出:

使用 celery

調用 celery 的程序中最重要的組成部分為 celery worker。

在 web 應用程序注冊的例子中,celery worker 用於發送郵件。

在臉書的例子中, celery worker 用於獲取不同的網址。

在我們的 celery_blog.py 例子中, celery worker 用於獲取 URL。
celery worker 和你的應用程序/腳本是不同的進程,彼此獨立運行。所以你的應用程序/腳本和 celery 需要一些方法來相互溝通。

應用程序代碼需要把任務放在 celery worker 可以取出並執行的位置。譬如,應用程序代碼將任務放在消息隊列中,celery worker 從消息隊列領取任務並執行任務。我們將使用 Redis 作為消息隊列。

請確認你已安裝 Redis,並可以運行redis-server

請確認你已安裝 celery。

修改文件 celery_blog.py,如下:

from celery import Celery app = Celery('celery_blog',broker='redis://localhost:6379/1') @app.task def fetch_url(url): resp = requests.get(url) print resp.status_code def func(urls): for url in urls: fetch_url.delay(url) if __name__ == "__main__": func(["http://oneapm.com", "http://jd.com", "https://taobao.com", "http://baidu.com", "http://news.oneapm.com"])

代碼解釋:我們需要一個 celery 實例來啟動程序,因此創建了一個名為 app 的 celery 實例。

在3個終端中啟動:

第一個終端,運行 redis-server

第二個終端,運行 celery worker -A celery_blog -l info -c 5 ,通過輸出可以看到 celery 成功運行。

第三個終端,運行腳本 python celery_blog.py

可以看到第二個終端輸出如下:

將 celery 代碼和配置保存在不同文件中

上面的例子中,我們只寫了一個 celery 任務。但您的項目可能涉及多個模塊,您可能希望在不同的模塊中有不同的任務。所以讓我們將 celery 配置移到單獨的文件中。

創建 celery_config.py

from celery import Celery app = Celery('celery_config', broker='redis://localhost:6379/0', include=['celery_blog'])

修改 celery_blog.py 代碼如下:

import requests from celery_config import app @app.task def fetch_url(url): resp = requests.get(url) print resp.status_code def func(urls): for url in urls: fetch_url.delay(url) if __name__ == "__main__": func(["http://oneapm.com", "http://jd.com", "https://taobao.com", "http://baidu.com", "http://news.oneapm.com"])

停掉之前的 celery worker ,運行:

celery worker -A celery_config -l info -c 5

打開 ipython ,運行如下命令:

In [1]: from celery_blog import func In [2]: func(["http://oneapm.com", "http://jd.com", "https://taobao.com", "http://baidu.com", "http://news.oneapm.com"])

輸出如下:

在不同文件中添加新的任務

您可以添加新的模塊,並在該模塊中定義一個任務。用以下內容創建一個模塊 celery_add.py

from celery_config import app @app.task def add(a, b): return a + b

改變 celery_config.py 包含新的模塊 celery_add.py,如下:

from celery import Celery app = Celery('celery_config', broker='redis://localhost:6379/0', include=['celery_blog', 'celery_add'])

在 ipython 輸入:

In [1]: from celery_add import add In [2]: add.delay(4, 5)

輸出如下:

在不同的機器上分開使用 Redis 和 celery

到目前為止,我們的腳本、celery worker 和 Redis 都運行在同一機器中。其實並無這種必要,這三者可以運行在不同機器上。

celery 任務涉及到網絡請求,因此,在網絡優化的機器上使用 celery worker 能提高任務運行速度。Redis 是一種內存數據庫,在內存優化的機器上運行效率更高。

在這個例子中,我將在本地系統運行腳本和 celery worker,在分開的服務器上運行 Redis。

修改 celery_config.py 為:

app = Celery('celery_config', broker='redis://192.168.118.148:6379/0', include=['celery_blog'])

現在我運行任何任務,腳本都將把他放在 Redis 運行的服務器(192.168.118.148)上面。

celery worker 也與 192.168.118.148 溝通,在這個 Redis 服務器上得到任務並執行它。

注意:您必須使用正在運行 redis-server 的服務器地址。我的服務器已停止Redis,所以你將無法連接到 Redis。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM