1. 生產者消費者設計模式
最常用的解耦方式之一,尋找中間人(broker)搭橋,保證兩個業務沒有直接關聯。
我們稱這一解耦方式為:生產者消費者設計模式
2.中間人broker
示例:此處演示Redis數據庫作為中間人broker
Celery需要一種解決消息的發送和接受的方式,我們把這種用來存儲消息的的中間裝置叫做message broker, 也可叫做消息中間人。
作為中間人,我們有幾種方案可選擇:
1.RabbitMQ
RabbitMQ是一個功能完備,穩定的並且易於安裝的broker. 它是生產環境中最優的選擇。
使用RabbitMQ的細節參照以下鏈接:http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#broker-rabbitmq
如果使用的是Ubuntu或者Debian發行版的Linux,可以直接通過命令安裝RabbitMQ:
sudo apt-get install rabbitmq-server
安裝完畢之后,RabbitMQ-server服務器就已經在后台運行。
如果用的並不是Ubuntu或Debian, 可以在以下網址:
http://www.rabbitmq.com/download.html
去查找自己所需要的版本軟件。
2.Redis
Redis也是一款功能完備的broker可選項,但是其更可能因意外中斷或者電源故障導致數據丟失的情況。
關於是由那個Redis作為Broker,可訪下面網址:http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis
1. Celery介紹
Celery介紹:
一個簡單、靈活且可靠、處理大量消息的分布式系統,可以在一台或者多台機器上運行。
單個 Celery 進程每分鍾可處理數以百萬計的任務。
通過消息進行通信,使用消息隊列(broker)在客戶端和消費者之間進行協調。
安裝Celery:
$ pip install -U Celery
Celery官方文檔
2. 創建Celery實例並加載配置
1.定義Celery包
2.創建Celery實例
celery_tasks.main.py
# celery啟動文件 from celery import Celery # 為celery使用django配置文件進行設置 import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xxx.settings") # 創建celery實例 celery_app = Celery('celery_tasks')
3.加載Celery配置
celery_tasks.config.py
broker_url = "redis://127.0.0.1/14" result_backend = "redis://127.0.0.1/15"
celery_tasks.main.py
# celery啟動文件 from celery import Celery # 創建celery實例 celery_app = Celery('xxx') # 加載celery配置 celery_app.config_from_object('celery_tasks.config')
3. 定義發送短信任務
1.注冊任務:celery_tasks.main.py
# celery啟動文件 from celery import Celery # 創建celery實例 celery_app = Celery('celery_tasks') # 加載celery配置 celery_app.config_from_object('celery_tasks.config') # 自動注冊celery任務 celery_app.autodiscover_tasks(['celery_tasks.sms'])
2.定義任務:celery_tasks.sms.tasks.py
tasks
from apps.verifications import constants from celery_tasks.main import celery_app from libs.yuntongxun.sms import CCP import logging logger = logging.getLogger('django') # bind:保證task對象會作為第一個參數自動傳入 # name:異步任務別名 # retry_backoff:異常自動重試的時間間隔 第n次(retry_backoff×2^(n-1))s # max_retries:異常自動重試次數的上限 @celery_app.task(bind=True, name='send_sms_code', retry_backoff=3) def send_sms_code(self, mobile, sms_code): """ 發送短信異步任務 :param mobile: 手機號 :param sms_code: 短信驗證碼 :return: 成功0 或 失敗-1 """ try: send_ret = CCP().send_template_sms(mobile, [sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60], constants.SEND_SMS_TEMPLATE_ID) except Exception as e: logger.error(e) # 有異常自動重試三次 raise self.retry(exc=e, max_retries=3) if send_ret != 0: # 有異常自動重試三次 raise self.retry(exc=Exception('發送短信失敗'), max_retries=3) return send_ret
4. 啟動Celery服務
$ cd ~/xxx_project/xxx
$ celery -A celery_tasks.main worker -l info
-A指對應的應用程序, 其參數是項目中 Celery實例的位置。
worker指這里要啟動的worker。
-l指日志等級,比如info等級。
5. 調用發送短信任務
# 發送短信驗證碼 # CCP().send_template_sms(mobile,[sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60], constants.SEND_SMS_TEMPLATE_ID) # Celery異步發送短信驗證碼 send_sms_code.delay(mobile, sms_code)
6. 補充celery worker的工作模式
默認是進程池方式,進程數以當前機器的CPU核數為參考,每個CPU開四個進程。
如何自己指定進程數:
celery worker -A proj --concurrency=4
如何改變進程池方式為協程方式:
celery worker -A proj --concurrency=1000 -P eventlet -c 1000
# 安裝eventlet模塊 $ pip install eventlet # 啟用 Eventlet 池 $ celery -A celery_tasks.main worker -l info -P eventlet -c 1000