在 tornado 中異步無阻塞的執行耗時任務
在 linux 上 tornado 是基於 epoll 的事件驅動框架,在網絡事件上是無阻塞的。但是因為 tornado 自身是單線程的,所以如果我們在某一個時刻執行了一個耗時的任務,那么就會阻塞在這里,無法響應其他的任務請求,這個和 tornado 的高性能服務器稱號不符,所以我們要想辦法把耗時的任務轉換為不阻塞主線程,讓耗時的任務不影響對其他請求的響應。
在 python 3.2 上,增加了一個並行庫 concurrent.futures,這個庫提供了更簡單的異步執行函數的方法。
如果是在 2.7 之類的 python 版本上,可以使用 pip install futures
來安裝這個庫。
關於這個庫的具體使用,這里就不詳細展開了,可以去看官方文檔,需要注意的是,前兩個例子是示例錯誤的用法,可能會產生死鎖。
下面說說如何在 tornado 中結合使用 futures 庫,最好的參考莫過於有文檔+代碼。正好, tornado 中解析 ip 使用的 dns 解析服務是多線程無阻塞的。(netutils.ThreadedResolver
)
我們來看看它的實現,看看如何應用到我們的程序中來。
tornado 中使用多線程無阻塞來處理 dns 請求
# 刪除了注釋
class ThreadedResolver(ExecutorResolver):
_threadpool = None
_threadpool_pid = None
def initialize(self, io_loop=None, num_threads=10):
threadpool = ThreadedResolver._create_threadpool(num_threads)
super(ThreadedResolver, self).initialize(
io_loop=io_loop, executor=threadpool, close_executor=False)
@classmethod
def _create_threadpool(cls, num_threads):
pid = os.getpid()
if cls._threadpool_pid != pid:
# Threads cannot survive after a fork, so if our pid isn't what it
# was when we created the pool then delete it.
cls._threadpool = None
if cls._threadpool is None:
from concurrent.futures import ThreadPoolExecutor
cls._threadpool = ThreadPoolExecutor(num_threads)
cls._threadpool_pid = pid
return cls._threadpool
ThreadedResolver
是 ExecutorEesolver
的子類,看看它的是實現。
class ExecutorResolver(Resolver):
def initialize(self, io_loop=None, executor=None, close_executor=True):
self.io_loop = io_loop or IOLoop.current()
if executor is not None:
self.executor = executor
self.close_executor = close_executor
else:
self.executor = dummy_executor
self.close_executor = False
def close(self):
if self.close_executor:
self.executor.shutdown()
self.executor = None
@run_on_executor
def resolve(self, host, port, family=socket.AF_UNSPEC):
addrinfo = socket.getaddrinfo(host, port, family, socket.SOCK_STREAM)
results = []
for family, socktype, proto, canonname, address in addrinfo:
results.append((family, address))
return results
從 ExecutorResolver
的實現可以看出來,它的關鍵參數是 ioloop
和 executor
,干活的 resolve
函數被@run_on_executor
修飾,結合起來看 ThreadedResolver
的實現,那么這里的 executor
就是from concurrent.futures import ThreadPoolExecutor
再來看看 @run_on_executor
的實現。
run_on_executor
的實現在 concurrent.py
文件中,它的源碼如下:
def run_on_executor(fn):
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
callback = kwargs.pop("callback", None)
future = self.executor.submit(fn, self, *args, **kwargs)
if callback:
self.io_loop.add_future(future,
lambda future: callback(future.result()))
return future
return wrapper
關於 functions.wraps()
的介紹可以參考官方文檔 functools — Higher-order functions and operations on callable objects
簡單的說,這里對傳遞進來的函數進行了封裝,並用 self.executor.submit()
對包裝的函數進行了執行,並判斷是否有回調,如果有,就加入到 ioloop 的 callback 里面。
對比官方的 concurrent.futures.Executor
的接口,里面有個 submit()
方法,從頭至尾看看ThreadedResolver
的實現,就是使用了 concurrent.futures.ThreadPoolExecutor
這個 Executor
的子類。
所以 tornado 中解析 dns 使用的多線程無阻塞的方法的實質就是使用了 concurrent.futures
提供的ThreadPoolExecutor
功能。
使用多線程無阻塞方法來執行耗時的任務
借鑒 tornado 的使用方法,在我們自己的程序中也使用這種方法來處理耗時的任務。
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
class LongTimeTask(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(10)
@run_on_executor()
def get(self, data):
long_time_task(data)
上面就是一個基本的使用方法,下面展示一個使用 sleep() 來模擬耗時的完整程序。
#!/usr/bin/env python
#-*-coding:utf-8-*-
import tornado.ioloop
import tornado.web
import tornado.httpserver
from concurrent.futures import ThreadPoolExecutor
from tornado.concurrent import run_on_executor
import time
class App(tornado.web.Application):
def __init__(self):
handlers = [
(r'/', IndexHandler),
(r'/sleep/(\d+)', SleepHandler),
]
settings = dict()
tornado.web.Application.__init__(self, handlers, **settings)
class BaseHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(10)
class IndexHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world %s" % time.time())
class SleepHandler(BaseHandler):
@run_on_executor
def get(self, n):
time.sleep(float(n))
self._callback()
def _callback(self):
self.write("after sleep, now I'm back %s" % time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
if __name__ == "__main__":
app = App()
server = tornado.httpserver.HTTPServer(app, xheaders=True)
server.listen(8888)
tornado.ioloop.IOLoop.instance().start()
此時先調用 127.0.0.1:8888/sleep/10 不會阻塞 127.0.0.1:8888/ 了。
以上,就是完整的在 tornado 中利用多線程來執行耗時的任務。
結語
epoll 的好處確實很多,事件就緒通知后,上層任務函數執行任務,如果任務本身需要較耗時,那么就可以考慮這個方法了,
當然也有其他的方法,比如使用 celery 來調度執行耗時太多的任務,比如頻繁的需要寫入數據到不同的文件中,我公司的一個項目中,需要把數據寫入四千多個文件中,每天產生幾億條數據,就是使用了 tornado + redis + celery 的方法來高效的執行寫文件任務。
完。