Tornado在TCP層里的工作機制
上一節是關於應用層的協議 HTTP,它依賴於傳輸層協議 TCP,例如服務器是如何綁定端口的?HTTP 服務器的 handle_stream 是在什么時候被調用的呢?本節聚焦在 TCP 層次的實現,以便和上節的程序流程銜接起來。
首先是關於 TCP 協議。這是一個面向連接的可靠交付的協議。由於是面向連接,所以在服務器端需要分配內存來記憶客戶端連接,同樣客戶端也需要記錄服務器。由於保證可靠交付,所以引入了很多保證可靠性的機制,比如定時重傳機制,SYN/ACK 機制等,相當復雜。所以,在每個系統里的 TCP 協議軟件都是相當復雜的,本文不打算深入談這些(我也談不了多少,呵呵)。但我們還是得對 TCP 有個了解。先上一張圖(UNIX 網絡編程)-- 狀態轉換圖。
除外,來一段TCP服務器端編程經典三段式代碼(C實現):
// 創建監聽socket int sfd = socket(AF_INET, SOCK_STREAM, 0); // 綁定socket到地址-端口, 並在該socket上開始監聽。listen的第二個參數叫backlog,和連接隊列有關 bind(sfd,(struct sockaddr *)(&s_addr), sizeof(struct sockaddr)) && listen(sfd, 10); while(1) cfd = accept(sfd, (struct sockaddr *)(&cli_addr), &addr_size);
以上,忽略所有錯誤處理和變量聲明,顧名思義吧…… 更多詳細,可以搜 Linux TCP 服務器編程。所以,對於 TCP 編程的總結就是:創建一個監聽 socket,然后把它綁定到端口和地址上並開始監聽,然后不停 accept。這也是 tornado 的 TCPServer 要做的工作。
TCPServer 類的定義在 tcpserver.py。它有兩種用法:bind+start 或者 listen。
第一種用法可用於多線程,但在 TCP 方面兩者是一樣的。就以 listen 為例吧。TCPServer 的__init__沒什么注意的,就是記住了 ioloop 這個單例,這個下節再分析(它是tornado異步性能的關鍵)。listen 方法接收兩個參數端口和地址,代碼如下
def listen(self, port, address=""): """Starts accepting connections on the given port. This method may be called more than once to listen on multiple ports. `listen` takes effect immediately; it is not necessary to call `TCPServer.start` afterwards. It is, however, necessary to start the `.IOLoop`. """ sockets = bind_sockets(port, address=address) self.add_sockets(sockets)
以上。首先 bind_sockets 方法接收地址和端口創建 sockets 列表並綁定地址端口並監聽(完成了TCP三部曲的前兩部),add_sockets 在這些 sockets 上注冊 read/timeout 事件。有關高性能並發服務器編程可以參照UNIX網絡編程里給的幾種編程模型,tornado 可以看作是單線程事件驅動模式的服務器,TCP 三部曲中的第三部就被分隔到了事件回調里,因此肯定要在所有的文件 fd(包括sockets)上監聽事件。在做完這些事情后就可以安心的調用 ioloop 單例的 start 方法開始循環監聽事件了。具體細節可以參照現代高性能 web 服務器(nginx/lightttpd等)的事件模型,后面也會涉及一點。
簡言之,基於事件驅動的服務器(tornado)要干的事就是:創建 socket,綁定到端口並 listen,然后注冊事件和對應的回調,在回調里accept 新請求。
bind_sockets 方法在 netutil 里被定義,沒什么難的,創建監聽 socket 后為了異步,設置 socket 為非阻塞(這樣由它 accept 派生的socket 也是非阻塞的),然后綁定並監聽之。add_sockets 方法接收 socket 列表,對於列表中的 socket,用 fd 作鍵記錄下來,並調用add_accept_handler 方法。它也是在 netutil 里定義的,代碼如下:
def add_accept_handler(sock, callback, io_loop=None): """Adds an `.IOLoop` event handler to accept new connections on ``sock``. When a connection is accepted, ``callback(connection, address)`` will be run (``connection`` is a socket object, and ``address`` is the address of the other end of the connection). Note that this signature is different from the ``callback(fd, events)`` signature used for `.IOLoop` handlers. """ if io_loop is None: io_loop = IOLoop.current() def accept_handler(fd, events): while True: try: connection, address = sock.accept() except socket.error as e: if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return raise callback(connection, address) io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)
需要注意的一個參數是 callback,現在指向的是 TCPServer 的 _handle_connection 方法。add_accept_handler 方法的流程:首先是確保ioloop對象。然后調用 add_handler 向 loloop 對象注冊在fd上的read事件和回調函數accept_handler。該回調函數是現成定義的,屬於IOLoop層次的回調,每當事件發生時就會調用。回調內容也就是accept得到新socket和客戶端地址,然后調用callback向上層傳遞事件。從上面的分析可知,當read事件發生時,accept_handler被調用,進而callback=_handle_connection被調用。
_handle_connection就比較簡單了,跳過那些ssl的處理,簡化為兩句stream = IOStream(connection, io_loop=self.io_loop)和self.handle_stream()。這里IOStream代表了IO層,以后再說,反正讀寫是不愁了。接着是調用handle_stream。我們可以看到,不論應用層是什么協議(或者自定義協議),當有新連接到來時走的流程是差不多的,都要經歷一番上訴的回調,不同之處就在於這個handle_stream方法。這個方法是由子類自定義覆蓋的,它的HTTP實現已經在上一節看過了。
到此,和上節的代碼流程接上軌了。當事件發生時是如何回調的呢?app.py里的IOLoop.instance().start()又是怎樣的流程呢?明天繼續,看tornado異步高性能的根本所在
Tornado TCPServer類的設計解讀
前文已經說過,HTTPServer是派生自TCPServer,從協議層次上講,這再自然不過。
從TCPServer的實現上看,它是一個通用的server框架,基本是按照BSD socket的思想設計的。create-bind-listen三段式一個都不少。
從helloworld.py往下追,可以看到:
- helloworld.py中的main函數創建了HTTPServer.
- HTTPServer繼承自TCPServer,在HTTPServer的構造函數中直接調用了TCPServer的構造函數。
接下來我們就去看看TCPServer這個類的實現,它的代碼放在tornado/tcpserver.py中。tcpserver.py只有兩百多行,不算多。所有代碼都是在實現TCPServer這個類。
TCPServer
在TCPServer類的注釋中,首先強調了它是一個non-blocking, single-threaded TCP Server。
怎么理解呢?
non-blocking,就是說,這個服務器沒有使用阻塞式API。
什么是阻塞式設計?舉個例子,在BSD Socket里,recv函數默認是阻塞式的。使用recv讀取客戶端數據時,如果對方並未發送數據,則這個API就會一直阻塞那里不返回。這樣服務器的設計不得不使用多線程或者多進程方式,避免因為一個API的阻塞導致服務器沒法做其它事。阻塞式API是很常見的,我們可以簡單認為,阻塞式設計就是“不管有沒有數據,服務器都派API去讀,讀不到,API就不會回來交差”。
而非阻塞,對recv來說,區別在於沒有數據可讀時,它不會在那死等,它直接就返回了。你可能會認為這辦法比阻塞式還要矬,因為服務器無法預知有沒有數據可讀,不得不反復派recv函數去讀。這不是浪費大量的CPU資源么?
當然不會這么傻。tornado這里說的非阻塞要高級得多,基本上是另一種思路:服務器並不主動讀取數據,它和操作系統合作,實現了一種“監視器”,TCP連接就是它的監視對象。當某個連接上有數據到來時,操作系統會按事先的約定通知服務器:某某號連接上有數據到來,你去處理一下。服務器這時候才派API去取數據。服務器不用創建大量線程來阻塞式的處理每個連接,也不用不停派API去檢查連接上有沒有數據,它只需要坐那里等操作系統的通知,這保證了recv API出手就不會落空。
tornado另一個被強調的特征是single-threaded,這是因為我們的“監視器”非常高效,可以在一個線程里監視成千上萬個連接的狀態,基本上不需要再動用線程來分流。實測表明,它比阻塞式多線程或者多進程設計更加高效——當然,這依賴於操作系統的大力配合,現在主流操作系統都提供了非常高端大氣上檔次的“監視器”機制,比如epoll、kqueue。
作者提到這個類一般不直接被實例化,而是由它派生出子類,再用子類實例化。
為了強化這個設計思想,作者定義了一個未直接實現的接口,叫handle_stream()。
def handle_stream(self, stream, address): """Override to handle a new `.IOStream` from an incoming connection.""" raise NotImplementedError()
這倒是個不錯的技巧,強制讓子類覆蓋本方法,不然就報錯給你看!
TCPServer是支持SSL的。由於Python的強大,支持SSL一點都不費事。要啟動一個支持SSL的TCPServer,只需要告訴它你的certifile和keyfile就行。
TCPServer(ssl_options={"certfile": os.path.join(data_dir, "mydomain.crt"), "keyfile": os.path.join(data_dir, "mydomain.key"),})
關於這兩個文件的來龍去脈,可以去Google“數字證書原理”這篇文章。
TCPServer的三種形式
TCPServer的初始化有三種形式。
1. 單進程形式
server = TCPServer() server.listen(8888) IOLoop.instance().start()
我們在helloworld.py中看到的就是這種用法,不再贅述。
2. 多進程形式。
server = TCPServer() server.bind(8888) server.start(0) # Forks multiple sub-processes IOLoop.instance().start(
區別主要在server.start(0)這里。后面分析listen()與start()兩個成員函數時,就會看到它們是怎么跟進程結合的。
注意:這種模式啟動時,不能把IOLoop對象傳遞給TCPServer的構造函數,這樣會導致TCPServer直接按單進程啟動。
3. 高級多進程形式。
sockets = bind_sockets(8888) tornado.process.fork_processes(0) server = TCPServer() server.add_sockets(sockets) IOLoop.instance().start()
高級意味着復雜。從上面代碼看,雖然只多了一兩行,實際里面的流程有比較大的差別。
這種方式的主要優點就是 tornado.process.fork_processes(0)這句,它為進程的創建提供了更多的靈活性。當然現在說了也是糊塗,后面鑽進這些代碼后,我們再來驗證這里的說法。
以上內容都是TCPServer類的doc string中提到的。后面小節開始看code。
從代碼分析TCPServer類的機制
TCPServer的__init__函數很簡單,僅保存了參數而已。
唯一要注意的是,它可以接受一個io_loop為參數。實際上io_loop對TCPServer來說並不是可有可無,它是必須的。不過TCPServer提供了多種渠道來與一個io_loop綁定,初始化參數只是其中一種綁定方式而已。
listen
接下來我們看一下listen函數,在helloworld.py中,httpserver實例創建之后,它被第一個調用。
TCPServer類的listen函數是開始接受指定端口上的連接。注意,這個listen與BSD Socket中的listen並不等價,它做的事比BSD socket()+bind()+listen()還要多。
注意在函數注釋中提到的一句話:你可以在一個server的實例中多次調用listen,以實現一個server偵聽多個端口。
怎么理解?在BSD Socket架構里,我們不可能在一個socket上同時偵聽多個端口。反推之,不難想到,TCPServer的listen函數內部一定是執行了全套的BSD Socket三段式(create socket->bind->listen),使得每調用一次listen實際上是創建了一個新的socket。
代碼很好的符合了我們的猜想:
def listen(self, port, address=""): sockets = bind_sockets(port, address=address) self.add_sockets(sockets)
兩步走,先創建了一個socket,然后把它加到自己的偵聽隊列里。
bind_socket
bind_socket函數並不是TCPServer的成員,它定義在netutil.py中,原型:
def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=128, flags=None):
它也有大段的注釋。
bind_socket完成的工作包括:創建socket,綁定socket到指定的地址和端口,開啟偵聽。
解釋一下參數:
- port不用說,端口號嘛。
- address可以是IP地址,如“192.168.1.100”,也可以是hostname,比如“localhost”。如果是hostname,則可以監聽該hostname對應的所有IP。如果address是空字符串(“”)或者None,則會監聽主機上的所有接口。
- family是指網絡層協議類型。可以選AF_INET和AF_INET6,默認情況下則兩者都會被啟用。這個參數就是在BSD Socket創建時的那個sockaddr_in.sin_family參數哈。
- backlog就是指偵聽隊列的長度,即BSD listen(n)中的那個n。
- flags參數是一些位標志,它是用來傳遞給socket.getaddrinfo()函數的。比如socket.AI_PASSIVE等。
另外要注意,在IPV6和IPV4混用的情況下,這個函數的返回值可以是一個socket列表,因為這時候一個address參數可能對應一個IPv4地址和一個IPv6地址,它們的socket是不通用的,會各自獨立創建。
現在來一行一行看下bind_socket的代碼
sockets = [] if address == "": address = None if not socket.has_ipv6 and family == socket.AF_UNSPEC: # Python can be compiled with --disable-ipv6, which causes # operations on AF_INET6 sockets to fail, but does not # automatically exclude those results from getaddrinfo # results. # http://bugs.python.org/issue16208 family = socket.AF_INET if flags is None: flags = socket.AI_PASSIVE
這一段平淡無奇,基本上都是前面講到的參數賦值。
接下來就是一個大的循環:
for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM,0, flags)):
鬧半天,前面解釋的參數全都被socket.getaddrinfo()這個函數吃下去了。
socket.getaddrinfo()是python標准庫中的函數,它的作用是將所接收的參數重組為一個結構res,res的類型將可以直接作為socket.socket()的參數。跟BSD Socket中的getaddrinfo差不多嘛。
之所以用了一個循環,正如前面講到的,因為IPv6和IPv4混用的情況下,getaddrinfo會返回多個地址的信息。參見python文檔中的說明和示例:
The function returns a list of 5-tuples with the following structure: (family, type, proto, canonname, sockaddr)
>>> socket.getaddrinfo("www.python.org", 80, proto=socket.SOL_TCP) [(2, 1, 6, '', ('82.94.164.162', 80)), (10, 1, 6, '', ('2001:888:2000:d::a2', 80, 0, 0))]
接下來的代碼在循環體中,是針對單個地址的。循環體內一開始就如我們猜想,直接拿getaddrinfo的返回值來創建socket。
af, socktype, proto, canonname, sockaddr = res try: sock = socket.socket(af, socktype, proto) except socket.error as e: if e.args[0] == errno.EAFNOSUPPORT: continue raise
先從tuple中拆出5個參數,然后揀需要的來創建socket。
set_close_exec(sock.fileno())
這行是設置進程退出時對sock的操作。lose_on_exec 是一個進程所有文件描述符(文件句柄)的位圖標志,每個比特位代表一個打開的文件描述符,用於確定在調用系統調用execve()時需要關閉的文件句柄(參見include/fcntl.h)。當一個程序使用fork()函數創建了一個子進程時,通常會在該子進程中調用execve()函數加載執行另一個新程序。此時子進程將完全被新程序替換掉,並在子進程中開始執行新程序。若一個文件描述符在close_on_exec中的對應比特位被設置,那么在執行execve()時該描述符將被關閉,否則該描述符將始終處於打開狀態。
當打開一個文件時,默認情況下文件句柄在子進程中也處於打開狀態。因此sys_open()中要復位對應比特位
if os.name != 'nt': sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
對非NT的內核,需要額外設置一個SO_REUSEADDR參數。有些系統的設計里,服務器進程結束后端口也會被內核保持一段時間,若我們迅速的重啟服務器,可能會遇到“端口已經被占用”的情況。這個標志就是通知內核不要保持了,進程一關,立馬放手,便於后來者重用。
if af == socket.AF_INET6: # On linux, ipv6 sockets accept ipv4 too by default, # but this makes it impossible to bind to both # 0.0.0.0 in ipv4 and :: in ipv6. On other systems, # separate sockets *must* be used to listen for both ipv4 # and ipv6. For consistency, always disable ipv4 on our # ipv6 sockets and use a separate ipv4 socket when needed. # # Python 2.x on windows doesn't have IPPROTO_IPV6. if hasattr(socket, "IPPROTO_IPV6"): sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
這段代碼的說明已經很清楚了
sock.setblocking(0) sock.bind(sockaddr) sock.listen(backlog) sockets.append(sock)
前面經常提BSD Socket的這幾個家伙,現在它們終於出現了。“非阻塞”性質也是在這里決定的。
每創建一個socket都將它加入到前面定義的列表里,最后函數結束時,將列表返回。其實這個函數蠻簡單的。為什么它不是TCPServer的成員函數?