Python celery原理及運行流程


1. Celery介紹

1.1 celery應用舉例

  • Celery 是一個 基於python開發的分布式異步消息任務隊列,通過它可以輕松的實現任務的異步處理,如果你的業務場景中需要用到異步任務,就可以考慮使用celery;
  • 你想對100台機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿着這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情;
  • Celery 在執行任務時需要通過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 一般使用rabbitMQ or Redis。

1.2 Celery有以下優點

  • 簡單:一單熟悉了celery的工作流程后,配置和使用還是比較簡單的;
  • 高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務;
  • 快速:一個單進程的celery每分鍾可處理上百萬個任務;
  • 靈活: 幾乎celery的各個組件都可以被擴展及自定制。

1.3 Celery 特性

  • 方便查看定時任務的執行情況, 如 是否成功, 當前狀態, 執行任務花費的時間等.
  • 可選 多進程, Eventlet 和 Gevent 三種模型並發執行.
  • Celery 是語言無關的.它提供了python 等常見語言的接口支持.

參考鏈接: https://www.cnblogs.com/xiaonq/p/11166235.html#i1

2. celery 組件

2.1 Celery 扮演生產者和消費者的角色

  • Celery Beat : 任務調度器. Beat 進程會讀取配置文件的內容, 周期性的將配置中到期需要執行的任務發送給任務隊列.
  • Celery Worker : 執行任務的消費者, 通常會在多台服務器運行多個消費者, 提高運行效率.
  • Broker : 消息代理, 隊列本身. 也稱為消息中間件. 接受任務生產者發送過來的任務消息, 存進隊列再按序分發給任務消費方(通常是消息隊列或者數據庫).
  • Producer : 任務生產者. 調用 Celery API , 函數或者裝飾器, 而產生任務並交給任務隊列處理的都是任務生產者.
  • Result Backend : 任務處理完成之后保存狀態信息和結果, 以供查詢.

2.2 celery架構圖

2.3 產生任務的方式

  • 發布者發布任務(WEB 應用)
  • 任務調度按期發布任務(定時任務)

2.4 celery 依賴三個庫: 這三個庫, 都由 Celery 的開發者開發和維護.

  • billiard : 基於 Python2.7 的 multisuprocessing 而改進的庫, 主要用來提高性能和穩定性.
  • librabbitmp : C 語言實現的 Python 客戶端
  • kombu : Celery 自帶的用來收發消息的庫, 提供了符合 Python 語言習慣的, 使用 AMQP 協議的高級接口.

參考鏈接: https://www.cnblogs.com/xiaonq/p/11166235.html#i2

3.celery配置與基本使用

3.1 安裝celery

 pip install celery @ 
 https://github.com/celery/celery/tarball/master

3.2 新建celery/main.py配置celery

 # celery_task/main.py
 import os
 from celery import Celery

 # 定義celery實例, 需要的參數, 1, 實例名, 2, 任務發布位置, 3, 結果保存位置
 app = Celery('mycelery',

 broker='redis://127.0.0.1:6379/14',  # 任務存放的地方 

 backend='redis://127.0.0.1:6379/15')  # 結果存放的地方

 @app.task
 def add(x, y):
     return x + y

4.測試celery

啟動celery命令

 '''1.啟動celery'''
 #1.1 單進程啟動celery
 celery -A main worker -l INFO
 #1.2 celery管理
 celery  multi start celery_test -A celery_test 
 -l debug --autoscale=50,5        # celery並發
 數:最多50個,最少5個
 ps auxww|grep "celery worker"|grep -v grep|awk  
 '{print $2}'|xargs kill -9       # 關閉所有
 celery進程

5. 使用celery異步發送短信

5.1 在celery_task/mian.py中添加發送短信函數

 # celery項目中的所有導包地址, 都是以
 CELERY_BASE_DIR為基准設定.
 # 執行celery命令時, 也需要進入CELERY_BASE_DIR目錄執行.
 CELERY_BASE_DIR = 
 os.path.dirname(os.path.abspath(__file__))

 
 @app.task(bind=True)
 def send_sms_code(self, mobile, datas):
     sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../syl'))
     # 在方法中導包
     from libs.rl_sms import send_message
     # time.sleep(5)
     try:
         # 用 res 接收發送結果, 成功是:0, 失敗是:-1
         res = send_message(mobile, datas)
     except Exception as e:
         res = '-1'

     if res == '-1':
         # 如果發送結果是 -1  就重試.
         self.retry(countdown=5, max_retries=3, exc=Exception('短信發送失敗'))

5.2 在verifications/views.py中添加celery發送短信試圖函數

 class SmsCodeView(APIView):
     """使用apiview的限流"""
     # 1. 所有人可以訪問
     permission_classes = (AllowAny,)

     def post(self, request):
         # 1. 獲取參數
         phone = request.data.get('phone')  # 手機號
         image_code = request.data.get('image_code')  # 圖片驗證碼
         image_code_uuid = request.data.get('image_code_uuid')  # 前端生成的uuid

         # 2. 檢查參數
         if not all([phone, image_code, image_code_uuid]):
             return Response({"code": 999, "msg": "參數不全"})
         if not re.match(r'^1[3456789]\d{9}$', phone):
             return Response({"code": 999, "msg": "手機號碼不正確"})

         # 3. 檢查是否發送
         redis_client = get_redis_connection('img_code')
         phone_exists = redis_client.get(phone)
         if phone_exists:
             return Response({"code": 999, "msg": "頻繁發送, 請稍后再試"})

         # 驗證圖形驗證碼
         redis_image_code = redis_client.get(image_code_uuid)  # bytes
         if redis_image_code:
             # bytes 轉成 string
             redis_image_code = redis_image_code.decode()

         # 比較用戶提供的圖片內容是否和redis中保存的一致
         if image_code.upper() != redis_image_code:
             return Response({'code': 999, 'msg': '圖片驗證碼不正確'})

         # 4. 發送
         code = '%06d' % random.randint(0, 999999)  # 隨機6位驗證碼
 
         from syl.settings import BASE_DIR
         sys.path.insert(0, os.path.join(BASE_DIR, '../celery_task'))
         from main import send_sms_code  # 必須這么寫, 從main中導包

         send_sms_code.delay(phone, (code, "5"))
         print(code)

         # 5.使用 pipeline 批量操作
         pl = redis_client.pipeline()    # 實例化pipeline對象
         pl.setex(phone, 60 * 5, code)   # 存儲phone:code, 5分鍾有效期
         pl.delete(image_code_uuid)      # 從redis中刪除這個圖片驗證碼, 以防再次被使用
         pl.execute()

         # 6. 返回結果
         return Response({"code": 0, "msg": "短信發送成功"})

5.3 添加路由

  xurlpatterns = [    
       path('sms_codes/', 
  views.SmsCodeView.as_view()),
  ]

6.測試接口

  • 接口URL
    http://192.168.56.100:8888/user/sms_codes/
  • 請求攜帶參數
 {   
     "phone": 15303478492,    
     "image_code":"aed3",                                          # 前端生成的 圖形驗證碼                
     "image_code_uuid":"de8edce2-fc9f-11ea-9325-005056c00008"      # 前端生成的uuid                 
 }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM