Python Celery隊列


Celery隊列簡介:

Celery 是一個 基於python開發的分布式異步消息任務隊列,通過它可以輕松的實現任務的異步處理, 如果你的業務場景中需要用到異步任務,就可以考慮使用celery.

使用場景:

1.你想對100台機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿着這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情。

2.你想做一個定時任務,比如每天檢測一下你們所有客戶的資料,如果發現今天 是客戶的生日,就給他發個短信祝福

Celery原理:

Celery 在執行任務時需要通過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 一般使用rabbitMQ or Redis 或者是數據庫來存放消息的中間結果

Celery優點:

  1. 簡單:一單熟悉了celery的工作流程后,配置和使用還是比較簡單的
  2. 高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務
  3. 快速:一個單進程的celery每分鍾可處理上百萬個任務
  4. 靈活: 幾乎celery的各個組件都可以被擴展及自定制

Celery缺點:

    1.目前只能在Linux系統上有較好的支持

Celery工作流程圖:

  

  在傳統的web應用中,Django的web頁面通過url的映射到view,view再執行方法,如果方法需要調用大量的腳本,執行大量的任務,頁面就會阻塞,如果在項目中使用Celery隊列.首先用戶的任務會被celery放到broker中進行中轉,然后將任務分為一個個的task來執行,由於celery是異步機制,所以會直接給用戶返回task_id,頁面拿到task_id就可以執行后續的操作,比如查看任務進度,暫停任務,而無需等待所有任務全部執行完畢,才能看到頁面

Celery的安裝與使用

1.安裝:

  1.在linux(ubuntu)系統上首先安裝Celery隊列

    pip3 install Celery

       2.在linux安裝redis

    sudo apt-get install redis-server

       3.在linux上安裝redis-celery中間件

    pip3 install -U "celery[redis]"

       4.啟動redis

             sudo /etc/init.d/redis-server start

2.創建並執行一個簡單的task

   命名為tasks.py

 1 from celery import Celery
 2  
 3 app = Celery('tasks',
 4              broker='redis://localhost',
 5              backend='redis://localhost')
 6  
 7 @app.task
 8 def add(x,y):
 9     print("running...",x,y)
10     return x+y
View Code

  啟動監聽並開始執行該服務

 celery -A tasks worker -l debug

  在開啟一個終端進行測試任務

    進入python環境

1 from tasks import add
2 t = add.delay(3,3) #此時worker會生成一個任務和任務id
3 t.get() #獲取任務執行的結果
4 t.get(propagate=False) #如果任務執行中出現異常,在client端不會異常退出
5 t.ready()#查看任務是否執行完畢
6 t.traceback #打印異常詳細信息
View Code

 

3.在項目中創建celery

在當前的目錄下創建文件夾celery_pro

 mkdir celery_pro

在此目錄下創建兩個文件

目錄結構:

1 celery_proj
2     /__init__.py
3     /celery.py
4     /tasks.py
View Code

 celery.py(定義了celery的一些元信息)

 1 rom __future__ import absolute_import, unicode_literals
 2 from celery import Celery
 3  
 4 app = Celery('proj',
 5              broker='redis://localhost',   #消息中間接收
 6              backend='redis://localhost', #消息結果存放 
 7              include=['proj.tasks'])          #執行任務的文件   
 8  
 9 # Optional configuration, see the application user guide.
10 app.conf.update(
11     result_expires=3600,
12 )
13  
14 if __name__ == '__main__':
15     app.start()
View Code

tasks.py (定義任務執行的具體邏輯和調用的具體方法)

 1 from __future__ import absolute_import, unicode_literals
 2 from .celery import app
 3 
 4 
 5 @app.task
 6 def add(x, y):
 7     return x + y
 8 
 9 
10 @app.task
11 def mul(x, y):
12     return x * y
13 
14 
15 @app.task
16 def xsum(numbers):
17     return sum(numbers)
View Code

啟動worker

 celery -A celery_pro worker -l debug

再另一個窗口打開python命令模式進行測試

1 from celery_pro import tasks
2 
3 t = tasks.add.delay(3,4)
4 t.get()
View Code

 Celery的分布式:多啟動worker就可以自動實現負載均衡,無需手動管理

 Celery永駐后台(開啟&重啟&關閉)

1 celery multi start w1 -A celery_pro -l info  #開啟后台celery任務
2 celery  multi restart w1 -A proj -l info #重啟該服務
3 celery multi stop w1 -A proj -l info #關閉該服務
View Code

 Celery定時任務

在celery_pro文件夾下創建periodic_tasks.py

目錄結構:

1  celery_proj
2      /__init__.py
3      /celery.py
4      /tasks.py
5      /periodic_tasks.py
View Code

文件內容如下:

 1 from __future__ import absolute_import, unicode_literals
 2 from .celery import app
 3 from celery.schedules import crontab
 4 
 5 
 6 @app.on_after_configure.connect
 7 def setup_periodic_tasks(sender, **kwargs):
 8     # Calls test('hello') every 10 seconds.
 9     sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
10 
11     # Calls test('world') every 30 seconds
12     sender.add_periodic_task(30.0, test.s('world'), expires=10)
13 
14     # Executes every Monday morning at 7:30 a.m.
15     sender.add_periodic_task(
16         crontab(hour=21, minute=42, day_of_week=5),
17         test.s('Happy Mondays!'),
18     )
19 
20 @app.task
21 def test(arg):
22     print(arg)
View Code

修改celery.py,加入periodic_task.py

 1 from __future__ import absolute_import, unicode_literals
 2 from celery import Celery
 3 
 4 app = Celery('proj',
 5              broker='redis://localhost',
 6              backend='redis://localhost',
 7              include=['celery_pro.tasks','celery_pro.periodic_tasks'])
 8 
 9 # Optional configuration, see the application user guide.
10 app.conf.update(
11     result_expires=3600,
12 )
13 
14 if __name__ == '__main__':
15     app.start()
16 ~                                                                                       
17 ~                      
View Code

在服務端啟動 celery -A celery_pro worker -l debug

在客戶端啟動 celery -A celery_pro.periodic_tasks beat -l debug

在服務端如果看到打印的hell ,world說明定時任務配置成功

上面是通過調用函數添加定時任務,也可以像寫配置文件 一樣的形式添加, 下面是每30s執行的任務

在celery.py中添加

1 app.conf.beat_schedule = {
2     'add-every-30-seconds': {
3         'task': 'cerely_pro.tasks.add', #執行的具體方法
4         'schedule': 5.5,  #每秒鍾執行
5         'args': (16, 16)   #執行的具體動作的參數
6     },
7 }
8 app.conf.timezone = 'UTC'
View Code

更多定制

上面的定時任務比較簡單,但如果你想要每周一三五的早上8點給你發郵件怎么辦呢?用crontab功能,跟linux自帶的crontab功能是一樣的,可以個性化定制任務執行時間

1 rom celery.schedules import crontab
2  
3 app.conf.beat_schedule = {
4     #在每周一早上7:30執行
5     'add-every-monday-morning': {
6         'task': 'celery_pro.tasks.add',
7         'schedule': crontab(hour=7, minute=30, day_of_week=1),
8         'args': (16, 16),
9     },
View Code

還有更多定時配置方式如下:

Example Meaning
crontab() Execute every minute.
crontab(minute=0, hour=0) Execute daily at midnight.
crontab(minute=0, hour='*/3') Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0,
hour='0,3,6,9,12,15,18,21')
Same as previous.
crontab(minute='*/15') Execute every 15 minutes.
crontab(day_of_week='sunday') Execute every minute (!) at Sundays.
crontab(minute='*',
hour='*', day_of_week='sun')
Same as previous.
crontab(minute='*/10',
hour='3,17,22', day_of_week='thu,fri')
Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays.
crontab(minute=0,hour='*/2,*/3') Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour='*/5') Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).
crontab(minute=0, hour='*/3,8-17') Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
crontab(0, 0,day_of_month='2') Execute on the second day of every month.
crontab(0, 0,
day_of_month='2-30/3')
Execute on every even numbered day.
crontab(0, 0,
day_of_month='1-7,15-21')
Execute on the first and third weeks of the month.
crontab(0, 0,day_of_month='11',
month_of_year='5')
Execute on the eleventh of May every year.
crontab(0, 0,
month_of_year='*/3')
Execute on the first month of every quarter.

 

 Celery+Django實現異步任務分發

1.在setting.py的文件同一級別創建celery.py

 1 from __future__ import absolute_import, unicode_literals
 2 import os
 3 from celery import Celery
 4 
 5 # 設置Django的環境變量
 6 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'PerfectCRM.settings')
 7 
 8 #設置app的默認處理方式,如果不設置默認是rabbitMQ
 9 app = Celery('proj',
10              broker='redis://localhost',
11              backend='redis://localhost'
12 )
13 
14 #配置前綴
15 app.config_from_object('django.conf:settings', namespace='CELERY')
16 
17 #自動掃描app下的tasks文件
18 app.autodiscover_tasks()
19 
20 
21 @app.task(bind=True)
22 def debug_task(self):
23     print('Request: {0!r}'.format(self.request))
24                                             
View Code

2.修改當前目錄下的__init__文件

1 from __future__ import absolute_import, unicode_literals
2  
3 #啟動時檢測celery文件
4 from .celery import app as celery_app
5  
6 __all__ = ['celery_app']
View Code

3.在app下新增tasks文件,寫要執行的任務

 1 from __future__ import absolute_import, unicode_literals
 2 from celery import shared_task
 3 
 4 
 5 @shared_task
 6 def add(x, y):
 7     return x + y
 8 
 9 
10 @shared_task
11 def mul(x, y):
12     return x * y
13 
14 
15 @shared_task
16 def xsum(numbers):
17     return sum(numbers)
18                             
View Code

在另一個app下新增tasks文件

1 from __future__ import absolute_import, unicode_literals
2 from celery import shared_task
3 import time,random
4 
5 @shared_task
6 def randnum(start, end):
7     time.sleep(3)
8     return random.ranint(start,end)
View Code

在app下的urls.py文件中增加映射

1 url(r'celery_call', views.celery_call),
2 url(r'celery_result', views.celery_result),
View Code

在views下增加處理邏輯

 1 from crm import tasks
 2 from celery.result import AsyncResult
 3 import random
 4 #計算結果
 5 def celery_call(request):
 6     randnum =random.randint(0,1000)
 7     t = tasks.add.delay(randnum,6)
 8     print('randum',randnum)
 9     return HttpResponse(t.id)
10 
11 #獲取結果
12 def celery_result(request):
13     task_id = request.GET.get('id')
14     res = AsyncResult(id=task_id)
15     if res.ready():
16         return HttpResponse(res.get())
17     else:
18         return HttpResponse(res.ready())
View Code

測試

首先啟動Django,從web端輸入url調用celery_call方法

例:http://192.168.17.133:9000/crm/celery_call,此方法會返回一個task_id(41177118-3647-4830-b8c8-7be76d9819d7)

帶着這個task_id 訪問http://192.168.17.133:9000/crm/celery_result?id=41177118-3647-4830-b8c8-7be76d9819d7如果可以看到結果說明配置成功

 Dnango+Celery實現定時任務

 1.安裝Django,Celery中間件

   pip3 install django-celery-beat

2.在Django的settings文件中,新增app,名稱如下

    INSTALLED_APPS = (

  .....,

  'django_celery_beat', #新增的app

)

3.輸入命令

python manage.py migrate #創建與Django有關定時計划任務的新表

4.通過celery beat開啟定時任務

celery -A PrefectCRM beat -l info -S django

5.啟動Django服務,進入admin配置頁面

python3 manager.py runserver 0.0.0.0:9000

並設置settings.py中的

ALLOW_HOSTS=['*']

6.可以在原有業務表的基礎之上看到新的三張表

 

 

 

最后配置計划任務表,在此表中將定時任務和執行的頻率相關聯

 

后記:經測試,每添加或修改一個任務,celery beat都需要重啟一次,要不然新的配置不會被celery beat進程讀到

 

 

 

 

 

 

 

 

 

 

 

 


 

'django_celery_beat


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM