使用tornado實現的一個簡單http服務器:只需要定義自己的處理方法,其他的東西全部交給tornado完成.
#coding:utf-8 import tornado.httpserver import tornado.ioloop def handle_request(request): message = "Hello World from Tornado Http Server" request.write("HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\n%s" % (len(message), message)) request.finish() http_server = tornado.httpserver.HTTPServer(handle_request) http_server.bind(8080)
http_server.start() #啟動事件循環,開始監聽網絡事件,主要是socket的讀和寫 tornado.ioloop.IOLoop.instance().start()
1.socket、bind及listen函數(httpserver中實現)
#getaddrinfo返回服務器的所有網卡信息, 每塊網卡上都要創建監聽客戶端的socket. for res in socket.getaddrinfo(address, port, family, socket.SOCK_STREAM,0, socket.AI_PASSIVE | socket.AI_ADDRCONFIG): af, socktype, proto, canonname, sockaddr = res # 創建listen socket sock = socket.socket(af, socktype, proto) # 設置socket的屬性 flags = fcntl.fcntl(sock.fileno(), fcntl.F_GETFD) flags |= fcntl.FD_CLOEXEC fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, flags) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if af == socket.AF_INET6: if hasattr(socket, "IPPROTO_IPV6"): sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) sock.setblocking(0) # bind 和 listen sock.bind(sockaddr) sock.listen(128) # 加入ioloop #ioloop可以理解為一個容器,用戶把socket和回調函數注冊到容器中, 容器內部會輪詢socket, 一旦某個socket可以讀寫, 就調用回調函數來處理socket的讀寫事件. self._sockets[sock.fileno()] = sock if self._started: #監聽listen_socket的讀事件, 回調函數為_handle_events,一旦listen socket可讀, 說明客戶端請求到來, 然后調用_handle_events接受客戶端的請求 self.io_loop.add_handler(sock.fileno(), self._handle_events,ioloop.IOLoop.READ)
2.accept函數(httpserver中實現)
def _handle_events(self, fd, events): while True: try: #accept方法返回客戶端的socket(注意connection的類型是socket), 以及客戶端的地址 connection, address = self._sockets[fd].accept() except socket.error, e: if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return raise try: #創建IOStream對象, 用來處理socket的異步讀寫. 這一步會調用ioloop.add_handler把client socket加入ioloop,然后創建HTTPConnection, 處理用戶的請求. stream = iostream.IOStream(connection, io_loop=self.io_loop) HTTPConnection(stream, address, self.request_callback, self.no_keep_alive, self.xheaders) except: logging.error("Error in connection callback", exc_info=True)
3.IOStream
IOStream對socket的讀寫做了一層封裝, 通過使用兩個緩沖區, 實現對socket的異步讀寫.
為了實現對client socket的異步讀寫, 我們為client socket創建兩個緩沖區: _read_buffer和_write_buffer, 寫: 先寫到_write_buffer, 讀: 從_read_buffer讀. 這樣我們就不用直接讀寫socket, 進而實現異步讀寫. 這些操作都封裝在IOStream類中
IOStream的初始化:IOStream與socket是一一對應的, 初始化主要做4個工作
(1) 初始化IOStream對應的socket
(2) 分配輸入緩沖區_write_buffer
(3) 分配輸出緩沖區_read_buffer
(4) 把socket加入ioloop, 這樣當socket可讀寫的時候, 調用回調函數_handle_events把數據從socket讀入buffer, 或者把數據從buffer發送給socket
#IOStream的__init__方法 self.socket = socket self.io_loop = io_loop or ioloop.IOLoop.instance() self._read_buffer = collections.deque() self._write_buffer = collections.deque() self.io_loop.add_handler(self.socket.fileno(),self._handle_events,self._state)
IOStream提供的接口:
(1) write(data):把數據寫入IOStream的_write_buffer
(2) read_until(delimiter, callback):從_read_buffer讀取數據, delimiter作為讀取結束符, 完了調用callback
(3) read_bytes(num_of_bytes, callback):從_read_buffer讀取指定大小的數據, 完了調用callback
異步IO的例子:一系列調用都是通過回調函數實現的,這就是異步的處理方式
#!/usr/bin/env python # -*- coding:utf-8 -*- from tornado import ioloop from tornado import iostream import socket def send_request(): stream.write("GET /index.html HTTP/1.0\r\nHost: nginx.net\r\n\r\n") #回調on_headers解析協議頭 stream.read_until("\r\n\r\n", on_headers) def on_headers(data): headers = {} for line in data.split("\r\n"): parts = line.split(":") if len(parts) == 2: headers[parts[0].strip()] = parts[1].strip() #回調on_body把數據打印出來 stream.read_bytes(int(headers["Content-Length"]), on_body) def on_body(data): print data stream.close() ioloop.IOLoop.instance().stop() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) stream = iostream.IOStream(s) #調用connect連接服務器, 完成后回調send_request發出請求, 並讀取服務器返回的http協議頭 stream.connect(("nginx.net", 80), send_request) ioloop.IOLoop.instance().start()
4.處理請求:HTTPConnection
HttpConnection類專門用來處理http請求, 處理http請求的一般流程是:
HTTPConnection實現了一系列的函數用來處理這些流程, 參見下圖:
5.IOLoop:基於epoll
在Tornado服務器中, IOLoop是調度的核心模塊, Tornado服務器回把所有的socket都注冊到IOLoop, 注冊的時候指明回調處理函數, IOLoop內部不斷的監聽IO事件, 一旦發現某個socket可讀寫, 就調用其注冊時指定的回調函數.
IOLoop的結構圖如下所示:
使用IOLoop的一個例子:
from tornado import ioloop from tornado import iostream import socket import errno import functools def handle_connection(client, address): client.send("Hello World from A Simple TCP Server") client.close() //該函數作用是接受客戶端新的連接 def connection_ready(sock, fd, events): while True: try: connection, address = sock.accept() except socket.error, e: if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN): raise return connection.setblocking(0)
//此處其實應該設置為異步的,將相應的套接字加入到事件循環並注冊相應的回調函數,只是沒這樣做 handle_connection(connection, address)
//下面的代碼可以直接調用httpserver或者tcpserver實現 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
//一般來說,一個端口釋放后會等待兩分鍾之后才能再被使用,SO_REUSEADDR是讓端口釋放后立即就可以被再次使用
//server程序總是應該在調用bind()之前設置SO_REUSEADDR套接字選項 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
//啟用套接字的非阻塞模式 sock.setblocking(0) sock.bind(("localhost", 8080))
//128是連接隊列的最大長度 sock.listen(128)
//創建一個ioloop實例 io_loop = ioloop.IOLoop.instance()
callback = functools.partial(connection_ready, sock)
//fileno將sock轉換為標准的描述符,add_handler向ioloop中注冊socket以及對應的回調函數、監聽事件類型 io_loop.add_handler(sock.fileno(), callback, io_loop.READ) io_loop.start()
IOLoop的單例模式:
#coding:utf-8 import os class IOLoop(object): @classmethod #cls和self一樣,是python的built-in變量,self表示類的實例,cls表示類 #cls一般用於static method,因為static method無須實例化就可以調用,所以傳遞cls給static method,然后調用cls()可以創建對象 def instance(cls): if not hasattr(cls,"_instance"): cls._instance = cls() return cls._instance @classmethod def initialized(cls): return hasattr(cls,"_instance") def service(self): print "hello world" print IOLoop.initialized(), ioloop = IOLoop.instance() ioloop.service() if os.fork() == 0: print IOLoop.initialized(), ioloop = IOLoop.instance() ioloop.service()
Always use 'self' for the first argument to instance methods.
Always use 'cls' for the first argument to class methods.