Python Tornado框架(TCP層)


Tornado在TCP層里的工作機制

上一節是關於應用層的協議 HTTP,它依賴於傳輸層協議 TCP,例如服務器是如何綁定端口的?HTTP 服務器的 handle_stream 是在什么時候被調用的呢?本節聚焦在 TCP 層次的實現,以便和上節的程序流程銜接起來。

首先是關於 TCP 協議。這是一個面向連接的可靠交付的協議。由於是面向連接,所以在服務器端需要分配內存來記憶客戶端連接,同樣客戶端也需要記錄服務器。由於保證可靠交付,所以引入了很多保證可靠性的機制,比如定時重傳機制,SYN/ACK 機制等,相當復雜。所以,在每個系統里的 TCP 協議軟件都是相當復雜的,本文不打算深入談這些(我也談不了多少,呵呵)。但我們還是得對 TCP 有個了解。先上一張圖(UNIX 網絡編程)-- 狀態轉換圖。

 

除外,來一段TCP服務器端編程經典三段式代碼(C實現):

// 創建監聽socket
int sfd = socket(AF_INET, SOCK_STREAM, 0);  
// 綁定socket到地址-端口, 並在該socket上開始監聽。listen的第二個參數叫backlog,和連接隊列有關
bind(sfd,(struct sockaddr *)(&s_addr), sizeof(struct sockaddr)) && listen(sfd, 10); 
while(1) cfd = accept(sfd, (struct sockaddr *)(&cli_addr), &addr_size);

以上,忽略所有錯誤處理和變量聲明,顧名思義吧…… 更多詳細,可以搜 Linux TCP 服務器編程。所以,對於 TCP 編程的總結就是:創建一個監聽 socket,然后把它綁定到端口和地址上並開始監聽,然后不停 accept。這也是 tornado 的 TCPServer 要做的工作。

TCPServer 類的定義在 tcpserver.py。它有兩種用法:bind+start 或者 listen。

第一種用法可用於多線程,但在 TCP 方面兩者是一樣的。就以 listen 為例吧。TCPServer 的__init__沒什么注意的,就是記住了 ioloop 這個單例,這個下節再分析(它是tornado異步性能的關鍵)。listen 方法接收兩個參數端口和地址,代碼如下

def listen(self, port, address=""):
	"""Starts accepting connections on the given port.

	This method may be called more than once to listen on multiple ports.
	`listen` takes effect immediately; it is not necessary to call
	`TCPServer.start` afterwards.  It is, however, necessary to start
	the `.IOLoop`.
	"""
	sockets = bind_sockets(port, address=address)
	self.add_sockets(sockets)

 

以上。首先 bind_sockets 方法接收地址和端口創建 sockets 列表並綁定地址端口並監聽(完成了TCP三部曲的前兩部),add_sockets 在這些 sockets 上注冊 read/timeout 事件。有關高性能並發服務器編程可以參照UNIX網絡編程里給的幾種編程模型,tornado 可以看作是單線程事件驅動模式的服務器,TCP 三部曲中的第三部就被分隔到了事件回調里,因此肯定要在所有的文件 fd(包括sockets)上監聽事件。在做完這些事情后就可以安心的調用 ioloop 單例的 start 方法開始循環監聽事件了。具體細節可以參照現代高性能 web 服務器(nginx/lightttpd等)的事件模型,后面也會涉及一點。

簡言之,基於事件驅動的服務器(tornado)要干的事就是:創建 socket,綁定到端口並 listen,然后注冊事件和對應的回調,在回調里accept 新請求。

bind_sockets 方法在 netutil 里被定義,沒什么難的,創建監聽 socket 后為了異步,設置 socket 為非阻塞(這樣由它 accept 派生的socket 也是非阻塞的),然后綁定並監聽之。add_sockets 方法接收 socket 列表,對於列表中的 socket,用 fd 作鍵記錄下來,並調用add_accept_handler 方法。它也是在 netutil 里定義的,代碼如下:

def add_accept_handler(sock, callback, io_loop=None):
    """Adds an `.IOLoop` event handler to accept new connections on ``sock``.

    When a connection is accepted, ``callback(connection, address)`` will
    be run (``connection`` is a socket object, and ``address`` is the
    address of the other end of the connection).  Note that this signature
    is different from the ``callback(fd, events)`` signature used for
    `.IOLoop` handlers.
    """
    if io_loop is None:
        io_loop = IOLoop.current()

    def accept_handler(fd, events):
        while True:
            try:
                connection, address = sock.accept()
            except socket.error as e:
                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    return
                raise
            callback(connection, address)
    io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)

 

需要注意的一個參數是 callback,現在指向的是 TCPServer 的 _handle_connection 方法。add_accept_handler 方法的流程:首先是確保ioloop對象。然后調用 add_handler 向 loloop 對象注冊在fd上的read事件和回調函數accept_handler。該回調函數是現成定義的,屬於IOLoop層次的回調,每當事件發生時就會調用。回調內容也就是accept得到新socket和客戶端地址,然后調用callback向上層傳遞事件。從上面的分析可知,當read事件發生時,accept_handler被調用,進而callback=_handle_connection被調用。

_handle_connection就比較簡單了,跳過那些ssl的處理,簡化為兩句stream = IOStream(connection, io_loop=self.io_loop)和self.handle_stream()。這里IOStream代表了IO層,以后再說,反正讀寫是不愁了。接着是調用handle_stream。我們可以看到,不論應用層是什么協議(或者自定義協議),當有新連接到來時走的流程是差不多的,都要經歷一番上訴的回調,不同之處就在於這個handle_stream方法。這個方法是由子類自定義覆蓋的,它的HTTP實現已經在上一節看過了。

到此,和上節的代碼流程接上軌了。當事件發生時是如何回調的呢?app.py里的IOLoop.instance().start()又是怎樣的流程呢?明天繼續,看tornado異步高性能的根本所在

 

 

Tornado TCPServer類的設計解讀

前文已經說過,HTTPServer是派生自TCPServer,從協議層次上講,這再自然不過。

從TCPServer的實現上看,它是一個通用的server框架,基本是按照BSD socket的思想設計的。create-bind-listen三段式一個都不少。

從helloworld.py往下追,可以看到:

  1. helloworld.py中的main函數創建了HTTPServer.
  2. HTTPServer繼承自TCPServer,在HTTPServer的構造函數中直接調用了TCPServer的構造函數。

接下來我們就去看看TCPServer這個類的實現,它的代碼放在tornado/tcpserver.py中。tcpserver.py只有兩百多行,不算多。所有代碼都是在實現TCPServer這個類。

TCPServer

在TCPServer類的注釋中,首先強調了它是一個non-blocking, single-threaded TCP Server。

怎么理解呢? 

non-blocking,就是說,這個服務器沒有使用阻塞式API。

什么是阻塞式設計?舉個例子,在BSD Socket里,recv函數默認是阻塞式的。使用recv讀取客戶端數據時,如果對方並未發送數據,則這個API就會一直阻塞那里不返回。這樣服務器的設計不得不使用多線程或者多進程方式,避免因為一個API的阻塞導致服務器沒法做其它事。阻塞式API是很常見的,我們可以簡單認為,阻塞式設計就是“不管有沒有數據,服務器都派API去讀,讀不到,API就不會回來交差”。

非阻塞,對recv來說,區別在於沒有數據可讀時,它不會在那死等,它直接就返回了。你可能會認為這辦法比阻塞式還要矬,因為服務器無法預知有沒有數據可讀,不得不反復派recv函數去讀。這不是浪費大量的CPU資源么?

當然不會這么傻。tornado這里說的非阻塞要高級得多,基本上是另一種思路:服務器並不主動讀取數據,它和操作系統合作,實現了一種“監視器”,TCP連接就是它的監視對象。當某個連接上有數據到來時,操作系統會按事先的約定通知服務器:某某號連接上有數據到來,你去處理一下。服務器這時候才派API去取數據。服務器不用創建大量線程來阻塞式的處理每個連接,也不用不停派API去檢查連接上有沒有數據,它只需要坐那里等操作系統的通知,這保證了recv API出手就不會落空。

tornado另一個被強調的特征是single-threaded,這是因為我們的“監視器”非常高效,可以在一個線程里監視成千上萬個連接的狀態,基本上不需要再動用線程來分流。實測表明,它比阻塞式多線程或者多進程設計更加高效——當然,這依賴於操作系統的大力配合,現在主流操作系統都提供了非常高端大氣上檔次的“監視器”機制,比如epoll、kqueue。

作者提到這個類一般不直接被實例化,而是由它派生出子類,再用子類實例化。

為了強化這個設計思想,作者定義了一個未直接實現的接口,叫handle_stream()。

 

def handle_stream(self, stream, address):
    """Override to handle a new `.IOStream` from an incoming connection."""
    raise NotImplementedError()

這倒是個不錯的技巧,強制讓子類覆蓋本方法,不然就報錯給你看!

TCPServer是支持SSL的。由於Python的強大,支持SSL一點都不費事。要啟動一個支持SSL的TCPServer,只需要告訴它你的certifile和keyfile就行。

TCPServer(ssl_options={"certfile": os.path.join(data_dir, "mydomain.crt"),
	"keyfile": os.path.join(data_dir, "mydomain.key"),})

關於這兩個文件的來龍去脈,可以去Google“數字證書原理”這篇文章。

TCPServer的三種形式

TCPServer的初始化有三種形式。

1. 單進程形式

server = TCPServer()
server.listen(8888)
IOLoop.instance().start()

 我們在helloworld.py中看到的就是這種用法,不再贅述。

2. 多進程形式。

server = TCPServer()
server.bind(8888)
server.start(0)  # Forks multiple sub-processes
IOLoop.instance().start(

區別主要在server.start(0)這里。后面分析listen()與start()兩個成員函數時,就會看到它們是怎么跟進程結合的。

注意:這種模式啟動時,不能把IOLoop對象傳遞給TCPServer的構造函數,這樣會導致TCPServer直接按單進程啟動。

3. 高級多進程形式。

sockets = bind_sockets(8888)
tornado.process.fork_processes(0)
server = TCPServer()
server.add_sockets(sockets)
IOLoop.instance().start()

高級意味着復雜。從上面代碼看,雖然只多了一兩行,實際里面的流程有比較大的差別。

這種方式的主要優點就是 tornado.process.fork_processes(0)這句,它為進程的創建提供了更多的靈活性。當然現在說了也是糊塗,后面鑽進這些代碼后,我們再來驗證這里的說法。

以上內容都是TCPServer類的doc string中提到的。后面小節開始看code。

 

從代碼分析TCPServer類的機制

TCPServer的__init__函數很簡單,僅保存了參數而已。

唯一要注意的是,它可以接受一個io_loop為參數。實際上io_loop對TCPServer來說並不是可有可無,它是必須的。不過TCPServer提供了多種渠道來與一個io_loop綁定,初始化參數只是其中一種綁定方式而已。

listen

接下來我們看一下listen函數,在helloworld.py中,httpserver實例創建之后,它被第一個調用。

TCPServer類的listen函數是開始接受指定端口上的連接。注意,這個listen與BSD Socket中的listen並不等價,它做的事比BSD socket()+bind()+listen()還要多。

注意在函數注釋中提到的一句話:你可以在一個server的實例中多次調用listen,以實現一個server偵聽多個端口。

怎么理解?在BSD Socket架構里,我們不可能在一個socket上同時偵聽多個端口。反推之,不難想到,TCPServer的listen函數內部一定是執行了全套的BSD Socket三段式(create socket->bind->listen),使得每調用一次listen實際上是創建了一個新的socket。

代碼很好的符合了我們的猜想:

def listen(self, port, address=""):
	sockets = bind_sockets(port, address=address)
	self.add_sockets(sockets)

 

兩步走,先創建了一個socket,然后把它加到自己的偵聽隊列里。

bind_socket

bind_socket函數並不是TCPServer的成員,它定義在netutil.py中,原型:

def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=128, flags=None):

它也有大段的注釋。

bind_socket完成的工作包括:創建socket,綁定socket到指定的地址和端口,開啟偵聽。

解釋一下參數:

  • port不用說,端口號嘛。
  • address可以是IP地址,如“192.168.1.100”,也可以是hostname,比如“localhost”。如果是hostname,則可以監聽該hostname對應的所有IP。如果address是空字符串(“”)或者None,則會監聽主機上的所有接口。
  • family是指網絡層協議類型。可以選AF_INET和AF_INET6,默認情況下則兩者都會被啟用。這個參數就是在BSD Socket創建時的那個sockaddr_in.sin_family參數哈。
  • backlog就是指偵聽隊列的長度,即BSD listen(n)中的那個n。
  • flags參數是一些位標志,它是用來傳遞給socket.getaddrinfo()函數的。比如socket.AI_PASSIVE等。

另外要注意,在IPV6和IPV4混用的情況下,這個函數的返回值可以是一個socket列表,因為這時候一個address參數可能對應一個IPv4地址和一個IPv6地址,它們的socket是不通用的,會各自獨立創建。

現在來一行一行看下bind_socket的代碼

sockets = []
if address == "":
	address = None
if not socket.has_ipv6 and family == socket.AF_UNSPEC:
	# Python can be compiled with --disable-ipv6, which causes
	# operations on AF_INET6 sockets to fail, but does not
	# automatically exclude those results from getaddrinfo
	# results.
	# http://bugs.python.org/issue16208
	family = socket.AF_INET
if flags is None:
	flags = socket.AI_PASSIVE

這一段平淡無奇,基本上都是前面講到的參數賦值。

接下來就是一個大的循環:

for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM,0, flags)):

 

鬧半天,前面解釋的參數全都被socket.getaddrinfo()這個函數吃下去了。

socket.getaddrinfo()是python標准庫中的函數,它的作用是將所接收的參數重組為一個結構res,res的類型將可以直接作為socket.socket()的參數。跟BSD Socket中的getaddrinfo差不多嘛。

之所以用了一個循環,正如前面講到的,因為IPv6和IPv4混用的情況下,getaddrinfo會返回多個地址的信息。參見python文檔中的說明和示例:

The function returns a list of 5-tuples with the following structure: (family, type, proto, canonname, sockaddr)

>>> socket.getaddrinfo("www.python.org", 80, proto=socket.SOL_TCP)
[(2, 1, 6, '', ('82.94.164.162', 80)),
 (10, 1, 6, '', ('2001:888:2000:d::a2', 80, 0, 0))]

 接下來的代碼在循環體中,是針對單個地址的。循環體內一開始就如我們猜想,直接拿getaddrinfo的返回值來創建socket。

af, socktype, proto, canonname, sockaddr = res
try:
	sock = socket.socket(af, socktype, proto)
except socket.error as e:
	if e.args[0] == errno.EAFNOSUPPORT:
		continue
raise

 先從tuple中拆出5個參數,然后揀需要的來創建socket。

set_close_exec(sock.fileno())

這行是設置進程退出時對sock的操作。lose_on_exec 是一個進程所有文件描述符(文件句柄)的位圖標志,每個比特位代表一個打開的文件描述符,用於確定在調用系統調用execve()時需要關閉的文件句柄(參見include/fcntl.h)。當一個程序使用fork()函數創建了一個子進程時,通常會在該子進程中調用execve()函數加載執行另一個新程序。此時子進程將完全被新程序替換掉,並在子進程中開始執行新程序。若一個文件描述符在close_on_exec中的對應比特位被設置,那么在執行execve()時該描述符將被關閉,否則該描述符將始終處於打開狀態。

當打開一個文件時,默認情況下文件句柄在子進程中也處於打開狀態。因此sys_open()中要復位對應比特位

if os.name != 'nt':
	sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

 對非NT的內核,需要額外設置一個SO_REUSEADDR參數。有些系統的設計里,服務器進程結束后端口也會被內核保持一段時間,若我們迅速的重啟服務器,可能會遇到“端口已經被占用”的情況。這個標志就是通知內核不要保持了,進程一關,立馬放手,便於后來者重用。

if af == socket.AF_INET6:
	 # On linux, ipv6 sockets accept ipv4 too by default,
	 # but this makes it impossible to bind to both
	 # 0.0.0.0 in ipv4 and :: in ipv6.  On other systems,
	 # separate sockets *must* be used to listen for both ipv4
	 # and ipv6.  For consistency, always disable ipv4 on our
	 # ipv6 sockets and use a separate ipv4 socket when needed.
	 #
	 # Python 2.x on windows doesn't have IPPROTO_IPV6.
	 if hasattr(socket, "IPPROTO_IPV6"):
		 sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)

 這段代碼的說明已經很清楚了

sock.setblocking(0)
sock.bind(sockaddr)
sock.listen(backlog)
sockets.append(sock)

 

前面經常提BSD Socket的這幾個家伙,現在它們終於出現了。“非阻塞”性質也是在這里決定的。

每創建一個socket都將它加入到前面定義的列表里,最后函數結束時,將列表返回。其實這個函數蠻簡單的。為什么它不是TCPServer的成員函數?

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM