模擬tornado兩個socket請求
同步執行
按部就班的依次執行,知道上一個步驟執行完才執行下一步。
# coding:utf-8 import time def long_io(): # 長io操作 print("開始執行IO操作") time.sleep(2) print("完成IO操作") def req_a(): # 模擬請求a print('開始處理請求req_a') long_io() # 執行一個長io操作 print('完成處理請求req_a') def req_b(): # 模擬請求b print('開始處理請求req_b') print('完成處理請求req_b') def main(): # 模擬tornado框架,處理兩個請求 req_a() req_b() if __name__ == "__main__": main()
結果:
開始處理請求req_a
開始執行IO操作
完成IO操作
完成處理請求req_a
開始處理請求req_b
完成處理請求req_b
異步執行
對於耗時的過程,我們將其交給別人(如其另外一個線程)去執行,而我們繼續往下處理,當別人執行完耗時操作后再將結果反饋給我們,這就是我們所說的異步。
1)引入線程和回調函數
# coding:utf-8 import time,threading # 引入多線程 thread_list = [] def long_io(callback): # 長io操作 def fun(cb): print("開始執行IO操作") time.sleep(5) print("完成IO操作") cb("IO操作結束") # 當線程結束時,執行回調函數 threading.Thread(target=fun,args=(callback,)).start() # 將長的io操作交個另一個線程來處理,此時long_io執行完畢(python多線程不用join) def callback(ret): print(ret) def req_a(): # 模擬請求a print('開始處理請求req_a') long_io(callback) # 執行一個長io操作 print('完成處理請求req_a') def req_b(): # 模擬請求b print('開始處理請求req_b') print('完成處理請求req_b') def main(): # 模擬tornado框架,處理兩個請求 req_a() req_b() if __name__ == "__main__": main()
結果:
開始處理請求req_a
開始執行IO操作
完成處理請求req_a
開始處理請求req_b
完成處理請求req_b
完成IO操作
IO操作結束
2)引入協程
yield關鍵字:將函數/循環...變成生成器,使用__next__執行到下一個yield關鍵字的位置,使用send賦值並喚醒。
正常版本:
# coding:utf-8 import time,threading # 引入多線程 thread_list = [] def long_io(): # 長io操作 def fun(): print("開始執行IO操作") time.sleep(5) print("完成IO操作") try: gen.send("============ IO操作結束 ============") # 使用send返回結果並喚醒程序繼續執行 except: # 捕獲生成器完成迭代的異常,防止程序退出 pass threading.Thread(target=fun).start() # 將長的io操作交個另一個線程來處理,python多進程不用join def req_a(): # 模擬請求a print('開始處理請求req_a') ret = yield long_io() # 執行一個長io操作,接收結果 print(ret) print('完成處理請求req_a') def req_b(): # 模擬請求b print('開始處理請求req_b') print('完成處理請求req_b') def main(): # 模擬tornado框架,處理兩個請求 global gen gen = req_a() # 初始化生成器 gen.__next__() # 執行到第一個yield位置 req_b() if __name__ == "__main__": main()
結果:
開始處理請求req_a
開始執行IO操作
開始處理請求req_b
完成處理請求req_b
完成IO操作
============ IO操作結束 ============
完成處理請求req_a
裝飾器版本:
# coding:utf-8 import time,threading # 引入多線程 thread_list = [] def long_io(): # 長io操作 def fun(): print("開始執行IO操作") time.sleep(5) print("完成IO操作") try: gen.send("============ IO操作結束 ============") # 使用send返回結果並喚醒程序繼續執行 except: # 捕獲生成器完成迭代的異常,防止程序退出 pass threading.Thread(target=fun).start() # 將長的io操作交個另一個線程來處理,python多進程不用join def gen_coroutine(f): def inner(*args,**kwargs): global gen gen = f() # 執行req_a,初始化生成器 gen.__next__() # 執行到第一個yield位置 return inner @gen_coroutine def req_a(): # 模擬請求a print('開始處理請求req_a') ret = yield long_io() # 執行一個長io操作,接收結果 print(ret) print('完成處理請求req_a') def req_b(): # 模擬請求b print('開始處理請求req_b') print('完成處理請求req_b') def main(): # 模擬tornado框架,處理兩個請求 req_a() # 下面不修改 req_b() if __name__ == "__main__": main()
結果:
開始處理請求req_a
開始執行IO操作
開始處理請求req_b
完成處理請求req_b
完成IO操作
============ IO操作結束 ============
完成處理請求req_a
這個版本就是理解Tornado異步編程原理的最簡易模型,但是,Tornado實現異步的機制不是線程,而是ioloop,即將異步過程交給ioloop執行並進行監視回調。
需要注意的一點是,我們實現的版本嚴格意義上來說不能算是協程,因為兩個程序的掛起與喚醒是在兩個線程上實現的,而Tornado利用ioloop來實現異步,程序的掛起與喚醒始終在一個線程上,由Tornado自己來調度,屬於真正意義上的協程。雖如此,並不妨礙我們理解Tornado異步編程的原理。
Tornado的異步
因為epoll主要是用來解決網絡IO的並發問題,所以Tornado的異步編程也主要體現在網絡IO的異步上,即異步Web請求。
將每個建立起來的socket送入epoll池中監聽,當有請求來了的時候,epoll會捕捉信號並將生成器,斷點...全部打包給IOLoop,讓IOLoop來進行調度。
經過路由映射交給視圖函數處理,當遇到堵塞的時候會將生成器,斷點...全部打包還給IOLoop,等待下一次調度。
tornado.httpclient.AsyncHTTPClient
Tornado提供了一個異步Web請求客戶端tornado.httpclient.AsyncHTTPClient用來進行異步Web請求。
fetch(request, callback=None)
用於執行一個web請求request,並異步返回一個tornado.httpclient.HTTPResponse響應。
request可以是一個url,也可以是一個tornado.httpclient.HTTPRequest對象。如果是url,fetch會自己構造一個HTTPRequest對象。
HTTPRequest
HTTP請求類,HTTPRequest的構造函數可以接收眾多構造參數,最常用的如下:
- url (string) – 要訪問的url,此參數必傳,除此之外均為可選參數
- method (string) – HTTP訪問方式,如“GET”或“POST”,默認為GET方式
- headers (HTTPHeaders or dict) – 附加的HTTP協議頭
- body – HTTP請求的請求體
HTTPResponse
HTTP響應類,其常用屬性如下:
- code: HTTP狀態碼,如 200 或 404
- reason: 狀態碼描述信息
- body: 響應體字符串
- error: 異常(可有可無)
裝飾器(tornado.web.asynchronous)
此裝飾器用於回調形式的異步方法,並且應該僅用於HTTP的方法上(如get、post等)。
此裝飾器不會讓被裝飾的方法變為異步,而只是告訴框架被裝飾的方法是異步的,當方法返回時響應尚未完成。只有在request handler調用了finish方法后,才會結束本次請求處理,發送響應。
不帶此裝飾器的請求在get、post等方法返回時自動完成結束請求處理。
callback異步:
# coding:utf-8 import tornado.web import tornado.ioloop import tornado.httpserver import tornado.options import json from tornado.web import url,RequestHandler from tornado.options import define,options from tornado.httpclient import AsyncHTTPClient # 引入異步Web請求客戶端 tornado.options.define("port",default=8001,type=int,help="給個端口號唄") class IndexHandler(RequestHandler): @tornado.web.asynchronous # 不關閉連接,也不發送響應(因為在get,post方法執行后會執行on_finish方法,會關閉管道) def get(self): http = AsyncHTTPClient() # 實例化異步客戶端 http.fetch("http://int.dpool.sina.com.cn/iplookup/iplookup.php?format=json&ip=14.130.112.24",callback=self.on_response) # 發送一個http請求 def on_response(self, response): if response.error: self.send_error(500) else: data = json.loads(response.body) # 獲取響應體 if 1 == data["ret"]: self.write(u"國家:%s 省份: %s 城市: %s" % (data["country"], data["province"], data["city"])) else: self.write("查詢IP信息錯誤") self.finish() # 發送響應信息,結束請求處理 if __name__ == "__main__": tornado.options.parse_command_line() app = tornado.web.Application( [ (r"/",IndexHandler), ], debug = True ) http_server = tornado.httpserver.HTTPServer(app) # 創建httpserver實例 http_server.listen(options.port) tornado.ioloop.IOLoop.current().start()
結果:
國家:中國 省份: 北京 城市: 北京
協程異步:
# coding:utf-8 import tornado.web import tornado.ioloop import tornado.httpserver import tornado.options import json from tornado.web import url,RequestHandler from tornado.options import define,options from tornado.httpclient import AsyncHTTPClient # 引入異步Web請求客戶端 import tornado.gen tornado.options.define("port",default=8001,type=int,help="給個端口號唄") class IndexHandler(RequestHandler): class IndexHandler(tornado.web.RequestHandler): @tornado.gen.coroutine def get(self): http = AsyncHTTPClient() response = yield http.fetch( "http://int.dpool.sina.com.cn/iplookup/iplookup.php?format=json&ip=14.130.112.24") if response.error: self.send_error(500) else: data = json.loads(response.body) if 1 == data["ret"]: self.write(u"國家:%s 省份: %s 城市: %s" % (data["country"], data["province"], data["city"])) else: self.write("查詢IP信息錯誤") if __name__ == "__main__": tornado.options.parse_command_line() app = tornado.web.Application( [ (r"/",IndexHandler), ], debug = True ) http_server = tornado.httpserver.HTTPServer(app) # 創建httpserver實例 http_server.listen(options.port) tornado.ioloop.IOLoop.current().start()
也可以將異步Web請求單獨出來:
class IndexHandler(tornado.web.RequestHandler): @tornado.gen.coroutine def get(self): rep = yield self.get_ip_info("14.130.112.24") if 1 == rep["ret"]: self.write(u"國家:%s 省份: %s 城市: %s" % (rep["country"], rep["province"], rep["city"])) else: self.write("查詢IP信息錯誤") @tornado.gen.coroutine def get_ip_info(self, ip): http = tornado.httpclient.AsyncHTTPClient() response = yield http.fetch("http://int.dpool.sina.com.cn/iplookup/iplookup.php?format=json&ip=" + ip) if response.error: rep = {"ret:0"} else: rep = json.loads(response.body) return(rep) # 此處需要注意,python2中使用raise tornado.gen.Return
代碼中我們需要注意的地方是get_ip_info返回值的方式,在python 2中,使用了yield的生成器可以使用不返回任何值的return,但不能return value,因此Tornado為我們封裝了用於在生成器中返回值的特殊異常tornado.gen.Return,並用raise來返回此返回值。
並行協程:
Tornado可以同時執行多個異步,並發的異步可以使用列表或字典,如下:
class IndexHandler(tornado.web.RequestHandler): @tornado.gen.coroutine def get(self): ips = ["14.130.112.24", "15.130.112.24", "16.130.112.24", "17.130.112.24"] rep1, rep2 = yield [self.get_ip_info(ips[0]), self.get_ip_info(ips[1])] rep34_dict = yield dict(rep3=self.get_ip_info(ips[2]), rep4=self.get_ip_info(ips[3])) self.write_response(ips[0], rep1) self.write_response(ips[1], rep2) self.write_response(ips[2], rep34_dict['rep3']) self.write_response(ips[3], rep34_dict['rep4']) def write_response(self, ip, response): self.write(ip) self.write(":<br/>") if 1 == response["ret"]: self.write(u"國家:%s 省份: %s 城市: %s<br/>" % (response["country"], response["province"], response["city"])) else: self.write("查詢IP信息錯誤<br/>") @tornado.gen.coroutine def get_ip_info(self, ip): http = tornado.httpclient.AsyncHTTPClient() response = yield http.fetch("http://int.dpool.sina.com.cn/iplookup/iplookup.php?format=json&ip=" + ip) if response.error: rep = {"ret:1"} else: rep = json.loads(response.body) raise tornado.gen.Return(rep)
關於數據庫的異步說明
網站基本都會有數據庫操作,而Tornado是單線程的,這意味着如果數據庫查詢返回過慢,整個服務器響應會被堵塞。
數據庫查詢,實質上也是遠程的網絡調用;理想情況下,是將這些操作也封裝成為異步的;但Tornado對此並沒有提供任何支持。
這是Tornado的設計,而不是缺陷。
一個系統,要滿足高流量;是必須解決數據庫查詢速度問題的!
數據庫若存在查詢性能問題,整個系統無論如何優化,數據庫都會是瓶頸,拖慢整個系統!
異步並不能從本質上提到系統的性能;它僅僅是避免多余的網絡響應等待,以及切換線程的CPU耗費。
如果數據庫查詢響應太慢,需要解決的是數據庫的性能問題;而不是調用數據庫的前端Web應用。
對於實時返回的數據查詢,理想情況下需要確保所有數據都在內存中,數據庫硬盤IO應該為0;這樣的查詢才能足夠快;而如果數據庫查詢足夠快,那么前端web應用也就無將數據查詢封裝為異步的必要。
就算是使用協程,異步程序對於同步程序始終還是會提高復雜性;需要衡量的是處理這些額外復雜性是否值得。
如果后端有查詢實在是太慢,無法繞過,Tornaod的建議是將這些查詢在后端封裝獨立封裝成為HTTP接口,然后使用Tornado內置的異步HTTP客戶端進行調用。
websocket
WebSocket是HTML5規范中新提出的客戶端-服務器通訊協議,協議本身使用新的ws://URL格式。
WebSocket 是獨立的、創建在 TCP 上的協議,和 HTTP 的唯一關聯是使用 HTTP 協議的101狀態碼進行協議切換,使用的 TCP 端口是80,可以用於繞過大多數防火牆的限制。
WebSocket 使得客戶端和服務器之間的數據交換變得更加簡單,允許服務端直接向客戶端推送數據而不需要客戶端進行請求,兩者之間可以創建持久性的連接,並允許數據進行雙向傳送。
目前常見的瀏覽器如 Chrome、IE、Firefox、Safari、Opera 等都支持 WebSocket,同時需要服務端程序支持 WebSocket。
1. Tornado的WebSocket模塊
Tornado提供支持WebSocket的模塊是tornado.websocket,其中提供了一個WebSocketHandler類用來處理通訊。
WebSocketHandler.open()
當一個WebSocket連接建立后被調用。
WebSocketHandler.on_message(message)
當客戶端發送消息message過來時被調用,注意此方法必須被重寫。
WebSocketHandler.on_close()
當WebSocket連接關閉后被調用。
WebSocketHandler.write_message(message, binary=False)
向客戶端發送消息messagea,message可以是字符串或字典(字典會被轉為json字符串)。若binary為False,則message以utf8編碼發送;二進制模式(binary=True)時,可發送任何字節碼。
WebSocketHandler.close()
關閉WebSocket連接。
WebSocketHandler.check_origin(origin)
判斷源origin,對於符合條件(返回判斷結果為True)的請求源origin允許其連接,否則返回403。可以重寫此方法來解決WebSocket的跨域請求(如始終return True)。
2. 前端JavaScript編寫
在前端JS中使用WebSocket與服務器通訊的常用方法如下:
var ws = new WebSocket("ws://127.0.0.1:8888/websocket"); // 新建一個ws連接 ws.onopen = function() { // 連接建立好后的回調 ws.send("Hello, world"); // 向建立的連接發送消息 }; ws.onmessage = function (evt) { // 收到服務器發送的消息后執行的回調 alert(evt.data); // 接收的消息內容在事件參數evt的data屬性中 };
在線聊天室的小Demo

1 # coding:utf-8 2 3 import tornado.web 4 import tornado.ioloop 5 import tornado.httpserver 6 import tornado.options 7 import os 8 import datetime 9 10 from tornado.web import RequestHandler 11 from tornado.options import define, options 12 from tornado.websocket import WebSocketHandler 13 14 define("port", default=8000, type=int) 15 16 class IndexHandler(RequestHandler): 17 def get(self): 18 self.render("index.html") 19 20 class ChatHandler(WebSocketHandler): 21 22 users = set() # 用來存放在線用戶的容器 23 24 def open(self): 25 self.users.add(self) # 建立連接后添加用戶到容器中 26 for u in self.users: # 向已在線用戶發送消息 27 u.write_message(u"[%s]-[%s]-進入聊天室" % (self.request.remote_ip, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) 28 29 def on_message(self, message): 30 for u in self.users: # 向在線用戶廣播消息 31 u.write_message(u"[%s]-[%s]-說:%s" % (self.request.remote_ip, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), message)) 32 33 def on_close(self): 34 self.users.remove(self) # 用戶關閉連接后從容器中移除用戶 35 for u in self.users: 36 u.write_message(u"[%s]-[%s]-離開聊天室" % (self.request.remote_ip, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) 37 38 def check_origin(self, origin): 39 return True # 允許WebSocket的跨域請求 40 41 if __name__ == '__main__': 42 tornado.options.parse_command_line() 43 app = tornado.web.Application([ 44 (r"/", IndexHandler), 45 (r"/chat", ChatHandler), 46 ], 47 static_path = os.path.join(os.path.dirname(__file__), "static"), 48 template_path = os.path.join(os.path.dirname(__file__), "template"), 49 debug = True 50 ) 51 http_server = tornado.httpserver.HTTPServer(app) 52 http_server.listen(options.port) 53 tornado.ioloop.IOLoop.current().start()

1 <!DOCTYPE html> 2 <html> 3 <head> 4 <meta charset="utf-8"> 5 <title>聊天室</title> 6 </head> 7 <body> 8 <div id="contents" style="height:500px;overflow:auto;"></div> 9 <div> 10 <textarea id="msg"></textarea> 11 <a href="javascript:;" onclick="sendMsg()">發送</a> 12 </div> 13 <script src="{{static_url('js/jquery.min.js')}}"></script> 14 <script type="text/javascript"> 15 var ws = new WebSocket("ws://192.168.114.177:8000/chat"); 16 ws.onmessage = function(e) { 17 $("#contents").append("<p>" + e.data + "</p>"); 18 } 19 function sendMsg() { 20 var msg = $("#msg").val(); 21 ws.send(msg); 22 $("#msg").val(""); 23 } 24 </script> 25 </body> 26 </html>