Python WSGI 協議


0. WSGI協議

0.1 WSGI概念

出自python的增強性建議書:PEP-3333,由PEP-333發展而來(為了支持python3)全稱Web Server Gateway Interface

在python中有各種web應用框架,不同的應用框架會限制使用他們的web服務器,相比於JAVA,它雖然也有眾多的web開發框架,但自從servlet API出現之后,JAVA web框架都可以在支持servlet API的web服務器上運行。WSGI協議也就是充當了srvlet API這樣的一個角色,它定義了應用或框架和web服務器之間通信的接口,使得python的web框架可以在任何支持WSGI協議的web服務器上運行。

那到底開發遵循WSGI協議去開發框架、應用、web服務器等帶給我們什么好處呢

至少有了一個明確的領域划分,我們不需要在開發一個web應用或框架的同時還要去想着去實現一遍web服務器的功能,專注各自的領域,減少重復造輪子。

在就是移植性強,項目靈活,我們不要再去考慮說只能在項目里使用一種web框架,當我們的web服務器遵守了WSGI協議,應用層的框架選擇不在是問題。

0.2 WSGI協議內容

WSGI協議把整個web服務端分為三個部分,ServerApplicationMiddleware

0.2.1. Server

Server端每次從http客戶端收到一個請求,就調用一次應用對象。需要實現的是一個將請求中包含的參數、請求頭、元數據寫入到一個字典中,和一個返回數據給客戶端的函數,一並傳入到Application中。

下面用python實現了一個cgi進程,通過環境變量獲取一個請求的參數,並用cgi進程處理請求輸出到標准輸出。

 
 
 
 
 
 
 
import os, sys
enc, esc = sys.getfilesystemencoding(), 'surrogateescape'
def unicode_to_wsgi(u):
    # Convert an environment variable to a WSGI "bytes-as-unicode" string
    return u.encode(enc, esc).decode('iso-8859-1')
def wsgi_to_bytes(s):
    return s.encode('iso-8859-1')
def run_with_cgi(application):
    environ = {k: unicode_to_wsgi(v) for k,v in os.environ.items()}
    environ['wsgi.input']       = sys.stdin.buffer
    environ['wsgi.errors']       = sys.stderr
    environ['wsgi.version']     = (1, 0)
    environ['wsgi.multithread'] = False
    environ['wsgi.multiprocess'] = True
    environ['wsgi.run_once']     = True
    if environ.get('HTTPS', 'off') in ('on', '1'):
        environ['wsgi.url_scheme'] = 'https'
    else:
        environ['wsgi.url_scheme'] = 'http'
    headers_set = []
    headers_sent = []
    def write(data):
        out = sys.stdout.buffer
        if not headers_set:
             raise AssertionError("write() before start_response()")
        elif not headers_sent:
             # Before the first output, send the stored headers
             status, response_headers = headers_sent[:] = headers_set
             out.write(wsgi_to_bytes('Status: %s\r\n' % status))
             for header in response_headers:
                 out.write(wsgi_to_bytes('%s: %s\r\n' % header))
             out.write(wsgi_to_bytes('\r\n'))
        out.write(data)
        out.flush()
    def start_response(status, response_headers, exc_info=None):
        if exc_info:
            try:
                if headers_sent:
                    # Re-raise original exception if headers sent
                    raise exc_info[1].with_traceback(exc_info[2])
            finally:
                exc_info = None     # avoid dangling circular ref
        elif headers_set:
            raise AssertionError("Headers already set!")
        headers_set[:] = [status, response_headers]
        return write
    result = application(environ, start_response)
    try:
        for data in result:
            if data:    # don't send headers until body appears
                write(data)
        if not headers_sent:
            write('')   # send headers now if body was empty
    finally:
        if hasattr(result, 'close'):
            result.close()
 

0.2.2. Application

應用就是一個簡單的接受兩個參數的可調用對象,可以是函數,方法,類,實現了call的實例,該應用對象必須可以被多次調用,web服務器會重復的調用它。

基本結構:

 
 
 
 
 
 
 
def application(environ, start_response):
    start_response('200 OK', [('Content-Type', 'text/plain')])
    return ['Hello World!']
 app = application()
 

先用回調函數start_response返回狀態碼響應頭,最后返回響應正文,響應正文是可迭代對象。

下面是Application兩種實現:函數和類

 
 
 
 
 
 
 
def simple_app(environ, start_response):
    """最簡單的應用對象"""
    status = '200 OK'
    response_headers = [('Content-type', 'text/plain')]
    start_response(status, response_headers)
    return b"Hello world!\n"
    
class AppClass:
  """
   生成一個AppClass的實例對象,那是一個生成器對象,當我們遍歷這個對象,就會執行
   __iter__方法,來達到重復執行的效果。當然我們想通過執行這個實例對象來執行,可以去  實現__call__方法。
   """
  def __init__(self, environ, start_response):
        self.environ = environ
        self.start = start_response
    def __iter__(self):
        status = '200 OK'
        response_headers = [('Content-type', 'text/plain')]
        self.start(status, response_headers)
        yield  b"Hello world!\n"
 

0.2.3. Middleware

中間件是是一個可以與兩端交互的組件,也可看做是一個Application,它接受一個Application作為參數,並返回一個Application,這正是利用了Application的可嵌套性,用法類似app = mw1(mw2(app)),常見用法

  • 重寫environ,然后基於 URL,將請求對象路由給不同的應用對象

  • 支持多個應用或者框架順序地運行於同一個進程中

  • 通過轉發請求和響應,支持負載均衡和遠程處理

  • 支持對內容做后處理(postprocessing)

0.3. WSGI協議的一些延伸

0.3.1 關於uWSGI和uwsgi

uWSGI是一個web服務器,它實現了WSGI接口。

而uwsgi是一種二進制協議,用於兩個web服務器用來通信,常見是使用nginx和uWSGI一起部署,nginx是uWSGI之間通訊使用uwsgi協議,而nginx就負責把http協議包轉換成uwsgi協議包。

當然uWSGI是可以不依賴於nginx的,但客戶端到服務端通常用的都是http協議,那么在uWSGI服務器就有兩種選擇:自己將http協議解析成uwsgi協議,它起了一個http進程接受客戶端請求並解析然后用uwsgi協議傳遞到每個uWSGI服務器的work;另外一種就是整個過程都用http協議流通。

 
 
 
 
 
 
 
 uwsgi --http :9000 --wsgi-file flask_test.py --processes 1 --threads 1 --callable app
 

中文翻譯文檔:https://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/Options.html

0.3.1 什么是envrion

文檔https://www.python.org/dev/peps/pep-3333/#environ-variables

 
 
 
 
 
 
 
{'wsgi.errors': <gunicorn.http.wsgi.WSGIErrorsWrapper object at 0x7f8416243a58>, 'wsgi.version': (1, 0), 'wsgi.multithread': False, 'wsgi.multiprocess': False, 'wsgi.run_once': False, 'wsgi.file_wrapper': <class 'gunicorn.http.wsgi.FileWrapper'>, 'SERVER_SOFTWARE': 'gunicorn/19.9.0', 'wsgi.input': <gunicorn.http.body.Body object at 0x7f84162434a8>, 'gunicorn.socket': <socket.socket fd=17, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8001), raddr=('127.0.0.1', 59571)>, 'REQUEST_METHOD': 'GET', 'QUERY_STRING': 'dd=1', 'RAW_URI': '/v1/frontend/competition?dd=1', 'SERVER_PROTOCOL': 'HTTP/1.1', 'HTTP_AUTHORIZATION': '123', 'HTTP_USER': '123', 'CONTENT_TYPE': 'application/json', 'HTTP_USER_AGENT': 'PostmanRuntime/7.26.1', 'HTTP_ACCEPT': '*/*', 'HTTP_POSTMAN_TOKEN': '992ee3de-4eba-4866-b32a-9705ddb742fd', 'HTTP_HOST': 'localhost:8001', 'HTTP_ACCEPT_ENCODING': 'gzip, deflate, br', 'HTTP_CONNECTION': 'keep-alive', 'CONTENT_LENGTH': '572', 'HTTP_COOKIE': 'session=835faf17-25e5-432c-9c2a-fe6dfa4f0d99', 'wsgi.url_scheme': 'http', 'REMOTE_ADDR': '127.0.0.1', 'REMOTE_PORT': '59571', 'SERVER_NAME': '0.0.0.0', 'SERVER_PORT': '8001', 'PATH_INFO': '/v1/frontend/competition', 'SCRIPT_NAME': ''}
 

 

1. 介紹

Werkzeug是一個WSGI的工具庫,你可以使用他來構建應用或框架。

2. 實現一個簡單的web應用

從WSGI協議來看,一個WSGI應用應該是這樣的

 
 
 
 
 
 
 
def application(environ, start_response):
    start_response('200 OK', [('Content-Type', 'text/plain')])
    return ['Hello World!']
 

我們可以使用Werkzeug封裝好的類Response,來幫我們簡化上述代碼

 
 
 
 
 
 
 
from werkzeug.wrappers import Response
def application(environ, start_response):
    response = Response('Hello World!', mimetype='text/plain')
    return response(environ, start_response)
 

2.1 開始我們的應用

我們用類的方式來實現一個application,在python的魔法函數__call__下執行start_response,這樣我們可以用執行對象的方式app()來處理請求。

 
 
 
 
 
 
 
class MyApp(object):
    def __init__(self):
        print("創建app")
    def dispatch_request(self, request):
        return Response("hello world")
    def wsgi_app(self, environ, start_response):
        request = Request(environ)
        response = self.dispatch_request(request)
        return response(environ, start_response)
    def __call__(self, environ, start_response):
        return self.wsgi_app(environ, start_response)
 

函數wsgi_app是我們整個應用的核心,這樣的寫法容易讓我們去擴展WSGI中間件,里面創建了一個Request對象,傳遞給dispatch_request並返回一個WSGI應用Response

現在我們可以構建一個統一的生成函數來生成一個MyApp對象,方便管理。

 
 
 
 
 
 
 
def create_app():
    app = MyApp()
    return app
 

最后我們通過WerkZeug來實現一個簡單的本地web服務器,傳入我們寫的應用。

 
 
 
 
 
 
 
if __name__ == '__main__':
  from werkzeug.serving import run_simple
    app = create_app()
    run_simple('127.0.0.1', 5000, app)
 

2.1.1 flask應用的是怎么啟動的

我們看一下我們DSP項目的app_runner

 
 
 
 
 
 
 
def run():
    app = create_app()
    # 啟動celery worker
    if config.RUN_MODEL in ('ALL', 'WORKER'):
        worker_thread = Thread(target=run_worker, args=(app,))
        worker_thread.daemon = True
        worker_thread.start()
    # 啟動flask項目
    if config.RUN_MODEL in ('ALL', 'CONTROLLER'):
        try:
            app.run('0.0.0.0', 8001, debug=app.config["DEBUG"], threaded=True)
        except Exception as e:
            LOG.info("Program exit unexpectly because an error {}".format(e))
 

我們進入到app.run()發現里面有這么一句

from werkzeug.serving import run_simple
try:
		run_simple(host, port, self, **options)
finally:
		self._got_first_request = False

所以我們知道,我們的app_runner其實本質上用的還是werkzeug的run_simple.

2.1.2 那到底這個run_simple干了些什么

def run_simple(
    hostname,
    port,
    application,
    use_reloader=False,
    use_debugger=False,
    use_evalex=True,
    extra_files=None,
    reloader_interval=1,
    reloader_type="auto",
    threaded=False,
    processes=1,
    request_handler=None,
    static_files=None,
    passthrough_errors=False,
    ssl_context=None,
):
    """Start a WSGI application. Optional features include a reloader,
    multithreading and fork support.
    ……

啟動一個WSGI的應用,可選特性包括 自動加載、多線程、多進程。這其中我們注意兩個參數,threadedprocesses。在整個run_simple里核心是這個函數的內置函數inner

 
 
 
 
 
 
 
    def inner():
        try:
            fd = int(os.environ["WERKZEUG_SERVER_FD"])
        except (LookupError, ValueError):
            fd = None
        srv = make_server(
            hostname,
            port,
            application,
            threaded,
            processes,
            request_handler,
            passthrough_errors,
            ssl_context,
            fd=fd,
        )
        if fd is None:
            log_startup(srv.socket)
        srv.serve_forever()
 

這里的fd其實就是一個文件描述符,讓socket可以從文件描述符來創建對象。

進入make_server我們查看下代碼

 
 
 
 
 
 
 
def make_server(
    host=None,
    port=None,
    app=None,
    threaded=False,
    processes=1,
    request_handler=None,
    passthrough_errors=False,
    ssl_context=None,
    fd=None,
):
    """Create a new server instance that is either threaded, or forks
   or just processes one request after another.
   """
    if threaded and processes > 1:
        raise ValueError("cannot have a multithreaded and multi process server.")
    elif threaded:
        return ThreadedWSGIServer(
            host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
        )
    elif processes > 1:
        return ForkingWSGIServer(
            host,
            port,
            app,
            processes,
            request_handler,
            passthrough_errors,
            ssl_context,
            fd=fd,
        )
    else:
        return BaseWSGIServer(
            host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
        )
 

整個過程的邏輯很清晰,根據參數threadedprocesses來判斷最后返回的是那個WSGIServer,其實這個函數的注釋也說明了實現的功能:創建一個服務支持對請求的多線程處理、或者多進程處理、或者一個請求接一個請求處理的單線程,但是不支持多進程多線程的方式。

到此我們也知道,Flask原生支持的並發模式,我們可以再往里挖一點:

2.1.3 WSGIServer又是如何工作的

我們找到BaseWSGIServer,它繼承自HTTPServer

 
 
 
 
 
 
 
class BaseWSGIServer(HTTPServer, object):
    """Simple single-threaded, single-process WSGI server."""
 

告訴我們它是個單線程的WSGI server,在回顧run_simple下的inner函數最后執行了srv.serve_forever() 看這個名字貌似就是啟動了一個永久的服務。

 
 
 
 
 
 
 
    def serve_forever(self):
        self.shutdown_signal = False
        try:
            HTTPServer.serve_forever(self)
        except KeyboardInterrupt:
            pass
        finally:
            self.server_close()
 

你能看到就是擴展了HTTPServer.serve_forever函數,最后執行server_close

 
 
 
 
 
 
 
  if hasattr(selectors, 'PollSelector'):
      _ServerSelector = selectors.PollSelector
  else:
      _ServerSelector = selectors.SelectSelector
    ……    
   def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.
       Polls for shutdown every poll_interval seconds. Ignores
       self.timeout. If you need to do periodic tasks, do them in
       another thread.
       """
        self.__is_shut_down.clear()
        try:
            with _ServerSelector() as selector:
                selector.register(self, selectors.EVENT_READ)
                while not self.__shutdown_request:
                    ready = selector.select(poll_interval)
                    # bpo-35017: shutdown() called during select(), exit immediately.
                    if self.__shutdown_request:
                        break
                    if ready:
                        self._handle_request_noblock()
                    self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()
 

我們去尋找_ServerSelector函數,最后指向的是標准庫selectors.py ,實現IO模型selectpollepollkqueue(大部分unix系統上都存在,包括OS X)的封裝。這里用的是 poll,監控一組文件句柄,返回當有活躍的文件描述符活躍,去執行_handle_request_noblock

    def _handle_request_noblock(self):
        """Handle one request, without blocking.

        I assume that selector.select() has returned that the socket is
        readable before this function was called, so there should be no risk of
        blocking in get_request().
        """
        try:
            request, client_address = self.get_request()
        except OSError:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except Exception:
                self.handle_error(request, client_address)
                self.shutdown_request(request)
            except:
                self.shutdown_request(request)
                raise
        else:
            self.shutdown_request(request)

注釋和函數名都表明了,只要在selector返回的ready為真情況下,執行過程就不可能被阻塞。

request, client_address = self.get_request()

這個get_request函數在HTTPServer的父類TCPServer里定義了

    def get_request(self):
        """Get the request and client address from the socket.

        May be overridden.

        """
        return self.socket.accept()

最后走到了socketaccept. 返回新的socket(這里是連接套接字, 而前面的socket則是監聽套接字),和請求方的地址,在socket編程里,我們執行accept函數會阻塞,一直到客戶端有消息過來,但其實這里是根本不會阻塞的,這就是pollio模型的特點,遍歷所有的連接,直到找到一個有新消息的連接就返回真,通知服務端的socket的去accept,這時候是必定能收到值。

再回到上面的_handle_request_noblock 我們看到了一個 process_request,這個名字也告訴了我們,是請求到來時執行的操作,這個里面其實是在同步的執行指令。

def process_request(self, request, client_address):
    """Call finish_request.

    Overridden by ForkingMixIn and ThreadingMixIn.

    """
    self.finish_request(request, client_address)
    self.shutdown_request(request)

注釋也告訴我們這是可以被ForkingMixInThreadingMixIn改寫,所以在werkzeug的make_server函數中另外兩個WSGIServer其實就是去繼承這兩個類來改寫這個process_request來實現多線程多進程.

Server_forever中我們是循環的去調用selector.select(poll_interval)來獲取活躍的socket,然后執行我們的處理函數,在里面或是用os.fork實現多進程或是thread實現多線程。

 

2.2 添加路由

現在我們已經有了一個基本的wsgi應用,接下來我們需要完善一下應用的路由規則。

Werkzeug 提供了一個靈活的集成路由。 你需要創建一個 Map 實例並添加一系列 Rule 對象。每一個Rule我們可以傳入兩個參數,一個是url, 一個是endpoint

我們在MyApp中維護一個Map, 並在create_app函數里注冊我們的路由

# 添加路由
app.url_map = Map(
  [Rule('/', endpoint="new_url")]
)

endpoint在這里其實就是路由對應的函數的名稱,接下來我們實現的就是通過endpoint去指向一個函數

def dispatch_request(self, request):
  	# 根據請求路由找出匹配的endpoint,value是一個字典,代表的是路由的位置參數
    adapter = self.url_map.bind_to_environ(request.environ)
    try:
      endpoint, values = adapter.match()
      # 通過 endpoint + _handler 找到對應的函數
      return getattr(self, endpoint + '_handler')(request, **values)
    except HTTPException as e:
      print(repr(e))
      return Response("hello world")

2.2.1 flask是如何添加路由的呢

裝飾器@route(“/home”, methods=["GET"])添加路由,我們可以從源碼分析下

def route(self, rule, **options):
    def decorator(f):
    		endpoint = options.pop("endpoint", None)
    		self.add_url_rule(rule, endpoint, f, **options)
    		return f
    return decorator

rule指的就是路由,這個endpoint其實和上面的是一個意思,主要邏輯都在add_url_rule

 
 
 
 
 
 
 
def add_url_rule(
        self,
        rule,
        endpoint=None,
        view_func=None,
        provide_automatic_options=None,
        **options,
    ):
      # 沒有傳入endpoint就默認為view_func的函數名
        if endpoint is None:
            endpoint = _endpoint_from_view_func(view_func)
        options["endpoint"] = endpoint
        
        # 獲取當前view_func支持的所有函數
        methods = options.pop("methods", None)
        
        methods = {item.upper() for item in methods}
        required_methods = set(getattr(view_func, "required_methods", ()))
        methods |= required_methods
        
        # 生成werkzeug Rule對象 綁定endpoint和路由地址
        rule = self.url_rule_class(rule, methods=methods, **options)
        
        # werkzeug Map對象中添加Rule對象
        self.url_map.add(rule)
        
        # 通過endpoint來尋找綁定的視圖函數,如果已經綁定了且和當前函數不同就拋錯
        if view_func is not None:
            old_func = self.view_functions.get(endpoint)
            if old_func is not None and old_func != view_func:
                raise AssertionError(
                    "View function mapping is overwriting an existing"
                    f" endpoint function: {endpoint}"
                )
            # 存取view_func到屬性view_functions里
            self.view_functions[endpoint] = view_func
 

整個路由添加的流程是先用路由地址和endpoint生成Rule對象綁定到Map對象--也就是self.url_map,最后檢查endpoint關聯的view_func是否是被更新了,更新了則拋出異常,否則就添加endpointview_func的鍵值對到self.view_functions。如果是flask的藍圖添加的路由,則endpoint變為:藍圖名稱 + “.” + endpoint

2.2 添加WSGI中間件

從WSGI協議中我們得知可以通過app=mw1(app)的方式添加中間件

我們新建一個中間件類,作用是記錄整個響應過程用時。結構和我們的MyApp類很像,實現了一個__call__這也是一個WSGI應用,不同的是我們先保存了一個application,然后再執行中間件是實例時去處理保存的該application

 
 
 
 
 
 
 
class MyMiddleWare(object):
    """
   wsgi中間件
   """
    def __init__(self, application):
        self.application = application
        print("創建middleware")
    def __call__(self, environ, start_response):
        b = time.time()
        result = self.application(environ, start_response)
        duration = (time.time() - b)/1000
        print("duration: %f" % duration)
        return result
 

現在我們在create_app中添加該中間件, 我們可以直接改寫app的wsgi_app函數為中間件的對象(實現了__call__使它可以像函數樣調用)

app.wsgi_app = MyMiddleWare(app.wsgi_app)

2.2.1 Flask中如何添加中間件呢

在dsp項目的app_runner文件中

def create_app():
    flask_app = Flask('csp-controller')
 	  with flask_app.app_context():
        i18n(flask_app)
        create_db(flask_app)
        configure_models()
        configure_blueprints(flask_app)
        init_monitor(flask_app)
        setup_default_data()
        add_app_hook(flask_app)
    return flask_app
  
def run_worker(app=None):
    from app.scheduling.celery_app import make_celery
    if not app:
        app = create_app()
    celery_app = make_celery(app)

我們能看到在初始化了flask_app這個wsgi應用后,傳入到下面的幾個

 

2.3 最終代碼

 
 
 
 
 
 
 
from werkzeug.wrappers import Response
class MyMiddleWare(object):
    """
   wsgi中間件
   """
    def __init__(self, application):
        self.application = application
        print("創建middleware")
    def __call__(self, environ, start_response):
        b = time.time()
        result = self.application(environ, start_response)
        duration = (time.time() - b)/1000
        print("duration: %f" % duration)
        return result
class MyApp(object):
    def __init__(self):
        self.url_map = None
        print("創建app")
    def url_adapter(self):
        pass
    # handler方法需要返回Response對象(werkzeug封裝的實現wsgi application)
    def new_url_handler(self, request):
        return Response('{"code": 0}', status=404)
    def dispatch_request(self, request):
        adapter = self.url_map.bind_to_environ(request.environ)
        try:
            # 根據請求路由找出匹配的endpoint,value是一個字典,代表的是路由的位置參數
            endpoint, values = adapter.match()
            # 通過 endpoint + _handler 找到對應的函數
            return getattr(self, endpoint + '_handler')(request, **values)
        except HTTPException as e:
            print(repr(e))
            return Response("hello world")
    def wsgi_app(self, environ, start_response):
        request = Request(environ)
        response = self.dispatch_request(request)
        return response(environ, start_response)
    def __call__(self, environ, start_response):
        return self.wsgi_app(environ, start_response)
def create_app():
    app = MyApp()
    # 加入中間件
    app.wsgi_app = MyMiddleWare(app.wsgi_app)
    # 添加路由,endpoint指向的是一個函數,通過路由地址綁定到該endpoint上
    app.url_map = Map(
        [Rule('/', endpoint="new_url")]
    )
    return app
    
if __name__ == '__main__'
  app = create_app()
  run_simple('127.0.0.1', 5000, app)
 

3. 源碼解析

2.1 開始

2.2 werkzeug reloader 機制

2.2.1 介紹

我們在本地開發flask應用時,常常會用到debug模式,類似這樣app.run(debug=True)他為我們調試代碼帶來了很多方便,其中就包括代碼修改后本地服務的自動reload。但這個功能並不是flask提供的,而是werkzeug。整個過程大概的流程是這樣的

當我們在主進程中使用debug模式啟動flask,這時不在是簡單的通過run_simple()中的inner()函數啟動server,而是我們的werkzeug主進程派生出一個子進程,這個子進程負責運行我們的flask應用,還有啟用reloader`來監控代碼文件的變化,一旦代碼發生改變,退出子進程,而我們的父進程獲知到子進程退出,則重新的去創建子進程,如此循環達到一次服務的reload。

2.3.2 源碼分析

在上面提到的werkzeug的run_simple()有這么一段,是用來啟動werkzeug的reloader

 
 
 
 
 
 
 
    if use_reloader:
        # 判斷當前進程是否是reloder生成的子進程
        if not is_running_from_reloader():
            if port == 0 and not can_open_by_fd:
                raise ValueError(
                    "Cannot bind to a random port with enabled "
                    "reloader if the Python interpreter does "
                    "not support socket opening by fd."
                )
      # 先創建socket,監聽服務端口,如果當前操作系統允許根據文件操作符打開socket,就把fd
            # 保存到環境變量,否則先關閉連接
            address_family = select_address_family(hostname, port)
            server_address = get_sockaddr(hostname, port, address_family)
            s = socket.socket(address_family, socket.SOCK_STREAM)
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            s.bind(server_address)
            s.set_inheritable(True)
            if can_open_by_fd:
                os.environ["WERKZEUG_SERVER_FD"] = str(s.fileno())
                s.listen(LISTEN_QUEUE)
                log_startup(s)
            else:
                s.close()
                if address_family == af_unix:
                    _log("info", "Unlinking %s", server_address)
                    os.unlink(server_address)
        from ._reloader import run_with_reloader
        run_with_reloader(inner, extra_files, reloader_interval, reloader_type)
 

inner()函數傳入到run_with_reloader()

reloader = reloader_loops[reloader_type](extra_files, interval)
    signal.signal(signal.SIGTERM, lambda *args: sys.exit(0))
    try:
        if os.environ.get("WERKZEUG_RUN_MAIN") == "true":
            ensure_echo_on()
            t = threading.Thread(target=main_func, args=())
            t.setDaemon(True)
            t.start()
            reloader.run()
        else:
            sys.exit(reloader.restart_with_reloader())
    except KeyboardInterrupt:
        pass

這一段代碼的是根據環境變量WERKZEUG_RUN_MAIN來判斷當前進程是不是子進程,如果是子進程,那么環境變量的值就為true,后面我們會看到在主進程去生成進程的時候會設置這個變量。如果是子進程,其實就是開了一個線程用於跑我們的inner(),然后在子進程的主線程去啟動我們在上方選擇的reloader

werkzeug有兩種reloaderStatReloaderLoopWatchdogReloaderLoop,默認是StatReloaderLoop通過遍歷整個項目下的文件mtime和上次reload的存的進行比較。而WatchdogReloaderLoop則是借助第三方包

watchdog實現。

現在我們還是在主進程,且環境變量並沒有設置,進入restart_with_reloader()

 
 
 
 
 
 
 
    def restart_with_reloader(self):
        """Spawn a new Python interpreter with the same arguments as the
       current one, but running the reloader thread.
       """
        while 1:
            _log("info", f" * Restarting with {self.name}")
            args = _get_args_for_reloading()
            new_environ = os.environ.copy()
            new_environ["WERKZEUG_RUN_MAIN"] = "true"
            exit_code = subprocess.call(args, env=new_environ, close_fds=False)
            if exit_code != 3:
                return exit_code
 

到此就是整個reloader的實現的核心部分,

  • _get_args_for_reloading()這個是獲取的是運行當前進程的執行參數,也就是我們啟動flask的完整命令["python", "xxx.py"]類似這樣。

  • new_environ = os.environ.copy()是復制當前的環境變量,並添加環境變量WERKZEUG_RUN_MAIN

  • 標准庫的subprocess.call()這是阻塞式的系統命令執行方式,並指定運行環境變量,需要注意的是,這個subprocess.call()此時是阻塞的,且如果當前主進程異常退出,是會為我們kill掉生成的子進程。

  • exit_code != 3:這是父進程用來判斷子進程是否需要重啟的重要判斷,因為如果是文件改動,子進程的reloader將會調用sys.exit(3)退出子進程。

其實到此我們也清楚了整個werkzeug reloader的具體實現方式:父進程輪詢創建子進程的步驟,並監控子進程的退出碼來判斷是否要退出輪詢,而子進程就是父進程的所有運行環境的copy,只是通過環境變量WERKZEUG_RUN_MAIN來控制代碼要走的邏輯分支。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM