1.多線程網絡IO請求:

#!/usr/bin/python #coding:utf-8 from concurrent.futures import ThreadPoolExecutor import requests #線程池 # def get_page(url): # response = requests.get(url) # print response.url # return response # # urllist=["https://www.baidu.com/","https://www.jianshu.com/","https://i.cnblogs.com/"] # pool = ThreadPoolExecutor(5) #最多能運行5個線程 # for url in urllist: # pool.submit(get_page,url) #將線程(函數和參數)提交到線程池中 # pool.shutdownn(wait=True) #線程池加回調函數 def get_page(url): response = requests.get(url) print response.url return response def callback(future): print future.result urllist=["https://www.baidu.com/","https://www.jianshu.com/","https://i.cnblogs.com/"] pool = ThreadPoolExecutor(5) #最多能運行5個線程 for url in urllist: future = pool.submit(get_page,url) #將線程(函數和參數)提交到線程池中,返回Future對象 future.add_done_callback(callback) pool.shutdownn(wait=True)
2.多進程網絡IO請求:

#!/usr/bin/python #coding:utf-8 from concurrent.futures import ProcessPoolExecutor import requests #進程池加會調函數 # def get_page(url): # response = requests.get(url) # print response.url # return response # # urllist=["https://www.baidu.com/","https://www.jianshu.com/","https://i.cnblogs.com/"] # pool = ProcessPoolExecutor(5) # # for url in urllist: # pool.submit(get_page,url) # pool.shutdown(wait=True) #進程池加回調函數 def get_page(url): response = requests.get(url) print response.url return response def callback(future): print future.result() urllist=["https://www.baidu.com/","https://www.jianshu.com/","https://i.cnblogs.com/"] pool = ProcessPoolExecutor(5) for url in urllist: future = pool.submit(get_page,url) future.add_done_callback(callback) pool.shutdown(wait=True)
上面執行結果如下:
每一個請求發出后等待結果而阻塞,造成了進程或線程資源浪費。異步IO能更好的解決問題,即請求發出后不等待結果,而繼續處理其他業務,待網頁結果返回后再進行處理。
3. 異步IO請求:
3.1,asyncio模塊:asyncio是Python 3.4版本引入的標准庫,直接內置了對異步IO的支持
https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001432090954004980bd351f2cd4cc18c9e6c06d855c498000
asyncio是一個事件循環(EventLoop),通過裝飾器@asyncio.coroutine將一個generator(需要完成的任務)標識為coroutine(協程)類型,然后將多個coroutine放入到EventLoop中執行就能實現異步IO了。

#!/usr/bin/python #coding:utf-8 import asyncio @asyncio.coroutine def get_page(host,url='/'): print host reader,writer = yield from asyncio.open_connection(host,80) request_header = "Get %s HTTP/1.0\r\nHost: %s\r\n\r\n"%(url,host) writer.write(bytes(request_header,encoding='utf-8')) yield from writer.drain() text = yield from reader.read() print(host,url,text) writer.close() tasks=[ get_page("www.cnblogs.com","silence-cho"), get_page("www.sina.com.cn"), get_page("www.163.com") ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
上面執行結果如下:
三個請求(協程)是在一個線程中執行的,但能夠並發執行。即第一個協程碰到網絡IO后切換到第二個協程,第二個協程同樣碰到IO時切換到第三個協程,而當三個網絡IO中有返回結果時,切回到該協程繼續執行,從而實現異步IO和並發執行
3.2,asyncio+aiohttp: asyncio實現了實現了TCP、UDP、SSL等協議,aiohttp則是基於asyncio實現的http框架,aiohttp封裝了web,request等方法。
aiohttp文檔:https://aiohttp.readthedocs.io/en/stable/client_quickstart.html#make-a-request
3.3,asyncio+requests:
3.4,gevent+requests: gevent為python第三方庫,提供了完善的協程支持。gevent通過greenlet實現協程。當一個greenlet遇到IO操作時,比如訪問網絡,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。由於IO操作非常耗時,經常使程序處於等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO。

#!/usr/bin/python #coding:utf-8 import gevent import requests from gevent import monkey def get_page(method,url,kwargs): print(url,kwargs) response = requests.request(method=method,url=url,**kwargs) print(response.url,response.content) #發送請求 gevent.joinall([ gevent.spawn(get_page,method='get',url="https://www.baidu.com/",kwargs={}), gevent.spawn(get_page,method='get',url="https://www.sina.com.cn/",kwargs={}), ]) #發送請求:控制最大協程數量 # from gevent.pool import Pool # pool = Pool(None) # gevent.joinall([ # pool.spawn(get_page,method='get',url="https://www.baidu.com/",kwargs={}), # pool.spawn(get_page,method='get',url="https://www.sina.com.cn/",kwargs={}), # ])
3.5,grequests

#!/usr/bin/python #coding:utf-8 #grequests可以簡單理解為結合了gevent和requests import grequests request_list = [ grequests.get('http://www.baidu.com/', timeout=0.001), grequests.get('http://sina.com.cn/'), ]
3.6,Twisted框架
3.7, Tornado框架
使用順序:gevent > Twisted >Tornado >asyncio
4, 自定義異步IO框架:
上面這些異步IO框架,本質上都是基於非阻塞socket + IO多路復用(select)實現的。
非阻塞socket:
在進行網絡請求時,主要是客戶端socket和服務端socket之間的通信,客戶端socket的請求流程如下代碼所示,包括建立連接,發送請求和接收數據。其中connect()和recv()方法都會發生阻塞,直到服務端有響應時才會結束阻塞而向下執行。而當利用setblocking(False)時,socket在connect()和recv()時都會變成非阻塞,但是會直接拋出異常(沒有接受到服務端的響應),可以捕獲這個異常來做其他的任務,同時監聽connect的狀態,連接成功時繼續執行連接后的發送請求任務,select可以完成對socket狀態的監聽。
import socket #阻塞型 soc = socket.socket() soc.connect(("www.baidu.com",80)) #和服務端socket建立連接 request = """GET / HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n""" soc.send(request.encode('utf-8')) #向服務端發送請求頭 response = soc.recv(8096) #接受服務端返回數據 print response #非阻塞型 # soc = socket.socket() # soc.setblocking(False) #socket變成非阻塞,但是執行connect()會直接拋出異常(BlockingIOError) # try: # soc.connect(("www.baidu.com",80)) # except Exception as e: # pass
IO多路復用(select):
select 可以同時監聽多個文件描述符或socket對象的狀態,接受四個參數rlist, wlist, xlist,timeout(可選),返回三個列表:r, w, e ;這里只談論socket對象的部分:
r, w, e = select.select(rlist, wlist, xlist [, timeout] )
參數:
rlist: 包含socket對象的列表,select監聽其是否可以被讀取 (有數據傳來)
wlist:包含socket對象的列表,select監聽其是否可以被寫入 (連接上服務器)
xlist:包含socket對象的列表,select監聽其是否發生異常
timeout: 監聽時最多等待時間
返回值
r : 返回rlist的子集,其包含的socket對象接收到了服務端發來的數據(可以讀取了)
w:返回wlist的子集,其包含的socket對象連接上服務器,可以發送請求了(可以寫入了)
x:返回wlist的子集,其包含的socket對象發生異常
監聽列表中的元素除了為文件描述符和socket外,還可以為實現了fileno() (必須返回文件描述符)的任意對象,如下面的HttpContext對象。
class HttpContext(object): def __init__(self,socket): self.soc = socket def fileno(self): return self.soc.fileno()
基於異步IO的客戶端:

#!/usr/bin/python #coding:utf-8 import socket import select class HttpResponse(object): def __init__(self,recv_data): self.data = recv_data self.headers = [] self.body = None self.initializa() def initializa(self): headers,body=self.data.split('\r\n\r\n',1) self.body=body self.headers=headers.split('\r\n') class HttpContext(object): def __init__(self,socket,task): self.host = task['host'] self.port = task['port'] self.url = task['url'] self.method = task['method'] self.data = task['data'] self.callback = task['callback'] self.soc = socket self.buffer = [] def fileno(self): return self.soc.fileno() def send_request(self): request = """%s %s HTTP/1.0\r\nHost:%s\r\n\r\n%s"""%(self.method,self.url,self.host,self.data) self.soc.send(request.encode('utf-8')) def receive(self,size): return self.soc.recv(size) def finish(self): recv_data = ''.join(self.buffer) self.soc.close() response = HttpResponse(recv_data) self.callback(self.host,response) class AsyncRequest(object): def __init__(self): self.connection=[] self.connected=[] def set_request(self,task): try: #必須進行異常捕獲,否則會報錯,[Errno 10035] soc = socket.socket() soc.setblocking(0) soc.connect((task['host'],task['port'])) except socket.error as e: pass request = HttpContext(soc,task) print request.host self.connected.append(request) self.connection.append(request) def run(self): while True: r, w, e = select.select(self.connected, self.connection, self.connected, 0.05) # w中存放的socket連接上了服務器 for request_obj in w: request_obj.send_request() self.connection.remove(request_obj) #r中存放的socket收到了服務器傳遞來的數據 for request_obj in r: while True: try: #必須進行異常捕獲,否則會報錯,[Errno 10035] res = request_obj.receive(8096) if not res: break else: request_obj.buffer.append(res) except socket.error as e: pass request_obj.finish() self.connected.remove(request_obj) if len(self.connected)==0: break def result(host,response): print host print response.headers #print response.body loop = AsyncRequest() url_list = [ {'host':'cn.bing.com','port':80,'url':'/','method':'GET','data':'','timeout':5,'callback':result}, {'host':'www.baidu.com','port':80,'url':'/','method':'GET','data':'','timeout':5,'callback':result}, {'host':'www.sina.com','port':80,'url':'/','method':'GET','data':'','timeout':5,'callback':result}, ] for item in url_list: loop.set_request(item) loop.run()
執行結果如下,實現了一個線程中的並發,發送三個請求,並當有結果返回時,調用相應的回調函數處理請求。
基於異步IO的服務端:

#!/usr/bin/python #coding:utf-8 # 一個簡單的異步IO服務端 import socket import select soc = socket.socket() soc.bind(("127.0.0.1",8080)) soc.listen(5) soc.setblocking(0) inputs = [soc,] outputs = [] while True: r,w,e = select.select(inputs,outputs,inputs) for sk in r: if sk==soc: # 服務端收到客戶端的連接 ck,address = soc.accept() print "有連接來了:",address msg = '連接上了。。。。' ck.send(msg) ck.setblocking(0) inputs.append(ck) #將客戶端socket加入監聽隊列 else: # 客戶端發來數據 buffer=[] while True: try: #必須捕捉異常,否則會報錯,[Errno 10035] data = sk.recv(1024) except socket.error as e: data=None if not data: break buffer.append(data) recv_data = ''.join(buffer) print "接收到數據:%s" % recv_data inputs.remove(sk) sk.close() #客戶端代碼 # import socket # for i in range(3): # soc = socket.socket() # soc.connect(('127.0.0.1',8080)) # response = soc.recv(8096) # print i,response.decode('utf-8') # soc.send('我是socket--%s'%i)

import socket soc1 = socket.socket() soc1.connect(('127.0.0.1',8080)) soc2 = socket.socket() soc2.connect(('127.0.0.1',8080)) soc3 = socket.socket() soc3.connect(('127.0.0.1',8080)) soc1.send('我是socket--%s'%1) soc2.send('我是socket--%s'%2) soc3.send('我是socket--%s'%3)
對於測試客戶端一,執行后,服務端結果如下,客戶端三個socket都先連接上,隨后發送的數據也成功接收,能實現一定的異步IO。

import socket for i in range(3): soc = socket.socket() soc.connect(('127.0.0.1',8080)) response = soc.recv(8096) print i,response.decode('utf-8') soc.send('我是socket--%s'%i)
對於測試客戶端二,執行后,服務端結果如下,客戶端三個socket都連接上,且發送的數據也成功接收,但是服務器依次處理每個請求,並不能異步,這是因為客戶端和服務器間的數據交互期間會阻塞。
類似於tornado的異步非阻塞web框架(瀏覽器為客戶端)
1. 版本一

#!/usr/bin/python #coding:utf-8 # 一個簡單的異步IO服務端 import socket import select import time class HttpResponse(object): def __init__(self,content): self.content = content self.body = None self.method='' self.url='' self.protocol='' self.headers = {} self.initialize() def initialize(self): header,body = self.content.split('\r\n\r\n',1) header_list = header.split('\r\n') self.body=body #print header_list #['GET / HTTP/1.1', 'Host: 127.0.0.1:8080', 'User-Agent: Mozilla/5.0 (Windows NT 6.1; rv:63.0) Gecko/20100101 Firefox/63.0', 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language: zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', 'Accept-Encoding: gzip, deflate', 'Connection: keep-alive', 'Upgrade-Insecure-Requests: 1'] for item in header_list: temp = item.split(':') if len(temp)==1: self.method,self.url,self.protocol=temp[0].split(' ') elif len(temp)==2: self.headers[temp[0]] = temp[1] soc = socket.socket() soc.bind(("127.0.0.1",8080)) soc.listen(5) soc.setblocking(0) inputs = [soc,] outputs = [] def main(request): time.sleep(10) return 'main' def page(request): return 'page' routers = [ (r'/main',main), (r'/page',page), ] while True: r,w,e = select.select(inputs,outputs,inputs) print r for sk in r: if sk==soc: # 服務端收到客戶端的連接 print sk,'sk' ck,address = soc.accept() print "有連接來了:",address # msg = '連接上了。。。。' # ck.send(msg) ck.setblocking(0) inputs.append(ck) #將客戶端socket加入監聽隊列 else: # 客戶端發來數據 print sk, 'sk2' buffer=[] while True: try: #必須捕捉異常,否則會報錯,[Errno 10035] data = sk.recv(8096) except socket.error as e: data=None if not data: break buffer.append(data) recv_data = ''.join(buffer) # print recv_data response = HttpResponse(str(recv_data)) import re func=None for router in routers: ret = re.match(router[0], response.url) if ret: func = router[1] break if func: result = func(response) else: result='404' print result sk.sendall(result.encode('utf-8')) inputs.remove(sk) sk.close() # print "接收到數據:%s" % recv_data
上述代碼運行時,若瀏覽器先訪問‘/main’,再去訪問‘/page’會阻塞,因為服務器在處理‘/main’的函數中,sleep出現阻塞,無法同時處理‘/page’請求。
下面兩個版本,通過引入Future類,設置其屬性result,對於需要阻塞的請求返回future對象,若future的result值為None,則繼續阻塞,但當其值不會None時取消阻塞。但在其阻塞期間,其他的請求都可以正常訪問。
2. 版本二

#!/usr/bin/python #coding:utf-8 # 一個簡單的異步IO服務端 import socket import select import time class HttpRequest(object): def __init__(self,content): self.content = content self.body = None self.method='' self.url='' self.protocol='' self.headers = {} self.initialize() def initialize(self): splits = self.content.split('\r\n\r\n',1) if len(splits)==2: header, body=splits self.body = body elif len(splits)==1: header=splits[0] header_list = header.split('\r\n') #print header_list #['GET / HTTP/1.1', 'Host: 127.0.0.1:8080', 'User-Agent: Mozilla/5.0 (Windows NT 6.1; rv:63.0) Gecko/20100101 Firefox/63.0', 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language: zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', 'Accept-Encoding: gzip, deflate', 'Connection: keep-alive', 'Upgrade-Insecure-Requests: 1'] for item in header_list: temp = item.split(':') if len(temp)==1: self.method,self.url,self.protocol=temp[0].split(' ') elif len(temp)==2: self.headers[temp[0]] = temp[1] class Future(object): def __init__(self,timeout=None): self.result=None self.timeout = timeout self.starttime = time.time() def set_result(self,result): self.result=result def delay_func(self,func,args): func(args) self.set_result('main') delay_socket={} def main(request): future=Future(10) return future def page(request): return 'page' routers = [ (r'/main',main), (r'/page',page), ] def run(): soc = socket.socket() soc.bind(("127.0.0.1",8080)) soc.listen(5) soc.setblocking(0) inputs = [soc,] #r中的每一個socket數據接收后一定記得移除和關閉,不然后面每次循環還會循環到該socket,拿到的都將是空數據 while True: r,w,e = select.select(inputs,[],inputs,0.05) #timeout時間得設置,會一直阻塞 for sk in r: if sk==soc: # 服務端收到客戶端的連接 ck,address = soc.accept() print "有連接來了:",address # msg = '連接上了。。。。' # ck.send(msg) ck.setblocking(0) inputs.append(ck) #將客戶端socket加入監聽隊列 else: # 客戶端發來數據 buffer=[] while True: try: #必須捕捉異常,否則會報錯,[Errno 10035] data = sk.recv(8096) except socket.error as e: data=None if not data: break buffer.append(data) recv_data = ''.join(buffer) response = HttpRequest(str(recv_data)) import re func=None for router in routers: ret = re.match(router[0], response.url) if ret: func = router[1] break if func: result = func(response) else: #其他url訪問,如GET /favicon.ico HTTP/1.1(瀏覽器自動訪問?) result='404' print result if isinstance(result, Future): delay_socket[sk] = result else: sk.sendall(result.encode('utf-8')) inputs.remove(sk) sk.close() for sk in delay_socket.keys(): future = delay_socket[sk] timeout = future.timeout start = future.starttime ctime = time.time() if (timeout+start)<=ctime: future.result='main' if future.result: sk.sendall(future.result) inputs.remove(sk) sk.close() del delay_socket[sk] if __name__ == '__main__': run()
上述代碼運行時,若瀏覽器先訪問‘/main’,再去訪問‘/page’不會阻塞,而時立即返回,‘/main’請求在十秒后超時返回
3. 版本三

#!/usr/bin/python #coding:utf-8 # 一個簡單的異步IO服務端 import socket import select import time class HttpRequest(object): def __init__(self,content): self.content = content self.body = None self.method='' self.url='' self.protocol='' self.headers = {} self.initialize() def initialize(self): splits = self.content.split('\r\n\r\n',1) if len(splits)==2: header, body=splits self.body = body elif len(splits)==1: header=splits[0] header_list = header.split('\r\n') #print header_list #['GET / HTTP/1.1', 'Host: 127.0.0.1:8080', 'User-Agent: Mozilla/5.0 (Windows NT 6.1; rv:63.0) Gecko/20100101 Firefox/63.0', 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language: zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', 'Accept-Encoding: gzip, deflate', 'Connection: keep-alive', 'Upgrade-Insecure-Requests: 1'] for item in header_list: temp = item.split(':') if len(temp)==1: self.method,self.url,self.protocol=temp[0].split(' ') elif len(temp)==2: self.headers[temp[0]] = temp[1] class Future(object): def __init__(self,timeout=None): self.result=None self.timeout = timeout self.starttime = time.time() def set_result(self,result): self.result=result def delay_func(self,func,args): func(args) self.set_result('main') delay_socket={} future_list=[] def index(request): future=Future() future_list.append(future) return future def stopindex(request): f = future_list.pop() f.set_result('stop') return 'stopindex' routers = [ (r'/index',index), (r'/stopindex',stopindex), ] def run(): soc = socket.socket() soc.bind(("127.0.0.1",8080)) soc.listen(5) soc.setblocking(0) inputs = [soc,] #r中的每一個socket數據接收后一定記得移除和關閉,不然后面每次循環還會循環到該socket,拿到的都將是空數據 while True: r,w,e = select.select(inputs,[],inputs,0.05) #timeout時間得設置,會一直阻塞 for sk in r: if sk==soc: # 服務端收到客戶端的連接 ck,address = soc.accept() print "有連接來了:",address # msg = '連接上了。。。。' # ck.send(msg) ck.setblocking(0) inputs.append(ck) #將客戶端socket加入監聽隊列 else: # 客戶端發來數據 buffer=[] while True: try: #必須捕捉異常,否則會報錯,[Errno 10035] data = sk.recv(8096) except socket.error as e: data=None if not data: break buffer.append(data) recv_data = ''.join(buffer) response = HttpRequest(str(recv_data)) import re func=None for router in routers: ret = re.match(router[0], response.url) if ret: func = router[1] break if func: result = func(response) else: #其他url訪問,如GET /favicon.ico HTTP/1.1(瀏覽器自動訪問?) result='404' print result if isinstance(result, Future): delay_socket[sk] = result else: sk.sendall(result.encode('utf-8')) inputs.remove(sk) sk.close() for sk in delay_socket.keys(): future = delay_socket[sk] if future.result: sk.sendall(future.result) inputs.remove(sk) sk.close() del delay_socket[sk] if __name__ == '__main__': run()
上述代碼運行時,若瀏覽器先訪問‘/index’,會阻塞,而再去訪問‘/stopindex’,兩個請求都會立即訪問
參考博客:
http://www.cnblogs.com/wupeiqi/p/6536518.html
http://www.cnblogs.com/wupeiqi/articles/6229292.html
https://www.cnblogs.com/aylin/p/5572104.html