一、使用celery的原因
分布式任務調度框架celery及其監控工具flower,Linux進程管理工具supervisor
項目痛點:
1、代碼上線及運維困難,新代碼上線必須保證系統中沒有正在運行的異步任務,等待任務結束期間無法保證系統不在接收新任務。(項目中進程多是以multiprocessing方式啟動)
2、重啟困難,重啟后不知道是否啟動成功,必須手動curl測試接口保證系統重啟成功,缺少重啟監控機制。
痛點解決:
1、celery解決中斷任務痛點,所有異步任務均由celery下發。可單獨重啟一個worker或所有worker。重啟worker時保證當前worker正在消費的任務重新回到隊列,等待處於工作狀態的worker消費。不同worker可運行不同版本的代碼。
2、supervisor解決重啟痛點,新架構中一個節點會啟多worker以及flower和后端服務,具有大量進程需要管理,手動管理已然不現實。supervisor可對啟動異常的進程自動重啟也可對異常退出的進程進行拉起,並且提供客戶端和web界面。
二、架構圖
三、調度框架celery
celery中的幾個概念
1、broker 消息傳輸中間件,可以簡單理解為隊列,支持RabbitMQ,Redis,SQS(某些博客說支持sqlalchemy,官網未找到,實驗也未成功)。celery對Redis Cluster類型的redis集群支持不是很好,目前正在尋找解決方案。
2、exchange 路由,可將特定任務路由到指定隊列。
3、worker 消費者。會在多節點啟多worker
4、task 異步任務。某些任務需要指定消費節點。所以觸發任務時需要顯式指定該任務的存放的隊列,task.apply_async(queue='q1')。未指定的將會放到default隊列,由三個節點競爭。
5、backend 結果存儲。可使用mq,redis,nosql、mysql等。存放任務執行的結果。
1、使用方案
一、異步任務
設置default,q1,q2,q3四個隊列,各節點會監聽各自的隊列,並且所有節點都監聽default隊列。
編寫異步任務和正常寫函數是一樣的,最后只需要對該函數使用裝飾器@celery.task將該任務注冊為異步任務。如果有多個裝飾器進行組合使用時,必須確保 task()
裝飾器被放置在首位:
@app.task
@decorator2
@decorator1
def add(x, y):
return x + y
觸發任務
簡單觸發時可使用 delay
,但是該方法無法指定存放的隊列,因此該任務會被放到默認隊列
task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
如果需要設置額外的行參數,必須用 apply_async
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x'}, queue='q1')
啟動worker
celery -A app.celery worker -l info -Q default,q1 --concurrency=10 -n node1-worker-1@%%h
二、beat任務(定時任務)
對於celery產生的定時任務如果放到一個隊列里,該任務被一個worker拿到后其他worker將獲取不到該任務。這樣會產生一個現象即該任務只在一個節點執行了,但業務上需要的是該任務在各個節點都執行。
對此現象的解決方案是各節點需要定制各節點的定時任務並放到各自的隊列里。對於任務a生成者會將其產生三份對下發到三個節點。對於任務b,node1並不需要執行,因此會產生兩份並將其下發到node2和node3上。
這並不意味同一個任務需要編寫三份,任務編寫完后只需要將其注冊各節點對應的配置里(需要自己實現)。
為不影響其他異步任務執行,beat將會由各節點單獨的worker進行消費。
產生beat任務
celery -A app.celery beat -l info
消費beat任務
celery -A app.celery worker -l info -Q node1-crontab --concurrency=10 -n node1-worker-crontab@%%h
這里的node1-crontab為新的隊列,專門存放node1節點需要消費的定時任務。
三、task是如何工作的
這里會說明為什么不同worker可以運行不同版本的代碼,甚至生產者和消費者之間也可以運行不同版本的代碼。
celery的任務是注冊在注冊表中,該表中注冊了任務名和任務類。說人話就是celery會在隊列中傳遞任務的模塊,例如proj模塊中有一個task.py,該文件中編寫了一個叫add的異步任務(函數),那么celery傳遞的就是proj.task.add,只要保證消費該任務的worker中有該模塊該文件該函數就行,worker並不關心該函數里是怎樣執行的,是否和生產者一致。
任務狀態
- PENDING 任務正在等待執行或未知。任何未知的任務 ID 都默認處於掛起狀態。
- STARTED 任務開始執行
- SUCCESS 任務執行成功
- FAILURE 任務執行失敗
- RETRY 任務處於重試狀態,這里指在task中捕獲到異常並顯式調用celery使其重試
- REVOKED 任務被撤銷
@celery.task(bind=True)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc)
2、celery監控工具flower
Flower是基於web的監控和管理celery工具
flower可以
- 用Celery事件實時監控,顯示任務的詳細信息,圖形化和統計
- 查看worker狀態和統計,查看當前正在運行的tasks
- Broker monitoring(中間人監控),查看所有Celery 隊列的統計,隊列長度圖
flower只需啟動在生產者端即可
截圖展示
四、進程管理工具supervisor
粗略估計在node1上會啟后端服務,celery worker三個,定時任務消費worker一個,celery beat一個,flower進程。這么多進程用手工一個個啟動肯定要花費大量時間,於是用supervisor管理這些進程。
supervisor會已啟動自己子進程的方式開啟進程,可以對異常退出的進程進行重啟操作。
supervisor可以分為三個部分
- supervisord 服務端,主要負責啟動與管理進程,響應客戶端的請求
- supervisorctl 客戶端,提供一個命令行來使用supervisord提供的服務
- web界面 用來查看與管理子進程
1、子進程配置
[program:worker]
command=celery -A app.celery worker -l info -Q default,q1 --concurrency=10 -n node1-worker-%(process_num)s@%%h ; 啟動命令
process_name=%(program_name)s-%(process_num)d ; 進程名
numprocs=3 ; 進程數量
directory=/Users/aaa/PycharmProjects/flask_test ; 工作路徑
;umask=022 ; umask for process (default None)
priority=999 ; 優先級。優先級低,最先啟動,關閉的時候最后關閉
autostart=true ; supervisor啟動后自動啟動
startsecs=1 ; 啟動多少秒后是running認為啟動成功
;startretries=3 ; 最大啟動重試次數 (default 3)
autorestart=true ; 子進程掛掉自動重啟 (def: unexpected)
;exitcodes=0 ; 'expected' exit codes used with autorestart (default 0)
stopsignal=TERM ; 進程停止信號,停止celery worker時使用TERM, (TERM, HUP, INT, QUIT, KILL, USR1, or USR2)
stopwaitsecs=30 ; 等待停止最大時間,超過此時間會強制kill (default 10)
stopasgroup=true ; 停掉子進程的子進程(保證不會出現孤兒進程)
;killasgroup=true ; kill進程及其子進程,直接發送KILL信號不會等待進程退出
;user=chrism ; 管理子進程的用戶
redirect_stderr=true ; redirect 日志 stderr to stdout
stdout_logfile=/Users/aaa/PycharmProjects/flask_test/log/node1/celery-worker-1.log ; 日志
stdout_logfile_maxbytes=50MB ; 單個日志文件最大大小 (default 50MB)
stdout_logfile_backups=20 ; 日志文件數量 (default 10)
;stdout_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0)
;stdout_events_enabled=false ; emit events on stdout writes (default false)
;stdout_syslog=false ; send stdout to syslog with process name (default false)
;stderr_logfile=/Users/aaa/PycharmProjects/flask_test/log/default/celery_err.log ; 錯誤日志
;stderr_logfile_maxbytes=10MB ; max # logfile bytes b4 rotation (default 50MB)
;stderr_logfile_backups=10 ; # of stderr logfile backups (0 means none, default 10)
;stderr_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0)
;stderr_events_enabled=false ; emit events on stderpidr writes (default false)
;stderr_syslog=false ; send stderr to syslog with process name (default false)
environment=PATH="/Users/aaa/anaconda3/envs/flask_test/bin" ; 環境變量,子進程間不共享
;serverurl=AUTO ; override serverurl computation (childutils)
supervisor配置文件放在supervisord.conf中
啟動supervisord
supervisord -c supervisord.conf
對於celery worker節點進程退出信號使用TERM,TERM信號會使worker進行熱關機,worker會將未消費完的任務放回到隊列。
發現丟任務的情況:
假設worker1正在消費任務3個,worker2正在消費任務4個。將worker1關機,3個任務會進入到worker2,再將work2關機后打開worker3,這時會發現少了兩個任務。
丟任務的解決方案:
方案一:啟動一個worker然后將其關機后未消費完的任務可以全部回到隊列,需要重啟時可以先將未有消費任務的worker進行重啟,然后再停掉正在消費的worker。或者只停掉一個worker。
方案二:supervisor進程組的概念,直接將進程組重啟
代碼更新后只需重啟子進程,不需要重啟supervisord
進程組配置
[group:celery-worker]
programs=worker ;上面實例三個進程會默認分配到名為worker的進程組,這里定義進程組會覆蓋默認的
priority=999 ; the relative start priority (default 999)
對進程組進行操作等同於對進程組下所有的進程操作
對於進程組操作在進程組名后需要加上冒號即 celery-worker:
2、進程數說明
問題:celery啟動命令中已經指定了 --concurrency=10
參數配置worker中開啟的進程數量,為什么在supervisord中還要指定 numprocs=3
進程數呢?
答:這兩個參數指定的進程數量是不同的意義。在celery中指定進程數即意味着單個worker中可開啟的最大進程數據量。在supervisord指定的進程數會直接開啟三個worker,相當將定義的cmd執行了三次。
supervisord中如果指定numprocs
的同時也需要指定 process_name=%(program_name)s-%(process_num)d
,原因在於如果多個進程使用相同的進程名會報錯,所以需要指定不同的進程名。program_name
為 [program:worker]
中定義的名字,即worker。process_num
為進程的序號,從1開始,注意它不是pid。
使用numprocs=3
創建的三個worker默認會被放到一個名為worker(在哪里定義)的進程組里,如果在后面定義一個新的進程組並將worker放進去則這三個worker會的默認進程組會被替換為新的進程組,同時新的進程組里也可以放一個在其它program里定義的進程。
[group:node1-celery-worker]
programs=worker,crontab-worker # crontab-worker為在其它program里定義的進程
priority=999
3、supervisorctl命令
supervisorctl start ${program} # 啟動進程
supervisorctl stop ${program} # 停止進程
supervisorctl restart ${program} # 重啟進程
supervisorctl status ${program} # 查看進程狀態
supervisorctl update # 重新載入配置文件
supervisorctl shutdown # 關閉supervisord服務
supervisorctl reload # 重啟supervisord服務
supervisorctl stop all # 停止所有進程
對於已經配置好的supervisor並不需要進行supervisord級別的重啟以及重新載入配置。代碼更新后只需重啟子進程即可加載最新代碼。
4、supervisor界面
將supervisorctl的命令可視化,可以直接點點點,此外還可以展示子進程的日志。實則感覺有supervisorctl就可以了