一、Celery架構介紹
Celery是分布式異步任務框架,Celery的架構由三部分組成,消息中間件(message broker)、任務執行單元(worker)和 任務執行結果存儲(task result store)組成。
消息中間件
Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等
任務執行單元
Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中。
任務結果存儲
Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, redis等
"""
1)可以不依賴任何服務器,通過自身命令,啟動服務(內部支持socket)
2)celery服務為為其他項目服務提供異步解決任務需求的
注:會有兩個服務同時運行,一個是項目服務,一個是celery服務,項目服務將需要異步處理的任務交給celery服務,celery就會在需要時異步完成項目的需求
人是一個獨立運行的服務 | 醫院也是一個獨立運行的服務
正常情況下,人可以完成所有健康情況的動作,不需要醫院的參與;但當人生病時,就會被醫院接收,解決人生病問題
人生病的處理方案交給醫院來解決,所有人不生病時,醫院獨立運行,人生病時,醫院就來解決人生病的需求
"""
1、使用場景
異步執行:解決耗時任務,將耗時操作任務提交給Celery去異步執行,比如發送短信/郵件、消息推送、音視頻處理等等
延遲執行:解決延遲任務
定時執行:解決周期(周期)任務,比如每天數據統計
二、Celery簡單使用
pip install celery==5.1.2 # 安裝
1、使用
# 使用(舉例,假設放在s1.py文件上)
from celery import Celery
# broker = 'redis://:密碼@127.0.0.1:6379/1' # -有密碼的寫法
broker = 'redis://127.0.0.1:6379/1' # redis地址
backend = 'redis://127.0.0.1:6379/2' # redis地址
# 實例化得到celery對象,第一個參數是起個名字的意思
app = Celery(__name__, backend=backend, broker=broker)
# 使用裝飾器包裹任務,將任務管理起來
@app.task() # 寫一個任務函數(示例,實際情況下任務應該單獨創建文件)
def add(x, y):
return x + y
#############################################################
# 任務提交(在另外一個s2.py文件提交任務)
import s1
# 異步執行
# 第一步:提交(使用的任務名.apply_async(參數))
res = s1.add.apply_async(args=[2, 3])
print(res) # 會返回一個任務id號,唯一標識
#############################################################
# 第二步:讓worker執行-->結果存在redis,使用命令啟動
# 【非Windows】
celery worker -A s1文件名 -l info
# 【Windows】需要安裝一個模塊 pip3 install eventlet
celery worker -A 文件名 -l info -P eventlet # 5.X前的啟動命令
celery -A 文件名 worker -l info -P eventlet # 5.X的啟動命令
#############################################################
# 第三步:查看任務結果(放在任意文件查看,如s3.py)
from s1 import app
from celery.result import AsyncResult
id = '1b814481-08aa-484f-a1f9-3952a762df9a'
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任務失敗')
elif async.status == 'PENDING':
print('任務等待中被執行')
elif async.status == 'RETRY':
print('任務異常后正在重試')
elif async.status == 'STARTED':
print('任務已經開始被執行')
三、celery包解構
# 目錄結構
-celery_task # 包名
__init__.py
celery.py # app所在py文件
course_task.py # 任務
order_task.py # 任務
user_task.py # 任務
提交任務.py # 提交任務
查看結果.py # 查看結果
1、執行異步任務
"""首先是celery.py文件:存放celery的使用"""
from celery import Celery
# broker = 'redis://:密碼@127.0.0.1:6379/1' # -有密碼的寫法
broker = 'redis://127.0.0.1:6379/1' # redis地址
backend = 'redis://127.0.0.1:6379/2' # redis地址
"""
實例化得到celery對象,第一個參數是起個名字的意思,第二和第三個參數對應上面的redis地址,
第三個參數include是一個列表,放被管理的task的py文件
"""
app = Celery(__name__, backend=backend, broker=broker, include=[
'celery_task.user_task',
'celery_task.order_task',
])
##################################################################################
"""然后就是user_task.py這些任務文件,舉個例子,模擬一個發送短信驗證碼的任務"""
from .celery import app
@app.task() # 裝飾器管理任務
def send_sms(phone, code):
import time
time.sleep(3) # 模擬發送短信延遲
print('短信發送成功,手機號是:%s,驗證碼是:%s' % (phone, code))
return '短信發送成功'
##################################################################################
"""然后就是提交我們所寫的任務,放在提交任務.py文件中,然后運行該文件,再去啟動worker命令"""
from celery_task import user_task
# 提交一個短信任務
a = user_task.send_sms.apply_async(args=(137000, 4691)) # 使用位置傳參
# a = user_task.send_sms.apply_async(kwargs={'x': 137000, 'y': 4691}) 也可以使用關鍵字傳參
# a = user_task.send_sms.delay('137000','4691') 也可以寫delay這種方式,可以直接傳參
print(a) # 會以字符串形式返回一個任務id號,可以用來查看任務結果
###################################################################################
執行worker命令后,提交一次任務,worker就執行一次,如果沒有任務就會一直阻塞着,等待下一次任務執行。(執行worker記得把位置切到需要啟動的包的上一層文件夾里)
# 【非Windows】
celery worker -A 文件名 -l info
# 【Windows】需要安裝一個模塊 pip3 install eventlet
celery worker -A 文件名 -l info -P eventlet # 5.X前的啟動命令
celery -A 文件名 worker -l info -P eventlet # 5.X的啟動命令
# 文件名就是目錄結構的包名
###################################################################################
"""查看結果.py文件,用於查看任務執行的結果,下面基本就是固定代碼"""
from celery_task.celery import app # 導入celery文件下的app對象
from celery.result import AsyncResult
id = '1b814481-08aa-484f-a1f9-3952a762df9a' # 使用提交任務返回的字符串,來查看結果
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任務失敗')
elif async.status == 'PENDING':
print('任務等待中被執行')
elif async.status == 'RETRY':
print('任務異常后正在重試')
elif async.status == 'STARTED':
print('任務已經開始被執行')
2、執行延遲任務
拿上面user_task的短信任務為例,將短信任務延遲10秒之后執行,所以直接在 "提交任務.py"下寫
from celery_task import user_task # 導入celery_task包下的user_task短信任務文件
from datetime import datetime, timedelta # 導入關於時間的模塊
# utcnow當前utc時間
eta = datetime.utcnow() + timedelta(seconds=10) # 這句代碼的意思是 當前utc時間的10秒后
# 10s之后發送短信
res = user_task.send_sms.apply_async(args=(137000, 4691),eta=eta)
print(res)
3、執行定時任務
"""對定時任務進行配置,放在celery.py里"""
# 時區
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任務的定時配置
from datetime import timedelta
from celery.schedules import crontab # 這個模塊有各種指定任務時間的命令
app.conf.beat_schedule = {
'send_sms_every_3_seconds': { # 給配置起名字
'task': 'celery_task.user_task.send_sms', # 指定執行的任務
'schedule': timedelta(seconds=3), # 執行任務內容,例如這個是每三秒發送一次短信
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八點
'args': (137000, 4691), # 傳參數,如果任務沒有參數,就不用寫
}
}
"""
這時候worker啟動后並沒有執行,是因為我們之前執行任務是手動提交任務,為了能自動提交任務,我們需要啟動beat
再開一個終端,不要把啟動worker的終端頂掉了,然后輸入命令啟動beat
"""
celery beat -A celery_task -l # 5.X之前的命令
celery -A celery_task beat -l info # 5.X之后的命令
四、Django集成Celery
# 了解性質內容:django-celery,一個把django和celery集成起來的第三方模塊,但是第三方寫的包的版本,需要跟celery和django版本完全對應。略顯麻煩,所以我們不用
# 我們自己使用包結構集成到django中
# 第一步,把寫好的包,直接復制到項目根路徑
# 第二步,在視圖類/函數中
from celery_task.user_task import send_sms # 導入想要使用的功能
def test(request):
mobile = request.GET.get('mobile')
code = '9999'
res = send_sms.delay(mobile, code) # 同步發送假設3分支鍾,異步發送,直接就返回id了,后期通過id查詢發送是否成功
print(res)
return HttpResponse(res)
# 啟動worker,訪問http://127.0.0.1:8000/test/?mobile=1111,帶着參數,就訪問成功了,后台的終端就會顯示 短信發送成功,手機號是:1111,驗證碼是:9999
1、django集成celery實現定時任務
1-1、定時更新首頁輪播圖
我們設想這樣一個場景,一個網站的首頁可能會有 海報輪播圖,這個輪播圖通常很長時間都不變,如果用戶每次訪問首頁,網站就得去查詢一次數據庫,對數據庫的壓力非常大
這時候我們可以使用Redis緩存,以后用戶訪問首頁,只需要第一次訪問時從數據庫中獲取數據放入Redis中,之后的訪問都是直接從Redis中獲取數據,減少了數據庫查詢的次數
class BannerView(ViewSetMixin, ListAPIView):
queryset = models.Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
:settings.BANNER_COUNT]
serializer_class = serializer.BannerSerializer
def list(self, request, *args, **kwargs):
# 先去緩存中獲取,如果緩存有,直接返回,如果沒有,去數據庫查詢,放到緩存
# 先去緩存中獲取
banner_list = cache.get('banner_list_cache')
if not banner_list: # 去數據庫中獲取
# 沒有走緩存
print('查了數據庫')
res = super().list(request, *args, **kwargs)
banner_list = res.data # res是Response對象
# 放入到緩存中
cache.set('banner_list_cache', banner_list)
return Response(data=banner_list)
1-2、解決雙寫一致性問題
雖然用Redis緩存很方便,可以有效解決數據庫壓力,但這種方法也會伴隨着一定問題,比如說:雙寫一致性問題(redis緩存和mysql數據不同步),或者緩存穿透,緩存擊穿,緩存雪崩等問題,不過我們也有相應的解決辦法
# 雙寫一致性問題:是緩存數據庫更新的策略,數據庫的數據更新了,但是Redis緩存沒更新,確怎么解決
# 緩存更新策略
-先刪除緩存,在更新數據庫
-先更新數據庫。再更新緩存(可靠性高)
-定時更新(對實時性要求不高)
-可以設置為某個時間段,自動更新緩存
1-2-1、home_task.py
from home import serializer
from .celery import app
from apps.home import models
from django.conf import settings
from django.core.cache import cache
@app.task()
def update_banner():
# 從mysql中取出輪播圖數據
queryset = models.Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
:settings.BANNER_COUNT]
# 2 序列化
ser = serializer.BannerSerializer(instance=queryset, many=True)
# 3 獲取到字典,手動拼上前面的地址
banner_list = ser.data
for banner in banner_list:
banner['image'] = settings.BACKEND_URL % str(banner['image'])
# 4 放到緩存中
cache.set('banner_list_cache', banner_list)
return True
1-2-2、celery.py
from celery import Celery
# 由於celery和django 是獨立的兩個服務,要想在celery服務中使用django,必須加這兩句
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")
# 由於celery和django 是獨立的兩個服務,要想在celery服務中使用django,必須加這兩句
import os
# os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# include 是一個列表,放被管理的task 的py文件
app = Celery(__name__, backend=backend, broker=broker, include=[
'celery_task.gagaluansha',
'celery_task.user_task',
])
# 時區
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任務的定時配置
from datetime import timedelta
app.conf.beat_schedule = {
'update_banner_every_3_seconds': { # 給配置起名字
'task': 'celery_task.home_task.update_banner', # 指定執行的任務
'schedule': timedelta(seconds=3), # 執行任務內容,例如這個是每三秒發送一次短信
}
}
1-3-3、啟動djagno,啟動beat,啟動worker
python manage.py runserver
celery -A celery_task beat -l info
celery -A celery_task worker -l info -P eventlet
