Python開發異步任務Celery的使用教程!


1. 生產者消費者設計模式


最常用的解耦方式之一,尋找中間人(broker)搭橋,保證兩個業務沒有直接關聯。
我們稱這一解耦方式為:生產者消費者設計模式

2.中間人broker


示例:此處演示Redis數據庫作為中間人broker
Celery需要一種解決消息的發送和接受的方式,我們把這種用來存儲消息的的中間裝置叫做message broker, 也可叫做消息中間人。
作為中間人,我們有幾種方案可選擇:


1.RabbitMQ


RabbitMQ是一個功能完備,穩定的並且易於安裝的broker. 它是生產環境中最優的選擇。
使用RabbitMQ的細節參照以下鏈接:http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#broker-rabbitmq
如果使用的是Ubuntu或者Debian發行版的Linux,可以直接通過命令安裝RabbitMQ:

sudo apt-get install rabbitmq-server

 

安裝完畢之后,RabbitMQ-server服務器就已經在后台運行。
如果用的並不是Ubuntu或Debian, 可以在以下網址:
http://www.rabbitmq.com/download.html
去查找自己所需要的版本軟件。


2.Redis


Redis也是一款功能完備的broker可選項,但是其更可能因意外中斷或者電源故障導致數據丟失的情況。
關於是由那個Redis作為Broker,可訪下面網址:http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis


1. Celery介紹


Celery介紹:
一個簡單、靈活且可靠、處理大量消息的分布式系統,可以在一台或者多台機器上運行。
單個 Celery 進程每分鍾可處理數以百萬計的任務。
通過消息進行通信,使用消息隊列(broker)在客戶端和消費者之間進行協調。
安裝Celery:

$ pip install -U Celery

 

Celery官方文檔
2. 創建Celery實例並加載配置


1.定義Celery包

 

2.創建Celery實例

 

 

celery_tasks.main.py

# celery啟動文件
from celery import Celery

# 為celery使用django配置文件進行設置
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xxx.settings")

# 創建celery實例
celery_app = Celery('celery_tasks')

 

3.加載Celery配置

 

 

celery_tasks.config.py

broker_url = "redis://127.0.0.1/14"
result_backend = "redis://127.0.0.1/15"

 

celery_tasks.main.py

# celery啟動文件
from celery import Celery


# 創建celery實例
celery_app = Celery('xxx')
# 加載celery配置
celery_app.config_from_object('celery_tasks.config')

 

3. 定義發送短信任務

 

1.注冊任務:celery_tasks.main.py

# celery啟動文件
from celery import Celery


# 創建celery實例
celery_app = Celery('celery_tasks')
# 加載celery配置
celery_app.config_from_object('celery_tasks.config')
# 自動注冊celery任務
celery_app.autodiscover_tasks(['celery_tasks.sms'])

 

2.定義任務:celery_tasks.sms.tasks.py
tasks

from apps.verifications import constants
from celery_tasks.main import celery_app
from libs.yuntongxun.sms import CCP
import logging
logger = logging.getLogger('django')

# bind:保證task對象會作為第一個參數自動傳入
# name:異步任務別名
# retry_backoff:異常自動重試的時間間隔 第n次(retry_backoff×2^(n-1))s
# max_retries:異常自動重試次數的上限
@celery_app.task(bind=True, name='send_sms_code', retry_backoff=3)
def send_sms_code(self, mobile, sms_code):
"""
發送短信異步任務
:param mobile: 手機號
:param sms_code: 短信驗證碼
:return: 成功0 或 失敗-1
"""
try:
send_ret = CCP().send_template_sms(mobile, [sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60], constants.SEND_SMS_TEMPLATE_ID)
except Exception as e:
logger.error(e)
# 有異常自動重試三次
raise self.retry(exc=e, max_retries=3)
if send_ret != 0:
# 有異常自動重試三次
raise self.retry(exc=Exception('發送短信失敗'), max_retries=3)

return send_ret

 

4. 啟動Celery服務

$ cd ~/xxx_project/xxx
$ celery -A celery_tasks.main worker -l info

 

-A指對應的應用程序, 其參數是項目中 Celery實例的位置。
worker指這里要啟動的worker。
-l指日志等級,比如info等級。

 

 

5. 調用發送短信任務

# 發送短信驗證碼
# CCP().send_template_sms(mobile,[sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60], constants.SEND_SMS_TEMPLATE_ID)
# Celery異步發送短信驗證碼

send_sms_code.delay(mobile, sms_code)

 

6. 補充celery worker的工作模式
默認是進程池方式,進程數以當前機器的CPU核數為參考,每個CPU開四個進程。
如何自己指定進程數:

celery worker -A proj --concurrency=4
如何改變進程池方式為協程方式:

celery worker -A proj --concurrency=1000 -P eventlet -c 1000

# 安裝eventlet模塊
$ pip install eventlet

# 啟用 Eventlet 池
$ celery -A celery_tasks.main worker -l info -P eventlet -c 1000

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM