Celery + Redis 的探究
不在乎過程的,可以直接看最后的結論。
測試代碼:
# a.py from celery import Celery celery_app = Celery('a', broker='redis://localhost:6379/0') @celery_app.task def test_task(n): open('test.txt', 'a').write(n + '\n') print n if __name__ == '__main__': test_task.delay('==== ttttt1 =====')
先將 redis 部署於本機的 6379 默認端口 不要設置密碼,使用 celery 版本 3.1.23
[1]
先直接發起一個 task
python a.py
執行后可看到 redis 上生成了兩個 key

celery
:表示當前正在隊列中的 task,等待被 worker 所接收
_kombu.binding.celery
:這個不用管(celery 使用 kombu 維護消息隊列,這個是 kombu 生成的對邏輯影響不大)
然后啟動一個 worker
celery worker -A a --loglevel=debug
執行后可看到 celery 這個 key 消失了,同時新增了 2 個 key

celery
消失說明任務已經被剛啟動的 worker 接收了,worker 會自己去執行這個 task,當前沒有等待被接收的任務
_kombu.binding.celery.pidbox
:這個也不用管(也是 kombu 維護的)
_kombu.binding.celeryev
:這個也不用管(也是 kombu 維護的,用來記下當前連接的 worker)
[2]
下面我們試一下延時任務
將代碼中的
test_task.delay('==== ttttt1 =====')
改成
test_task.apply_async(('==== ttttt2 =====', ), countdown=60)
然后啟動腳本,發起一個60秒后執行任務,並且啟動 celery worker 准備執行任務
python a.py
celery worker -A a --loglevel=debug
在 60 秒內查看 redis,可以看到沒有出現 celery
這個 key,但多出了另外兩個 key

unacked
:可以理解為這個是被 worker 接收了但是還沒開始執行的 task 列表(因為60秒后才會開始執行)
unacked_index
:用戶標記上面 unacked 的任務的 id,理論上應該與 unacked
一一對應的
60 秒后再次查看 redis,可以看到又回到了無任務的狀態

這表示被 worker 領取的任務確實在 60 秒后執行了
[3]
這里在嘗試一種異常的情況,worker 領取任務后還沒到 60 秒,突然遇到問題退出了
python a.py
celery worker -A a --loglevel=debug
等大約 10 秒后,ctrl+c 中斷 worker
可以看到 redis 中有 celery
這個 key,其中有一條等待領取的任務

再次啟動 worker
celery worker -A a --loglevel=debug
可以發現任務被再次正常領取和執行
結論,由此可以推測出 celery 和 redis 之間交互的基本原理:
1、當發起一個 task 時,會向 redis 的 celery
key 中插入一條記錄。
2、如果這時有正在待命的空閑 worker,這個 task 會立即被 worker 領取。
3、如果這時沒有空閑的 worker,這個 task 的記錄會保留在 celery
key 中。
4、這時會將這個 task 的記錄從 key celery 中移除,並添加相關信息到 unacked
和 unacked_index
中。
5、worker 根據 task 設定的期望執行時間執行任務,如果接到的不是延時任務或者已經超過了期望時間,則立刻執行。
6、worker 開始執行任務時,通知 redis。(如果設置了 CELERY_ACKS_LATE = True 那么會在任務執行結束時再通知)
7、redis 接到通知后,將 unacked
和 unacked_index
中相關記錄移除。
8、如果在接到通知前,worker 中斷了,這時 redis 中的 unacked 和 unacked_index 記錄會重新回到 celery
key 中。(這個回寫的操作是由 worker 在 “臨死” 前自己完成的,所以在關閉 worker 時為防止任務丟失,請務必使用正確的方法停止它,如: celery multi stop w1 -A proj1
)
9、在 celery
key 中的 task 可以再次重復上述 2 以下的流程。
10、celery 只是利用 redis 的 list 類型,當作個簡單的 Queue,並沒有使用消息訂閱等功能
題外話:
1、啟動 celery worker
時可以加上 -B
參數使得 schedule 定時任務生效,但要注意如果為同一個項目啟動多個 worker 時,只需要其中一個啟動命令中加上 -B
,否則 schedule 會被多次執行。
2、上面的 1 同時也說明了 schedule task 的執行是由 celery 發起的。也就是說,如果在 django 中使用了 CELERYBEAT_SCHEDULE
,那么只要 celery worker -B
啟動了,即使 django web 服務沒有啟動,定時任務也一樣會被發起。(推薦使用專門的 celery beat
方法)
3、使用 flower 時,在上述的 “worker 領取任務后突然遇到問題退出了然后又重新啟動執行” 這種情況下可能會出現顯示不正常的問題,這個是否是 flower 的 bug 還是有其他原因,可能下篇再探究。