tornado 啟動WSGI應用(Flask)使用多線程將同步代碼變成異步


1、tornado是單線程的,同時WSGI應用又是同步的,如果我們使用Tornado啟動WSGI應用,理論上每次只能處理一個請求都是,任何一個請求有阻塞,都會導致tornado的整個IOLOOP阻塞。如下所示,我們同時發出兩個GET請求向http://127.0.0.1:5000/

會發現第一個發出的請求會在大約5s之后返回,而另一個請求會在10s左右返回,我們可以判斷,這兩個請求是順序執行的。

from tornado.wsgi import WSGIContainer
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoopfrom flask import Flask
import time



app = Flask(__name__)

@app.route('/')
def index():
    time.sleep(5)
    return 'OK'
if __name__ == '__main__':
    http_server = HTTPServer(WSGIContainer(app))
    http_server.listen(5000)
    IOLoop.instance().start()

2、我們知道,tornado實現異步運行同步函數,我們只能使用線程來運行,如下所示:

import tornado.web
import tornado.ioloop
import time
import tornado

class IndexHandler(tornado.web.RequestHandler):
    """主路由處理類"""
    @tornado.gen.coroutine
    def get(self):
        """對應http的get請求方式"""
        loop = tornado.ioloop.IOLoop.instance()
        yield loop.run_in_executor(None,self.sleep)
        self.write("Hello You!")

    def sleep(self):
        time.sleep(5)
        self.write('sleep OK')


if __name__ == "__main__":
    app = tornado.web.Application([
        (r"/", IndexHandler),
    ])
    app.listen(8000)
    tornado.ioloop.IOLoop.current().start()

3、對於這種(使用tornado運行Flask的情況)情況,我們如何做呢,查看 WSGIContainer 的代碼我們發現

我們只需要重寫整個方法,將紅圈中的部分變為異步運行即可。

代碼如下  

loop.run_in_executor 的第一個參數可以為一個ThreadPoolExecutor對象
from flask import Flask
import time
from tornado.wsgi import WSGIContainer
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop

app = Flask(__name__)


@app.route('/')
def index():
    time.sleep(5)
    return 'OK'


import tornado
from tornado import escape
from tornado import httputil
from typing import List, Tuple, Optional, Callable, Any, Dict
from types import TracebackType


class WSGIContainer_With_Thread(WSGIContainer):
    @tornado.gen.coroutine
    def __call__(self, request):
        data = {}  # type: Dict[str, Any]
        response = []  # type: List[bytes]

        def start_response(
                status: str,
                headers: List[Tuple[str, str]],
                exc_info: Optional[
                    Tuple[
                        "Optional[Type[BaseException]]",
                        Optional[BaseException],
                        Optional[TracebackType],
                    ]
                ] = None,
        ) -> Callable[[bytes], Any]:
            data["status"] = status
            data["headers"] = headers
            return response.append

        loop = tornado.ioloop.IOLoop.instance()
        app_response = yield loop.run_in_executor(None, self.wsgi_application, WSGIContainer.environ(request),
                                                  start_response)
        # app_response = self.wsgi_application(
        #     WSGIContainer.environ(request), start_response
        # )
        try:
            response.extend(app_response)
            body = b"".join(response)
        finally:
            if hasattr(app_response, "close"):
                app_response.close()  # type: ignore
        if not data:
            raise Exception("WSGI app did not call start_response")

        status_code_str, reason = data["status"].split(" ", 1)
        status_code = int(status_code_str)
        headers = data["headers"]  # type: List[Tuple[str, str]]
        header_set = set(k.lower() for (k, v) in headers)
        body = escape.utf8(body)
        if status_code != 304:
            if "content-length" not in header_set:
                headers.append(("Content-Length", str(len(body))))
            if "content-type" not in header_set:
                headers.append(("Content-Type", "text/html; charset=UTF-8"))
        if "server" not in header_set:
            headers.append(("Server", "TornadoServer/%s" % tornado.version))

        start_line = httputil.ResponseStartLine("HTTP/1.1", status_code, reason)
        header_obj = httputil.HTTPHeaders()
        for key, value in headers:
            header_obj.add(key, value)
        assert request.connection is not None
        request.connection.write_headers(start_line, header_obj, chunk=body)
        request.connection.finish()
        self._log(status_code, request)


if __name__ == '__main__':
    http_server = HTTPServer(WSGIContainer_With_Thread(app))
    http_server.listen(5000)
    IOLoop.instance().start()

注意:

  1 、這種方法實際上並沒有提高性能,說到底還是使用多線程來運行的,所以推薦如果使用tornado還是和tornado的web框架聯合起來寫出真正的異步代碼,這樣才會達到tornado異步IO的高性能目的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM