初識Celery


本系列文章的開發環境:

window 7 + python2.7 + pycharm5 + celery3.1.25 + django1.9.4

在我們日常的開發工作中,經常會遇到這幾種情況:

1、在web應用中,用戶觸發一個操作,執行后台處理程序,這個程序需要執行很長時間才能返回結果。怎樣才能不阻塞http請求,不讓用戶等待從而提高用戶體驗呢?
2、定時任務腳本:生產環境經常會跑一些定時任務腳本,假如你有上千台的服務器、上千種任務,定時任務的管理很困難,如何對job進行有效的管理?
3、異步需求:比如發送短信/郵件、推送消息、清理/設置緩存?

如果你有以上的需求,那么Celery可能對你很有用。

Celery - 分布式任務隊列系統

Celery是一個可以處理大量消息的分布式任務系統,它憑借簡單、靈活、可靠的特性被廣泛使用。Celery聚焦於實時處理任務,同時也支持定時的任務調度。

1、特性:

  • 查看定時任務的執行情況,比如執行是否成功、當前狀態、執行任務花費的時間等。
  • 易於其他框架集成,如使用django管理后台添加、更新、刪除任務。
  • 方便把任務和配置管理相關聯。
  • 可選多進程、Eventlet和Gevent三種模式並發執行。
  • 提供錯誤處理機制。

2、架構圖

 

從上圖中可以知道Celery包含如下組件:

  • Producer:凡是調用了Celery API、函數或裝飾器而產生任務並交給任務隊列處理的都是任務生產者。
  • 任務調度組件:Beat進程會讀取配置文件的內容,周期性地將配置中到期需要執行的任務發送給任務隊列。
  • Celery Worker:負責執行任務的線程,可以在多台服務器運行提高執行效率。
  • Broker:消息中間件,負責接受任務生產者的任務,並且轉發work進行執行。Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper等作為消息中間件,官方推薦使用RabbitMQ。
  • Result Backend:任務處理完后保存狀態信息和結果,以供查詢。Celery默認已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

開始使用Celery 

這里Broker和Result Backend都選擇RabbitMQ。

1、安裝

     1) 安裝RabbitMQ

     2)  安裝Celery 3.1.25

         為什么選擇這個低版本?請見最下面的問題列表。

pip install celery==3.1.25

2、一個簡單例子

  2.1) 在一個目錄中創建tasks.py文件,內容如下:

from celery import Celery

#創建celery實例,其中backend表示采用rpc瞬態信息,不保存到數據庫;broker表示連接RabbitMQ URL app
= Celery('tasks',backend='rpc://',broker='pyamqp://guest@localhost//') @app.task def hello(): return "hello celery"

     2.2) 啟動celery worker

              到tasks.py文件那層目錄,執行以下命令:

celery -A tasks worker --loglevel=info

啟動輸出信息如下:

E:\workdir\test_pro>celery -A tasks worker --loglevel=info
[2017-05-10 18:26:18,298: WARNING/MainProcess] c:\python27\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarnin
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.


  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

 -------------- celery@507B9D97E083 v3.1.25 (Cipater)
---- **** -----
--- * ***  * -- Windows-7-6.1.7601-SP1
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x35fc240
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.hello

[2017-05-10 18:26:18,433: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2017-05-10 18:26:18,483: INFO/MainProcess] mingle: searching for neighbors
[2017-05-10 18:26:19,497: INFO/MainProcess] mingle: all alone
[2017-05-10 18:26:19,523: WARNING/MainProcess] celery@507B9D97E083 ready.
View Code

     2.3) 測試結果

           另起一個終端,還是到tasks.py那層目錄,進入python命令行:

 

這時celery worker會有提示信息:

 

 到此為止,celery入門就介紹到這里了,下一節介紹如何在django中使用celery。

 

 

遇到的問題列表: 

 1、啟動celery worker時報錯

ERROR:
`[2016-11-05 20:23:25,759: CRITICAL/MainProcess] Unrecoverable error: TypeError('must be integer, not _subprocess_handle',)
Traceback (most recent call last):
File "c:\development\env\zillow2\lib\site-packages\celery\worker\worker.py", line 203, in start
self.blueprint.start(self)
File "c:\development\env\zillow2\lib\site-packages\celery\bootsteps.py", line 119, in start
step.start(parent)
File "c:\development\env\zillow2\lib\site-packages\celery\bootsteps.py", line 370, in start
return self.obj.start()
File "c:\development\env\zillow2\lib\site-packages\celery\concurrency\base.py", line 131, in start
self.on_start()
File "c:\development\env\zillow2\lib\site-packages\celery\concurrency\prefork.py", line 112, in on_start
**self.options)
File "c:\development\env\zillow2\lib\site-packages\billiard\pool.py", line 1008, in init
self._create_worker_process(i)
File "c:\development\env\zillow2\lib\site-packages\billiard\pool.py", line 1117, in _create_worker_process
w.start()
File "c:\development\env\zillow2\lib\site-packages\billiard\process.py", line 122, in start
self._popen = self._Popen(self)
File "c:\development\env\zillow2\lib\site-packages\billiard\context.py", line 383, in _Popen
return Popen(process_obj)
File "c:\development\env\zillow2\lib\site-packages\billiard\popen_spawn_win32.py", line 64, in init
_winapi.CloseHandle(ht)
TypeError: must be integer, not _subprocess_handle

(zillow2) C:\DEVELOPMENT\zillow2\project>Traceback (most recent call last):
File "", line 1, in
File "c:\development\env\zillow2\lib\site-packages\billiard\spawn.py", line 159, in spawn_main
new_handle = steal_handle(parent_pid, pipe_handle)
File "c:\development\env\zillow2\lib\site-packages\billiard\reduction.py", line 121, in steal_handle
_winapi.PROCESS_DUP_HANDLE, False, source_pid)
WindowsError: [Error 87] The parameter is incorrect

解決方案:這是因為windows系統不支持celery4以上版本,請降級到3.xx即可解決該問題

 

 

 

 

 

 參考文檔:https://zhuanlan.zhihu.com/p/22304455

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM