Tornado 高並發源碼分析之六---異步編程的幾種實現方式


 

方式一:通過線程池或者進程池
導入庫futures是python3自帶的庫,如果是python2,需要pip安裝future這個庫
備注:進程池和線程池寫法相同
 1 from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
 2 from tornado.concurrent import run_on_executor
 3 
 4 def doing(s):
 5     print('xiumian--{}'.format(s))
 6     time.sleep(s)
 7     return s
 8 
 9 class MyMainHandler(RequestHandler):
10     executor = ProcessPoolExecutor(2)   #新建一個進程池,靜態變量,屬於類,所以全程只有這個幾個進程,不需要關閉,如果放在__init__中,則屬於對象,每次請求都會新建pool,當請求增多的時候,會導致今天變得非常多,這個方法不可取
11 
12     @gen.coroutine
13     def get(self, *args, **kwargs):
14         print('開始{}'.format(self.pool_temp))
15         a = yield self.executor.submit(doing, 20)   
16         print('進程 %s'  % self.executor._processes)
17         self.write(str(a))
18         print('執行完畢{}'.format(a))
19 
20    @run_on_executor       #tornado 另外一種寫法,需要在靜態變量中有executor的進程池變量
21     def post(self, *args, **kwargs):
22 a = yield doing(20)

 

 

方式二:Tornado + Celery + RabbitMQ 實現
使用Celery任務隊列,Celery 只是一個任務隊列,需要一個broker媒介,將耗時的任務傳遞給Celery任務隊列執行,執行完畢將結果通過broker媒介返回。官方推薦使用RabbitMQ作為消息傳遞,redis也可以
 
一、Celery 介紹:
1.1、注意:
1、當使用RabbitMQ時,需要按照pika第三方庫,pika0.10.0存在bug,無法獲得回調信息,需要按照0.9.14版本即可
2、tornado-celery 庫比較舊,無法適應Celery的最新版,會導致報無法導入task Producter包錯誤,只需要將celery版本按照在3.0.25就可以了
 
1.2、關於配置:
單個參數配置:
  1 app.conf.CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' 
多個參數配置:
1 app.conf.update(
2     CELERY_BROKER_URL = 'amqp://guest@localhost//',
3     CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
4 )
從配置文件中獲取:(將配置參數寫在文件app.py中)
1 BROKER_URL='amqp://guest@localhost//'
2 CELERY_RESULT_BACKEND='redis://localhost:6379/0'
3 app.config_from_object('celeryconfig')
 
 
二、案例
2.1、啟動一個Celery 任務隊列,也就是消費者:
1 from celery import Celery
2 celery = Celery('tasks', broker='amqp://guest:guest@119.29.151.45:5672', backend='amqp')  使用RabbitMQ作為載體, 回調也是使用rabbit作為載體
3 
4 @celery.task(name='doing')   #異步任務,需要命一個獨一無二的名字
5 def doing(s, b):
6     print('開始任務')
7     logging.warning('開始任務--{}'.format(s))
8     time.sleep(s)
9     return s+b
命令行啟動任務隊列守護進程,當隊列中有任務時,自動執行 (命令行可以放在supervisor中管理)
--loglevel=info --concurrency=5
記錄等級,默認是concurrency:指定工作進程數量,默認是CPU核心數
 
2.2、啟動任務生產者
 1 import tcelery
 2 tcelery.setup_nonblocking_producer()  #設置為非阻塞生產者,否則無法獲取回調信息
 3 
 4 class MyMainHandler(RequestHandler):
 5 
 6     @web.asynchronous
 7     @gen.coroutine
 8     def get(self, *args, **kwargs):
 9         print('begin')
10         result = yield gen.Task(sleep.apply_async, args=[10])   #使用yield 獲取異步返回值,會一直等待但是不阻塞其他請求
11         print('ok--{}'.format(result.result))     #返回值結果
12        
13        # sleep.apply_async((10, ), callback=self.on_success)   
14        # print('ok -- {}'.format(result.get(timeout=100)))#使用回調的方式獲取返回值,發送任務之后,請求結束,所以不能放在處理tornado的請求任務當中,因為請求已經結束了,與客戶端已經斷開連接,無法再在獲取返回值的回調中繼續向客戶端返回數據
15       
16         # result = sleep.delay(10)    #delay方法只是對apply_async方法的封裝而已
17         # data = result.get(timeout=100)  #使用get方法獲取返回值,會導致阻塞,相當於同步執行
18         
19 
20     def on_success(self, response):    #回調函數
21         print ('Ok-- {}'.format(response))

 

 
 
 
 
 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM