一、Tornado異步非阻塞
一般的web框架,可以分為兩類:
阻塞式:(Django,Flask,Tornado,Bottle)
一個請求到來未處理完成,后續一直等待
解決方案:多線程或多進程
異步非阻塞(存在IO請求):Tornado (單進程+單線程)
- 使用
- @gen.coroutine
- yield Future對象
1.簡單的異步例子
import tornado.ioloop
import tornado.web
from tornado.web import RequestHandler
from tornado import gen
from tornado.concurrent import Future
import time
class IndexHandler(RequestHandler):
@gen.coroutine
def get(self):
print('開始')
future = Future()
#從當前時間開始夯住10S,當時間結束執行 self.doing方法
tornado.ioloop.IOLoop.current().add_timeout(time.time() + 10, self.doing)
yield future
def doing(self, *args, **kwargs):
self.write('async')
#關閉連接
self.finish()
application = tornado.web.Application([
(r"/index", IndexHandler),
])
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
在內部中,Select循環檢測Future,當有返回值時,Future內部會作出變化,此時即可釋放向自己發送請求的鏈接
2.作為中轉,接收用戶對自己的請求,而自己需要向另一個API發送請求處理,在發送請求的過程中,占用大量時間,這時候就可以使用異步非阻塞的形式。
比如一個購物網站,用戶向自己發來購買訂單,自己再向預定的后台處理API發送處理請求,在這段時間內可以接收其它用戶發來的訂單請求,這將大大提升處理的效率。
import tornado.ioloop
import tornado.web
from tornado.web import RequestHandler
from tornado import gen
from tornado.concurrent import Future
import time
from tornado import httpclient
class IndexHandler(RequestHandler):
@gen.coroutine
def get(self):
print('收到訂單')
#開啟一個非阻塞HTTP客戶端對象
http = httpclient.AsyncHTTPClient()
# 執行請求,異步返回HTTPResponse。在向另一url請求的過程中,可以繼續接收其它對自己的請求
yield http.fetch("http://www.github.com", self.done,)
def done(self, response): #異步方法接收HTTPResponse,並可以繼續執行其它代碼
print(response)
self.write('訂單成功')
#關閉此連接
self.finish()
application = tornado.web.Application([
(r"/index", IndexHandler),
])
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
fetch(request,callback = None,raise_error = True,** kwargs )[source]
執行請求,異步返回HTTPResponse。
請求可以是字符串URL或HTTPRequest對象。如果它是一個字符串,我們構造一個HTTPRequest使用任何額外的kwargs:HTTPRequest(request, **kwargs)
該方法return一個Future結果是一個 HTTPResponse。默認情況下,Future將引發一個HTTPError,如果該請求返回一個非200響應代碼(其他錯誤也可以得到提升,如果服務器無法聯系)。相反,如果raise_error設置為False,則無論響應代碼如何,都將始終返回響應。
如果callback給出一個,它將被調用HTTPResponse。在回調界面中,HTTPError不會自動提出。相反,您必須檢查響應的error屬性或調用其rethrow方法。
3.模擬異步非阻塞執行的流程:
import tornado.ioloop
import tornado.web
from tornado.web import RequestHandler
from tornado import gen
from tornado.concurrent import Future
import time
from tornado import httpclient
fu = None
class IndexHandler(RequestHandler):
@gen.coroutine
def get(self):
#將fu設置為全局變量
global fu
print('瘋狂的追求')
fu = Future()
#添加一個回調,如果完成運行並且其結果可用時,它將作為其參數被調用
fu.add_done_callback(self.done)
yield fu
def done(self, response):
self.write('終於等到你')
self.finish()
class TestHandler(RequestHandler):
def get(self):
#手動設置Future結果
fu.set_result(666)
self.write('我只能幫你到這里了')
application = tornado.web.Application([
(r"/index", IndexHandler),
(r"/test", TestHandler),
])
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
當return一個Future時,可以添加一個回調函數,而這個回調函數,只有在請求成功接收到結果時才會執行,否則將一直夯住,而Future內部則是通過set_result()方法使其得到一個成功返回的信號從而執行回調。正常情況下set_result()的值是正確的返回結果,這里通過偽造結果達到異步的效果。done()方法中的response即set_result()的值。
3.1 多線程自動執行:
import tornado.ioloop
import tornado.web
from tornado.web import RequestHandler
from tornado import gen
from tornado.concurrent import Future
import time
from threading import Thread
def waiting(future):
import time
time.sleep(10)
#10S后執行set_result使回調生效。
future.set_result(666)
class IndexHandler(RequestHandler):
@gen.coroutine
def get(self):
global fu
print('瘋狂的追求')
fu = Future()
fu.add_done_callback(self.done)
#開啟線程執行waiting函數,
thread = Thread(target=waiting,args=(fu,))
thread.start()
yield fu
def done(self, response):
self.write('終於等到你')
self.finish()
application = tornado.web.Application([
(r"/index", IndexHandler),
])
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
4.使用Tornado異步非阻塞模擬用戶登錄
import tornado.web
from tornado import gen
import tornado_mysql
#必須要使用支持的類庫,如tornado_mysql ,sqlalchemy以及pymysql都不支持
@gen.coroutine
def get_user(user):
#連接時會有IO耗時
conn = yield tornado_mysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='cmdb',
charset='utf8')
cur = conn.cursor()
#查詢時也會有IO耗時
# yield cur.execute("SELECT name,email FROM web_models_userprofile where name=%s", (user,))
yield cur.execute("select sleep(10)")
row = cur.fetchone()
cur.close()
conn.close()
#gen.Task特有的return方式,通過捕捉異常獲取數據庫查詢的值
raise gen.Return(row)
class LoginHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
self.render('login.html')
@gen.coroutine
def post(self, *args, **kwargs):
user = self.get_argument('user')
#yield一個Task任務,封裝數據庫查詢的函數,當查詢遇到IO操作時不會阻塞,可以接收其它請求。
data = yield gen.Task(get_user, user)
if data:
print(data)
self.redirect('http://www.baidu.com')
else:
self.render('login.html')
二、自定義簡易非阻塞框架
1.socket服務端基本(阻塞式):
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("127.0.0.1", 9999,))
sock.listen(5)
while True:
client, address = sock.accept()
data = client.recv(8096)
print(data)
# /r/n/r/n
# 請求頭中URL
# 請求體中數據
# URL去路由關系中匹配 (請求體中數據)
# response = func()
import time
time.sleep(10)
client.sendall(b'uuuuuuuuuuuuuu')
client.close()
2.應用select模擬非阻塞web框架的基本流程:
import socket
import select
def f1(request):
return "內容1"
def f2(request):
return "內容2"
#路由匹配系統
urls = [
('/index.html',f1),
('/home.html',f2),
]
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#設置非阻塞
sock.setblocking(False)
sock.bind(("127.0.0.1", 9999,))
sock.listen(5)
input_list = [sock,]
while True:
"""
客戶端第一次建立連接以及后續的請求發來數據都會對rlist有變動,對rlist進行循環,
如果等於sock即是建立連接的請求,否則就是客戶端后續發來數據,因此進行不同的操作
"""
rlist,wlist,elist = select.select(input_list,[],[],0.005)
for sk in rlist:
# 有新人來連接:sock 進行accept,接收連接,等待客戶來連接
if sk == sock:
client, address = sock.accept()
client.setblocking(False)
input_list.append(client) #將客戶端連接添加到循環列表中
# 老人發來數據: client 建立連接並接收數據
else:
data = sk.recv(8096)
# /r/n/r/n (分割請求頭跟請求體)
# 請求頭中URL
# 請求體中數據 (提取URL跟數據)
# URL去路由關系中匹配 (請求體中數據)
# response = func()
#根據數據匹配URL,服務器做相應的視圖處理,處理后的數據返回給客戶端
response = f1(data)
sk.sendall(response.encode('utf-8'))
sk.close()
3.簡單的web框架基本實現原理
import re
import socket
import select
import time
class Future(object):
"""
異步非阻塞模式時封裝回調函數以及是否准備就緒
"""
def __init__(self, callback):
self.callback = callback
self._ready = False
self.value = None
def set_result(self, value=None):
self.value = value
self._ready = True
@property
def ready(self):
return self._ready
class HttpResponse(object):
"""
封裝響應信息
"""
def __init__(self, content=''):
self.content = content
self.headers = {}
self.cookies = {}
def response(self):
return bytes(self.content, encoding='utf-8')
class HttpNotFound(HttpResponse):
"""
404時的錯誤提示
"""
def __init__(self):
super(HttpNotFound, self).__init__('404 Not Found')
class HttpRequest(object):
"""
用戶封裝用戶請求信息
"""
def __init__(self, conn):
"""
:param conn: 客戶端的連接
"""
self.conn = conn
self.header_bytes = bytes()
self.body_bytes = bytes()
self.header_dict = {}
self.method = ""
self.url = ""
self.protocol = ""
self.initialize()
self.initialize_headers()
def initialize(self):
header_flag = False
while True:
try:
received = self.conn.recv(8096)
except Exception as e:
received = None
if not received:
break
if header_flag:
self.body_bytes += received
continue
temp = received.split(b'\r\n\r\n', 1)
if len(temp) == 1:
self.header_bytes += temp
else:
h, b = temp
self.header_bytes += h
self.body_bytes += b
header_flag = True
@property
def header_str(self):
return str(self.header_bytes, encoding='utf-8')
def initialize_headers(self):
headers = self.header_str.split('\r\n')
first_line = headers[0].split(' ')
if len(first_line) == 3:
self.method, self.url, self.protocol = headers[0].split(' ')
for line in headers:
kv = line.split(':')
if len(kv) == 2:
k, v = kv
self.header_dict[k] = v
class Snow(object):
"""
微型Web框架類,
"""
def __init__(self, routes):
"""
:param routes: url路由匹配
"""
self.routes = routes
self.inputs = set()
self.request = None
self.async_request_handler = {}
def run(self, host='localhost', port=9999):
"""
事件循環
:param host: IP地址
:param port: 端口
:return:
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port,))
sock.setblocking(False)
sock.listen(128)
sock.setblocking(0)
self.inputs.add(sock)
try:
while True:
readable_list, writeable_list, error_list = select.select(self.inputs, [], self.inputs,0.005)
for conn in readable_list:
if sock == conn:
# 連接
client, address = conn.accept()
client.setblocking(False)
self.inputs.add(client)
else:
# 發送請求
# HttpResponse
# 生成器
"""
gen=HttpResponse對象,執行了三件事,
1.HttpRequest類,將客戶端拿到,分割請求頭與請求體,將請求頭信息處理成header_dict {'method':xxx,'url':xxx}
2.循環路由,實例化HttpRequest類, .url與其進行匹配,匹配成功將self.fun 賦值稱為路由系統填寫的func函數。
3.處理func視圖,返回2種類型,HttpResponse對象或者Future對象。如果是HttpResponse()對象,封裝了將字符串轉換
成字節的方法,即還是一種阻塞式,即一般的如Django類web框架的處理流程,
"""
gen = self.process(conn)
if isinstance(gen,HttpResponse):
conn.sendall(gen.response())
self.inputs.remove(conn)
conn.close()
else:
yielded = next(gen)
self.async_request_handler[conn] = yielded
for conn,fur in self.async_request_handler.items():
if not fur.ready:
continue
response = fur.callback(fur.value)
conn.sendall(response)
conn.close()
del self.async_request_handler[conn]
except Exception as e:
pass
finally:
sock.close()
def process(self, conn):
"""
處理路由系統以及執行函數
:param conn:
:return:
"""
self.request = HttpRequest(conn)
func = None
for route in self.routes:
if re.match(route[0], self.request.url):
func = route[1]
break
if not func:
return HttpNotFound()
else:
return func(self.request)
def index(request):
return HttpResponse('OK')
def asynccccc(request):
fur = Future(callback=done)
yield fur
def done(arg):
pass
routes = [
(r'/index/', index),
(r'/async/', asynccccc),
]
if __name__ == '__main__':
app = Snow(routes)
app.run(port=8012)
gen對象中,如果返回的是一個Future對象,則內部會將封裝的Future對象yield給回調函數,或者說是交給中轉站,由中轉站向基礎平台的API發送IO請求,這時不會阻塞,會在循環中繼續yield新的請求,並生成一個字典,{conn:yield Future的對象},而在外層的循環中監聽這個對象,一旦對象發送的IO請求處理完成,將執行set_result方法,將ready設置成True,並拿到請求接收的值即賦值給value,再把value傳遞給回調函數中進行相應處理, 獲得一個最后的值,將conn從字典中移除,將處理后的值返回給用戶。
4.最后整理后的簡單異步非阻塞web框架
import re
import socket
import select
import time
class HttpResponse(object):
"""
封裝響應信息
"""
def __init__(self, content=''):
self.content = content
self.headers = {}
self.cookies = {}
def response(self):
return bytes(self.content, encoding='utf-8')
class HttpNotFound(HttpResponse):
"""
404時的錯誤提示
"""
def __init__(self):
super(HttpNotFound, self).__init__('404 Not Found')
class HttpRequest(object):
"""
用戶封裝用戶請求信息
"""
def __init__(self, conn):
self.conn = conn
self.header_bytes = bytes()
self.header_dict = {}
self.body_bytes = bytes()
self.method = ""
self.url = ""
self.protocol = ""
self.initialize()
self.initialize_headers()
def initialize(self):
header_flag = False
while True:
try:
received = self.conn.recv(8096)
except Exception as e:
received = None
if not received:
break
if header_flag:
self.body_bytes += received
continue
temp = received.split(b'\r\n\r\n', 1)
if len(temp) == 1:
self.header_bytes += temp
else:
h, b = temp
self.header_bytes += h
self.body_bytes += b
header_flag = True
@property
def header_str(self):
return str(self.header_bytes, encoding='utf-8')
def initialize_headers(self):
headers = self.header_str.split('\r\n')
first_line = headers[0].split(' ')
if len(first_line) == 3:
self.method, self.url, self.protocol = headers[0].split(' ')
for line in headers:
kv = line.split(':')
if len(kv) == 2:
k, v = kv
self.header_dict[k] = v
class Future(object):
"""
異步非阻塞模式時封裝回調函數以及是否准備就緒
"""
def __init__(self, callback):
self.callback = callback
self._ready = False
self.value = None
def set_result(self, value=None):
self.value = value
self._ready = True
@property
def ready(self):
return self._ready
class TimeoutFuture(Future):
"""
異步非阻塞超時
"""
def __init__(self, timeout):
super(TimeoutFuture, self).__init__(callback=None)
self.timeout = timeout
self.start_time = time.time()
@property
def ready(self):
current_time = time.time()
if current_time > self.start_time + self.timeout:
self._ready = True
return self._ready
class Snow(object):
"""
微型Web框架類
"""
def __init__(self, routes):
self.routes = routes
self.inputs = set()
self.request = None
self.async_request_handler = {}
def run(self, host='localhost', port=9999):
"""
事件循環
:param host:
:param port:
:return:
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port,))
sock.setblocking(False)
sock.listen(128)
sock.setblocking(0)
self.inputs.add(sock)
try:
while True:
readable_list, writeable_list, error_list = select.select(self.inputs, [], self.inputs,0.005)
for conn in readable_list:
if sock == conn:
client, address = conn.accept()
client.setblocking(False)
self.inputs.add(client)
else:
gen = self.process(conn)
if isinstance(gen, HttpResponse):
conn.sendall(gen.response())
self.inputs.remove(conn)
conn.close()
else:
yielded = next(gen)
self.async_request_handler[conn] = yielded
self.polling_callback()
except Exception as e:
pass
finally:
sock.close()
def polling_callback(self):
"""
遍歷觸發異步非阻塞的回調函數
:return:
"""
for conn in list(self.async_request_handler.keys()):
yielded = self.async_request_handler[conn]
if not yielded.ready:
continue
if yielded.callback:
ret = yielded.callback(self.request, yielded)
conn.sendall(ret.response())
self.inputs.remove(conn)
del self.async_request_handler[conn]
conn.close()
def process(self, conn):
"""
處理路由系統以及執行函數
:param conn:
:return:
"""
self.request = HttpRequest(conn)
func = None
for route in self.routes:
if re.match(route[0], self.request.url):
func = route[1]
break
if not func:
return HttpNotFound()
else:
return func(self.request)
使用:
基本使用(沒有使用異步非阻塞):
from s4 import Snow
from s4 import HttpResponse
def index(request):
return HttpResponse('OK')
routes = [
(r'/index/', index),
]
app = Snow(routes)
app.run(port=8012)
手動的異步非阻塞使用
from s4 import Snow
from s4 import HttpResponse
from s4 import Future
request_list = []
def callback(request, future):
return HttpResponse(future.value)
def req(request):
print('請求到來')
obj = Future(callback=callback)
request_list.append(obj)
yield obj
def stop(request):
obj = request_list[0]
obj.set_result('done')
del request_list[0]
return HttpResponse('stop')
routes = [
(r'/req/', req),
(r'/stop/', stop),
]
app = Snow(routes)
app.run(port=8012)
由於我們在最外層循環了一個{conn:future}的字典,所以需要手動去對stop視圖函數發送請求,由它進行set_result,如果是自動的方法,就是將這個conn添加到select中,由select進行監聽並set_result,
達到程序自動返回的效果。
