主要內容
- Gevent協程
- Select\Poll\Epoll異步IO與事件驅動
- selectors 模塊 多並發演示
協程
協程,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程。
協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:
協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
協程的好處:
- 無需線程上下文切換的開銷
- 無需原子操作鎖定及同步的開銷
- "原子操作(atomic operation)是不需要synchronized",所謂原子操作是指不會被線程調度機制打斷的操作;這種操作一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另一個線程)。原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。視作整體是原子性的核心。
- 方便切換控制流,簡化編程模型
- 高並發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高並發處理。
缺點:
- 無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
- 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序
協程一個標准定義,即符合什么條件就能稱之為協程:
- 必須在只有一個單線程里實現並發
- 修改共享數據不需加鎖
- 用戶程序里自己保存多個控制流的上下文棧
- 一個協程遇到IO操作自動切換到其它協程
Greenlet
greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可以使你在任意函數之間隨意切換,而不需把這個函數先聲明為generator
1 from greenlet import greenlet 2 def test1(): 3 print(12) 4 gr2.switch() 5 print(34) 6 gr2.switch() 7 8 def test2(): 9 print(56) 10 gr1.switch() 11 print(78) 12 13 gr1 = greenlet(test1) 14 gr2 = greenlet(test2) 15 gr1.switch()
感覺確實用着比generator還簡單了呢,但好像還沒有解決一個問題,就是遇到IO操作,自動切換,對不對?
Gevent
Gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。
1 import gevent 2 3 def func1(): 4 print('我在吃西瓜...') 5 gevent.sleep(2) 6 print('我回來繼續吃西瓜...') 7 8 def func2(): 9 print('我去吃芒果...') 10 gevent.sleep(1) 11 print('我吃完西瓜,回去繼續吃芒果...') 12 13 gevent.joinall([ 14 gevent.spawn(func1), 15 gevent.spawn(func2), 16 ])
輸出:
我在吃西瓜...
我去吃芒果...
我回來繼續吃西瓜...
我吃完西瓜,回去繼續吃芒果...
同步與異步的性能區別
1 # _*_coding:utf-8_*_ 2 # Author:Jaye He 3 4 import gevent, time 5 from urllib.request import urlopen 6 from gevent import monkey 7 8 monkey.patch_all() # 把當前程序的所有的IO操作給我單獨的做標記觸發gevent的遇到IO自動切換線程 9 10 11 def func(url): 12 print('[*] Get %s' % url) 13 res = urlopen(url) 14 data = res.read() 15 print('%d bytes received from %s' % (len(data), url)) 16 17 url = [ 18 'https://www.python.org/', 19 'https://www.yahoo.com/', 20 'https://github.com/' 21 ] 22 23 time_start = time.time() 24 25 for i in url: 26 func(i) 27 print('同步cost', time.time() - time_start) 28 29 async_time_start = time.time() 30 gevent.joinall( 31 [ 32 gevent.spawn(func, 'https://www.python.org/'), 33 gevent.spawn(func, 'https://www.yahoo.com/'), 34 gevent.spawn(func, 'https://github.com/') 35 ] 36 ) 37 print('異步cost', time.time() - async_time_start)
輸出:
[*] Get https://www.python.org/
48708 bytes received from https://www.python.org/
[*] Get https://www.yahoo.com/
478886 bytes received from https://www.yahoo.com/
[*] Get https://github.com/
55867 bytes received from https://github.com/
同步cost 3.859987258911133
[*] Get https://www.python.org/
[*] Get https://www.yahoo.com/
[*] Get https://github.com/
48708 bytes received from https://www.python.org/
55867 bytes received from https://github.com/
475663 bytes received from https://www.yahoo.com/
異步cost 1.8283183574676514
效果 很明顯
通過gevent實現單線程下的多socket並發
server side
1 import socket 2 import gevent 3 from gevent import socket, monkey 4 5 monkey.patch_all() 6 7 8 def server(port): 9 s = socket.socket() 10 s.bind(('0.0.0.0', port)) 11 s.listen(500) 12 while True: 13 cli, addr = s.accept() 14 gevent.spawn(handle_request, cli) 15 16 17 def handle_request(conn): 18 try: 19 while True: 20 data = conn.recv(1024) 21 print("recv:", data) 22 conn.send(data) 23 if not data: 24 conn.shutdown(socket.SHUT_WR) 25 26 except Exception as ex: 27 print(ex) 28 finally: 29 conn.close() 30 31 32 if __name__ == '__main__': 33 server(8001)
並發100個socket連接
1 import socket 2 import threading 3 4 5 def sock_conn(): 6 7 client = socket.socket() 8 9 client.connect(("localhost", 8001)) 10 count = 0 11 while True: 12 client.send(("hello %s" % count).encode("utf-8")) 13 14 data = client.recv(1024) 15 16 print("[%s]recv from server:" % threading.get_ident(), data.decode()) # 結果 17 count += 1 18 client.close() 19 20 21 for i in range(100): 22 t = threading.Thread(target=sock_conn) 23 t.start()
論事件驅動與異步IO
看圖說話講事件驅動模型
在UI編程中,常常要對鼠標點擊進行相應,首先如何獲得鼠標點擊呢?
方式一:創建一個線程,該線程一直循環檢測是否有鼠標點擊,那么這個方式有以下幾個缺點:
1. CPU資源浪費,可能鼠標點擊的頻率非常小,但是掃描線程還是會一直循環檢測,這會造成很多的CPU資源浪費;如果掃描鼠標點擊的接口是阻塞的呢?
2. 如果是堵塞的,又會出現下面這樣的問題,如果我們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,由於掃描鼠標時被堵塞了,那么可能永遠不會去掃描鍵盤;
3. 如果一個循環需要掃描的設備非常多,這又會引來響應時間的問題;
所以,該方式是非常不好的。
方式二:就是事件驅動模型
目前大部分的UI編程都是事件驅動模型,如很多UI平台都會提供onClick()事件,這個事件就代表鼠標按下事件。事件驅動模型大體思路如下:
1. 有一個事件(消息)隊列;
2. 鼠標按下時,往這個隊列中增加一個點擊事件(消息);
3. 有個循環,不斷從隊列取出事件,根據不同的事件,調用不同的函數,如onClick()、onKeyDown()等;
4. 事件(消息)一般都各自保存各自的處理函數指針,這樣,每個消息都有獨立的處理函數;
事件驅動編程是一種編程范式,這里程序的執行流由外部事件來決定。它的特點是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程范式是(單線程)同步以及多線程編程。
讓我們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展示了隨着時間的推移,這三種模式下程序所做的工作。這個程序有3個任務需要完成,每個任務都在等待I/O操作時阻塞自身。阻塞在I/O操作上所花費的時間已經用灰色框標示出來了。
在單線程同步模型中,任務按照順序執行。如果某個任務因為I/O而阻塞,其他所有的任務都必須等待,直到它完成之后它們才能依次執行。這種明確的執行順序和串行化處理的行為是很容易推斷得出的。如果任務之間並沒有互相依賴的關系,但仍然需要互相等待的話這就使得程序不必要的降低了運行速度。
在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操作系統來管理,在多處理器系統上可以並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其他線程得以繼續執行。與完成類似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,因為這類程序不得不通過線程同步機制如鎖、可重入函數、線程局部存儲或者其他機制來處理線程安全問題,如果實現不當就會導致出現微妙且令人痛不欲生的bug。
在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其他昂貴的操作時,注冊一個回調到事件循環中,然后當I/O操作完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢所有的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序盡可能的得以執行而不需要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行為,因為程序員不需要關心線程安全問題。
當我們面對如下的環境時,事件驅動模型通常是一個好的選擇:
- 程序中有許多任務,而且…
- 任務之間高度獨立(因此它們不需要互相通信,或者等待彼此)而且…
- 在等待事件到來時,某些任務會阻塞。
當應用程序需要在任務間共享可變的數據時,這也是一個不錯的選擇,因為這里不需要采用同步處理。
網絡應用程序通常都有上述這些特點,這使得它們能夠很好的契合事件驅動編程模型。
Select\Poll\Epoll異步IO
參考alex老師 講解的 Select\Poll\Epoll 發展 和 Select詳解
http://www.cnblogs.com/alex3714/p/4372426.html
select 多並發socket 例子

import select import socket import sys import queue server = socket.socket() server.setblocking(0) server_addr = ('localhost',10000) print('starting up on %s port %s' % server_addr) server.bind(server_addr) server.listen(5) inputs = [server, ] #自己也要監測呀,因為server本身也是個fd outputs = [] message_queues = {} while True: print("waiting for next event...") readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果沒有任何fd就緒,那程序就會一直阻塞在這里 for s in readable: #每個s就是一個socket if s is server: #別忘記,上面我們server自己也當做一個fd放在了inputs列表里,傳給了select,如果這個s是server,代表server這個fd就緒了, #就是有活動了, 什么情況下它才有活動? 當然 是有新連接進來的時候 呀 #新連接進來了,接受這個連接 conn, client_addr = s.accept() print("new connection from",client_addr) conn.setblocking(0) inputs.append(conn) #為了不阻塞整個程序,我們不會立刻在這里開始接收客戶端發來的數據, 把它放到inputs里, 下一次loop時,這個新連接 #就會被交給select去監聽,如果這個連接的客戶端發來了數據 ,那這個連接的fd在server端就會變成就續的,select就會把這個連接返回,返回到 #readable 列表里,然后你就可以loop readable列表,取出這個連接,開始接收數據了, 下面就是這么干 的 message_queues[conn] = queue.Queue() #接收到客戶端的數據后,不立刻返回 ,暫存在隊列里,以后發送 else: #s不是server的話,那就只能是一個 與客戶端建立的連接的fd了 #客戶端的數據過來了,在這接收 data = s.recv(1024) if data: print("收到來自[%s]的數據:" % s.getpeername()[0], data) message_queues[s].put(data) #收到的數據先放到queue里,一會返回給客戶端 if s not in outputs: outputs.append(s) #為了不影響處理與其它客戶端的連接 , 這里不立刻返回數據給客戶端 else:#如果收不到data代表什么呢? 代表客戶端斷開了呀 print("客戶端斷開了",s) if s in outputs: outputs.remove(s) #清理已斷開的連接 inputs.remove(s) #清理已斷開的連接 del message_queues[s] ##清理已斷開的連接 for s in writeable: try : next_msg = message_queues[s].get_nowait() except queue.Empty: print("client [%s]" %s.getpeername()[0], "queue is empty..") outputs.remove(s) else: print("sending msg to [%s]"%s.getpeername()[0], next_msg) s.send(next_msg.upper()) for s in exeptional: print("handling exception for ",s.getpeername()) inputs.remove(s) if s in outputs: outputs.remove(s) s.close() del message_queues[s]

import socket import sys messages = [ b'This is the message. ', b'It will be sent ', b'in parts.', ] server_address = ('localhost', 10000) # Create a TCP/IP socket socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), ] # Connect the socket to the port where the server is listening print('connecting to %s port %s' % server_address) for s in socks: s.connect(server_address) for message in messages: # Send messages on both sockets for s in socks: print('%s: sending "%s"' % (s.getsockname(), message) ) s.send(message) # Read responses on both sockets for s in socks: data = s.recv(1024) print( '%s: received "%s"' % (s.getsockname(), data) ) if not data: print(sys.stderr, 'closing socket', s.getsockname() )
selectors模塊多並發演示
使用selectors 模塊(協程)實現500並發上傳下載, 在本機win10上測試超過500就出現以下情況,主要是windows上selectors 使用的是select, 顯示使用的fd太多,有限制
ValueError: too many file descriptors in select()
selectors 服務端
1 # _*_coding:utf-8_*_ 2 # Author:Jaye He 3 4 import selectors 5 import socket 6 import os 7 8 9 class SelectorsServer(object): 10 11 def __init__(self, address): 12 self.address = address 13 self.server = socket.socket() 14 self.server.setblocking(False) 15 self.sel = selectors.DefaultSelector() 16 17 self.fd = {} # 儲存每個conn的對應文件句柄和文件大小 18 19 def server_start(self): 20 """開啟Server,然后交給self.accept監聽連接請求(接受數據注冊EVENT_READ事件)""" 21 self.server.bind(self.address) 22 self.server.listen(1000) 23 self.sel.register(self.server, selectors.EVENT_READ, self.accept) 24 25 def accept(self, server): 26 """建立連接,然后交給self.read接受從客戶端來的數據(接受數據注冊EVENT_READ事件)""" 27 conn, addr = server.accept() 28 conn.setblocking(False) 29 # print('accepted form', conn, addr) 30 self.sel.register(conn, selectors.EVENT_READ, self.read) 31 32 def read(self, conn): 33 """接受從客戶端來的數據,然后判斷需要的操作""" 34 data = conn.recv(1024) 35 if data: 36 if data == b'get': 37 # 接收到'get', 就打開文件'歌詞.txt',把文件句柄傳入self.fd 38 # 然后注冊EVENT_WRITE事件交給self.get_write發數據給客戶端 39 f = open('歌詞.txt', 'rb') 40 file_size = os.path.getsize('歌詞.txt') 41 conn.send(str(file_size).encode()) 42 self.fd.update({conn: {'fd': f, 'file_size': file_size}}) 43 self.sel.unregister(conn) 44 self.sel.register(conn, selectors.EVENT_WRITE, self.get_write) 45 elif data == b'put': 46 # 接收到'put', 就注冊EVENT_READ事件交給self.put_read處理 47 self.sel.unregister(conn) 48 self.sel.register(conn, selectors.EVENT_READ, self.put_read) 49 else: 50 # 接受到空數據就從事件列表中注銷conn的事件,同時關閉conn 51 print('\033[1;33mclosing %s\033[0m' % conn) 52 self.sel.unregister(conn) 53 conn.close() 54 55 def put_read(self, conn): 56 """接受並處理客戶端發來的數據""" 57 data = conn.recv(1024) 58 if data: # 有數據,打印數據大小和對應conn 59 print(len(data), conn) 60 else: # 沒有就注銷conn,並關閉conn 61 print('\033[1;33mclosing %s\033[0m' % conn) 62 self.sel.unregister(conn) 63 conn.close() 64 65 def get_write(self, conn): 66 """給客戶端發送數據""" 67 f = self.fd[conn]['fd'] 68 file_size = self.fd[conn]['file_size'] 69 conn.send(f.readline()) 70 progress = f.tell() 71 if progress == file_size: 72 # 文件發送完畢,然后繼續注冊EVENT_READ事件,交給self.read處理 73 self.sel.unregister(conn) 74 self.sel.register(conn, selectors.EVENT_READ, self.read) 75 76 def monitor(self): 77 """監聽EVENT事件列表,有活動的事件就交給相應方法處理""" 78 while True: 79 events_list = self.sel.select() 80 for key, mask in events_list: 81 callback = key.data 82 callback(key.fileobj) 83 84 85 def main(): 86 fs = SelectorsServer(('0.0.0.0', 9000)) 87 fs.server_start() 88 fs.monitor() 89 90 if __name__ == '__main__': 91 main()
selectors 500並發客戶端
1 # _*_coding:utf-8_*_ 2 # Author:Jaye He 3 4 import socket 5 import threading 6 7 8 def sock_conn(): 9 pid = threading.current_thread() 10 client = socket.socket() 11 client.connect(('localhost', 9000)) 12 13 # 從服務器下載數據 14 client.send(b'get') 15 file_size = int(client.recv(1024).decode()) 16 get_size = 0 17 while True: 18 get_data = client.recv(1024) 19 get_size += len(get_data) 20 print('get', '%sbytes' % len(get_data), pid) 21 if get_size == file_size: 22 break 23 print('\033[1;33mGet Completed %s\033[0m' % pid) 24 25 # 向服務器上傳數據 26 client.send(b'put') 27 f = open('歌詞.txt', 'rb') 28 while True: 29 data = f.readline() 30 if len(data) != 0: 31 client.send(data) 32 print('put', '%sbytes' % len(data), pid) 33 else: 34 print('\033[1;33mPut Completed %s\033[0m' % pid) 35 break 36 f.close() 37 client.close() 38 39 threading_list = [] 40 41 for i in range(500): 42 t = threading.Thread(target=sock_conn) 43 threading_list.append(t) 44 t.start() 45 46 for t in threading_list: 47 t.join()