flask是python web開發比較主流的框架之一,也是我在工作中使用的主要開發框架。一直對其是如何保證線程安全的問題比較好奇,所以簡單的探究了一番,由於只是簡單查看了源碼,並未深入細致研究,因此以下內容僅為個人理解,不保證正確性。
首先是很多文章都說flask會為每一個request啟動一個線程,每個request都在單獨線程中處理,因此保證了線程安全。於是就做了一個簡單的測試。首先是寫一個簡單的flask程序(只需要有最簡單的功能用於測試即可),然后我們知道一個flask應用啟動之后實際上是作為一個 WSGI application的,之后所有接收到的請求都會經由flask的wsgi_app(self, environ, start_response)方法去處理,所以就來看一下這個方法(注釋已去掉)。
def wsgi_app(self, environ, start_response): ctx = self.request_context(environ) ctx.push() error = None try: try: response = self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) return response(environ, start_response) finally: if self.should_ignore_error(error): error = None ctx.auto_pop(error)
那么這個request_context又是什么東西呢?它是一個RequestContext對象,文檔是這么說的:
The request context contains all request relevant information. It is created at the beginning of the request and pushed to the `_request_ctx_stack` and removed at the end of it. It will create the URL adapter and request object for the WSGI environment provided.
說的很清楚,這個對象的上下文包含着request相關的信息。也就是說每一個請求到來之后,flask都會為它新建一個RequestContext對象,並且將這個對象push進全局變量_request_ctx_stack中,在push前還要檢查_app_ctx_stack,如果_app_ctx_stack的棧頂元素不存在或是與當前的應用不一致,則首先push appcontext 到_app_ctx_stack中,再push requestcontext。源碼如下:
def push(self): top = _request_ctx_stack.top if top is not None and top.preserved: top.pop(top._preserved_exc) # Before we push the request context we have to ensure that there # is an application context. app_ctx = _app_ctx_stack.top if app_ctx is None or app_ctx.app != self.app: app_ctx = self.app.app_context() app_ctx.push() self._implicit_app_ctx_stack.append(app_ctx) else: self._implicit_app_ctx_stack.append(None) if hasattr(sys, 'exc_clear'): sys.exc_clear() _request_ctx_stack.push(self) # Open the session at the moment that the request context is # available. This allows a custom open_session method to use the # request context (e.g. code that access database information # stored on `g` instead of the appcontext). self.session = self.app.open_session(self.request) if self.session is None: self.session = self.app.make_null_session()
通過上面的兩步,每一個請求的應用上下文和請求上下文就被push到了全局變量_request_ctx_stack和_app_ctx_stack中。
現在我們知道了_request_ctx_stack和_app_ctx_stack是何時被push的,每一個請求到來都會導致新的RequestContext和AppContext被建立並push,一旦請求處理完畢就被pop出去。而無論是_app_ctx_stack還是_request_ctx_stack都是一個LocalStack對象,這是werkzeug中的一個對象,看看它里邊有什么:
class LocalStack(object): def __init__(self): self._local = Local() def __release_local__(self): self._local.__release_local__() def _get__ident_func__(self): return self._local.__ident_func__ def _set__ident_func__(self, value): object.__setattr__(self._local, '__ident_func__', value) __ident_func__ = property(_get__ident_func__, _set__ident_func__) del _get__ident_func__, _set__ident_func__ def __call__(self): def _lookup(): rv = self.top if rv is None: raise RuntimeError('object unbound') return rv return LocalProxy(_lookup) def push(self, obj): """Pushes a new item to the stack""" rv = getattr(self._local, 'stack', None) if rv is None: self._local.stack = rv = [] rv.append(obj) return rv def pop(self): """Removes the topmost item from the stack, will return the old value or `None` if the stack was already empty. """ stack = getattr(self._local, 'stack', None) if stack is None: return None elif len(stack) == 1: release_local(self._local) return stack[-1] else: return stack.pop() @property def top(self): """The topmost item on the stack. If the stack is empty, `None` is returned. """ try: return self._local.stack[-1] except (AttributeError, IndexError): return None
可以看到,這個對象的幾乎所有重要屬性在_local這一屬性中,它是一個Local對象,很有意思,如果看一下Local的構造器,會發現其中包含有重要屬性__ident_func__,
def __init__(self):
object.__setattr__(self, '__storage__', {})
object.__setattr__(self, '__ident_func__', get_ident)
這一屬性由get_ident方法提供,這個方法的作用是提供當前線程的id,用於區別同時存在的多個線程Return a non-zero integer that uniquely identifiamongst other threads that exist simultaneously.
到此為止,可見作為一個全局變量_request_ctx_stack和_app_ctx_stack應該都是只有一個線程去處理,沒有發現哪里有可以為每個請求都開啟一個線程的代碼,實際測試一下,可以發現確實所有的請求都只運行在一個線程上(使用pycharm的debug模式可以看到當前程序啟動
的所有線程,在當前這種情型下除了主線程外只有一個Thread-6,無論多少請求都一樣)
這下就有趣了,傳說中的每個請求一個線程果然沒有出現,那么flask的線程安全是如何保證的呢?如果把每次請求到來時附帶的environ(wsgi_app方法參數中的environ)打印看看的話就會發現,每個environ都攜帶了請求相關的全部上下文信息,在請求到來的時候通過附帶的
environ重建context,並push到棧中,然后立刻處理該請求,處理完畢后將其pop出去。
那么很多文章說的每個請求一個線程到底是在哪里建立的呢?這就要去仔細看一下flask.app的run方法了:
def run(self, host=None, port=None, debug=None, **options): from werkzeug.serving import run_simple if host is None: host = '127.0.0.1' if port is None: server_name = self.config['SERVER_NAME'] if server_name and ':' in server_name: port = int(server_name.rsplit(':', 1)[1]) else: port = 5000 if debug is not None: self.debug = bool(debug) options.setdefault('use_reloader', self.debug) options.setdefault('use_debugger', self.debug) try: run_simple(host, port, self, **options) finally: # reset the first request information if the development server # reset normally. This makes it possible to restart the server # without reloader and that stuff from an interactive shell. self._got_first_request = False
這個方法實際上是對werkzeug的run_simple方法的簡單包裝。而run_simple方法則有趣的多(這一段把注釋也貼上)
def run_simple(hostname, port, application, use_reloader=False, use_debugger=False, use_evalex=True, extra_files=None, reloader_interval=1, reloader_type='auto', threaded=False, processes=1, request_handler=None, static_files=None, passthrough_errors=False, ssl_context=None): """Start a WSGI application. Optional features include a reloader, multithreading and fork support. This function has a command-line interface too:: python -m werkzeug.serving --help .. versionadded:: 0.5 `static_files` was added to simplify serving of static files as well as `passthrough_errors`. .. versionadded:: 0.6 support for SSL was added. .. versionadded:: 0.8 Added support for automatically loading a SSL context from certificate file and private key. .. versionadded:: 0.9 Added command-line interface. .. versionadded:: 0.10 Improved the reloader and added support for changing the backend through the `reloader_type` parameter. See :ref:`reloader` for more information. :param hostname: The host for the application. eg: ``'localhost'`` :param port: The port for the server. eg: ``8080`` :param application: the WSGI application to execute :param use_reloader: should the server automatically restart the python process if modules were changed? :param use_debugger: should the werkzeug debugging system be used? :param use_evalex: should the exception evaluation feature be enabled? :param extra_files: a list of files the reloader should watch additionally to the modules. For example configuration files. :param reloader_interval: the interval for the reloader in seconds. :param reloader_type: the type of reloader to use. The default is auto detection. Valid values are ``'stat'`` and ``'watchdog'``. See :ref:`reloader` for more information. :param threaded: should the process handle each request in a separate thread? :param processes: if greater than 1 then handle each request in a new process up to this maximum number of concurrent processes. :param request_handler: optional parameter that can be used to replace the default one. You can use this to replace it with a different :class:`~BaseHTTPServer.BaseHTTPRequestHandler` subclass. :param static_files: a dict of paths for static files. This works exactly like :class:`SharedDataMiddleware`, it's actually just wrapping the application in that middleware before serving. :param passthrough_errors: set this to `True` to disable the error catching. This means that the server will die on errors but it can be useful to hook debuggers in (pdb etc.) :param ssl_context: an SSL context for the connection. Either an :class:`ssl.SSLContext`, a tuple in the form ``(cert_file, pkey_file)``, the string ``'adhoc'`` if the server should automatically create one, or ``None`` to disable SSL (which is the default). """ if use_debugger: from werkzeug.debug import DebuggedApplication application = DebuggedApplication(application, use_evalex) if static_files: from werkzeug.wsgi import SharedDataMiddleware application = SharedDataMiddleware(application, static_files) def log_startup(sock): display_hostname = hostname not in ('', '*') and hostname or 'localhost' if ':' in display_hostname: display_hostname = '[%s]' % display_hostname quit_msg = '(Press CTRL+C to quit)' port = sock.getsockname()[1] _log('info', ' * Running on %s://%s:%d/ %s', ssl_context is None and 'http' or 'https', display_hostname, port, quit_msg) def inner(): try: fd = int(os.environ['WERKZEUG_SERVER_FD']) except (LookupError, ValueError): fd = None srv = make_server(hostname, port, application, threaded, processes, request_handler, passthrough_errors, ssl_context, fd=fd) if fd is None: log_startup(srv.socket) srv.serve_forever() if use_reloader: # If we're not running already in the subprocess that is the # reloader we want to open up a socket early to make sure the # port is actually available. if os.environ.get('WERKZEUG_RUN_MAIN') != 'true': if port == 0 and not can_open_by_fd: raise ValueError('Cannot bind to a random port with enabled ' 'reloader if the Python interpreter does ' 'not support socket opening by fd.') # Create and destroy a socket so that any exceptions are # raised before we spawn a separate Python interpreter and # lose this ability. address_family = select_ip_version(hostname, port) s = socket.socket(address_family, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((hostname, port)) if hasattr(s, 'set_inheritable'): s.set_inheritable(True) # If we can open the socket by file descriptor, then we can just # reuse this one and our socket will survive the restarts. if can_open_by_fd: os.environ['WERKZEUG_SERVER_FD'] = str(s.fileno()) s.listen(LISTEN_QUEUE) log_startup(s) else: s.close() from ._reloader import run_with_reloader run_with_reloader(inner, extra_files, reloader_interval, reloader_type) else: inner()
默認情況下會執行inner方法,inner方法創建了一個server並啟動,這樣一個flask應用算是真正的啟動了。那么秘密就在make_server里了
def make_server(host=None, port=None, app=None, threaded=False, processes=1, request_handler=None, passthrough_errors=False, ssl_context=None, fd=None): """Create a new server instance that is either threaded, or forks or just processes one request after another. """ if threaded and processes > 1: raise ValueError("cannot have a multithreaded and " "multi process server.") elif threaded: return ThreadedWSGIServer(host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd) elif processes > 1: return ForkingWSGIServer(host, port, app, processes, request_handler, passthrough_errors, ssl_context, fd=fd) else: return BaseWSGIServer(host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd)
好了,這一下我們一直以來的疑問就找到答案了,原來一個flask應用的server並非只有一種類型,它是可以設定的,默認情況下創建的是一個 BaseWSGIServer,如果指定了threaded參數就啟動一個ThreadedWSGIServer,如果設定的processes>1則啟動一個ForkingWSGIServer。
事已至此,后面的事情就是追本溯源了:
class ThreadedWSGIServer(ThreadingMixIn, BaseWSGIServer): """A WSGI server that does threading.""" multithread = True
ThreadedWSGIServer是ThreadingMixIn和BaseWSGIServer的子類,
class ThreadingMixIn: """Mix-in class to handle each request in a new thread.""" # Decides how threads will act upon termination of the # main process daemon_threads = False def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try: self.finish_request(request, client_address) self.shutdown_request(request) except: self.handle_error(request, client_address) self.shutdown_request(request) def process_request(self, request, client_address): """Start a new thread to process the request.""" t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()
源碼寫的太明白了,原來是ThreadingMixIn的實例以多線程的方式去處理每一個請求,這樣對開發者來說,只有在啟動app時將threaded參數設定為True,flask才會真正以多線程的方式去處理每一個請求。
實際去測試一下,發現將threaded設置沒True后,果然每一個請求都會開啟一個單獨的線程去處理。