Celery + Redis 的探究


Celery + Redis 的探究

文本嘗試研究,使用 redis 作為 celery 的 broker 時,celery 的交互操作同 redis 中數據記錄的關聯關系。

不在乎過程的,可以直接看最后的結論

測試代碼:

# a.py
from celery import Celery

celery_app = Celery('a', broker='redis://localhost:6379/0')

@celery_app.task
def test_task(n):
    open('test.txt', 'a').write(n + '\n')
    print n

if __name__ == '__main__':
    test_task.delay('==== ttttt1 =====')

 

先將 redis 部署於本機的 6379 默認端口 不要設置密碼,使用 celery 版本 3.1.23


[1]
先直接發起一個 task

python a.py

執行后可看到 redis 上生成了兩個 key

 

celery:表示當前正在隊列中的 task,等待被 worker 所接收
_kombu.binding.celery:這個不用管(celery 使用 kombu 維護消息隊列,這個是 kombu 生成的對邏輯影響不大)

然后啟動一個 worker

celery worker -A a --loglevel=debug

 

執行后可看到 celery 這個 key 消失了,同時新增了 2 個 key
 

celery 消失說明任務已經被剛啟動的 worker 接收了,worker 會自己去執行這個 task,當前沒有等待被接收的任務
_kombu.binding.celery.pidbox:這個也不用管(也是 kombu 維護的)
_kombu.binding.celeryev:這個也不用管(也是 kombu 維護的,用來記下當前連接的 worker)


[2]
下面我們試一下延時任務
將代碼中的

test_task.delay('==== ttttt1 =====')

改成

test_task.apply_async(('==== ttttt2 =====', ), countdown=60)

然后啟動腳本,發起一個60秒后執行任務,並且啟動 celery worker 准備執行任務

python a.py
celery worker -A a --loglevel=debug

在 60 秒內查看 redis,可以看到沒有出現 celery 這個 key,但多出了另外兩個 key

 

unacked:可以理解為這個是被 worker 接收了但是還沒開始執行的 task 列表(因為60秒后才會開始執行)
unacked_index:用戶標記上面 unacked 的任務的 id,理論上應該與 unacked 一一對應的

60 秒后再次查看 redis,可以看到又回到了無任務的狀態

 

這表示被 worker 領取的任務確實在 60 秒后執行了


[3]
這里在嘗試一種異常的情況,worker 領取任務后還沒到 60 秒,突然遇到問題退出了

python a.py
celery worker -A a --loglevel=debug

 

等大約 10 秒后,ctrl+c 中斷 worker
可以看到 redis 中有 celery 這個 key,其中有一條等待領取的任務

image.png

再次啟動 worker

celery worker -A a --loglevel=debug

可以發現任務被再次正常領取和執行


結論,由此可以推測出 celery 和 redis 之間交互的基本原理:

1、當發起一個 task 時,會向 redis 的 celery key 中插入一條記錄。
2、如果這時有正在待命的空閑 worker,這個 task 會立即被 worker 領取。
3、如果這時沒有空閑的 worker,這個 task 的記錄會保留在 celery key 中。
4、這時會將這個 task 的記錄從 key celery 中移除,並添加相關信息到 unackedunacked_index 中。
5、worker 根據 task 設定的期望執行時間執行任務,如果接到的不是延時任務或者已經超過了期望時間,則立刻執行。
6、worker 開始執行任務時,通知 redis。(如果設置了 CELERY_ACKS_LATE = True 那么會在任務執行結束時再通知)
7、redis 接到通知后,將 unackedunacked_index 中相關記錄移除。
8、如果在接到通知前,worker 中斷了,這時 redis 中的 unacked 和 unacked_index 記錄會重新回到 celery key 中。(這個回寫的操作是由 worker 在 “臨死” 前自己完成的,所以在關閉 worker 時為防止任務丟失,請務必使用正確的方法停止它,如: celery multi stop w1 -A proj1)
9、在 celery key 中的 task 可以再次重復上述 2 以下的流程。
10、celery 只是利用 redis 的 list 類型,當作個簡單的 Queue,並沒有使用消息訂閱等功能


題外話:

1、啟動 celery worker 時可以加上 -B 參數使得 schedule 定時任務生效,但要注意如果為同一個項目啟動多個 worker 時,只需要其中一個啟動命令中加上 -B,否則 schedule 會被多次執行。
2、上面的 1 同時也說明了 schedule task 的執行是由 celery 發起的。也就是說,如果在 django 中使用了 CELERYBEAT_SCHEDULE,那么只要 celery worker -B 啟動了,即使 django web 服務沒有啟動,定時任務也一樣會被發起。(推薦使用專門的 celery beat 方法)
3、使用 flower 時,在上述的 “worker 領取任務后突然遇到問題退出了然后又重新啟動執行” 這種情況下可能會出現顯示不正常的問題,這個是否是 flower 的 bug 還是有其他原因,可能下篇再探究。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM