異步消息隊列,也能用於定時和周期性任務。每次修改的task代碼還要重啟worker,這個有點麻煩
所有帶task()裝飾器的可調用對象(usertask)都是celery.app.task.Task類的子類,也就是說task()裝飾器會將usertask標識符變成Task子類的引用。
另外,celery允許用自定義Task類,不過該類要繼承於celery.app.task.Task,Task類在task狀態轉換動作時提供了接口,如任務執行失敗時調用接口on_failure,
這樣就非常方便我們在自定義Task類中重定義。見http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-task-classes
task()裝飾器可以接收很多參數,比如序列化類、是否保存task結果、所使用的back_end等等,見
http://docs.celeryproject.org/en/latest/userguide/tasks.html#list-of-options。比如,我們可以設置只在出錯的情況下保存運行結果。即
ignore_result=False且store_errors_even_if_ignored=True,不過,ignore_result可以在全局配置文件中設置CELERY_IGNORE_RESULT
當發異步消息時調用usertask.delay()或usertask.apply_async(),它其實是將usertask的信息,如名稱,入參,id等序列化后保存在broker中。
如果不限制task的處理速度,那應該設置CELERY_DISABLE_RATE_LIMITS = True,這算是celery的優化部分
task不要嵌套,如果希望多個task順序執行(同步),那可以用回調函數,在celery中是用chain()方法實現,見
http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks
http://docs.celeryproject.org/en/latest/userguide/canvas.html
celery在兩個地方有retry,一個是在task的代碼執行過程中出現異常時可以retry,不過這個需要用戶在task代碼中自己寫,如果想
retry那就要我們自己捕獲異常,並拋出Retry exception,參見http://docs.celeryproject.org/en/latest/userguide/tasks.html#retrying
另外一個地方是在調用delay()/apply_async()時,這個retry可能是指向rabbitmq發送消息時如果失敗,可以重試如果想鏈式調用task,那要設置link參數,見
http://docs.celeryproject.org/en/latest/userguide/canvas.html#callbacks
http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives
開發環境中,work的啟動、結束、重啟可以用celery multi xxx命令
生產環境中,用http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing
celery中的一個worker其實是代表一個進程池,一個進程池是由一個父進程和多個子進程組成, 貌似父進程不干事,只用於分配task,子進程數默認是CPU核數
一台主機上可以啟多個worker,但這種方法貌似和一個worker中啟多個子進程區別不大啊,兩種方式都是多進程。
在啟動worker時,可以設置worker的很多參數,如是否允許自動伸縮pool的容量,最大和最小容量,見
http://docs.celeryproject.org/en/latest/userguide/workers.html#autoscaling
celery有很多設置,其中在配置文件celeryconfig中的設置是全局的,另外,我們還可以單獨設置task(可以在task定義的地方,也可以在delay(), apply_async()中設置),
worker啟動時可以設置參數
celery的broker中的task隊列可以有多個,用不同的名字命名,還可以有不同的優先級。我們啟動worker時可以設置該worker只處理指定隊列的task
celery中的log使用的是python的log模塊,是線程安全的,而不是進程安全的。我們可以給一個worker中的每個process定義其logfile,見
http://docs.celeryproject.org/en/latest/userguide/workers.html#variables-in-file-paths
一個node是指一個worker
celery的每個worker的每個進程可以一次從broker中取出多個task
查看所有node中的活動的task: celery -A proj inspect active
查看所有node中注冊的所有task:celery -A proj inspect registered
查看所有node的信息:celery -A proj inspect stats
調用異步消息接口delay() ,apply_async()時,其實只是應用程序與broker通信而已,發消息給broker,消息中包含調用的task名稱,參數等,broker收到后保存。這個過程中不會與celery worker打交道
因此,這個時候即使沒有啟動celery worker也沒有任何關系。當celery worker啟動后,會主動連接broker,並從broker那里取數據來consume。同理,我們可以在任何平滑停止celery worker,而不用管
broker中是否還有沒consume的消息,當celery worker重啟后,它會繼續處理broker中的消息。這里的啟動順序貌似只能是先啟動broker,再啟動celery,因為啟動celery時會與broker建立連接,broker
是被動連接的。
broker自己應該是有辦法保證一個消息只能被一個worker取走,而不會同時被兩個worker取走。
有一個問題:rabbitmq有內存占用限制嗎?如果有,那當達到這個限制時,新來的消息怎么處理?像reddis那樣把舊的消息寫到磁盤?還是直接拋棄舊的消息?或不再接收新消息?
Rabbitmq有監控,默認的,Rabbitmq_management是關閉的,需要打開,執行sudo rabbitmq-plugins enable rabbitmq_management,打開這個plugin,
然后重啟Rabbitmq,在瀏覽器輸入http://localhost:15672,用戶名和密碼都是guest,就可以看到Rabbitmq的監控信息了。
Rabbitmq相當於一個郵箱,用於接收和發送消息,而且它里面有很多隊列,發送消息時可以指定用哪個隊列,一個消息中包含一個task
(1) 發送者將消息發給rabbitmq的指定隊列,消息中包含消息處理函數
(2) rabbitmq接收並保存消息(無數量限制)
(3) 接收者聯系rabbitmq,從隊列中取走消息(可以指定接收者只消費某個指定隊列的消息)
(4) 接收者取回消息后,調用消息中的處理函數處理消息
其實這個過程中我們要考慮消息執行的可靠性。
第一點:若消息被worker接收后,處理過程中worker死掉了怎么辦?Rabbitmq考慮到了這點,在默認情況下(是一個配置項,我們可以修改),每個消息其實是被worker拷貝了一份取走的,
當然被拷貝的消息會打上標記,當worker處理完消息后會給Rabbitmq一個確認(acknowledgment),Rabbitmq收到確認后才刪除消息,這個過程沒有限定超時時間,只有當worker與
Rabbitmq之間的TCP連接斷開后(Rabbitmq應該是會檢測worker的心跳),Rabbitmq才會將沒有確認的消息重新加入到隊列。我們可以使用命令:rabbitmqctl list_queues name messages_ready
messages_unacknowledged來查看有多少消息沒有收到確認。它是一個默認項,我們關閉這個可靠性設置,這樣,消息處理完不用再給Rabbitmq發acknowledgment了。在celery配置文件中
對應於CELERY_ACKS_LATE項
第二點:若Rabbitmq掛掉,消息也會丟失。因為要做消息的持久化,Rabbitmq這樣做了,不過它的持久化並不絕對可靠,因為從接收到消息到持久化之間有時間間隔,如果想絕對可靠,見
http://www.rabbitmq.com/tutorials/tutorial-two-python.html中的Note on message persistence部分。
還有些可靠性是worker自己來保證的,如task執行過程中拋出異常,這種情況是否要重新執行該task,celery的處理如下:
在celery中,task在被worker執行過程中如果拋出異常(Exception),celery會捕獲並給Rabbitmq發acknowledgment,如果我們想讓該task重新執行,也可以,那就要自己手動捕獲異常,
並拋出Retry exception,這樣worker會再次執行該task,這個過程中不會與Rabbitmq交互。
1、celery中的基本概念
2、Rabbitmq中的基本概念
publish:即生產者將消息發送給Rabbitmq的過程,更准確的說,是發送給Rabbitmq exchange的過程,在Rabbitmq的控制台網頁中的overview子頁的Message rates中有publish一項,它指
的是指消息進入Rabbitmq的速率。
3、celery中的配置項
CELERYD_PREFETCH_MULTIPLIER
乘子,worker的進程數*CELERYD_PREFETCH_MULTIPLIER=worker每次從Rabbitmq一次性取走多的消息數
4、Rabbitmq配置項和查詢命令
首先幾乎所有的控制命令都通過rabbitmqctl執行,很多東西在網頁管理頁可以看到
rabbitmqctl list_queues
查看queue相關的信息,后面可加參數name(名稱),messages(消息數量), consumers(消息者數量,該值與worker數不相等,一般是worker數的三倍,貌似每個worker順帶兩個額外的consumer,這兩
個consumer我們不用關心)
rabbitmqctl list_exchanges
查看所有exchange,如下,第一列是名稱,第二列是exchang type。Rabbitmq共有四種類型:direct,fanout,headers,topic。
direct是指將消息直接發到指定隊列,fanout是指廣播,發給綁定到該exchange的所有隊列。我們一般是用direct。
在celery中,當broker是Rabbitmq時,會調用Rabbitmq創建默認的exchange和默認的queue,它們的名稱都是celery,並且該queue是沒有綁定任何exchange的,給Rabbitmq發消息時會指定隊列,exchange
將消息發到該指定的隊列。貌似基本上不會用到綁定功能
C:\Work\hera\src>rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
celery direct
celery.pidbox fanout
celeryev topic
reply.celery.pidbox direct
...done.
C:\Work\hera\src>
service rabbitmq-server start
啟動rabbitmq