項目所用知識點
- tornado
- socket
- tcpserver
- 協程
- 異步
tornado tcpserver源碼拋析
在tornado的tcpserver文件中,實現了TCPServer這個類,他是一個單線程的,非阻塞的tcp 服務。
為了與上層協議(在tornado中就是HTTPServer)交互,TCPServer提供了一個接口:handle_stream, 要求其子類必需實現該方法,該方法就是主要用來處理應用層邏輯的。
我們可以通過下面代碼倒入模塊查看源碼
from tornado.tcpserver import TCPServer
源碼中給了好多解釋,先把源碼注釋貼進來
class TCPServer(object): ‘’‘ 1.要想用TCPserver,我給你提供你一個接口handle_stream,子類中必須要有這個方法 2.他已經給我們舉出了例子 3.往下他給咱們介紹了幾種啟動方法,而我用的listen()方法啟動看起來簡單明了 ’‘’ r"""A non-blocking, single-threaded TCP server. To use `TCPServer`, define a subclass which overrides the `handle_stream` method. For example, a simple echo server could be defined like this:: from tornado.tcpserver import TCPServer from tornado.iostream import StreamClosedError from tornado import gen class EchoServer(TCPServer): @gen.coroutine def handle_stream(self, stream, address): while True: try: data = yield stream.read_until(b"\n") yield stream.write(data) except StreamClosedError: break To make this server serve SSL traffic, send the ``ssl_options`` keyword argument with an `ssl.SSLContext` object. For compatibility with older versions of Python ``ssl_options`` may also be a dictionary of keyword arguments for the `ssl.wrap_socket` method.:: ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_ctx.load_cert_chain(os.path.join(data_dir, "mydomain.crt"), os.path.join(data_dir, "mydomain.key")) TCPServer(ssl_options=ssl_ctx) `TCPServer` initialization follows one of three patterns: 1. `listen`: simple single-process:: server = TCPServer() server.listen(8888) IOLoop.current().start() 2. `bind`/`start`: simple multi-process:: server = TCPServer() server.bind(8888) server.start(0) # Forks multiple sub-processes IOLoop.current().start() When using this interface, an `.IOLoop` must *not* be passed to the `TCPServer` constructor. `start` will always start the server on the default singleton `.IOLoop`. 3. `add_sockets`: advanced multi-process:: sockets = bind_sockets(8888) tornado.process.fork_processes(0) server = TCPServer() server.add_sockets(sockets) IOLoop.current().start() The `add_sockets` interface is more complicated, but it can be used with `tornado.process.fork_processes` to give you more flexibility in when the fork happens. `add_sockets` can also be used in single-process servers if you want to create your listening sockets in some way other than `~tornado.netutil.bind_sockets`. .. versionadded:: 3.1 The ``max_buffer_size`` argument. .. versionchanged:: 5.0 The ``io_loop`` argument has been removed. """
自己仔細看該類的其他方法
def listen(self, port, address=""): """Starts accepting connections on the given port. This method may be called more than once to listen on multiple ports. `listen` takes effect immediately; it is not necessary to call `TCPServer.start` afterwards. It is, however, necessary to start the `.IOLoop`. """ sockets = bind_sockets(port, address=address) self.add_sockets(sockets) def add_sockets(self, sockets): """Makes this server start accepting connections on the given sockets. The ``sockets`` parameter is a list of socket objects such as those returned by `~tornado.netutil.bind_sockets`. `add_sockets` is typically used in combination with that method and `tornado.process.fork_processes` to provide greater control over the initialization of a multi-process server. """ for sock in sockets: self._sockets[sock.fileno()] = sock self._handlers[sock.fileno()] = add_accept_handler( sock, self._handle_connection) def add_socket(self, socket): """Singular version of `add_sockets`. Takes a single socket object.""" self.add_sockets([socket]) def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128, reuse_port=False): """Binds this server to the given port on the given address. To start the server, call `start`. If you want to run this server in a single process, you can call `listen` as a shortcut to the sequence of `bind` and `start` calls. Address may be either an IP address or hostname. If it's a hostname, the server will listen on all IP addresses associated with the name. Address may be an empty string or None to listen on all available interfaces. Family may be set to either `socket.AF_INET` or `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise both will be used if available. The ``backlog`` argument has the same meaning as for `socket.listen <socket.socket.listen>`. The ``reuse_port`` argument has the same meaning as for `.bind_sockets`. This method may be called multiple times prior to `start` to listen on multiple ports or interfaces. .. versionchanged:: 4.4 Added the ``reuse_port`` argument. """ sockets = bind_sockets(port, address=address, family=family, backlog=backlog, reuse_port=reuse_port) if self._started: self.add_sockets(sockets) else: self._pending_sockets.extend(sockets) def start(self, num_processes=1): """Starts this server in the `.IOLoop`. By default, we run the server in this process and do not fork any additional child process. If num_processes is ``None`` or <= 0, we detect the number of cores available on this machine and fork that number of child processes. If num_processes is given and > 1, we fork that specific number of sub-processes. Since we use processes and not threads, there is no shared memory between any server code. Note that multiple processes are not compatible with the autoreload module (or the ``autoreload=True`` option to `tornado.web.Application` which defaults to True when ``debug=True``). When using multiple processes, no IOLoops can be created or referenced until after the call to ``TCPServer.start(n)``. """ assert not self._started self._started = True if num_processes != 1: process.fork_processes(num_processes) sockets = self._pending_sockets self._pending_sockets = [] self.add_sockets(sockets) def stop(self): """Stops listening for new connections. Requests currently in progress may still continue after the server is stopped. """ if self._stopped: return self._stopped = True for fd, sock in self._sockets.items(): assert sock.fileno() == fd # Unregister socket from IOLoop self._handlers.pop(fd)() sock.close() def handle_stream(self, stream, address): """Override to handle a new `.IOStream` from an incoming connection. This method may be a coroutine; if so any exceptions it raises asynchronously will be logged. Accepting of incoming connections will not be blocked by this coroutine. If this `TCPServer` is configured for SSL, ``handle_stream`` may be called before the SSL handshake has completed. Use `.SSLIOStream.wait_for_handshake` if you need to verify the client's certificate or use NPN/ALPN. .. versionchanged:: 4.2 Added the option for this method to be a coroutine. """ raise NotImplementedError() def _handle_connection(self, connection, address): if self.ssl_options is not None: assert ssl, "Python 2.6+ and OpenSSL required for SSL" try: connection = ssl_wrap_socket(connection, self.ssl_options, server_side=True, do_handshake_on_connect=False) except ssl.SSLError as err: if err.args[0] == ssl.SSL_ERROR_EOF: return connection.close() else: raise except socket.error as err: # If the connection is closed immediately after it is created # (as in a port scan), we can get one of several errors. # wrap_socket makes an internal call to getpeername, # which may return either EINVAL (Mac OS X) or ENOTCONN # (Linux). If it returns ENOTCONN, this error is # silently swallowed by the ssl module, so we need to # catch another error later on (AttributeError in # SSLIOStream._do_ssl_handshake). # To test this behavior, try nmap with the -sT flag. # https://github.com/tornadoweb/tornado/pull/750 if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL): return connection.close() else: raise try: if self.ssl_options is not None: stream = SSLIOStream(connection, max_buffer_size=self.max_buffer_size, read_chunk_size=self.read_chunk_size) else: stream = IOStream(connection, max_buffer_size=self.max_buffer_size, read_chunk_size=self.read_chunk_size) future = self.handle_stream(stream, address) if future is not None: IOLoop.current().add_future(gen.convert_yielded(future), lambda f: f.result()) except Exception: app_log.error("Error in connection callback", exc_info=True)
通過方法名就能看出來,而且開頭已經給出實例怎么去用,所以這個就不一一解釋了,我自己的用法如下
from tornado.tcpserver import TCPServer from tornado.iostream import IOStream, StreamClosedError from tornado import gen from tornado.ioloop import IOLoop import struct class ProxyServer(TCPServer): def __init__(self, *args, **kwargs): super(ProxyServer, self).__init__(*args, **kwargs) self.devices = dict() @gen.coroutine def handle_stream(self, stream, address): pass if __name__ == "__main__": server = ProxyServer() server.listen(1234) IOLoop.current().start()
具體步驟來分析 一下
TCPServer執行過程
1.server = ProxyServer()創建tcpserver對象,該步驟僅僅做了一個初始化操作
def __init__(self, io_loop=None, ssl_options=None, max_buffer_size=None, read_chunk_size=None): self.io_loop = io_loop self.ssl_options = ssl_options self._sockets = {} # fd -> socket object 用來存儲文件描述符與socket對象的映射關系 self._pending_sockets = [] self._started = False self.max_buffer_size = max_buffer_size # 最大緩沖長度 self.read_chunk_size = read_chunk_size # 每次讀的chunk大小 # 校驗ssl選項. if self.ssl_options is not None and isinstance(self.ssl_options, dict): if 'certfile' not in self.ssl_options: raise KeyError('missing key "certfile" in ssl_options') if not os.path.exists(self.ssl_options['certfile']): raise ValueError('certfile "%s" does not exist' % self.ssl_options['certfile']) if ('keyfile' in self.ssl_options and not os.path.exists(self.ssl_options['keyfile'])): raise ValueError('keyfile "%s" does not exist' % self.ssl_options['keyfile'])
2.想都不要想肯定是開啟socket
步驟是執行server.listen(1234)的時候,
def listen(self, port, address=""): """Starts accepting connections on the given port. This method may be called more than once to listen on multiple ports. `listen` takes effect immediately; it is not necessary to call `TCPServer.start` afterwards. It is, however, necessary to start the `.IOLoop`. """ sockets = bind_sockets(port, address=address) self.add_sockets(sockets)
3.看下listen里面有調用bind_sockets()方法,來看下該方法
def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=_DEFAULT_BACKLOG, flags=None, reuse_port=False): if reuse_port and not hasattr(socket, "SO_REUSEPORT"): raise ValueError("the platform doesn't support SO_REUSEPORT") sockets = [] if address == "": address = None # address family參數指定調用者期待返回的套接口地址結構的類型。它的值包括四種:AF_UNIX,AF_INET,AF_INET6和AF_UNSPEC。 # AF_UNIX用於同一台機器上的進程間通信 # 如果指定AF_INET,那么函數就不能返回任何IPV6相關的地址信息;如果僅指定了AF_INET6,則就不能返回任何IPV4地址信息。 # AF_UNSPEC則意味着函數返回的是適用於指定主機名和服務名且適合任何協議族的地址。 # 如果某個主機既有AAAA記錄(IPV6)地址,同時又有A記錄(IPV4)地址,那么AAAA記錄將作為sockaddr_in6結構返回,而A記錄則作為sockaddr_in結構返回 if not socket.has_ipv6 and family == socket.AF_UNSPEC: # 如果系統不支持ipv6 family = socket.AF_INET if flags is None: flags = socket.AI_PASSIVE bound_port = None for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM, 0, flags)): af, socktype, proto, canonname, sockaddr = res if (sys.platform == 'darwin' and address == 'localhost' and af == socket.AF_INET6 and sockaddr[3] != 0): # Mac OS X在“localhost”的getaddrinfo結果中包含一個鏈接本地地址fe80 :: 1%lo0。 # 但是,防火牆不了解這是一個本地地址,並且會提示訪問。 所以跳過這些地址。 continue try: sock = socket.socket(af, socktype, proto) except socket.error as e: # 如果協議不支持該地址 if errno_from_exception(e) == errno.EAFNOSUPPORT: continue raise # 為 fd 設置 FD_CLOEXEC 標識 set_close_exec(sock.fileno()) if os.name != 'nt': # 非windows sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if reuse_port: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) if af == socket.AF_INET6: # On linux, ipv6 sockets accept ipv4 too by default, # but this makes it impossible to bind to both # 0.0.0.0 in ipv4 and :: in ipv6. On other systems, # separate sockets *must* be used to listen for both ipv4 # and ipv6. For consistency, always disable ipv4 on our # ipv6 sockets and use a separate ipv4 socket when needed. # # Python 2.x on windows doesn't have IPPROTO_IPV6. if hasattr(socket, "IPPROTO_IPV6"): sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) # 自動端口分配,端口=None # 應該綁定在IPv4和IPv6地址上的同一個端口上 host, requested_port = sockaddr[:2] if requested_port == 0 and bound_port is not None: sockaddr = tuple([host, bound_port] + list(sockaddr[2:])) # 設置socket為非阻塞 sock.setblocking(0) sock.bind(sockaddr) bound_port = sock.getsockname()[1] sock.listen(backlog) sockets.append(sock) return sockets
4.接下來執行的是add_sockets()方法
def add_sockets(self, sockets): if self.io_loop is None: self.io_loop = IOLoop.current() # 獲取IOLoop實例對象 for sock in sockets: self._sockets[sock.fileno()] = sock add_accept_handler(sock, self._handle_connection, io_loop=self.io_loop)
可以看到里面調用了add_accept_handler方法,來我們進去看看該方法干啥了
5.探析add_accept_handler方法
def add_accept_handler(sock, callback, io_loop=None): if io_loop is None: # 獲取IOLoop實例對象 io_loop = IOLoop.current() def accept_handler(fd, events): # 我們處理回調時可能會有許多的連接等待建立; 為了防止其他任務的飢餓,我們必須限制我們一次接受的連接數。 # 理想情況下,我們接受在處理回調過程中等待的連接數,但此可能會對負載產生不利影響。 # 相反,我們使用listen backlog作為我們可以合理接受的連接數的。 for i in xrange(_DEFAULT_BACKLOG): # _DEFAULT_BACKLOG默認為128 try: connection, address = sock.accept() except socket.error as e: # _ERRNO_WOULDBLOCK 與EAGAIN相同,表示再嘗試一下,很多情況下是因為資源不足,或者條件未達成 # 當某個子進程與客戶端建立了連接,其他子進程再次嘗試與該客戶端建立連接時就會產生該錯誤 if errno_from_exception(e) in _ERRNO_WOULDBLOCK: return # ECONNABORTED表示有一個連接,在他處於等待被服務端accept的時候主動關閉了。 if errno_from_exception(e) == errno.ECONNABORTED: continue raise callback(connection, address) io_loop.add_handler(sock, accept_handler, IOLoop.READ) # 為socket注冊handler:當發生READ事件時運行accept_handler函數。
欣欣然我們來到了最后一步
6.IOLoop.current().start(),然我們看下源碼
def start(self): try: while True: callbacks = self._callbacks self._callbacks = [] due_timeouts = [] # 將時間已到的定時任務放置到due_timeouts中,過程省略 for callback in callbacks: # 執行callback self._run_callback(callback) for timeout in due_timeouts: # 執行定時任務 if timeout.callback is not None: self._run_callback(timeout.callback) callbacks = callback = due_timeouts = timeout = None # 釋放內存 # 根據情況設置poll_timeout的值,過程省略 if not self._running: # 終止ioloop運行時,在執行完了callback后結束循環 breaktry: event_pairs = self._impl.poll(poll_timeout) except Exception as e: if errno_from_exception(e) == errno.EINTR: # 系統調用被信號處理函數中斷,進行下一次循環 continue else: raise self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() # 獲取一個fd以及對應事件 try: fd_obj, handler_func = self._handlers[fd] # 獲取該fd對應的事件處理函數 handler_func(fd_obj, events) # 運行該事件處理函數 except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # 當客戶端關閉連接時會產生EPIPE錯誤 pass # 其他異常處理已經省略 fd_obj = handler_func = None # 釋放內存空間
這一步想了解更多去參考這篇文章http://www.cnblogs.com/MnCu8261/p/6730691.html
代碼實例
目前公司有這么一個需求,iphonex--server--ue4,面對兩個客戶端,達到iphonex把數據給ue4,server起一個代理作用,需求大概就是這樣,具體實現代碼如下
from tornado.tcpserver import TCPServer from tornado.iostream import IOStream, StreamClosedError from tornado import gen from tornado.ioloop import IOLoop import struct class ProxyServer(TCPServer): def __init__(self, *args, **kwargs): super(ProxyServer, self).__init__(*args, **kwargs) self.devices = dict() @gen.coroutine def handle_stream(self, stream, address): device = yield stream.read_bytes(1) if device == b"\x0a": self.handle_iphonex_stream(stream, address) elif device == b"\x0b": self.handle_ue4_stream(stream, address) else: print("protocol error.") @gen.coroutine def handle_iphonex_stream(self, stream, address): yield stream.write(b"\x00") print("iphonex") # uuid rlen = yield stream.read_bytes(4) rlen = struct.unpack(">I", rlen)[0] uuid = yield stream.read_bytes(rlen) uuid = uuid.decode() yield stream.write(b"\x00") print(uuid) # keys rlen = yield stream.read_bytes(4) rlen = struct.unpack(">I", rlen)[0] keys = yield stream.read_bytes(rlen) keys = keys.decode() yield stream.write(b"\x00") print(keys) # save self.devices[uuid] = {'keys': keys} # data keys = keys.split(',') fmt = "%df" % len(keys) while True: try: data = yield stream.read_bytes(struct.calcsize(fmt)) except StreamClosedError: print 'iphonex is closed' break pdata = struct.unpack(fmt, data) print(pdata) ue4stream = self.devices[uuid].get('ue4') if ue4stream: try: yield ue4stream.write(data) except Exception as e: self.devices[uuid]['ue4'] = None print('request for %s closed' % uuid) @gen.coroutine def handle_ue4_stream(self, stream, address): yield stream.write(b"\x00") print("ue4") # uuid rlen = yield stream.read_bytes(4) rlen = struct.unpack(">I", rlen)[0] uuid = yield stream.read_bytes(rlen) uuid = uuid.decode() print(uuid) if self.devices.get(uuid): yield stream.write(b"\x00") else: yield stream.write(b"\x01") raise Exception # send keys keys = self.devices[uuid].get('keys') stream.write(struct.pack(">I", len(keys))) stream.write(keys.encode()) valid = yield stream.read_bytes(1) if valid == b'x\01': print('keys not support.') raise Exception self.devices[uuid]['ue4'] = stream if __name__ == "__main__": server = ProxyServer() server.listen(1234) IOLoop.current().start()
請點贊轉發幫助身邊更多的人