1. Celery介紹
1.1 celery應用舉例
- Celery 是一個 基於python開發的分布式異步消息任務隊列,通過它可以輕松的實現任務的異步處理,如果你的業務場景中需要用到異步任務,就可以考慮使用celery;
- 你想對100台機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿着這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情;
- Celery 在執行任務時需要通過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 一般使用rabbitMQ or Redis。
1.2 Celery有以下優點
- 簡單:一單熟悉了celery的工作流程后,配置和使用還是比較簡單的;
- 高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務;
- 快速:一個單進程的celery每分鍾可處理上百萬個任務;
- 靈活: 幾乎celery的各個組件都可以被擴展及自定制。
1.3 Celery 特性
- 方便查看定時任務的執行情況, 如 是否成功, 當前狀態, 執行任務花費的時間等.
- 可選 多進程, Eventlet 和 Gevent 三種模型並發執行.
- Celery 是語言無關的.它提供了python 等常見語言的接口支持.
參考鏈接: https://www.cnblogs.com/xiaonq/p/11166235.html#i1
2. celery 組件
2.1 Celery 扮演生產者和消費者的角色
- Celery Beat : 任務調度器. Beat 進程會讀取配置文件的內容, 周期性的將配置中到期需要執行的任務發送給任務隊列.
- Celery Worker : 執行任務的消費者, 通常會在多台服務器運行多個消費者, 提高運行效率.
- Broker : 消息代理, 隊列本身. 也稱為消息中間件. 接受任務生產者發送過來的任務消息, 存進隊列再按序分發給任務消費方(通常是消息隊列或者數據庫).
- Producer : 任務生產者. 調用 Celery API , 函數或者裝飾器, 而產生任務並交給任務隊列處理的都是任務生產者.
- Result Backend : 任務處理完成之后保存狀態信息和結果, 以供查詢.
2.2 celery架構圖
2.3 產生任務的方式
- 發布者發布任務(WEB 應用)
- 任務調度按期發布任務(定時任務)
2.4 celery 依賴三個庫: 這三個庫, 都由 Celery 的開發者開發和維護.
billiard
: 基於 Python2.7 的 multisuprocessing 而改進的庫, 主要用來提高性能和穩定性.librabbitmp :
C 語言實現的 Python 客戶端kombu :
Celery 自帶的用來收發消息的庫, 提供了符合 Python 語言習慣的, 使用 AMQP 協議的高級接口.
參考鏈接: https://www.cnblogs.com/xiaonq/p/11166235.html#i2
3.celery配置與基本使用
3.1 安裝celery
pip install celery @
https://github.com/celery/celery/tarball/master
3.2 新建celery/main.py
配置celery
# celery_task/main.py
import os
from celery import Celery
# 定義celery實例, 需要的參數, 1, 實例名, 2, 任務發布位置, 3, 結果保存位置
app = Celery('mycelery',
broker='redis://127.0.0.1:6379/14', # 任務存放的地方
backend='redis://127.0.0.1:6379/15') # 結果存放的地方
@app.task
def add(x, y):
return x + y
4.測試celery
啟動celery命令
'''1.啟動celery'''
#1.1 單進程啟動celery
celery -A main worker -l INFO
#1.2 celery管理
celery multi start celery_test -A celery_test
-l debug --autoscale=50,5 # celery並發
數:最多50個,最少5個
ps auxww|grep "celery worker"|grep -v grep|awk
'{print $2}'|xargs kill -9 # 關閉所有
celery進程
5. 使用celery異步發送短信
5.1 在celery_task/mian.py
中添加發送短信函數
# celery項目中的所有導包地址, 都是以
CELERY_BASE_DIR為基准設定.
# 執行celery命令時, 也需要進入CELERY_BASE_DIR目錄執行.
CELERY_BASE_DIR =
os.path.dirname(os.path.abspath(__file__))
@app.task(bind=True)
def send_sms_code(self, mobile, datas):
sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../syl'))
# 在方法中導包
from libs.rl_sms import send_message
# time.sleep(5)
try:
# 用 res 接收發送結果, 成功是:0, 失敗是:-1
res = send_message(mobile, datas)
except Exception as e:
res = '-1'
if res == '-1':
# 如果發送結果是 -1 就重試.
self.retry(countdown=5, max_retries=3, exc=Exception('短信發送失敗'))
5.2 在verifications/views.py
中添加celery發送短信試圖函數
class SmsCodeView(APIView):
"""使用apiview的限流"""
# 1. 所有人可以訪問
permission_classes = (AllowAny,)
def post(self, request):
# 1. 獲取參數
phone = request.data.get('phone') # 手機號
image_code = request.data.get('image_code') # 圖片驗證碼
image_code_uuid = request.data.get('image_code_uuid') # 前端生成的uuid
# 2. 檢查參數
if not all([phone, image_code, image_code_uuid]):
return Response({"code": 999, "msg": "參數不全"})
if not re.match(r'^1[3456789]\d{9}$', phone):
return Response({"code": 999, "msg": "手機號碼不正確"})
# 3. 檢查是否發送
redis_client = get_redis_connection('img_code')
phone_exists = redis_client.get(phone)
if phone_exists:
return Response({"code": 999, "msg": "頻繁發送, 請稍后再試"})
# 驗證圖形驗證碼
redis_image_code = redis_client.get(image_code_uuid) # bytes
if redis_image_code:
# bytes 轉成 string
redis_image_code = redis_image_code.decode()
# 比較用戶提供的圖片內容是否和redis中保存的一致
if image_code.upper() != redis_image_code:
return Response({'code': 999, 'msg': '圖片驗證碼不正確'})
# 4. 發送
code = '%06d' % random.randint(0, 999999) # 隨機6位驗證碼
from syl.settings import BASE_DIR
sys.path.insert(0, os.path.join(BASE_DIR, '../celery_task'))
from main import send_sms_code # 必須這么寫, 從main中導包
send_sms_code.delay(phone, (code, "5"))
print(code)
# 5.使用 pipeline 批量操作
pl = redis_client.pipeline() # 實例化pipeline對象
pl.setex(phone, 60 * 5, code) # 存儲phone:code, 5分鍾有效期
pl.delete(image_code_uuid) # 從redis中刪除這個圖片驗證碼, 以防再次被使用
pl.execute()
# 6. 返回結果
return Response({"code": 0, "msg": "短信發送成功"})
5.3 添加路由
xurlpatterns = [
path('sms_codes/',
views.SmsCodeView.as_view()),
]
6.測試接口
- 接口URL
http://192.168.56.100:8888/user/sms_codes/
- 請求攜帶參數
{
"phone": 15303478492,
"image_code":"aed3", # 前端生成的 圖形驗證碼
"image_code_uuid":"de8edce2-fc9f-11ea-9325-005056c00008" # 前端生成的uuid
}