在 Flask 中使用 Celery
后台運行任務的話題是有些復雜,因為圍繞這個話題會讓人產生困惑。為了簡單起見,在以前我所有的例子中,我都是在線程中執行后台任務,但是我一直注意到更具有擴展性以及具備生產解決方案的任務隊列像 Celery 應該可以替代線程中執行后台任務。
不斷有讀者問我關於 Celery 問題,以及怎樣在 Flask 應用中使用它,因此今天我將會向你們展示兩個例子,我希望能夠覆蓋大部分的應用需求。
什么是 Celery?
Celery 是一個異步任務隊列。你可以使用它在你的應用上下文之外執行任務。總的想法就是你的應用程序可能需要執行任何消耗資源的任務都可以交給任務隊列,讓你的應用程序自由和快速地響應客戶端請求。
使用 Celery 運行后台任務並不像在線程中這樣做那么簡單。但是好處多多,Celery 具有分布式架構,使你的應用易於擴展。一個 Celery 安裝有三個核心組件:
致行動派讀者
如果你是行動派,本文開頭的截圖勾起你的好奇心的話,那么可以直接到 Github repository 獲取本文用到的代碼。 README 文件將會給你快速和直接的方式去運行示例應用。
接着可以回到本文來了解工作機制!
Flask 和 Celery 一起工作
Flask 與 Celery 整合是十分簡單,不需要任何插件。一個 Flask 應用需要使用 Celery 的話只需要初始化 Celery 客戶端像這樣:
from flask import Flask
from celery import Celery
app = Flask(name)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
正如你所見,Celery 通過創建一個 Celery 類對象來初始化,傳入應用名稱以及消息代理的連接 URL,這個 URL 我把它放在 app.config 中的 CELERY_BROKER_URL 的鍵值。URL 告訴 Celery 代理服務在哪里運行。如果你運行的不是 Redis,或者代理服務運行在一個不同的機器上,相應地你需要改變 URL。
Celery 其它任何配置可以直接用 celery.conf.update() 通過 Flask 的配置直接傳遞。CELERY_RESULT_BACKEND 選項只有在你必須要 Celery 任務的存儲狀態和運行結果的時候才是必須的。展示的第一個示例是不需要這個功能的,但是第二個示例是需要的,因此最好從一開始就配置好。
任何你需要作為后台任務的函數需要用 celery.task 裝飾器裝飾。例如:
@celery.task
def my_background_task(arg1, arg2):
# some long running task here
return result
接着 Flask 應用能夠請求這個后台任務的執行,像這樣:
task = my_background_task.delay(10, 20)
delay() 方法是強大的 apply_async() 調用的快捷方式。這樣相當於使用 apply_async():
task = my_background_task.apply_async(args=[10, 20])
當使用 apply_async(),你可以給 Celery 后台任務如何執行的更詳細的說明。一個有用的選項就是要求任務在未來的某一時刻執行。例如,這個調用將安排任務運行在大約一分鍾后:
task = my_background_task.apply_async(args=[10, 20], countdown=60)
delay() 和 apply_async() 的返回值是一個表示任務的對象,這個對象可以用於獲取任務狀態。我將會在本文的后面展示如何獲取任務狀態等信息,但現在讓我們保持簡單些,不用擔心任務的執行結果。
更多可用的選項請參閱 Celery 文檔 。
簡單例子:異步發送郵件¶
我要舉的第一個示例是應用程序非常普通的需求:能夠發送郵件但是不阻塞主應用。
在這個例子中我會用到 Flask-Mail 擴展,我會假設你們熟悉這個擴展。
我用來說明的示例應用是一個只有一個輸入文本框的簡單表單。要求用戶在此文本框中輸入一個電子郵件地址,並在提交,服務器會發送一個測試電子郵件到這個郵件地址。表單中包含兩個提交按鈕,一個立即發送郵件,一個是一分鍾后發送郵件。表單的截圖在文章開始。
這里就是支持這個示例的 HTML 模板:
<html>
<head>
<title>Flask + Celery Examples</title>
</head>
<body>
<h1>Flask + Celery Examples</h1>
<h2>Example 1: Send Asynchronous Email</h2>
{% for message in get_flashed_messages() %}
<p style="color: red;">{{ message }}</p>
{% endfor %}
<form method="POST">
<p>Send test email to: <input type="text" name="email" value="{{ email }}"></p>
<input type="submit" name="submit" value="Send">
<input type="submit" name="submit" value="Send in 1 minute">
</form>
</body>
</html>
這里沒有什么特別的東西。只是一個普通的 HTML 表單,再加上 Flask 閃現消息。
Flask-Mail 擴展需要一些配置,尤其是電子郵件服務器發送郵件的時候會用到一些細節。為了簡單我使用我的 Gmail 賬號作為郵件服務器:
# Flask-Mail configuration
app.config['MAIL_SERVER'] = 'smtp.googlemail.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = os.environ.get('MAIL_USERNAME')
app.config['MAIL_PASSWORD'] = os.environ.get('MAIL_PASSWORD')
app.config['MAIL_DEFAULT_SENDER'] = 'flask@example.com'
注意為了避免我的賬號丟失的風險,我將其設置在系統的環境變量,這是我從應用中導入的。
有一個單一的路由來支持這個示例:
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'GET':
return render_template('index.html', email=session.get('email', ''))
email = request.form['email']
session['email'] = email
再次說明,這是一個很標准的 Flask 應用。由於這是一個非常簡單的表單,我決定在沒有擴展的幫助下處理它,因此我用 request.method 和 request.form 來完成所有的管理。我保存用戶在文本框中輸入的值在 session 中,這樣在頁面重新加載后就能記住它。
在這個函數中讓人有興趣的是發送郵件的時候是通過調用一個叫做 send_async_email 的 Celery 任務,該任務調用 delay() 或者 apply_async() 方法。
這個應用的最后一部分就是能夠完成作業的異步任務:
@celery.task
def send_async_email(msg):
"""Background task to send an email with Flask-Mail."""
with app.app_context():
mail.send(msg)
這個任務使用 celery.task 裝飾使得成為一個后台作業。這個函數唯一值得注意的就是 Flask-Mail 需要在應用的上下文中運行,因此需要在調用 send() 之前創建一個應用上下文。
重點注意在這個示例中從異步調用返回值並不保留,因此應用不能知道調用成功或者失敗。當你運行這個示例的時候,需要檢查 Celery worker 的輸出來排查發送郵件的問題。
復雜例子:顯示狀態更新和結果¶
上面的示例過於簡單,后台作業啟動然后應用忘記它。大部分 Celery 針對網頁開發的教程就到此為止,但是事實上許多應用程序有必要監控它的后台任務並且獲取運行結果。
我現在將要做的就是擴展上面的應用程序成為第二個示例,這個示例展示一個虛構的長時間運行的任務。用戶點擊按鈕啟動一個或者更多的長時間運行的任務,在瀏覽器上的頁面使用 ajax 輪詢服務器更新所有任務的狀態。每一個任務,頁面都會顯示一個圖形的狀態欄,進度條,一個狀態消息,並且當任務完成的時候,也會顯示任務的執行結果。示例的截圖在本文的最開始。
狀態更新的后台任務¶
讓我向你們展示我在第二個示例中使用的后台任務:
@celery.task(bind=True)
def long_task(self):
"""Background task that runs a long function with progress reports."""
verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
message = ''
total = random.randint(10, 50)
for i in range(total):
if not message or random.random() < 0.25:
message = '{0} {1} {2}...'.format(random.choice(verb),
random.choice(adjective),
random.choice(noun))
self.update_state(state='PROGRESS',
meta={'current': i, 'total': total,
'status': message})
time.sleep(1)
return {'current': 100, 'total': 100, 'status': 'Task completed!',
'result': 42}
對於這個任務,我在 Celery 裝飾器中添加了 bind=True 參數。這個參數告訴 Celery 發送一個 self 參數到我的函數,我能夠使用它(self)來記錄狀態更新。
因為這個任務真沒有干什么有用的事情,我決定使用隨機的動詞,形容詞和名詞組合的幽默狀態信息。你可以在代碼上看到我用來生成上述信息的毫無意義的列表。
self.update_state() 調用是 Celery 如何接受這些任務更新。有一些內置的狀態,比如 STARTED, SUCCESS 等等,但是 Celery 也支持自定義狀態。這里我使用一個叫做 PROGRESS 的自定義狀態。連同狀態,還有一個附件的元數據,該元數據是 Python 字典形式,包含目前和總的迭代數以及隨機生成的狀態消息。客戶端可以使用這些元素來顯示一個漂亮的進度條。每迭代一次休眠一秒,以模擬正在做一些工作。
當循環退出,一個 Python 字典作為函數結果返回。這個字典包含了更新迭代計數器,最后的狀態消息和幽默的結果。
上面的 long_task() 函數在一個 Celery worker 進程中運行。下面你能看到啟動這個后台作業的 Flask 應用路由:
@app.route('/longtask', methods=['POST'])
def longtask():
task = long_task.apply_async()
return jsonify({}), 202, {'Location': url_for('taskstatus',
task_id=task.id)}
正如你所見,客戶端需要發起一個 POST 請求到 /longtask 來掀開這些任務中的一個的序幕。服務器啟動任務,並且存儲返回值。對於響應我使用狀態碼 202,這個狀態碼通常是在 REST APIs 中使用用來表明一個請求正在進行中。我也添加了 Location 頭,值為一個客戶端用來獲取狀態信息的 URL。這個 URL 指向另一個叫做 taskstatus 的 Flask 路由,並且有 task.id 作為動態的要素。
從 Flask 應用中訪問任務狀態¶
上面提及到 taskstatus 路由負責報告有后台任務提供的狀態更新。這里就是這個路由的實現:
@app.route('/status/<task_id>')
def taskstatus(task_id):
task = long_task.AsyncResult(task_id)
if task.state == 'PENDING':
// job did not start yet
response = {
'state': task.state,
'current': 0,
'total': 1,
'status': 'Pending...'
}
elif task.state != 'FAILURE':
response = {
'state': task.state,
'current': task.info.get('current', 0),
'total': task.info.get('total', 1),
'status': task.info.get('status', '')
}
if 'result' in task.info:
response['result'] = task.info['result']
else:
# something went wrong in the background job
response = {
'state': task.state,
'current': 1,
'total': 1,
'status': str(task.info), # this is the exception raised
}
return jsonify(response)
這個路由生成一個 JSON 響應,該響應包含任務的狀態以及設置在 update_state() 調用中作為 meta 的參數的所有值,客戶端可以使用這些構建一個進度條。遺憾地是這個函數需要檢查一些條件,因此代碼有些長。為了能夠訪問任務的數據,我重新創建了任務對象,該對象是 AsyncResult 類的實例,使用了 URL 中給的任務 id。
第一個 if 代碼塊是當任務還沒有開始的時候(PENDING 狀態)。在這種情況下暫時沒有狀態信息,因此我人為地制造了些數據。接下來的 elif 代碼塊返回后台的任務的狀態信息。任務提供的信息可以通過訪問 task.info 獲得。如果數據中包含鍵 result ,這就意味着這是最終的結果並且任務已經結束,因此我把這些信息也加到響應中。最后的 else 代碼塊是任務執行失敗的情況,這種情況下 task.info 中會包含異常的信息。
不管你是否相信,服務器所有要做的事情已經完成了。剩下的部分就是需要客戶端需要實現的,在這里也就是用 JavaScript 腳本的網頁來實現。
客戶端的 Javascript¶
這一部分就不是本文的重點,如果你有興趣的話,可以自己研究研究。
對於圖形進度條我使用 nanobar.js,我從 CDN 上引用它。同樣還需要引入 jQuery,它能夠簡化 ajax 的調用。
<script src="//cdnjs.cloudflare.com/ajax/libs/nanobar/0.2.1/nanobar.min.js"></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script>
啟動連接后台作業的按鈕的 Javascript 處理程序如下:
function start_long_task() {
// add task status elements
div = $('<div class="progress"><div></div><div>0%</div><div>...</div><div> </div></div><hr>');
$('#progress').append(div);
// create a progress bar
var nanobar = new Nanobar({
bg: '#44f',
target: div[0].childNodes[0]
});
// send ajax POST request to start background job
$.ajax({
type: 'POST',
url: '/longtask',
success: function(data, status, request) {
status_url = request.getResponseHeader('Location');
update_progress(status_url, nanobar, div[0]);
},
error: function() {
alert('Unexpected error');
}
});
}
div 的代碼:
<div class="progress">
<div></div> <-- Progress bar
<div>0%</div> <-- Percentage
<div>...</div> <-- Status message
<div> </div> <-- Result
</div>
<hr>
最后 Javascript 的 update_progress 函數代碼如下:
function update_progress(status_url, nanobar, status_div) {
// send GET request to status URL
$.getJSON(status_url, function(data) {
// update UI
percent = parseInt(data['current'] * 100 / data['total']);
nanobar.go(percent);
$(status_div.childNodes[1]).text(percent + '%');
$(status_div.childNodes[2]).text(data['status']);
if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
if ('result' in data) {
// show result
$(status_div.childNodes[3]).text('Result: ' + data['result']);
}
else {
// something unexpected happened
$(status_div.childNodes[3]).text('Result: ' + data['state']);
}
}
else {
// rerun in 2 seconds
setTimeout(function() {
update_progress(status_url, nanobar, status_div);
}, 2000);
}
});
}
這一部分的代碼就不一一解釋了。
運行示例¶
首先下載代碼,代碼的位於 Github repository,接着執行以下的命令:
$ git clone https://github.com/miguelgrinberg/flask-celery-example.git
$ cd flask-celery-example
$ virtualenv venv
$ source venv/bin/activate
(venv) $ pip install -r requirements.txt
接着,啟動 redis,關於 redis 的安裝,啟動以及配置,請參閱 Redis 文檔。
最后,執行如下命令運行示例:
$ export MAIL_USERNAME=<your-gmail-username>
$ export MAIL_PASSWORD=<your-gmail-password>
$ source venv/bin/activate
(venv) $ celery worker -A app.celery --loglevel=info
運行你的 Flask 應用來感受 Flask 和 Celery 一起工作的快樂:
$ source venv/bin/activate
(venv) $ python app.py
</div>