Celery和Rabbitmq自學


異步消息隊列,也能用於定時和周期性任務。每次修改的task代碼還要重啟worker,這個有點麻煩

所有帶task()裝飾器的可調用對象(usertask)都是celery.app.task.Task類的子類,也就是說task()裝飾器會將usertask標識符變成Task子類的引用。

另外,celery允許用自定義Task類,不過該類要繼承於celery.app.task.Task,Task類在task狀態轉換動作時提供了接口,如任務執行失敗時調用接口on_failure,

這樣就非常方便我們在自定義Task類中重定義。見http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-task-classes

task()裝飾器可以接收很多參數,比如序列化類、是否保存task結果、所使用的back_end等等,見

http://docs.celeryproject.org/en/latest/userguide/tasks.html#list-of-options。比如,我們可以設置只在出錯的情況下保存運行結果。即

ignore_result=False且store_errors_even_if_ignored=True,不過,ignore_result可以在全局配置文件中設置CELERY_IGNORE_RESULT

當發異步消息時調用usertask.delay()或usertask.apply_async(),它其實是將usertask的信息,如名稱,入參,id等序列化后保存在broker中。

如果不限制task的處理速度,那應該設置CELERY_DISABLE_RATE_LIMITS = True,這算是celery的優化部分

task不要嵌套,如果希望多個task順序執行(同步),那可以用回調函數,在celery中是用chain()方法實現,見

http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks

http://docs.celeryproject.org/en/latest/userguide/canvas.html

celery在兩個地方有retry,一個是在task的代碼執行過程中出現異常時可以retry,不過這個需要用戶在task代碼中自己寫,如果想

retry那就要我們自己捕獲異常,並拋出Retry exception,參見http://docs.celeryproject.org/en/latest/userguide/tasks.html#retrying

另外一個地方是在調用delay()/apply_async()時,這個retry可能是指向rabbitmq發送消息時如果失敗,可以重試如果想鏈式調用task,那要設置link參數,見

http://docs.celeryproject.org/en/latest/userguide/canvas.html#callbacks

http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives

開發環境中,work的啟動、結束、重啟可以用celery multi xxx命令

生產環境中,用http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing

celery中的一個worker其實是代表一個進程池,一個進程池是由一個父進程和多個子進程組成, 貌似父進程不干事,只用於分配task,子進程數默認是CPU核數

一台主機上可以啟多個worker,但這種方法貌似和一個worker中啟多個子進程區別不大啊,兩種方式都是多進程。

在啟動worker時,可以設置worker的很多參數,如是否允許自動伸縮pool的容量,最大和最小容量,見

http://docs.celeryproject.org/en/latest/userguide/workers.html#autoscaling

celery有很多設置,其中在配置文件celeryconfig中的設置是全局的,另外,我們還可以單獨設置task(可以在task定義的地方,也可以在delay(), apply_async()中設置),

worker啟動時可以設置參數

celery的broker中的task隊列可以有多個,用不同的名字命名,還可以有不同的優先級。我們啟動worker時可以設置該worker只處理指定隊列的task

celery中的log使用的是python的log模塊,是線程安全的,而不是進程安全的。我們可以給一個worker中的每個process定義其logfile,見

http://docs.celeryproject.org/en/latest/userguide/workers.html#variables-in-file-paths

一個node是指一個worker

celery的每個worker的每個進程可以一次從broker中取出多個task

查看所有node中的活動的task: celery -A proj inspect active

查看所有node中注冊的所有task:celery -A proj inspect registered

查看所有node的信息:celery -A proj inspect stats

調用異步消息接口delay() ,apply_async()時,其實只是應用程序與broker通信而已,發消息給broker,消息中包含調用的task名稱,參數等,broker收到后保存。這個過程中不會與celery worker打交道

因此,這個時候即使沒有啟動celery worker也沒有任何關系。當celery worker啟動后,會主動連接broker,並從broker那里取數據來consume。同理,我們可以在任何平滑停止celery worker,而不用管

broker中是否還有沒consume的消息,當celery worker重啟后,它會繼續處理broker中的消息。這里的啟動順序貌似只能是先啟動broker,再啟動celery,因為啟動celery時會與broker建立連接,broker

是被動連接的。

broker自己應該是有辦法保證一個消息只能被一個worker取走,而不會同時被兩個worker取走。

有一個問題:rabbitmq有內存占用限制嗎?如果有,那當達到這個限制時,新來的消息怎么處理?像reddis那樣把舊的消息寫到磁盤?還是直接拋棄舊的消息?或不再接收新消息?

Rabbitmq有監控,默認的,Rabbitmq_management是關閉的,需要打開,執行sudo rabbitmq-plugins enable rabbitmq_management,打開這個plugin,

然后重啟Rabbitmq,在瀏覽器輸入http://localhost:15672,用戶名和密碼都是guest,就可以看到Rabbitmq的監控信息了。

Rabbitmq相當於一個郵箱,用於接收和發送消息,而且它里面有很多隊列,發送消息時可以指定用哪個隊列,一個消息中包含一個task

(1) 發送者將消息發給rabbitmq的指定隊列,消息中包含消息處理函數

(2) rabbitmq接收並保存消息(無數量限制)

(3) 接收者聯系rabbitmq,從隊列中取走消息(可以指定接收者只消費某個指定隊列的消息)

(4) 接收者取回消息后,調用消息中的處理函數處理消息

其實這個過程中我們要考慮消息執行的可靠性。

第一點:若消息被worker接收后,處理過程中worker死掉了怎么辦?Rabbitmq考慮到了這點,在默認情況下(是一個配置項,我們可以修改),每個消息其實是被worker拷貝了一份取走的,

當然被拷貝的消息會打上標記,當worker處理完消息后會給Rabbitmq一個確認(acknowledgment),Rabbitmq收到確認后才刪除消息,這個過程沒有限定超時時間,只有當worker與

Rabbitmq之間的TCP連接斷開后(Rabbitmq應該是會檢測worker的心跳),Rabbitmq才會將沒有確認的消息重新加入到隊列。我們可以使用命令:rabbitmqctl list_queues name messages_ready

messages_unacknowledged來查看有多少消息沒有收到確認。它是一個默認項,我們關閉這個可靠性設置,這樣,消息處理完不用再給Rabbitmq發acknowledgment了。在celery配置文件中

對應於CELERY_ACKS_LATE項

第二點:若Rabbitmq掛掉,消息也會丟失。因為要做消息的持久化,Rabbitmq這樣做了,不過它的持久化並不絕對可靠,因為從接收到消息到持久化之間有時間間隔,如果想絕對可靠,見

http://www.rabbitmq.com/tutorials/tutorial-two-python.html中的Note on message persistence部分。

還有些可靠性是worker自己來保證的,如task執行過程中拋出異常,這種情況是否要重新執行該task,celery的處理如下:

在celery中,task在被worker執行過程中如果拋出異常(Exception),celery會捕獲並給Rabbitmq發acknowledgment,如果我們想讓該task重新執行,也可以,那就要自己手動捕獲異常,

並拋出Retry exception,這樣worker會再次執行該task,這個過程中不會與Rabbitmq交互。

1、celery中的基本概念

 

2、Rabbitmq中的基本概念

publish:即生產者將消息發送給Rabbitmq的過程,更准確的說,是發送給Rabbitmq exchange的過程,在Rabbitmq的控制台網頁中的overview子頁的Message rates中有publish一項,它指

的是指消息進入Rabbitmq的速率。

3、celery中的配置項

CELERYD_PREFETCH_MULTIPLIER

乘子,worker的進程數*CELERYD_PREFETCH_MULTIPLIER=worker每次從Rabbitmq一次性取走多的消息數

4、Rabbitmq配置項和查詢命令

首先幾乎所有的控制命令都通過rabbitmqctl執行,很多東西在網頁管理頁可以看到

rabbitmqctl list_queues

查看queue相關的信息,后面可加參數name(名稱),messages(消息數量), consumers(消息者數量,該值與worker數不相等,一般是worker數的三倍,貌似每個worker順帶兩個額外的consumer,這兩

個consumer我們不用關心)

rabbitmqctl list_exchanges

查看所有exchange,如下,第一列是名稱,第二列是exchang type。Rabbitmq共有四種類型:direct,fanout,headers,topic。

direct是指將消息直接發到指定隊列,fanout是指廣播,發給綁定到該exchange的所有隊列。我們一般是用direct。

在celery中,當broker是Rabbitmq時,會調用Rabbitmq創建默認的exchange和默認的queue,它們的名稱都是celery,並且該queue是沒有綁定任何exchange的,給Rabbitmq發消息時會指定隊列,exchange

將消息發到該指定的隊列。貌似基本上不會用到綁定功能

C:\Work\hera\src>rabbitmqctl list_exchanges
Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
celery  direct
celery.pidbox   fanout
celeryev        topic
reply.celery.pidbox     direct
...done.

C:\Work\hera\src>

service rabbitmq-server start

啟動rabbitmq


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM