IOStream對tornado的高效起了很大的作用,他封裝了socket的非阻塞IO的讀寫操作。大體上可以這么說,當連接建立后,服務端與客戶端的請求響應的讀寫都是基於IOStream的,也就是說:IOStream是用來處理對連接的讀寫,當然IOStream是異步的讀寫而且可以有很多花樣的讀寫。
接下來說一下有關接收請求的大體流程:
當連接建立,服務器端會產生一個對應該連接的socket,同時將該socket封裝至IOStream實例中(這代表着IOStream的初始化)。
我們知道tornado是基於IO多路復用的(就拿epoll來說),此時將socket進行register,事件為READABLE,這一步與IOStream沒有多大關系。
當該socket事件發生時,也就是意味着有數據從連接發送到了系統緩沖區中,這時就需要將chunk讀入到我們在內存中為其開辟的_read_buffer中,在IOStream中使用deque作為buffer。_read_buffer表示讀緩沖,當然也有_write_buffer,並且在讀的過程中也會檢測總尺寸是否大於我們設定的最大緩沖尺寸。不管是讀緩沖還是寫緩沖本質上就是tornado進程開辟的一段用來存儲數據的內存。
而這些chunk一般都是客戶端發送的請求了,但是我們還需要對這些chunk作進一步操作,比如這個chunk中可能包含了多個請求,如何把請求分離?(每個請求首部的結束符是b'\r\n\r\n'),這里就用到read_until來分離請求並設置callback了。同時會將被分離的請求數據從_read_buffer中移除。
然后就是將callback以及他的參數(被分離的請求數據)添加至IOLoop._callbacks中,等待下一次IOLoop的執行,屆時會迭代_callbacks並執行回調函數。
補充: tornado是水平觸發,所以假如讀完一次chunk后系統緩存區中依然還有數據,那么下一次的epoll.poll()依然會返回該socket。
在iostream中有一個類叫做:IOStream
有幾個較為重要的屬性:
def __init__(): self.socket = socket # 封裝socket self.socket.setblocking(False) # 設置socket為非阻塞 self.io_loop = io_loop or ioloop.IOLoop.current() self._read_buffer = deque() # 讀緩沖 self._write_buffer = deque() # 寫緩沖 self._read_callback = None # 讀到指定字節數據時,或是指定標志字符串時,需要執行的回調函數 self._write_callback = None # 發送完_write_buffer的數據時,需要執行的回調函數
有幾個較為重要的方法
class IOStream(object): def read_until(self, delimiter, callback): def read_bytes(self, num_bytes, callback, streaming_callback=None): def read_until_regex(self, regex, callback): def read_until_close(self, callback, streaming_callback=None): def write(self, data, callback=None):
以上所有的方法都需要一個可選的callback參數,如果該參數為None則該方法會返回一個Future對象。
以上所有的讀方法本質上都是讀取該socket所發送來的數據,然后當讀到指定分隔符或者標記或者條件觸發的時候,停止讀,然后將該分隔符以及其前面的數據作為callback(如果沒有callback,則將數據設置為Future對象的result)的參數,然后將callback添加至IOLoop._callbacks中。當然其中所有的"讀"操作是非阻塞的!
像read_until read_until_regex 這兩個方法相差不大,原理都是差不多的,都是在buffer中找指定的字符或者字符樣式。
而read_bytes則是設置讀取字節數,達到這些字節就會觸發並運行回調函數(當然這些回調函數不是立刻運行,而是被送到ioloop中的_callbacks中),該方法主要是用來讀取包含content-length或者分塊傳輸編碼的具有主體信息的請求或者響應。
而read_until_close則是主要被用在非持久連接上,因為非持久連接響應的結束標志就是連接關閉。
read_bytes和read_until_close這兩個方法都有streaming_callback這個參數,假如指定了該參數,那么只要read_buffer中有數據,則將數據作為參數調用該函數
就拿比較常見的read_until方法來說,下面是代碼簡化版:
def read_until(self, delimiter, callback=None, max_bytes=None): future = self._set_read_callback(callback) # 可能是Future對象,也可能是None self._read_delimiter = delimiter # 設置分隔符 self._read_max_bytes = max_bytes # 設置最大讀字節數 self._try_inline_read() return future
其中_set_read_callback會根據callback是否存在返回None或者Future對象(存在返回None,否則返回一個Future實例對象)
如果我們
再來看_try_inline_read方法的簡化版:
def _try_inline_read(self): """ 嘗試從_read_buffer中讀取所需數據 """ # 查看是否我們已經在之前的讀操作中得到了數據 self._run_streaming_callback() # 檢查字符流回調,如果調用read_bytes和read_until_close並指定了streaming_callback參數就會造成這個回調 pos = self._find_read_pos() # 嘗試在_read_buffer中找到分隔符的位置。找到則返回分隔符末尾所處的位置,如果不能,則返回None。 if pos is not None: self._read_from_buffer(pos) return
self._check_closed() # 檢查當前IOStream是否關閉 pos = self._read_to_buffer_loop() # 從系統緩沖中讀取一個chunk,檢查是否含有分隔符,沒有則繼續讀取一個chunk,合並兩個chunk,再次檢查是否函數分隔符…… 如果找到了分隔符,會返回分隔符末尾在_read_buffer中所處的位置 if pos is not None: # 如果找到了分隔符, self._read_from_buffer(pos) # 將所需的數據從_read_buffer中移除,並將其作為callback的參數,然后將callback封裝后添加至IOLoop._callbacks中 return
# 沒找到分隔符,要么關閉IOStream,要么為該socket在IOLoop中注冊事件 if self.closed(): self._maybe_run_close_callback() else: self._add_io_state(ioloop.IOLoop.READ)
上面的代碼被我用空行分為了三部分,每一部分順序的對應下面每一句話
分析該方法:
1 首先在_read_buffer第一項中找分隔符,找到了就將分隔符以及其前的數據從_read_buffer中移除並將其作為參數傳入回調函數,沒找到就將第二項與第一項合並然后繼續找……;
2 如果在_read_buffer所有項中都沒找到的話就把系統緩存中的數據讀取至_read_buffer,然后合並再次查找,
3 如果把系統緩存中的數據都取完了都還沒找到,那么就等待下一次該socket發生READ事件后再找,這時的找則就是:將系統緩存中的數據讀取到_read_buffer中然后找,也就是執行第2步。
來看一看這三部分分別調用了什么方法:
第一部分中的_find_read_pos以及_read_from_buffer
前者主要是在_read_buffer中查找分隔符,並返回分隔符的位置,后者則是將分隔符以及分隔符前面的所有數據從_read_buffer中取出並將其作為callback的參數,然后將callback封裝后添加至IOLoop._callbacks中
來看_find_read_pos方法的簡化版:

def _find_read_pos(self): # 嘗試在_read_buffer中尋找分隔符。找到則返回分隔符末尾所處的位置,如果不能,則返回None。 if self._read_delimiter is not None: if self._read_buffer: # 查看_read_buffer中是否有之前未處理的數據 while True: loc = self._read_buffer[0].find(self._read_delimiter) # 查找分隔符所出現的首部位置 if loc != -1: # 在_read_buffer的首項中找到了 delimiter_len = len(self._read_delimiter) self._check_max_bytes(self._read_delimiter, loc + delimiter_len) return loc + delimiter_len # 分隔符末尾的位置 if len(self._read_buffer) == 1: break _double_prefix(self._read_buffer) self._check_max_bytes(self._read_delimiter, len(self._read_buffer[0])) return None

def _read_from_buffer(self, pos): # 將所需的數據從_read_buffer中移除,並將其作為callback的參數,然后將callback封裝后添加至IOLoop._callbacks中 self._read_bytes = self._read_delimiter = self._read_regex = None self._read_partial = False self._run_read_callback(pos, False)

來看_run_read_callback源碼簡化版: def _run_read_callback(self, size, streaming): if streaming: callback = self._streaming_callback else: callback = self._read_callback self._read_callback = self._streaming_callback = None if self._read_future is not None: # 這里將_read_future進行set_result assert callback is None future = self._read_future self._read_future = None future.set_result(self._consume(size)) if callback is not None: assert (self._read_future is None) or streaming self._run_callback(callback, self._consume(size)) # 將后者作為前者的參數,然后將前者進行封裝后添加至IOLoop._callbacks中 來看_consume的源碼: def _consume(self, loc): # 將self._read_buffer 的首項改為 原首項[loc:] ,然后返回 原首項[:loc] if loc == 0: return b"" _merge_prefix(self._read_buffer, loc) # 將雙端隊列(deque)的首項調整為指定大小。 self._read_buffer_size -= loc return self._read_buffer.popleft() 來看_run_callback源碼簡化版: def _run_callback(self, callback, *args):# 將callback封裝后添加至ioloop._callbacks中 def wrapper(): self._pending_callbacks -= 1 try: return callback(*args) finally: self._maybe_add_error_listener() with stack_context.NullContext(): self._pending_callbacks += 1 self.io_loop.add_callback(wrapper) # 將callback添加至IOLoop._callbacks中
這里面還用到一個很有意思的函數:_merge_prefix ,這個函數的作用就是將deque的首項調整為指定大小

def _merge_prefix(deque, size): """Replace the first entries in a deque of strings with a single string of up to size bytes. >>> d = collections.deque(['abc', 'de', 'fghi', 'j']) >>> _merge_prefix(d, 5); print(d) deque(['abcde', 'fghi', 'j']) Strings will be split as necessary to reach the desired size. >>> _merge_prefix(d, 7); print(d) deque(['abcdefg', 'hi', 'j']) >>> _merge_prefix(d, 3); print(d) deque(['abc', 'defg', 'hi', 'j']) >>> _merge_prefix(d, 100); print(d) deque(['abcdefghij']) """ if len(deque) == 1 and len(deque[0]) <= size: return prefix = [] remaining = size while deque and remaining > 0: chunk = deque.popleft() if len(chunk) > remaining: deque.appendleft(chunk[remaining:]) chunk = chunk[:remaining] prefix.append(chunk) remaining -= len(chunk) if prefix: deque.appendleft(type(prefix[0])().join(prefix)) if not deque: deque.appendleft(b"")
第二部分的_read_to_buffer_loop

來看_read_to_buffer_loop簡化版: 系統緩沖中的data可能十分長,為了查找指定的字符,我們應該先讀一個chunk,檢查其中是否有指定的字符,若有則返回分隔符末尾所處的位置 若沒有則繼續讀第二個chunk,然后將這兩個chunk合並(多字節分隔符(例如“\ r \ n”)可能跨讀取緩沖區中的兩個塊),重復查找過程 def _read_to_buffer_loop(self): try: next_find_pos = 0 self._pending_callbacks += 1 while not self.closed(): if self._read_to_buffer() == 0: # 從系統緩沖中讀一個chunk並將其添加至_read_buffer中,然后返回chunk的大小,如果無數據則返回0 break self._run_streaming_callback() if self._read_buffer_size >= next_find_pos: # _read_buffer_size 表示_read_buffer的大小 pos = self._find_read_pos() # 嘗試在_read_buffer中尋找分隔符。找到則返回分隔符末尾所處的位置,如果不能,則返回None。 if pos is not None: return pos next_find_pos = self._read_buffer_size * 2 return self._find_read_pos() finally: self._pending_callbacks -= 1
第三部分_add_io_state,該函數和ioloop異步相關

def _add_io_state(self, state): if self.closed(): # 連接已經關閉 return if self._state is None: self._state = ioloop.IOLoop.ERROR | state with stack_context.NullContext(): self.io_loop.add_handler(self.fileno(), self._handle_events, self._state) # 為對應socket的文件描述符添加事件及其處理函數, elif not self._state & state: self._state = self._state | state self.io_loop.update_handler(self.fileno(), self._state) # self._handle_events 是根據events選擇對應的處理函數,在這里我們假設處理函數是_handle_read def _handle_read(self): try: pos = self._read_to_buffer_loop() except UnsatisfiableReadError: raise except Exception as e: gen_log.warning("error on read: %s" % e) self.close(exc_info=True) return if pos is not None: self._read_from_buffer(pos) return else: self._maybe_run_close_callback()
參考: