摘要:
1.場景描述
2.flask介紹
3.celery介紹
4.項目偽代碼記錄
5.幾個備注點
內容:
1.場景描述
最近在優化用戶畫像的東西,要開發一個給文本打標簽的服務;我這邊需要提供一個HTTP的異步回調接口,具體來說就是客戶端請求我之后,我判斷請求體有沒有問題,如果沒有返回200狀態嗎;之后開始我的具體計算邏輯,客戶端不用關心這中間要消耗多長時間,當我計算完成之后通過調用另一個HTTP接口,把計算結果返還客戶端。
2.flask介紹
這個最近才接觸,所以不敢妄自總結。所以還是搬來官網文檔:http://docs.jinkan.org/docs/flask/
3.celery介紹
這個是我解決異步計算的分布式任務隊列,其中用到了redis(這是之前總結的一些文檔redis總結1,redis總結2)做消息中間件,具體的介紹還是引用官網:http://docs.jinkan.org/docs/celery/
4.項目偽代碼記錄
1 import json 2 import logging 3 from logging.handlers import TimedRotatingFileHandler 4 from time import sleep 5 6 from celery import Celery 7 from flask import Flask 8 from flask import request 9 import requests10 11 ##Flask configure 12 app = Flask(__name__) 13 14 ##celery configure 15 app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' 16 app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0' 17 celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) 18 celery.conf.update(app.config) 19 20
21 @celery.task
22 def my_background_task(args1,args2):
23 sleep(50)#這里是具體的處理邏輯,使用sleep代替
24 headers = {'content-type': 'application/json;charset=UTF-8'}
25 requests.post(url=reURL, data=data=json.dumps({'somedata':'xxxxx'}).encode('utf-8'),headers=headers)
26 27 @app.route('/get_tags') 28 def server_desc(): 29 param = json.loads(request.data) 30 if param: 31 return json.dump({'status':200}) 32 return json.dump({'status':400}) 33 34 35 if __name__ == '__main__': 36 app.run(host='0.0.0.0', port=4020)
5.幾個備注點
1.服務啟動順序:redis-server,celery,flask
2.celery版本,我用的3.1.24,之前用的4.x版本報錯not enough values to unpack,后來發現了這個issue:https://github.com/celery/celery/issues/4178
3.關於報錯
worker accepts messages serialized with pickle is a very bad idea!
If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).
解決方案:
export C_FORCE_ROOT="true"