python 協程, 異步IO Select 和 selectors 模塊 多並發演示


主要內容

  1. Gevent協程
  2. Select\Poll\Epoll異步IO與事件驅動
  3. selectors 模塊 多並發演示

協程

協程,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程

協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:

協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

 

協程的好處:

  • 無需線程上下文切換的開銷
  • 無需原子操作鎖定及同步的開銷
    •   "原子操作(atomic operation)是不需要synchronized",所謂原子操作是指不會被線程調度機制打斷的操作;這種操作一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另一個線程)。原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。視作整體是原子性的核心。
  • 方便切換控制流,簡化編程模型
  • 高並發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高並發處理。

缺點:

  • 無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
  • 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序

協程一個標准定義,即符合什么條件就能稱之為協程:

  1. 必須在只有一個單線程里實現並發
  2. 修改共享數據不需加鎖
  3. 用戶程序里自己保存多個控制流的上下文棧
  4. 一個協程遇到IO操作自動切換到其它協程

Greenlet

greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可以使你在任意函數之間隨意切換,而不需把這個函數先聲明為generator

 

 1 from greenlet import greenlet
 2 def test1():
 3     print(12)
 4     gr2.switch()
 5     print(34)
 6     gr2.switch()
 7 
 8 def test2():
 9      print(56)
10      gr1.switch()
11      print(78)
12 
13 gr1 = greenlet(test1)
14 gr2 = greenlet(test2)
15 gr1.switch()

感覺確實用着比generator還簡單了呢,但好像還沒有解決一個問題,就是遇到IO操作,自動切換,對不對?

 

Gevent 

Gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。

 1 import gevent
 2 
 3 def func1():
 4     print('我在吃西瓜...')
 5     gevent.sleep(2)
 6     print('我回來繼續吃西瓜...')
 7     
 8 def func2():
 9     print('我去吃芒果...')
10     gevent.sleep(1)
11     print('我吃完西瓜,回去繼續吃芒果...')
12     
13 gevent.joinall([
14         gevent.spawn(func1),
15         gevent.spawn(func2),
16   ])      

輸出:

我在吃西瓜...

我去吃芒果...

我回來繼續吃西瓜...

我吃完西瓜,回去繼續吃芒果...

 

同步與異步的性能區別 

 1 # _*_coding:utf-8_*_
 2 # Author:Jaye He
 3 
 4 import gevent, time
 5 from urllib.request import urlopen
 6 from gevent import monkey
 7 
 8 monkey.patch_all()  # 把當前程序的所有的IO操作給我單獨的做標記觸發gevent的遇到IO自動切換線程
 9 
10 
11 def func(url):
12     print('[*] Get %s' % url)
13     res = urlopen(url)
14     data = res.read()
15     print('%d bytes received from %s' % (len(data), url))
16 
17 url = [
18     'https://www.python.org/',
19     'https://www.yahoo.com/',
20     'https://github.com/'
21 ]
22 
23 time_start = time.time()
24 
25 for i in url:
26     func(i)
27 print('同步cost', time.time() - time_start)
28 
29 async_time_start = time.time()
30 gevent.joinall(
31     [
32         gevent.spawn(func, 'https://www.python.org/'),
33         gevent.spawn(func, 'https://www.yahoo.com/'),
34         gevent.spawn(func, 'https://github.com/')
35     ]
36 )
37 print('異步cost', time.time() - async_time_start)

輸出:

[*] Get https://www.python.org/
48708 bytes received from https://www.python.org/
[*] Get https://www.yahoo.com/
478886 bytes received from https://www.yahoo.com/
[*] Get https://github.com/
55867 bytes received from https://github.com/
同步cost 3.859987258911133
[*] Get https://www.python.org/
[*] Get https://www.yahoo.com/
[*] Get https://github.com/
48708 bytes received from https://www.python.org/
55867 bytes received from https://github.com/
475663 bytes received from https://www.yahoo.com/
異步cost 1.8283183574676514

效果 很明顯

通過gevent實現單線程下的多socket並發

server side 

 1 import socket
 2 import gevent
 3 from gevent import socket, monkey
 4 
 5 monkey.patch_all()
 6 
 7 
 8 def server(port):
 9     s = socket.socket()
10     s.bind(('0.0.0.0', port))
11     s.listen(500)
12     while True:
13         cli, addr = s.accept()
14         gevent.spawn(handle_request, cli)
15 
16 
17 def handle_request(conn):
18     try:
19         while True:
20             data = conn.recv(1024)
21             print("recv:", data)
22             conn.send(data)
23             if not data:
24                 conn.shutdown(socket.SHUT_WR)
25 
26     except Exception as ex:
27         print(ex)
28     finally:
29         conn.close()
30 
31 
32 if __name__ == '__main__':
33     server(8001)

並發100個socket連接

 1 import socket
 2 import threading
 3 
 4 
 5 def sock_conn():
 6 
 7     client = socket.socket()
 8 
 9     client.connect(("localhost", 8001))
10     count = 0
11     while True:
12         client.send(("hello %s" % count).encode("utf-8"))
13 
14         data = client.recv(1024)
15 
16         print("[%s]recv from server:" % threading.get_ident(), data.decode())    # 結果
17         count += 1
18     client.close()
19 
20 
21 for i in range(100):
22     t = threading.Thread(target=sock_conn)
23     t.start()

論事件驅動與異步IO

通常,我們寫服務器處理模型的程序時,有以下幾種模型:
(1)每收到一個請求,創建一個新的進程,來處理該請求;
(2)每收到一個請求,創建一個新的線程,來處理該請求;
(3)每收到一個請求,放入一個事件列表,讓主進程通過非阻塞I/O方式來處理請求
上面的幾種方式,各有千秋,
第(1)中方法,由於創建新的進程的開銷比較大,所以,會導致服務器性能比較差,但實現比較簡單。
第(2)種方式,由於要涉及到線程的同步,有可能會面臨 死鎖等問題。
第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都復雜。
綜合考慮各方面因素,一般普遍認為第(3)種方式是大多數 網絡服務器采用的方式

看圖說話講事件驅動模型

在UI編程中,常常要對鼠標點擊進行相應,首先如何獲得鼠標點擊呢?
方式一:創建一個線程,該線程一直循環檢測是否有鼠標點擊,那么這個方式有以下幾個缺點
1. CPU資源浪費,可能鼠標點擊的頻率非常小,但是掃描線程還是會一直循環檢測,這會造成很多的CPU資源浪費;如果掃描鼠標點擊的接口是阻塞的呢?
2. 如果是堵塞的,又會出現下面這樣的問題,如果我們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,由於掃描鼠標時被堵塞了,那么可能永遠不會去掃描鍵盤;
3. 如果一個循環需要掃描的設備非常多,這又會引來響應時間的問題;
所以,該方式是非常不好的。

方式二:就是事件驅動模型
目前大部分的UI編程都是事件驅動模型,如很多UI平台都會提供onClick()事件,這個事件就代表鼠標按下事件。事件驅動模型大體思路如下:
1. 有一個事件(消息)隊列;
2. 鼠標按下時,往這個隊列中增加一個點擊事件(消息);
3. 有個循環,不斷從隊列取出事件,根據不同的事件,調用不同的函數,如onClick()、onKeyDown()等;
4. 事件(消息)一般都各自保存各自的處理函數指針,這樣,每個消息都有獨立的處理函數;

事件驅動編程是一種編程范式,這里程序的執行流由外部事件來決定。它的特點是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程范式是(單線程)同步以及多線程編程。

讓我們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展示了隨着時間的推移,這三種模式下程序所做的工作。這個程序有3個任務需要完成,每個任務都在等待I/O操作時阻塞自身。阻塞在I/O操作上所花費的時間已經用灰色框標示出來了。

 

 

 

在單線程同步模型中,任務按照順序執行。如果某個任務因為I/O而阻塞,其他所有的任務都必須等待,直到它完成之后它們才能依次執行。這種明確的執行順序和串行化處理的行為是很容易推斷得出的。如果任務之間並沒有互相依賴的關系,但仍然需要互相等待的話這就使得程序不必要的降低了運行速度。

在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操作系統來管理,在多處理器系統上可以並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其他線程得以繼續執行。與完成類似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,因為這類程序不得不通過線程同步機制如鎖、可重入函數、線程局部存儲或者其他機制來處理線程安全問題,如果實現不當就會導致出現微妙且令人痛不欲生的bug。

在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其他昂貴的操作時,注冊一個回調到事件循環中,然后當I/O操作完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢所有的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序盡可能的得以執行而不需要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行為,因為程序員不需要關心線程安全問題。

當我們面對如下的環境時,事件驅動模型通常是一個好的選擇:

  1. 程序中有許多任務,而且…
  2. 任務之間高度獨立(因此它們不需要互相通信,或者等待彼此)而且…
  3. 在等待事件到來時,某些任務會阻塞。

當應用程序需要在任務間共享可變的數據時,這也是一個不錯的選擇,因為這里不需要采用同步處理。

網絡應用程序通常都有上述這些特點,這使得它們能夠很好的契合事件驅動編程模型。

Select\Poll\Epoll異步IO 

參考alex老師 講解的 Select\Poll\Epoll 發展 和 Select詳解

http://www.cnblogs.com/alex3714/p/4372426.html

select 多並發socket 例子

import select
import socket
import sys
import queue


server = socket.socket()
server.setblocking(0)

server_addr = ('localhost',10000)

print('starting up on %s port %s' % server_addr)
server.bind(server_addr)

server.listen(5)


inputs = [server, ] #自己也要監測呀,因為server本身也是個fd
outputs = []

message_queues = {}

while True:
    print("waiting for next event...")

    readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果沒有任何fd就緒,那程序就會一直阻塞在這里

    for s in readable: #每個s就是一個socket

        if s is server: #別忘記,上面我們server自己也當做一個fd放在了inputs列表里,傳給了select,如果這個s是server,代表server這個fd就緒了,
            #就是有活動了, 什么情況下它才有活動? 當然 是有新連接進來的時候 呀
            #新連接進來了,接受這個連接
            conn, client_addr = s.accept()
            print("new connection from",client_addr)
            conn.setblocking(0)
            inputs.append(conn) #為了不阻塞整個程序,我們不會立刻在這里開始接收客戶端發來的數據, 把它放到inputs里, 下一次loop時,這個新連接
            #就會被交給select去監聽,如果這個連接的客戶端發來了數據 ,那這個連接的fd在server端就會變成就續的,select就會把這個連接返回,返回到
            #readable 列表里,然后你就可以loop readable列表,取出這個連接,開始接收數據了, 下面就是這么干 的

            message_queues[conn] = queue.Queue() #接收到客戶端的數據后,不立刻返回 ,暫存在隊列里,以后發送

        else: #s不是server的話,那就只能是一個 與客戶端建立的連接的fd了
            #客戶端的數據過來了,在這接收
            data = s.recv(1024)
            if data:
                print("收到來自[%s]的數據:" % s.getpeername()[0], data)
                message_queues[s].put(data) #收到的數據先放到queue里,一會返回給客戶端
                if s not  in outputs:
                    outputs.append(s) #為了不影響處理與其它客戶端的連接 , 這里不立刻返回數據給客戶端


            else:#如果收不到data代表什么呢? 代表客戶端斷開了呀
                print("客戶端斷開了",s)

                if s in outputs:
                    outputs.remove(s) #清理已斷開的連接

                inputs.remove(s) #清理已斷開的連接

                del message_queues[s] ##清理已斷開的連接


    for s in writeable:
        try :
            next_msg = message_queues[s].get_nowait()

        except queue.Empty:
            print("client [%s]" %s.getpeername()[0], "queue is empty..")
            outputs.remove(s)

        else:
            print("sending msg to [%s]"%s.getpeername()[0], next_msg)
            s.send(next_msg.upper())


    for s in exeptional:
        print("handling exception for ",s.getpeername())
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()

        del message_queues[s]
select socket server
import socket
import sys

messages = [ b'This is the message. ',
             b'It will be sent ',
             b'in parts.',
             ]
server_address = ('localhost', 10000)

# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]

# Connect the socket to the port where the server is listening
print('connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)

for message in messages:

    # Send messages on both sockets
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message) )
        s.send(message)

    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print( '%s: received "%s"' % (s.getsockname(), data) )
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname() )
select socket client

selectors模塊多並發演示

使用selectors 模塊(協程)實現500並發上傳下載, 在本機win10上測試超過500就出現以下情況,主要是windows上selectors 使用的是select, 顯示使用的fd太多,有限制

ValueError: too many file descriptors in select()

selectors 服務端

 1 # _*_coding:utf-8_*_
 2 # Author:Jaye He
 3 
 4 import selectors
 5 import socket
 6 import os
 7 
 8 
 9 class SelectorsServer(object):
10 
11     def __init__(self, address):
12         self.address = address
13         self.server = socket.socket()
14         self.server.setblocking(False)
15         self.sel = selectors.DefaultSelector()
16 
17         self.fd = {}   # 儲存每個conn的對應文件句柄和文件大小
18 
19     def server_start(self):
20         """開啟Server,然后交給self.accept監聽連接請求(接受數據注冊EVENT_READ事件)"""
21         self.server.bind(self.address)
22         self.server.listen(1000)
23         self.sel.register(self.server, selectors.EVENT_READ, self.accept)
24 
25     def accept(self, server):
26         """建立連接,然后交給self.read接受從客戶端來的數據(接受數據注冊EVENT_READ事件)"""
27         conn, addr = server.accept()
28         conn.setblocking(False)
29         # print('accepted form', conn, addr)
30         self.sel.register(conn, selectors.EVENT_READ, self.read)
31 
32     def read(self, conn):
33         """接受從客戶端來的數據,然后判斷需要的操作"""
34         data = conn.recv(1024)
35         if data:
36             if data == b'get':
37                 # 接收到'get', 就打開文件'歌詞.txt',把文件句柄傳入self.fd
38                 # 然后注冊EVENT_WRITE事件交給self.get_write發數據給客戶端
39                 f = open('歌詞.txt', 'rb')
40                 file_size = os.path.getsize('歌詞.txt')
41                 conn.send(str(file_size).encode())
42                 self.fd.update({conn: {'fd': f, 'file_size': file_size}})
43                 self.sel.unregister(conn)
44                 self.sel.register(conn, selectors.EVENT_WRITE, self.get_write)
45             elif data == b'put':
46                 # 接收到'put', 就注冊EVENT_READ事件交給self.put_read處理
47                 self.sel.unregister(conn)
48                 self.sel.register(conn, selectors.EVENT_READ, self.put_read)
49         else:
50             # 接受到空數據就從事件列表中注銷conn的事件,同時關閉conn
51             print('\033[1;33mclosing %s\033[0m' % conn)
52             self.sel.unregister(conn)
53             conn.close()
54 
55     def put_read(self, conn):
56         """接受並處理客戶端發來的數據"""
57         data = conn.recv(1024)
58         if data:     # 有數據,打印數據大小和對應conn
59             print(len(data), conn)
60         else:        # 沒有就注銷conn,並關閉conn
61             print('\033[1;33mclosing %s\033[0m' % conn)
62             self.sel.unregister(conn)
63             conn.close()
64 
65     def get_write(self, conn):
66         """給客戶端發送數據"""
67         f = self.fd[conn]['fd']
68         file_size = self.fd[conn]['file_size']
69         conn.send(f.readline())
70         progress = f.tell()
71         if progress == file_size:
72             # 文件發送完畢,然后繼續注冊EVENT_READ事件,交給self.read處理
73             self.sel.unregister(conn)
74             self.sel.register(conn, selectors.EVENT_READ, self.read)
75 
76     def monitor(self):
77         """監聽EVENT事件列表,有活動的事件就交給相應方法處理"""
78         while True:
79             events_list = self.sel.select()
80             for key, mask in events_list:
81                 callback = key.data
82                 callback(key.fileobj)
83 
84 
85 def main():
86     fs = SelectorsServer(('0.0.0.0', 9000))
87     fs.server_start()
88     fs.monitor()
89 
90 if __name__ == '__main__':
91     main()

selectors 500並發客戶端

 1 # _*_coding:utf-8_*_
 2 # Author:Jaye He
 3 
 4 import socket
 5 import threading
 6 
 7 
 8 def sock_conn():
 9     pid = threading.current_thread()
10     client = socket.socket()
11     client.connect(('localhost', 9000))
12 
13     # 從服務器下載數據
14     client.send(b'get')
15     file_size = int(client.recv(1024).decode())
16     get_size = 0
17     while True:
18         get_data = client.recv(1024)
19         get_size += len(get_data)
20         print('get', '%sbytes' % len(get_data), pid)
21         if get_size == file_size:
22             break
23     print('\033[1;33mGet Completed %s\033[0m' % pid)
24 
25     # 向服務器上傳數據
26     client.send(b'put')
27     f = open('歌詞.txt', 'rb')
28     while True:
29         data = f.readline()
30         if len(data) != 0:
31             client.send(data)
32             print('put', '%sbytes' % len(data), pid)
33         else:
34             print('\033[1;33mPut Completed %s\033[0m' % pid)
35             break
36     f.close()
37     client.close()
38 
39 threading_list = []
40 
41 for i in range(500):
42     t = threading.Thread(target=sock_conn)
43     threading_list.append(t)
44     t.start()
45 
46 for t in threading_list:
47     t.join()

 selectors 詳細請參考官方文檔


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM