06: django+celery+redis


目錄:

1.1 Celery介紹     返回頂部

  參考博客:http://www.cnblogs.com/alex3714/p/6351797.html

  參考博客:  https://www.jianshu.com/p/027538ffb8c1

  1、celery應用舉例

      1、Celery 是一個 基於python開發的分布式異步消息任務隊列,通過它可以輕松的實現任務的異步處理,
          如果你的業務場景中需要用到異步任務,就可以考慮使用celery

      2、你想對100台機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,
        你過一段時間只需要拿着這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情

      3、Celery 在執行任務時需要通過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 一般使用rabbitMQ or Redis

  2、Celery有以下優點

      1、簡單:一單熟悉了celery的工作流程后,配置和使用還是比較簡單的

      2、高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務

      3、快速:一個單進程的celery每分鍾可處理上百萬個任務

      4、靈活: 幾乎celery的各個組件都可以被擴展及自定制

  3、Celery基本工作流程圖

      

    user:用戶程序,用於告知celery去執行一個任務。
    broker: 存放任務(依賴RabbitMQ或Redis,進行存儲)
    worker:執行任務

  4、Celery 特性 

      1)方便查看定時任務的執行情況, 如 是否成功, 當前狀態, 執行任務花費的時間等.

      2)可選 多進程, Eventlet 和 Gevent 三種模型並發執行.

      3)Celery 是語言無關的.它提供了python 等常見語言的接口支持.

1.2 celery 組件     返回頂部

  1、Celery 扮演生產者和消費者的角色

      Celery Beat : 任務調度器. Beat 進程會讀取配置文件的內容, 周期性的將配置中到期需要執行的任務發送給任務隊列.

      Celery Worker : 執行任務的消費者, 通常會在多台服務器運行多個消費者, 提高運行效率.

      Broker : 消息代理, 隊列本身. 也稱為消息中間件. 接受任務生產者發送過來的任務消息, 存進隊列再按序分發給任務消費方(通常是消息隊列或者數據庫).

      Producer : 任務生產者. 調用 Celery API , 函數或者裝飾器, 而產生任務並交給任務隊列處理的都是任務生產者.

      Result Backend : 任務處理完成之后保存狀態信息和結果, 以供查詢.

  2、celery架構圖

      

  3. 產生任務的方式 

      1) 發布者發布任務(WEB 應用)

      2) 任務調度按期發布任務(定時任務)

  4. celery 依賴三個庫: 這三個庫, 都由 Celery 的開發者開發和維護.

      billiard : 基於 Python2.7 的 multisuprocessing 而改進的庫, 主要用來提高性能和穩定性.

      librabbitmp : C 語言實現的 Python 客戶端

      kombu : Celery 自帶的用來收發消息的庫, 提供了符合 Python 語言習慣的, 使用 AMQP 協議的高級借口.

1.3 安裝相關包 與 管理命令     返回頂部

  1、安裝相關軟件包

pip3 install Django==2.0.4
pip3 install celery==4.3.0
pip3 install redis==3.2.1
pip3 install  django-celery==3.1.17
pip3 install ipython==7.6.1 

find ./ -type f | xargs sed -i 's/\r$//g'  # 批量將當前文件夾下所有文件裝換成unix格式

  2、celery管理

celery multi start w1 w2 -A celery_pro -l info     #一次性啟動w1,w2兩個worker
celery -A celery_pro status                        #查看當前有哪些worker在運行
celery multi stop w1 w2 -A celery_pro              #停止w1,w2兩個worker

celery  multi start celery_test -A celery_test -l debug --autoscale=50,5        # celery並發數:最多50個,最少5個
ps auxww|grep "celery worker"|grep -v grep|awk '{print $2}'|xargs kill -9       # 關閉所有celery進程

  3、django_celery_beat管理

celery -A celery_test beat -l info -S django                   #啟動心跳任務
ps -ef | grep -E "celery -A celery_test beat" | grep -v grep| awk '{print $2}' | xargs kill -TERM &> /dev/null  # 殺死心跳所有進程

1.3 安裝相關包 與 管理命令     返回頂部

  1、在Django中使用celery介紹(celery無法再windows下運行)

    1)在Django中使用celery時,celery文件必須以tasks.py

    2)Django會自動到每個APP中找tasks.py文件

  2、創建一個Django項目celery_test,和app01

  3、在與項目同名的目錄下創建celery.py

# -*- coding: utf-8 -*-
from __future__ import absolute_import
import os
from celery import Celery

# 只要是想在自己的腳本中訪問Django的數據庫等文件就必須配置Django的環境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_test.settings')

# app名字
app = Celery('celery_test')

# 配置celery
class Config:
    BROKER_URL = 'redis://192.168.56.11:6379'
    CELERY_RESULT_BACKEND = 'redis://192.168.56.11:6379'

app.config_from_object(Config)
# 到各個APP里自動發現tasks.py文件
app.autodiscover_tasks()
celery.py

  4、在與項目同名的目錄下的 init.py 文件中添加下面內容

# -*- coding:utf8 -*-
from __future__ import absolute_import, unicode_literals

# 告訴Django在啟動時別忘了檢測我的celery文件
from .celery import app as celery_ap
__all__ = ['celery_app']
__init__.py

  5、創建app01/tasks.py文件

# -*- coding:utf8 -*-
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time

# 這里不再使用@app.task,而是用@shared_task,是指定可以在其他APP中也可以調用這個任務
@shared_task
def add(x,y):
    print('########## running add #####################')
    return x + y

@shared_task
def minus(x,y):
    time.sleep(30)
    print('########## running minus #####################')
    return x - y
app01/tasks.py

  6、將celery_test這個Django項目拷貝到centos7.3的django_test文件夾中

  7、保證啟動了redis-server

  8、啟動一個celery的worker

celery multi start w1 w2 -A celery_pro -l info     #一次性啟動w1,w2兩個worker
celery -A celery_pro status                        #查看當前有哪些worker在運行
celery multi stop w1 w2 -A celery_pro              #停止w1,w2兩個worker

celery  multi start celery_test -A celery_test -l debug --autoscale=50,5        # celery並發數:最多50個,最少5個
ps auxww|grep "celery worker"|grep -v grep|awk '{print $2}'|xargs kill -9       # 關閉所有celery進程

  9、測試celery

./manage.py shell
import tasks
t1 = tasks.minus.delay(5,3)
t2 = tasks.add.delay(3,4)
t1.get()
t2.get()
測試

1.5 在django中使用計划任務功能     返回頂部

  1、在Django中使用celery的定時任務需要安裝django-celery-beat

      pip3 install django-celery-beat

  2、在Django的settings中注冊django_celery_beat

INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)

  3、執行創建表命令

      python3 manage.py makemigrations

      python3 manage.py migrate

  4、在與項目同名的目錄下的celery.py中添加定時任務

# -*- coding: utf-8 -*-
from __future__ import absolute_import
import os
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
from kombu import Queue

# 只要是想在自己的腳本中訪問Django的數據庫等文件就必須配置Django的環境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_test.settings')

# app名字
app = Celery('celery_test')

# 配置celery
class Config:
    BROKER_URL = 'redis://192.168.56.11:6379'  # broker
    CELERY_RESULT_BACKEND = 'redis://192.168.56.11:6379'  # backend
    CELERY_ACCEPT_CONTENT = ['application/json']  # 指定任務接受的內容類型(序列化)
    CELERY_TASK_SERIALIZER = 'json'  # 任務的序列化方式
    CELERY_RESULT_SERIALIZER = 'json'  # 任務執行結果的序列化方式
    CELERY_TIMEZONE = 'Asia/Shanghai'  # 時區設置,計划任務需要,推薦 Asia/Shanghai
    ENABLE_UTC = False  # 不使用UTC時區
    CELERY_TASK_RESULT_EXPIRES = 60 * 60  # celery任務執行結果的超時時間
    CELERY_ANNOTATIONS = {'*': {'rate_limit': '500/s'}}
    # CELERYD_PREFETCH_MULTIPLIER = 10  # 每次取任務的數量
    CELERYD_MAX_TASKS_PER_CHILD = 16  # 每個worker執行了多少任務就會死掉,防止內存泄漏

app.config_from_object(Config)
app.autodiscover_tasks()

#crontab config
app.conf.update(
    CELERYBEAT_SCHEDULE = {
        # 每隔3s執行一次add函數
        'every-3-min-add': {
            'task': 'app01.tasks.add',
            'schedule': timedelta(seconds=10)
        },
        # 每天下午15:420執行
        'add-every-day-morning@14:50': {
            'task': 'app01.tasks.minus',
            'schedule': crontab(hour=19, minute=50, day_of_week='*/1'),
        },
    },
)

# kombu : Celery 自帶的用來收發消息的庫, 提供了符合 Python 語言習慣的, 使用 AMQP 協議的高級接口
Queue('transient', routing_key='transient',delivery_mode=1)
celery.py

  5、app01/tasks.py  

# -*- coding:utf8 -*-
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time

# 這里不再使用@app.task,而是用@shared_task,是指定可以在其他APP中也可以調用這個任務
@shared_task
def add():
    print('########## running add #####################')
    return 'add'

@shared_task
def minus():
    time.sleep(30)
    print('########## running minus #####################')
    return 'minus'
app01/tasks.py

  6、管理命令

'''1、celery管理 '''
celery  multi start celery_test -A celery_test -l debug --autoscale=50,5        # celery並發數:最多50個,最少5個
ps auxww|grep "celery worker"|grep -v grep|awk '{print $2}'|xargs kill -9       # 關閉所有celery進程

'''2、django-celery-beat心跳服務管理 '''
celery -A celery_test beat -l info -S django                   #啟動心跳任務
ps -ef | grep -E "celery -A celery_test beat" | grep -v grep| awk '{print $2}' | xargs kill -TERM &> /dev/null  # 殺死心跳所有進程

1.6 使用 Celery Once 來防止 Celery 重復執行同一個任務

  1、產生重復執行原因 

      1. 當我們設置一個ETA(預估執行時間)比visibility_timeout(超時時間)長的任務時,會出現重復執行問題

      2. 因為每過一次 visibility_timeout 時間,celery就會認為這個任務沒被worker執行成功,重新分配給其它worker再執行

  2、Celery Once解決方法

      1. Celery Once 也是利用 Redis 加鎖來實現,他的使用非常簡單,參照 GitHub 的使用很快就能夠用上。

      2. Celery Once 在 Task 類基礎上實現了 QueueOnce 類,該類提供了任務去重的功能

      3. 所以在使用時,我們自己實現的方法需要將 QueueOnce 設置為 base

@celery.task(base=QueueOnce, once={'keys': ['a']}) def slow_add(a, b): sleep(30) return a + b

      4. 后面的 once 參數表示,在遇到重復方法時的處理方式,默認 graceful 為 False,那樣 Celery 會拋出 AlreadyQueued 異常,手動設置為 True,則靜默處理。

      5. 可以手動設置任務的 key,可以指定 keys 參數。

  3、celery once使用

      參考官方:https://github.com/cameronmaske/celery-once

#! /usr/bin/env python
# -*- coding: utf-8 -*-
'''第一步: 安裝'''
pip install -U celery_once

'''第二步: 增加配置'''
from celery import Celery
from celery_once import QueueOnce
from time import sleep

celery = Celery('tasks', broker='amqp://guest@localhost//')
celery.conf.ONCE = {
  'backend': 'celery_once.backends.Redis',
  'settings': {
    'url': 'redis://localhost:6379/0',
    'default_timeout': 60 * 60
  }
}

'''第三步: 修改 delay 方法'''
example.delay(10)
# 修改為
result = example.apply_async(args=(10))

'''第四步: 修改 task 參數'''
@celery.task(base=QueueOnce, once={'graceful': True, keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

# 參考官方:https://github.com/cameronmaske/celery-once
celery once配置使用方法

1.7 redis會丟失消息 RabbitMQ不會丟失消息的原因 

  1、redis丟失消息的原因

      1. 用 Redis 作 broker 的話,任務會存在內存里面,如果 celery 進程要結束了,就會在臨死之前把隊列存進 Redis,下次啟動時再從 Redis 讀取。

      2. 但是如果可見性超時時間過長在斷電或者強制終止職程(Worker)的情況會“丟失“重新分配的任務。

      3. 比如當 celery 被 kill -9 了,任務將無法存進 Redis,內存中的任務會丟失,或者任務太多導致celery出現異常。

  2、RabbitMQ如何保證可靠消費

      Redis: 沒有相應的機制保證消息的消費,當消費者消費失敗的時候,消息體丟失,需要手動處理

      RabbitMQ: 具有消息消費確認,即使消費者消費失敗,也會自動使消息體返回原隊列,同時可全程持久化,保證消息體被正確消費

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM