深入tornado中的http1connection


前言

  tornado中http1connection文件的作用極其重要,他實現了http1.x協議。

  本模塊基於gen模塊和iostream模塊實現異步的處理請求或者響應。

  閱讀本文需要一些基礎的http知識。

 

正文:

  http協議是建立在tcp基礎上的應用層協議,tcp層由TCPServer,IOStream負責,對http報文的讀取與解析則由http1connection負責。當http報文被解析完成再交由給某個delegate類實例負責進行后續處理。

接下來說一下大體的過程(tornado作為服務器端):

  如果客戶端要向服務端發送http請求,首先要建立tcp連接,

  tornado在連接建立完成后會將連接封裝為一個IOStream對象,這個對象可以異步的從連接中讀寫數據

  tornado中又實現了HTTP1ServerConnection與HTTP1Connection兩個類,他們依賴於底層的IOStream從套接字中讀寫,並共同合作完成了http1.x協議。

  HTTP1Connection實際主要是用來處理http事務(http權威指南:http事務是由一條請求以及對應該請求的響應組成),當然他自己實現了前半部分,也就是對報文起始行、首部、主體進行讀,解析;后半部分需要配合HTTPMessageDelegate進行工作。

  HTTPMessageDelegate對經過HTTP1Connection解析后的報文進行分配,然后由其他類(比如說RequestHandler)執行具體的業務邏輯。

  其他類生成響應,並且把響應發送到IOStream中,這時就表示着這條http事務已經完成,我們需要根據情況判斷是否關閉連接。

  HTTP1ServerConnection則是不斷的生成HTTP1Connection實例,也就是不斷的處理http事務,直到連接關閉。

HTTP1Connection類:

  先看HTTP1Connection這個類。這個類實際上主要完成的工作可以概括為兩個:

    1  讀取並解析報文消息

    2  寫入報文

1 讀取並解析報文

  當請求或者響應到來的時候,read_response是解析消息的入口,盡管該方法讀起來好像僅僅是針對響應,但因為不管是請求或者是響應格式是相差不大的,所以不管是請求或者是響應他都是可以處理的。read_response中主要調用了_read_message方法,解析報文的邏輯也都是在_read_message方法中,另外,本文主要是對其作為服務端時進行分析

先來說一下_read_mssage的大體邏輯:

  首先HTTP1Connection基於iostream讀取請求報文,並對請求報文進行解析,分離出起始行 請求首部,並根據請求首部判定是否讀取消息主體以及消息主體的長度。

  在這個過程中,分析起始行的信息然后委托給代理(HTTPMessageDelegate)獲取對應的RequestHandler(這一步主要是_RequestDispatcher類實現的),並實例化,根據起始行的method調用相關方法,在調用方法執行業務邏輯時可能會用到模板語言,cookie,csrf等等其他東西,但最終會產生響應,並將響應發送到IOStream中。這些工作都是delegate干的。

  最后HTTP1Connection等待響應發送完成(這一步操作是異步的),根據是否支持keep-alive決定是否處理完后關閉連接

    def read_response(self, delegate):
        """Read a single HTTP response.
        """
        if self.params.decompress:
            delegate = _GzipMessageDelegate(delegate, self.params.chunk_size)
        return self._read_message(delegate)

    @gen.coroutine
    def _read_message(self, delegate):    # self是HTTP1Connection實例對象,delegate是_ServerRequestAdapter實例對象
        need_delegate_close = False
        try:
            header_future = self.stream.read_until_regex(b"\r?\n\r?\n", max_bytes=self.params.max_header_size)
            # 兩種方式來等待請求頭(在服務器模式下是請求,客戶端模式下是響應)的讀,第一種是什么時候發送過來什么時候讀,
            # 第二種是設置超時時間時長的定時任務,如果這段時間內沒有發送過來那么就關閉連接
            if self.params.header_timeout is None: # 第一種
                header_data = yield header_future  # 獲取起始行以及頭部信息的bytes流
            else:    # 第二種
                try:
                    header_data = yield gen.with_timeout(
                        self.stream.io_loop.time() + self.params.header_timeout,
                        header_future,
                        io_loop=self.stream.io_loop,
                        quiet_exceptions=iostream.StreamClosedError)
                except gen.TimeoutError:
                    self.close()
                    raise gen.Return(False)
            start_line, headers = self._parse_headers(header_data)    # 獲取起始行以及頭部信息
            if self.is_client:
                start_line = httputil.parse_response_start_line(start_line)
                self._response_start_line = start_line
            else:
                start_line = httputil.parse_request_start_line(start_line)
                self._request_start_line = start_line
                self._request_headers = headers

            self._disconnect_on_finish = not self._can_keep_alive(start_line, headers) # 如果不是keep alive在響應結束后關閉連接
            need_delegate_close = True
            with _ExceptionLoggingContext(app_log):
                # 這一步會做了很多東西,如果是服務器端,這一步會設置request對象,並根據請求中的url選擇對應handler
                header_future = delegate.headers_received(start_line, headers)
                if header_future is not None:
                    yield header_future
            if self.stream is None:
                # We've been detached.
                need_delegate_close = False
                raise gen.Return(False)
            skip_body = False
            if self.is_client:  # 作為client
                if (self._request_start_line is not None and
                        self._request_start_line.method == 'HEAD'):  # 如果方法是HEAD,那么默認是沒有主體的。即使有主體也會被忽略掉
                    skip_body = True
                code = start_line.code
                # 如果客戶端發送了一個帶條件的GET 請求且該請求已被允許,而文檔的內容(自上次訪問以來或者根據請求的條件)並沒有改變,則服務器應當返回這個304狀態碼
                if code == 304:  
                    # 304報文可能會包含content-length首部屬性,但實際上是沒有消息主體的
                    # http://tools.ietf.org/html/rfc7230#section-3.3
                    skip_body = True
                if code >= 100 and code < 200: # 臨時的響應。客戶端在收到常規響應之前,應准備接收一個或多個1XX響應
                    # 1xx 報文是不能包含主體信息的
                    if ('Content-Length' in headers or
                            'Transfer-Encoding' in headers):
                        raise httputil.HTTPInputError(
                            "Response code %d cannot have body" % code)
                    # 我們所需要的真正的響應還沒有接收到,所以繼續接收
                    yield self._read_message(delegate)
            else:
                # 1、Expect:100-continue 用於客戶端在發送POST數據給服務器前,征詢服務器情況,看服務器是否處理POST的數據,
                # 如果不處理,客戶端則不上傳POST數據,如果處理,則POST上傳數據。在現實應用中,通過在POST大數據時,才會使用100-continue協議。
                # http://www.cnblogs.com/tekkaman/archive/2013/04/03/2997781.html
                if (headers.get("Expect") == "100-continue" and not self._write_finished):
                    # 默認是支持的,所以收到請求后,返回100。
                    self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")
            if not skip_body:
# 這一步讀取消息主體 body_future
= self._read_body(start_line.code if self.is_client else 0, headers, delegate) if body_future is not None: if self._body_timeout is None: yield body_future else: try: yield gen.with_timeout( self.stream.io_loop.time() + self._body_timeout, body_future, self.stream.io_loop, quiet_exceptions=iostream.StreamClosedError) except gen.TimeoutError: gen_log.info("Timeout reading body from %s", self.context) self.stream.close() raise gen.Return(False) self._read_finished = True if not self._write_finished or self.is_client: need_delegate_close = False with _ExceptionLoggingContext(app_log): # 如果是服務器端,這一步會生成對應的handler實例,然后執行業務邏輯,最后將響應寫入IOStream中 delegate.finish() # If we're waiting for the application to produce an asynchronous # response, and we're not detached, register a close callback # on the stream (we didn't need one while we were reading) # 等待異步響應完成,所有數據都寫入 fd,才繼續后續處理,詳細見 _finish_request/finish 方法實現。 # 當異步寫完成,在HTTPServerRequest中調用當前對象的finish方法,finish方法則會調用_finisth_request方法,該方法內部會對_finish_future對象set_result if (not self._finish_future.done() and self.stream is not None and not self.stream.closed()): self.stream.set_close_callback(self._on_connection_close) yield self._finish_future # 判定是否關閉連接,服務器端一般等待客戶端主動關閉,而如果是客戶端則根據是否持久連接進行關閉 if self.is_client and self._disconnect_on_finish: self.close() if self.stream is None: raise gen.Return(False) except httputil.HTTPInputError as e: gen_log.info("Malformed HTTP message from %s: %s", self.context, e) self.close() raise gen.Return(False) finally: if need_delegate_close: with _ExceptionLoggingContext(app_log): delegate.on_connection_close() self._clear_callbacks() raise gen.Return(True)

值得注意的是,該方法中讀取報文主體的幾種方式:
  1 假設沒有開啟keep-alive,那么我們將連接結束作為報文終止的標志

  2 而如果開啟了keep-alive,那么我們根據Content-Length確定當前報文的終止位置

  3 如果開啟了分塊傳輸編碼(Transfer-Encoding:chunked,這時候Content-Length就不起作用了,實際上在tornado中如果Content-Length以及分塊傳輸編碼都指定則會返回錯誤)那么就會根據分塊傳輸編碼的格式一直讀取,直到讀取到b"0\r\n"時就可以確定當前報文已終止

_read_body方法則根據情況選擇讀取報文主體的方式,以上三種選擇分別對應於以下三種方法:

  1 _read_body_until_close

  2 _read_fixed_body

  3 _read_chunked_body

 來看一下代碼:

    def _read_body(self, code, headers, delegate):
        # https://imququ.com/post/transfer-encoding-header-in-http.html
        if "Content-Length" in headers:
            if "Transfer-Encoding" in headers:
                # Response cannot contain both Content-Length and
                # Transfer-Encoding headers.
                # If a message is received with both a Transfer-Encoding and a
                # Content-Length header field, the Transfer-Encoding overrides the
                # Content-Length.  
                # http://tools.ietf.org/html/rfc7230#section-3.3.3
                raise httputil.HTTPInputError("Response with both Transfer-Encoding and Content-Length")
            if "," in headers["Content-Length"]:
                # Proxies sometimes cause Content-Length headers to get
                # duplicated.  If all the values are identical then we can
                # use them but if they differ it's an error.
                pieces = re.split(r',\s*', headers["Content-Length"])
                if any(i != pieces[0] for i in pieces):
                    raise httputil.HTTPInputError(
                        "Multiple unequal Content-Lengths: %r" %
                        headers["Content-Length"])
                headers["Content-Length"] = pieces[0]

            try:
                content_length = int(headers["Content-Length"])
            except ValueError:
                # Handles non-integer Content-Length value.
                raise httputil.HTTPInputError(
                    "Only integer Content-Length is allowed: %s" % headers["Content-Length"])

            if content_length > self._max_body_size:
                raise httputil.HTTPInputError("Content-Length too long")
        else:
            content_length = None

        if code == 204:        # 狀態碼204(無內容)
            # This response code is not allowed to have a non-empty body,
            # and has an implicit length of zero instead of read-until-close.
            # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3
            if ("Transfer-Encoding" in headers or content_length not in (None, 0)):
                raise httputil.HTTPInputError(
                    "Response with code %d should not have body" % code)
            content_length = 0

        if content_length is not None: # 而如果開啟了keep-alive,那么我們根據Content-Length確定當前報文的終止位置
            return self._read_fixed_body(content_length, delegate)
        if headers.get("Transfer-Encoding") == "chunked": # 開啟了分塊傳輸編碼
            return self._read_chunked_body(delegate)
        if self.is_client: # 非持久連接
            return self._read_body_until_close(delegate)
        return None

    @gen.coroutine
    def _read_fixed_body(self, content_length, delegate):
        while content_length > 0:
            body = yield self.stream.read_bytes(min(self.params.chunk_size, content_length), partial=True)
            content_length -= len(body)
            if not self._write_finished or self.is_client:
                with _ExceptionLoggingContext(app_log):
                    ret = delegate.data_received(body)
                    if ret is not None:
                        yield ret

    @gen.coroutine
    def _read_chunked_body(self, delegate):
        # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1
        total_size = 0
        while True:
            # 先讀取chunk長度
            chunk_len = yield self.stream.read_until(b"\r\n", max_bytes=64)
            chunk_len = int(chunk_len.strip(), 16)
            # 如果chunk長度為0,表示分塊傳輸的終止
            if chunk_len == 0:
                return
            total_size += chunk_len
            # 檢測長度是否過大
            if total_size > self._max_body_size:
                raise httputil.HTTPInputError("chunked body too large")
            bytes_to_read = chunk_len
            while bytes_to_read:
                # 讀取該長度對應的data
                chunk = yield self.stream.read_bytes(
                    min(bytes_to_read, self.params.chunk_size), partial=True)
                bytes_to_read -= len(chunk)
                if not self._write_finished or self.is_client:
                    with _ExceptionLoggingContext(app_log):
                        # 讀取的消息主體要交給代理(HTTPMessageDelegate)處理
                        ret = delegate.data_received(chunk)
                        if ret is not None:
                            yield ret
            # chunk ends with \r\n
            # 每一個data后面都有一個CRLF
            crlf = yield self.stream.read_bytes(2)
            assert crlf == b"\r\n"

    @gen.coroutine
    def _read_body_until_close(self, delegate):
        body = yield self.stream.read_until_close()
        if not self._write_finished or self.is_client:
            with _ExceptionLoggingContext(app_log):
                delegate.data_received(body)
View Code

 2 寫入報文

  寫入報文主要可以分兩步:

    1  寫入報文起始行以及頭部

    2  寫入報文主體

與之相關的方法主要有三個,來看源碼:

    def write_headers(self, start_line, headers, chunk=None, callback=None):
        """Implements `.HTTPConnection.write_headers`.寫入起始行和消息頭"""
        lines = []
        if self.is_client:    # 客戶端,那就是發送請求了
            self._request_start_line = start_line
            lines.append(utf8('%s %s HTTP/1.1' % (start_line[0], start_line[1])))
            # Client requests with a non-empty body must have either a
            # Content-Length or a Transfer-Encoding.
            self._chunking_output = (
                start_line.method in ('POST', 'PUT', 'PATCH') and
                'Content-Length' not in headers and
                'Transfer-Encoding' not in headers)
        else:    # 服務端,那就是發送響應了
            self._response_start_line = start_line
            lines.append(utf8('HTTP/1.1 %d %s' % (start_line[1], start_line[2])))
            self._chunking_output = (
                # TODO: should this use
                # self._request_start_line.version or
                # start_line.version?
                self._request_start_line.version == 'HTTP/1.1' and
                # 304 responses have no body (not even a zero-length body), and so
                # should not have either Content-Length or Transfer-Encoding.
                # headers.
                start_line.code not in (204, 304) and
                # No need to chunk the output if a Content-Length is specified.
                'Content-Length' not in headers and
                # Applications are discouraged from touching Transfer-Encoding,
                # but if they do, leave it alone.
                'Transfer-Encoding' not in headers)
            # If a 1.0 client asked for keep-alive, add the header.
            if (self._request_start_line.version == 'HTTP/1.0' and
                (self._request_headers.get('Connection', '').lower() ==
                 'keep-alive')):
                headers['Connection'] = 'Keep-Alive'
        # tornado無論作為客戶端還是服務端默認是支持分塊傳輸的
        if self._chunking_output:
            headers['Transfer-Encoding'] = 'chunked'
        # 這下這種情況消息主體應為空
        if (not self.is_client and
            (self._request_start_line.method == 'HEAD' or
             start_line.code == 304)):
            self._expected_content_remaining = 0
        elif 'Content-Length' in headers:
            self._expected_content_remaining = int(headers['Content-Length'])
        else:
            self._expected_content_remaining = None
        # TODO: headers are supposed to be of type str, but we still have some
        # cases that let bytes slip through. Remove these native_str calls when those
        # are fixed.
        header_lines = (native_str(n) + ": " + native_str(v) for n, v in headers.get_all())
        if PY3:
            lines.extend(l.encode('latin1') for l in header_lines)
        else:
            lines.extend(header_lines)
        for line in lines:
            if b'\n' in line:
                raise ValueError('Newline in header: ' + repr(line))
        future = None
        if self.stream.closed():
            future = self._write_future = Future()
            future.set_exception(iostream.StreamClosedError())
            future.exception()
        else:
            if callback is not None:
                self._write_callback = stack_context.wrap(callback)
            else:
                future = self._write_future = Future()
            data = b"\r\n".join(lines) + b"\r\n\r\n"
            if chunk:
                data += self._format_chunk(chunk)
            self._pending_write = self.stream.write(data)
            self._pending_write.add_done_callback(self._on_write_complete)
        return future

    def _format_chunk(self, chunk):
        '''如果采用了分塊傳輸編碼,則將參數chunk轉換為http中規定的格式。如果沒有使用分塊傳輸編碼則原樣返回'''
        if self._expected_content_remaining is not None:
            self._expected_content_remaining -= len(chunk)
            if self._expected_content_remaining < 0:
                # Close the stream now to stop further framing errors.
                self.stream.close()
                raise httputil.HTTPOutputError(
                    "Tried to write more data than Content-Length")
        if self._chunking_output and chunk:
            # Don't write out empty chunks because that means END-OF-STREAM
            # with chunked encoding
            return utf8("%x" % len(chunk)) + b"\r\n" + chunk + b"\r\n"
        else:
            return chunk

    def write(self, chunk, callback=None):
        """Implements `.HTTPConnection.write`. 寫入報文主體

        For backwards compatibility is is allowed but deprecated to
        skip `write_headers` and instead call `write()` with a
        pre-encoded header block.
        """
        future = None
        if self.stream.closed():
            future = self._write_future = Future()
            self._write_future.set_exception(iostream.StreamClosedError())
            self._write_future.exception()
        else:
            if callback is not None:
                self._write_callback = stack_context.wrap(callback)
            else:
                future = self._write_future = Future()
            self._pending_write = self.stream.write(self._format_chunk(chunk))
            self._pending_write.add_done_callback(self._on_write_complete)
        return future
View Code

 

HTTP1ServerConnection類

HTTP1ServerConnection比較簡單,主要實現了服務端處理邏輯:

  在本條tcp連接上,不停的處理http事務(當然也有可能客戶端不支持持久連接所以處理完一條http事務后,tcp連接被關閉)

  當發生異常時,關閉連接。

其中start_serving方法是入口,其內部調用了_server_request_loop,來看代碼

def start_serving(self, delegate):
        """
            Starts serving requests on this connection.

            :arg delegate: a `.HTTPServerConnectionDelegate`
        """
        assert isinstance(delegate, httputil.HTTPServerConnectionDelegate) 
        self._serving_future = self._server_request_loop(delegate)
        # Register the future on the IOLoop so its errors get logged.
        self.stream.io_loop.add_future(self._serving_future, lambda f: f.result())

    @gen.coroutine
    def _server_request_loop(self, delegate):
        try:
            while True:
                # 不斷處理http事務,知道連接關閉或者出現異常
                conn = HTTP1Connection(self.stream, False, self.params, self.context)
                request_delegate = delegate.start_request(self, conn)    
                try:
                    ret = yield conn.read_response(request_delegate)
                except (iostream.StreamClosedError, iostream.UnsatisfiableReadError): # 連接關閉
                    return
                except _QuietException:
                    # This exception was already logged.
                    conn.close()
                    return
                except Exception:
                    gen_log.error("Uncaught exception", exc_info=True)
                    conn.close()
                    return
                if not ret:
                    return
                yield gen.moment
        finally:
            delegate.on_close(self)
View Code

 

參考:

  http://www.cnblogs.com/tekkaman/archive/2013/04/03/2997781.html

  http://strawhatfy.github.io/2015/11/02/tornado.http1connection.HTTP1Connection/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM