介紹
之前部門開發一個項目我們需要實現一個定時任務用於收集每天DUBBO接口、域名以及TOMCAT(核心應用)的訪問量,這個后面的邏輯就是使用定時任務去ES接口抓取數據存儲在數據庫中然后前台進行展示。
點開以后的詳情
在這個項目中使用的定時任務是python-crontab這個東西,它很簡單但是使用起來有些不方便,雖然程序后來也沒有進行修改,但是還是想看看有沒有更好的定時任務框架,后來就發現了Celery這個項目。下面我們看看Celery的架構,讓大家有個整體認識:
下面先來認識一下它的一些組件以及這些組件或者叫做角色的是干什么的
Task:任務(Task)就是你要做的事情,例如一個注冊流程里面有很多任務,給用戶發驗證郵件就是一個任務,這種耗時的任務就可以交給Celery去處理,還有一種任務是定時任務,比如每天定時統計網站的注冊人數,這個也可以交給Celery周期性的處理。我們在tasks.py中寫的就是worker要可以執行的任務。
Broker: 在Celery中這個角色相當於數據結構中的隊列,介於生產者和消費者之間經紀人。例如一個Web系統中,生產者是主程序,它生產任務,將任務發送給 Broker,消費者是 Worker,是專門用於執行任務的后台服務。Celery本身不提供隊列服務,一般用Redis或者RabbitMQ來實現隊列服務。
Worker: Worker 就是那個一直在后台執行任務的人,也成為任務的消費者,它會實時地監控隊列中有沒有任務,如果有就立即取出來執行。
Beat: Beat 是一個定時任務調度器,它會根據配置定時將任務發送給 Broker,等待 Worker 來消費。
Backend: Backend 用於保存任務的執行結果,每個任務都有返回值,比如發送郵件的服務會告訴我們有沒有發送成功,這個結果就是存在Backend中,當然我們並不總是要關心任務的執行結果。
Exchanges:交換器,用於把不同消息放到不同的消息隊列中
Queues:消息隊列
為什么需要這兩個worker會監控特定的隊列同時也有一個默認的交換器,通常會有多個worker處理不同任務,那如何區分不同消息屬於哪個worker處理呢這就需要交換器和隊列。通常不需要指定隊列和交換器因為有一個自動路由功能,如果你需要配置更加復雜的路由就需要使用這兩個。默認的queue/exchange/binding的鍵是celery,exchange的類型是direct
安裝
我的環境是Python3.6
# 安裝celery pip install celery # 因為我這里用到redis做后端所以需要安裝redis,但是需要注意雖然我的celery版本是最新的,但是redis驅動你不能用最新的 # 否則任務執行會失敗 pip install redis==2.10.6 # flower組件不是必須的,它是用來對celery進行監控的 pip install flower
Celery入門
第一個任務
#!/usr/bin/env python # -*- coding: utf-8 -*- import time from celery import Celery # 第一個參數是當前模塊名稱 # celery -A mytasks worker --loglevel=info 通過這個運行的時候-A后面的參數要和模塊名稱一致,這樣就啟動了一個worker來執行任務 # broker是任務隊列放在哪里 backend是任務執行結果放在哪里 app = Celery("mytasks", broker="redis://172.16.48.171:6379/3", backend="redis://172.16.48.171:6379/3") # 這個裝飾器讓add變成一個異步任務 @app.task def add(x, y): return x + y
Celery在被使用之前一定要進行實例化,一個實例叫做一個Application也稱作一個app,就像上面定義的app = Celery()。一個實例化的app是線程安全的。注意上面的目錄結構,下面啟動一個worker。-A參數是指定celery對象的位置,也就是celery實例的py文件,默認它會使用celery.py,如果使用了其他名字比如我們這里就用了mytasks.py,那么你就要指定具體的這個文件名稱。下面我們啟動任務
celery -A Chatper01.mytasks worker --loglevel=info
下面看看如何調度任務也就是執行任務
在代碼中如何執行任務呢?
#!/usr/bin/env python # -*- coding: utf-8 -*- from Chatper01.mytasks import add task_id = add.delay(10, 4) # 立即返回一個任務ID,它將任務序列化后發送到你指定的broker try: while True: if task_id.ready(): # 如果任務結束這里返回True print("Result is: ", task_id.get()) break except Exception as err: print(err)
調用add.delay()函數后會返回一個AsyncReult的對象,通過這個對象可以獲取如下內容:
state | 返回任務狀態 |
task_id | 返回任務ID |
result | 返回任務執行結果,等同於調用get()方法 |
ready() | 判斷任務是否完成 |
info() | 獲取任務信息 |
wait(seconds) | 等待N秒后獲取結果 |
successful() | 判斷任務是否成功 |
對比命令執行和代碼執行的結果
命令行啟動任務后如何結束呢?Ctrl+C。無論是啟動的任務還是執行任務,只要是命令行啟動的都可以這樣來結束,官方文檔也是這樣說的。
調用一個任務一個任務其實就是發送一個消息到隊列中,worker收到消息就會進行處理,也就是真正執行消息對應的函數。Worker如果沒有確認這個消息被消費或者說確認那么這個消息將不會被移除
如何把任務代碼拆出來呢?
目的是為了單獨一個文件寫task,一個文件做celery的初始化工作。
初始化celery
#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import absolute_import import sys from celery import Celery # 這里多了一個 include參數,這個就是引入你的具體任務的py文件 app = Celery("Chatper02", broker="redis://172.16.48.171:6379/3", backend="redis://172.16.48.171:6379/3", include=["Chatper02.tasks"]) if __name__ == "__main__": try: app.start() finally: sys.exit()
在Celery初始化時候,最后一個參數這里用來include來實現載入task,這里要注意寫的是tasks.py文件的路徑。而且需要注意這里的第一個引用是使用了絕對引用在Python2中必須要這么寫,但是在Python3中絕對引用是默認設置可以不用寫。絕對路徑導入后你就可以在代碼里使用相對名稱。
任務文件
#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import absolute_import from Chatper02.celery import app @app.task def add(x, y): return x + y
這里就實現了配置和任務分離,啟動方式和之前一樣
如何做配置分離呢?
目的是單獨一個文件寫Celery的初始化配置信息,因為它的配置項還是不少的,雖然你可能用不了那么多因為默認設置基本夠用,但是通常來講結構上還是要做到相對規范。這里我還是先展示一下目錄結構吧
配置文件 config.py 它其實就是一個Python文件
#!/usr/bin/env python # -*- coding: utf-8 -*- """ http://docs.celeryproject.org/en/latest/userguide/configuration.html#cache-backend-settings https://blog.csdn.net/libing_thinking/article/details/78812472 """ # 如果密碼連接就是這樣的 'redis://:password@host:port/db' BROKER_URL = 'redis://172.16.48.171:6379/3' # 是否自動重連,默認是 True BROKER_CONNECTION_RETRY = True # 重連最大次數,默認是100 BROKER_CONNECTION_MAX_RETRIES = 100 CELERY_RESULT_BACKEND = 'redis://172.16.48.171:6379/3' # 導入task 如果不在這里就需要在 Celery(__name__, include=["Chatper03.tasks"]) CELERY_INCLUDE = "Chatper03.tasks" # 任務序列化方式 Default: "json" CELERY_TASK_SERIALIZER = 'json' # 結果序列化方式 Default: json CELERY_RESULT_SERIALIZER = 'json' # 結果過期時間 默認1天,單位秒 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 指定任務接受的內容類型 Default: {'json'} (set, list, or tuple). CELERY_ACCEPT_CONTENT = ['json'] # 設置時區 Default: "UTC". # CELERY_TIMEZONE = 'Asia/Shanghai' # 是否啟用UTC時間 CELERY_ENABLE_UTC = True
初始化文件 celery.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import absolute_import import sys from celery import Celery # 這里使用了 __name__ 來表示模塊文件名稱,可讀性好 app = Celery(__name__) # 加載配置 app.config_from_object("Chatper03.config") if __name__ == "__main__": try: app.start() finally: sys.exit()
還有另外一種寫法
#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import absolute_import import sys from celery import Celery import Chatper03.config # 這里使用了 __name__ 來表示模塊文件名稱,可讀性好 app = Celery(__name__) # 加載配置 # app.config_from_object("Chatper03.config") app.config_from_object(Chatper03.config) if __name__ == "__main__": try: app.start() finally: sys.exit()
這個celery.py也就是app啟動文件,這里通過app.config_from_object()載入配置,如果是通過真實的模塊加載就提前需要導入就像第二種寫法,如果是字符串形式就不需要導入就像第一種寫法。
配置參數說明:http://docs.celeryproject.org/en/latest/userguide/configuration.html#cache-backend-settings
任務文件
#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import absolute_import from Chatper03.celery import app @app.task def add(x, y): return x + y
啟動方式還是和之前一樣
為什么要把程序入口文件叫做celery.py呢?
看下面這張圖,例子1和例子2程序入口文件不同
我們啟動任務2
是不是發現有區別呢?
如果你的入口文件叫做celery.py那么你就可以不指定任務文件,因為它會先加載celery的配置,然后注冊任務。如果按照上例啟動Chatper01你就不能省略.mytasks 因為這里有配置。說白了就是它默認找的文件就是celery.py,通過這個來初始化Celery,初始化Celery靠的是配置信息,如果你使用了其他名稱就需要指定去哪里讀取配置信息。
如何后台啟動和關閉worker呢
celery multi start WORKER_NAME –A APP_NAME
啟動需要給worker起一個名字,因為同一個APP也就是任務可以啟動多個,啟動多個的意義就在於分布式。
停止
定時任務
Celery 3.x版本的寫法
入口文件celery.py
#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import absolute_import import sys from celery import Celery import Chatper04.config # 這里使用率 __name__ 來表示模塊文件名稱,可讀性好 app = Celery(__name__) # 加載配置 app.config_from_object(Chatper04.config) if __name__ == "__main__": try: app.start() finally: sys.exit()
配置文件config.py
#!/usr/bin/env python # -*- coding: utf-8 -*- """ http://docs.celeryproject.org/en/latest/userguide/configuration.html#cache-backend-settings """ # 如果密碼連接就是這樣的 'redis://:password@host:port/db' BROKER_URL = 'redis://172.16.48.171:6379/3' # 是否自動重連,默認是 True BROKER_CONNECTION_RETRY = True # 重連最大次數,默認是100 BROKER_CONNECTION_MAX_RETRIES = 100 CELERY_RESULT_BACKEND = 'redis://172.16.48.171:6379/3' # 導入task 如果不在這里就需要在 Celery(__name__, include=["Chatper03.tasks"]) CELERY_INCLUDE = "Chatper04.tasks" # 任務序列化方式 Default: "json" CELERY_TASK_SERIALIZER = 'json' # 結果序列化方式 Default: json CELERY_RESULT_SERIALIZER = 'json' # 結果過期時間 默認1天,單位秒 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 指定任務接受的內容類型 Default: {'json'} (set, list, or tuple). CELERY_ACCEPT_CONTENT = ['json'] # 設置時區 Default: "UTC". # CELERY_TIMEZONE = 'Asia/Beijing' # 是否啟用UTC時間 CELERY_ENABLE_UTC = True
任務文件tasks.py
在配置中CELERYBEAT_SCHEDULE定義了beat定時服務的屬性,當設置好了以后需要更新配置。
啟動worker
啟動beat服務,它用於定期去執行指定的任務
查看worker這邊的日志輸出
Celery 4.x版本的寫法
在4.x中也可以使用上面的寫法。下面唯一有變化的就是tasks.py文件
啟動方式和之前以后,只是升級到最新4.2.1版本后啟動界面有些變化
啟動beat服務的命令還是一樣的,這里要加上任務文件的名稱
Celery 4.x的另外一種寫法
執行work和beat的方法還是一樣的
task裝飾器的一些屬性
bind屬性
在程序里就可以通過.來獲取比如self.request.task等。當bind=True時,被調用的函數傳遞進去的第一個參數就是task實例也就是這里的self。
base屬性
定義基類,可以用詞來定義回調函數
調用成功會有這樣的動作
關於路由
producer發出調用請求(message包含所調用任務的相關信息)—>celery服務啟動時,會產生一個或多個交換機(exchanges),對應的交換機 接收請求message—>交換機根據message內容,將message分發到一個或多個符合條件的隊列(queue)—>每個隊列上都有一個或多個worker在監聽,在監聽到符合條件的message到達后,worker負責進行任務處理,任務處理完被確認后,隊列中的message將被刪除。
啟動worker時它會監聽在默認的隊列是celery,鍵是celery。我們的worker就監聽在這個默認的隊列celery上,生產者調用任務時由於也都使用默認值所以exchange根據key來路由消息,就把消息路由到其對應的隊里上,這樣在這里監聽的worker就會捕捉的該消息。我們修改一下之前的定時任務的tasks.py文件,其他不變,這些隊列和路由信息也可以直接寫到config.py中去。
分別打開2個終端運行worker
通過上面的截圖可以看到不同的worker監聽在不同的隊列中,下面啟動beat服務
觀察worker結果
在代碼中如何調用呢?這種場景是注冊任務,通過代碼執行來觸發執行具體任務,而不是通過定時任務的形式。tasks.py代碼我們只去掉定時任務部分
執行的程序
delay()方法是apply_sync()方法的別名,后者可以接受更多參數。
# countdown : 設置該任務等待一段時間再執行,單位為s; # eta : 定義任務的開始時間;eta=time.time()+10; # expires : 設置任務時間,任務在過期時間后還沒有執行則被丟棄; # retry : 如果任務失敗后, 是否重試;使用true或false,默認為true # shadow:重新指定任務的名字str,覆蓋其在日志中使用的任務名稱; # retry_policy : 重試策略. # max_retries : 最大重試次數, 默認為 3 次. # interval_start : 重試等待的時間間隔秒數, 默認為 0 , 表示直接重試不等待. # interval_step : 每次重試讓重試間隔增加的秒數, 可以是數字或浮點數, 默認為 0.2 # interval_max : 重試間隔最大的秒數, 即 通過 interval_step 增大到多少秒之后, 就不在增加了, 可以是數字或者浮點數, 默認為 0.2 . # routing_key:自定義路由鍵; # queue:指定發送到哪個隊列; # exchange:指定發送到哪個交換機; # priority:任務隊列的優先級,0-9之間; # serializer:任務序列化方法;通常不設置; # compression:壓縮方案,通常有zlib, bzip2 # headers:為任務添加額外的消息; # link:任務成功執行后的回調方法;是一個signature對象;可以用作關聯任務 task.apply_async((2,2), compression='zlib', serialize='json', queue='priority.high', routing_key='web.add', priority=0, exchange='web_exchange')