Django異步
- 參考文獻:
https://blog.csdn.net/Taneost/article/details/105757575
- 參考文獻:
https://www.cnblogs.com/pyedu/p/12461819.html
- 參考文獻:
https://www.cnblogs.com/hard-working-Bert/p/14236125.html
1.Clelery
1.1 什么是Clelery
- Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統專注於實時處理的異步任務隊列同時也支持任務調度;(分布式,高可用)
- Celery 通過消息機制進行通信,通常使用中間人(Broker)作為客戶端和職程(Worker)調節。啟動一個任務,客戶端向消息隊列發送一條消息,然后中間人(Broker)將消息傳遞給一個職程(Worker),最后由職程(Worker)進行執行中間人(Broker)分配的任務。
1.2 Celery架構
- Celery 的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
1.3 消息中間件
- Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等
1.4 任務執行單元
- Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中。
1.5 任務結果存儲
- Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, redis等
1.6 使用場景
-
異步任務:將耗時操作任務提交給Celery去異步執行,比如發送短信/郵件、消息推送、音視頻處理等等。
-
定時任務:定時執行某件事情,比如每天數據統計。
1.7 Celery的安裝配置
-
pip install celery # 消息中間件:RabbitMQ/Redis app=Celery('任務名',backend='xxx',broker='xxx')-
1.8 工作原理
-
結合生產消費者模型使用。
-
1.9 組件介紹
-
-
Producer:調用了Celery提供的API、函數或者裝飾器而產生任務並交給任務隊列處理的都是任務生產者。
-
Celery Beat:任務調度器,Beat進程會讀取配置文件的內容,周期性地將配置中到期需要執行的任務發送給任務隊列。
-
Broker:消息代理,又稱消息中間件,接受任務生產者發送過來的任務消息,存進隊列再按序分發給任務消費方(通常是消息隊列或者數據庫)。Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper等作為消息代理,但適用於生產環境的只有RabbitMQ和Redis, 官方推薦 RabbitMQ。
-
Celery Worker:執行任務的消費者,通常會在多台服務器運行多個消費者來提高執行效率。
-
Result Backend:任務處理完后保存狀態信息和結果,以供查詢。Celery默認已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式
1.10 序列化和反序列化
2.簡單使用
2.1 異步任務
1.celery_task.py
import celery
import time
backend='redis://:password@127.0.0.1:6379/1'
broker='redis://:password@127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def send_email(name):
print("向%s發送郵件..."%name)
time.sleep(5)
print("向%s發送郵件完成"%name)
return "ok"
@cel.task
def send_sms(name):
print("向%s發送短信..."%name)
time.sleep(5)
print("向%s發送短信完成"%name)
return "ok"
2.produce_task.py
from celery_task import send_email
result = send_email.delay("yuan")
print(result.id)
result2 = send_email.delay("alex")
print(result2.id)
3.result.py結果
from celery.result import AsyncResult
from celery_task import cel
async_result=AsyncResult(id="8e16f7e9-c59f-453d-8aba-b97cb7f8d06e", app=cel)
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 將結果刪除
elif async_result.failed():
print('執行失敗')
elif async_result.status == 'PENDING':
print('任務等待中被執行')
elif async_result.status == 'RETRY':
print('任務異常后正在重試')
elif async_result.status == 'STARTED':
print('任務已經開始被執行')
4.celery 啟動
celery -A celery_task worker -l info -P eventlet
說明:如果不存在eventlet
,先手動安裝pip install eventlet
2.2 定時任務
修改produce_task.py中的代碼。
from celery_task import send_email
from datetime import datetime
# 方式一
# v1 = datetime(2020, 3, 11, 16, 19, 00)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = send_email.apply_async(args=["egon",], eta=v2)
# print(result.id)
# 方式二
ctime = datetime.now()
# 默認用utc時間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=30) # 設置當前時間為30秒后執行
task_time = utc_ctime + time_delay
# 使用apply_async並設定時間
result = send_email.apply_async(args=["egon"], eta=task_time)
print(result.id)
3.django中使用celery
3.1 調通django視圖
from django.shortcuts import render,HttpResponse
# Create your views here.
def index(request):
# 可放置異步功能模塊
return HttpResponse("...")
3.2 創建目錄結構
-
在項目的目錄下(與
manage.py
同級的目錄)創建目錄;mycelery/ ├── config.py ├── __init__.py ├── main.py └── sms/ # 一個功能呢過模塊 ├── __init__.py ├── tasks.py # 必須交tasks
3.3 編寫任務函數
# -*- coding: utf-8 -*-
'''
@Time : 2022/3/14 14:45
@Author : ziqingbaojian
@File : tasks.py
'''
# celery的任務必須寫在tasks.py的文件中,別的文件名稱不識別!!!
from mycelery.main import app
import time
import logging
log = logging.getLogger("django")
@app.task # name表示設置任務的名稱,如果不填寫,則默認使用函數名做為任務名
def send_sms(mobile):
"""發送短信"""
print("向手機號%s發送短信成功!"%mobile)
time.sleep(15)
return "send_sms OK"
@app.task # name表示設置任務的名稱,如果不填寫,則默認使用函數名做為任務名
def send_sms2(mobile):
print("向手機號%s發送短信成功!" % mobile)
time.sleep(5)
return "send_sms2 OK"
views.py
from django.shortcuts import render,HttpResponse
from mycelery.sms.tasks import send_sms,send_sms2
# Create your views here.
def index(request):
# 異步任務
# 1. 聲明一個和celery一模一樣的任務函數,但是我們可以導包來解決
send_sms.delay("110")
send_sms2.delay("119")
# send_sms.delay() # 如果調用的任務函數沒有參數,則不需要填寫任何內容
'''
按照邏輯應該停留在此處15分鍾。
'''
return HttpResponse("...")
main.py
# 主程序
import os
from celery import Celery
# 創建celery實例對象
app = Celery("sms")
# 把celery和django進行組合,識別和加載django的配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryPros.settings.dev')
# 通過app對象加載配置文件
app.config_from_object("mycelery.config")
# 加載任務
# 參數必須必須是一個列表,里面的每一個任務都是任務的路徑名稱
# app.autodiscover_tasks(["任務1","任務2"])
app.autodiscover_tasks(["mycelery.sms",])# 可以直接把相對應的路徑導入
# 啟動Celery的命令
# 強烈建議切換目錄到mycelery根目錄下啟動
# celery -A mycelery.main worker --loglevel=info
config.py
broker_url = 'redis://:password@127.0.0.1:6379/3'
result_backend = 'redis://:password@127.0.0.1:6379/4'
異步完成,結果直接返回。
3.4 另一種方式
1.在settings.py
同級目錄中,創建celery.py
;
import os
from celery import Celery
from django.conf import settings
# 設置celery的環境變量和django-celery的工作目錄
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangocelery.settings")
# 實例化celery應用,傳入服務器名稱
app = Celery("djangocelery")
# 加載celery配置
app.config_from_object("django.conf:settings")
# 如果在項目中,創建了task.py,那么celery就會沿着app去查找task.py來生成任務
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
2.添加配置文件
ROKER_URL = 'redis://:password@127.0.0.1:6379/14' # 任務容器地址,redis數據庫地址
CELERY_RESULT_BACKEND = 'redis://:password@127.0.0.1:6379/15' # 任務結束的地址
3.在settings.py
同級目錄中的__init__.py
中添加
from .celery import app as celery_app
__all__ = ['celery_app']
4.在app下新建tasks.py
from __future__ import absolute_import, unicode_literals
import time
from celery import shared_task
@shared_task
def adds():
for i in range(0, 10):
print(i)
time.sleep(1)
return 'finish'
5.編寫views
和url
from django.shortcuts import render,HttpResponse
from django.http.response import JsonResponse
# Create your views here.
from app01 import tasks
from celery.result import AsyncResult
def index(request):
""" 進入這個url的時候就觸發異步任務,並在session中記錄task_id """
res = tasks.adds.delay()
request.session['task_id'] = res.task_id
return JsonResponse({'status': 'successful', 'task_id': res.task_id})
def dasd(request):
""" 進入url就會去獲取session中的task_id,並檢測任務. 若任務還在進行就顯示頁面還在加載,若進行完成就顯示hahaha """
task_id = request.session.get('task_id')
if task_id and AsyncResult(task_id).state == 'PENDING': # 加載時的狀態為PENDING
return HttpResponse('頁面正在加載...')
return HttpResponse('hahaha')
urlpatterns = [
path('admin/', admin.site.urls),
re_path(r'^index/', views.index),
re_path(r'^dasd/', views.dasd),
]
繼續努力,終成大器!