celery 原理理解


這里有一篇寫的不錯的:http://www.jianshu.com/p/1840035cb510

 

自己的“格式化”后的內容備忘下:

  我們總在說c10k的問題, 也做了不少優化, 然后優化總是不夠的。

  其中的一個瓶頸就是一些耗時的操作(網絡請求/文件操作--含耗時的數據庫操作)。

  如果我們不關心他們的返回值,則可以將其做成異步任務,保證執行成功即可。

  

  開始闡述之前約定一些概念:

    1. web請求處理進程(簡稱:消息生產者,記做P), 這是我們c10k問題注意的焦點

    2. 消息的處理者(簡稱:消費者,記做C), 在成功“男人”后面默默無聞工作的“女人”  

    3. 消息存放的地方(簡稱: 消息隊列, 記做Q)

    4. 消息/任務, 記做T

 

     基本處理過程:

    1. P將T保存到Q

    2. C從Q中取出一個T實例, 處理, 若處理失敗則將T示例退回到Q(務必保證T得到成功處理)。

  

  最簡單的實現方案:

    redis 消息隊列(利用redis list類型)的lpush/rpop(brpop)來處理。python代碼如下:

    TaskServer.py

# -*- coding:utf-8 -*-
import traceback
import simplejson
import redis
import uuid
from functools import wraps

class TaskExecutor(object):
	def __init__(self, task_name ,  *args, **kwargs):
		self.queue =  redis.StrictRedis()#host='localhost', port=6378, db=0, password='xxx_tasks')
		self.task_name = task_name

	def _publish_task(self, task_id , func, *args, **kwargs):
		self.queue.lpush(self.task_name,
			simplejson.dumps({'id':task_id, 'func':func, 'args':args, 'kwargs':kwargs})
		)

	def task(self, func):#decorator
		setattr(func,'delay',lambda *args, **kwargs:self._publish_task(uuid.uuid4().hex, func.__name__, *args, **kwargs))
		@wraps(func)
		def _w(*args, **kwargs):
			return func(*args, **kwargs)
		return _w

	def run(self):
		print 'waiting for tasks...'
		while True:
			if self.queue.llen(self.task_name):
				msg_data = simplejson.loads( self.queue.rpop(self.task_name))#這里可以用StrictRedis實例的brpop改善,去掉llen輪詢。
				
				print 'handling task(id:{0})...'.format(msg_data['id'])
				try:
					if msg_data.get('func',None):
						func = eval(msg_data.get('func'))
						if callable(func):
							#print msg_data['args'], msg_data['kwargs']
							ret = func(*msg_data['args'], **msg_data['kwargs'])
							msg_data.update({'result':ret})
							self.queue.lpush(self.task_name+'.response.success', simplejson.dumps(msg_data) )
				except:
					msg_data.update({'failed_times':msg_data.get('failed_times',0)+1, 'failed_reason':traceback.format_exc()})
					if msg_data.get('failed_times',0)<10:#最多失敗10次,避免死循環
						self.queue.rpush(self.task_name,simplejson.dumps(msg_data))
					else:
						self.queue.lpush(self.task_name+'.response.failure', simplejson.dumps(msg_data) )
					print traceback.format_exc()


PingTask = TaskExecutor('PingTask')

@PingTask.task
def ping_url(url):
	import os
	os.system('ping -c 2 '+url)

if __name__=='__main__':
	PingTask.run()

運行服務:python TaskServer.py  

ps:

    1. TaskExecutor類是一個輕量級的celery.Celery實現。提供了 task修飾器。對被修飾的函數添加delay 方法(將原任務方法名/參數保存到redis的list中--FIFO--實際上celery也是類似的處理

    2. 客戶端只要定義自己的TaskExecutor實例以及用此實例的task修飾對應的任務處理函數func。並在代碼中待用 func.delay(...)實現異步調用(為了保證成功,最多調用10次); 成功的記錄會保存在 redis的 "任務名.response.success" 隊列中, 超過10次仍然失敗的保存在 “任務名.response.failure"隊列中。

    3. 待改進的地方是很多的, 比如多線程, 負載均衡。(尚未閱讀celery源碼)

 

 

  TaskClient.py

# -*- coding:utf-8 -*-
import sys
sys.path.append('./')
from my_tasks import ping_url
ping_url.delay('www.baidu.com')

           

ps: 客戶端和服務器文件在統一linux目錄下。

 

 

 

  celery

  試驗證明, celery目測大體上跟上面的“基本處理過程”基本一致。即:

  P將T保存在Q中。

  C從Q中取出T處理(保證成功--會不會死循環?執行一個注定失敗的任務--就沒有驗證了)。

 

  celery的運用比較簡單:

    1.安裝celery   

    2.編寫需要異步執行的任務函數,並用celery實例的task修飾器修飾

    3.調用異步任務時, 用函數名.delay(參數)形式調用為異步調用。 函數名(參數)方式為同步調用。

    4.執行celery監聽服務

            demo 這里有:http://www.jianshu.com/p/1840035cb510。 再來一個極簡的:

    tasks.py   

# -*- coding:utf-8 -*-
from celery import Celery
brokers = 'redis://127.0.0.1:6379/5'
backend = 'redis://127.0.0.1:6379/6'

import time

app = Celery('tasks', backend=backend, broker=brokers)

@app.task
def add(x,y):
    time.sleep(10)
    return x+y

運行celery監聽服務:celery -A tasks worker -l error

    

順便附上測試代碼:tasks_test.py(跟tasks.py同一路徑,linux環境)

# -*- coding:utf-8 -*-
import sys
sys.path.append('./')
def test():
        from tasks import add
        for i in range(1000):
                add.delay(i,i+1)

if __name__=='__main__':
        test()

執行之 : python tasks_test.py

(可以1秒內跑完, 證明的確異步處理了)

 

 順便查看了下進程,發現celery自動開了一個主進程, 與cpu核數相同的子線程。看了下官方文檔,有web監控用的插件(flower)。

安裝: sudo pip install flower

運行之(跟tasks.py先同目錄): celery -A tasks flower --port=5555

效果圖如下(木有發現失敗任務--"Failed tasks"---很遺憾):

 

 

 flower的基本原理推測是直接查詢Q, 並基於結果輸出圖表等。 

 

 

   ref: https://abhishek-tiwari.com/post/amqp-rabbitmq-and-celery-a-visual-guide-for-dummies 

  轉載請注明來源:http://www.cnblogs.com/Tommy-Yu/p/5955294.html

  謝謝!

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM