Tornado異步阻塞解決方案


在 tornado 中異步無阻塞的執行耗時任務


在 linux 上 tornado 是基於 epoll 的事件驅動框架,在網絡事件上是無阻塞的。但是因為 tornado 自身是單線程的,所以如果我們在某一個時刻執行了一個耗時的任務,那么就會阻塞在這里,無法響應其他的任務請求,這個和 tornado 的高性能服務器稱號不符,所以我們要想辦法把耗時的任務轉換為不阻塞主線程,讓耗時的任務不影響對其他請求的響應。

在 python 3.2 上,增加了一個並行庫 concurrent.futures,這個庫提供了更簡單的異步執行函數的方法。

如果是在 2.7 之類的 python 版本上,可以使用 pip install futures 來安裝這個庫。

關於這個庫的具體使用,這里就不詳細展開了,可以去看官方文檔,需要注意的是,前兩個例子是示例錯誤的用法,可能會產生死鎖。

下面說說如何在 tornado 中結合使用 futures 庫,最好的參考莫過於有文檔+代碼。正好, tornado 中解析 ip 使用的 dns 解析服務是多線程無阻塞的。(netutils.ThreadedResolver)

我們來看看它的實現,看看如何應用到我們的程序中來。

tornado 中使用多線程無阻塞來處理 dns 請求

# 刪除了注釋 
class ThreadedResolver(ExecutorResolver):
    _threadpool = None
    _threadpool_pid = None
    def initialize(self, io_loop=None, num_threads=10):
        threadpool = ThreadedResolver._create_threadpool(num_threads)
        super(ThreadedResolver, self).initialize(
            io_loop=io_loop, executor=threadpool, close_executor=False)
    @classmethod
    def _create_threadpool(cls, num_threads):
        pid = os.getpid()
        if cls._threadpool_pid != pid:
            # Threads cannot survive after a fork, so if our pid isn't what it
            # was when we created the pool then delete it.
            cls._threadpool = None
        if cls._threadpool is None:
            from concurrent.futures import ThreadPoolExecutor
            cls._threadpool = ThreadPoolExecutor(num_threads)
            cls._threadpool_pid = pid
        return cls._threadpool

ThreadedResolver 是 ExecutorEesolver 的子類,看看它的是實現。

class ExecutorResolver(Resolver):
    def initialize(self, io_loop=None, executor=None, close_executor=True):
        self.io_loop = io_loop or IOLoop.current()
        if executor is not None:
            self.executor = executor
            self.close_executor = close_executor
        else:
            self.executor = dummy_executor
            self.close_executor = False
    def close(self):
        if self.close_executor:
            self.executor.shutdown()
        self.executor = None
    @run_on_executor
    def resolve(self, host, port, family=socket.AF_UNSPEC):
        addrinfo = socket.getaddrinfo(host, port, family, socket.SOCK_STREAM)
        results = []
        for family, socktype, proto, canonname, address in addrinfo:
            results.append((family, address))
        return results

從 ExecutorResolver 的實現可以看出來,它的關鍵參數是 ioloop 和 executor,干活的 resolve 函數被@run_on_executor 修飾,結合起來看 ThreadedResolver 的實現,那么這里的 executor 就是from concurrent.futures import ThreadPoolExecutor

再來看看 @run_on_executor 的實現。

run_on_executor 的實現在 concurrent.py 文件中,它的源碼如下:

def run_on_executor(fn):
    @functools.wraps(fn)
    def wrapper(self, *args, **kwargs):
        callback = kwargs.pop("callback", None)
        future = self.executor.submit(fn, self, *args, **kwargs)
        if callback:
            self.io_loop.add_future(future,
                                    lambda future: callback(future.result()))
        return future
    return wrapper

關於 functions.wraps() 的介紹可以參考官方文檔 functools — Higher-order functions and operations on callable objects

簡單的說,這里對傳遞進來的函數進行了封裝,並用 self.executor.submit() 對包裝的函數進行了執行,並判斷是否有回調,如果有,就加入到 ioloop 的 callback 里面。

對比官方的 concurrent.futures.Executor 的接口,里面有個 submit() 方法,從頭至尾看看ThreadedResolver 的實現,就是使用了 concurrent.futures.ThreadPoolExecutor 這個 Executor 的子類。

所以 tornado 中解析 dns 使用的多線程無阻塞的方法的實質就是使用了 concurrent.futures 提供的ThreadPoolExecutor 功能。


使用多線程無阻塞方法來執行耗時的任務

借鑒 tornado 的使用方法,在我們自己的程序中也使用這種方法來處理耗時的任務。

from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
class LongTimeTask(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(10)
    @run_on_executor()
    def get(self, data):
        long_time_task(data)

上面就是一個基本的使用方法,下面展示一個使用 sleep() 來模擬耗時的完整程序。

#!/usr/bin/env python
#-*-coding:utf-8-*-
import tornado.ioloop
import tornado.web
import tornado.httpserver
from concurrent.futures import ThreadPoolExecutor
from tornado.concurrent import run_on_executor
import time
class App(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r'/', IndexHandler),
            (r'/sleep/(\d+)', SleepHandler),
        ]
        settings = dict()
        tornado.web.Application.__init__(self, handlers, **settings)
class BaseHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(10)
class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world %s" % time.time())
class SleepHandler(BaseHandler):
    @run_on_executor
    def get(self, n):
        time.sleep(float(n))
        self._callback()
    def _callback(self):
        self.write("after sleep, now I'm back %s" % time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
if __name__ == "__main__":
    app = App()
    server = tornado.httpserver.HTTPServer(app, xheaders=True)
    server.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

此時先調用 127.0.0.1:8888/sleep/10 不會阻塞 127.0.0.1:8888/ 了。

以上,就是完整的在 tornado 中利用多線程來執行耗時的任務。


結語

epoll 的好處確實很多,事件就緒通知后,上層任務函數執行任務,如果任務本身需要較耗時,那么就可以考慮這個方法了,
當然也有其他的方法,比如使用 celery 來調度執行耗時太多的任務,比如頻繁的需要寫入數據到不同的文件中,我公司的一個項目中,需要把數據寫入四千多個文件中,每天產生幾億條數據,就是使用了 tornado + redis + celery 的方法來高效的執行寫文件任務。

完。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM