協程和異步io


一. 並發、並行、同步、異步、阻塞、非阻塞

  1.並發:是指一個時間段中有幾個程序都處於已啟動運行到運行完畢之間,且這幾個程序都是在同一個處理機(CPU)上運行,但任一個時刻點上只有一個程序在處理機上運行。

  2.並行:是指任何時間點,有多個程序運行在多個CPU上(最多和CPU數量一致)。

  3.並發和並行的區別:

    並發和並行是即相似又有區別的兩個概念,並行是指兩個或者多個事件在同一時刻發生;而並發是指兩個或多個事件在同一時間間隔內發生。在多道程序環境下,並發性是指在一段時間內宏觀上有多個程序在同時運行,但在單處理機系統中,每一時刻卻僅能有一道程序執行,故微觀上這些程序只能是分時地交替執行。倘若在計算機系統中有多個處理機,則這些可以並發執行的程序便可被分配到多個處理機上,實現並行執行,即利用每個處理機來處理一個可並發執行的程序,這樣,多個程序便可以同時執行。

  4.同步:是指代碼調用IO操作時,必須等待IO操作完成才能返回的調用方式。

  5.異步:是指代碼調用IO操作時,不必等待IO操作完成就能返回的調用方式。

  6.阻塞:是指調用函數的時候當前線程被掛起。

  7.非阻塞:是指調用函數的時候當前線程不會被掛起,而是立即返回。

二. C10K問題和io多路復用(select、poll、epoll)

  1.C10K問題:

    謂c10k問題,指的是服務器同時支持成千上萬個客戶端的問題,也就是concurrent 10 000 connection(這也是c10k這個名字的由來)。由於硬件成本的大幅度降低和硬件技術的進步,如果一台服務器同時能夠服務更多的客戶端,那么也就意味着服務每一個客戶端的成本大幅度降低,從這個角度來看,問題顯得非常有意義。

  2.五種I/O模型(詳情:https://www.cnblogs.com/findumars/p/6361627.html):

    

 

    5.1阻塞I式/O:系統調用不會立即返回結果,當前線程會阻塞,等到獲得結果或報錯時在返回(問題:如在調用send()的同時,線程將被阻塞,在此期間,線程將無法執行任何運算或響應任何的網絡請求。)

 

    5.2非阻塞式I/O:調用后立即返回結果(問題:不一定三次握手成功,recv() 會被循環調用,循環調用recv()將大幅度推高CPU 占用率),做計算任務或者再次發起其他連接就較有優勢

 

 

    5.3I/O復用:它的基本原理就是select/epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有數據到達了,就通知用戶進程。(阻塞式的方法,可以監聽多個socket狀態)(問題:將數據從內核復制到用戶空間的時間不能省)

 

    5.4信號驅動式I/O:運用較少

 

    5.5異步I/O:它就像是用戶進程將整個IO操作交給了他人(kernel)完成,然后他人做完后發信號通知。在此期間,用戶進程不需要去檢查IO操作的狀態,也不需要主動的去拷貝數據。

  3.解決方法(參照:https://blog.csdn.net/wangtaomtk/article/details/51811011):

    3.1每個線程/進程處理一個連接:

      但是由於申請進程/線程會占用相當可觀的系統資源,同時對於多進程/線程的管理會對系統造成壓力,因此這種方案不具備良好的可擴展性。因此,這一思路在服務器資源還沒有富裕到足夠程度的時候,是不可行的;即便資源足夠富裕,效率也不夠高。

      問題:資源占用過多,可擴展性差。

    3.2每個進程/線程同時處理多個連接(IO多路復用):

      3.2.1傳統思路

        最簡單的方法是循環挨個處理各個連接,每個連接對應一個 socket當所有 socket 都有數據的時候,這種方法是可行的。但是當應用讀取某個 socket 的文件數據不 ready 的時候,整個應用會阻塞在這里等待該文件句柄,即使別的文件句柄 ready,也無法往下處理。    

        思路:直接循環處理多個連接。問題:任一文件句柄的不成功會阻塞住整個應用。

      3.2.2select:

 

       

        思路:有連接請求抵達了再檢查處理。

        問題:句柄上限+重復初始化+逐個排查所有文件句柄狀態效率不高。

      3.2.3poll

 

        思路:設計新的數據結構提供使用效率。

        問題:逐個排查所有文件句柄狀態效率不高。

      3.2.4epoll(nginx使用的是epoll)

 

        思路:只返回狀態變化的文件句柄。

        問題:依賴特定平台(Linux)。

    注:epoll不一定比select好(在高並發的情況下,連接活躍度不是很高,epoll比select好;在並發性不高,同時連接很活躍select比epoll好(游戲))

三. epoll+回調+事件循環方式url

  1. 通過非阻塞I/O實現http請求:

 1 import socket
 2 from urllib.parse import urlparse
 3 
 4 def get_url(url):
 5     #通過socket請求html
 6     url=urlparse(url)
 7     host=url.netloc
 8     path=url.path
 9     if path=="":
10         path="/"
11     #建立socket連接
12     client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
13     #設置成非阻塞(拋異常:BlockingIOError: [WinError 10035] 無法立即完成一個非阻止性套接字操作。)
14     client.setblocking(False)
15     try:
16         client.connect((host,80))
17     except BlockingIOError as e:
18         pass
19     #向服務器發送數據(還未連接會拋異常)
20     while True:
21         try:
22             client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
23             break
24         except OSError as e:
25             pass
26     #將數據讀取完
27     data=b""
28     while True:
29         try:
30             d=client.recv(1024)
31         except BlockingIOError as e:
32             continue
33         if d:
34             data+=d
35         else:
36             break
37     #會將header信息作為返回字符串
38     data=data.decode('utf8')
39     print(data.split('\r\n\r\n')[1])
40     client.close()
41 
42 if __name__=='__main__':
43     get_url('http://www.baidu.com')
View Code

  2.使用select完成http請求(循環回調):

    優點:並發性高(驅動整個程序主要是回調循環loop(),不會等待,請求操作系統有什么准備好了,准備好了就執行【沒有線程切換等,只有一個線程,當一個url連接建立完成后就會注冊,然后回調執行】,省去了線程切換和內存)

 1 #自動根據環境選擇poll和epoll
 2 from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE
 3 selector=DefaultSelector()
 4 urls=[]
 5 #全局變量
 6 stop=False
 7 class Fetcher:
 8     def connected(self, key):
 9         #取消注冊
10         selector.unregister(key.fd)
11         self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
12         selector.register(self.client.fileno(),EVENT_READ,self.readable)
13 
14     def readable(self,key):
15         d = self.client.recv(1024)
16         if d:
17             self.data += d
18         else:
19             selector.unregister(key.fd)
20             # 會將header信息作為返回字符串
21             data = self.data.decode('utf8')
22             print(data.split('\r\n\r\n')[1])
23             self.client.close()
24             urls.remove(self.spider_url)
25             if not urls:
26                 global stop
27                 stop=True
28 
29     def get_url(self,url):
30         self.spider_url = url
31         url = urlparse(url)
32         self.host = url.netloc
33         self.path = url.path
34         self.data = b""
35         if self.path == "":
36             self.path = "/"
37         # 建立socket連接
38         self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
39         self.client.setblocking(False)
40         try:
41             self.client.connect((self.host, 80))
42         except BlockingIOError as e:
43             pass
44 
45         #注冊寫事件,及回調函數
46         selector.register(self.client.fileno(),EVENT_WRITE,self.connected)
47 
48 def loop():
49     #回調+事件循環+select(poll/epoll)
50     #事件循環,不停的調用socket的狀態並調用對應的回調函數
51     #判斷哪個可讀可寫,select本身不支持register模式
52     #socket狀態變化后的回調使用程序員完成的
53     if not stop:
54         while True:
55             ready=selector.select()
56             for key,mask in ready:
57                 call_back=key.data
58                 call_back(key)
59 
60 
61 if __name__=='__main__':
62     fetcher=Fetcher()
63     fetcher.get_url('http://www.baidu.com')
64     loop()
View Code

四. 回調之痛

  1.可讀性差;2.共享狀態管理困難;3.異常處理困難

  

協程能解決            

五. C10M問題和協程

  1.C10M問題:

    如何利用8核心CPU,64G內存,在10gps的網絡上保持1000萬的並發連接。

  2.協程:

    2.1問題:回調模式編碼復雜度高;同步編程的並發性不高;多線程需要線程間同步,look會降低性能

    2.2解決:

      采用同步的方式去編寫異步的代碼;

      采用單線程去解決任務:線程是由操作系統切換,單線程切換意味着需要我們自己去調度任務;不在需要鎖,並發性高,如果單線程內切換函數,性能遠高於線程切換,並發性更高。

    2.3協程:

      傳統函數調用 過程 A->B->C;

      我們需要一個可以暫停的函數,並且可以在適當的時候恢復該函數的繼續執行;

     出現了協程 -> 有多個入口的函數, 可以暫停的函數, 可以暫停的函數(可以向暫停的地方傳入值);
 1 def get_url(url):
 2     #do someting 1
 3     html = get_html(url) #此處暫停,切換到另一個函數去執行
 4     # #parse html
 5     urls = parse_url(html)
 6 
 7 def get_url(url):
 8     #do someting 1
 9     html = get_html(url) #此處暫停,切換到另一個函數去執行
10     # #parse html
11     urls = parse_url(html)
View Code

六. 生成器的send和yield from

  1.生成器send和next方法:

    啟動生成器方式有兩種:1.next();2.send();

    生成器可以產出值;也可以接收值(調用方傳遞進來的值);

    send方法可以傳遞值進入生成器內部,同時還可以重啟生成器執行到下一個yield的位置(注:在調用send()發送非none之前,我們必須啟動一次生成器,否則會拋錯,方式有兩種gen.send(None)或者next(gen))

  2.close()方法:(關閉生成器)

    自己處理的話會拋異常,gen.close(),RuntimeError: generator ignored GeneratorExit,如果是except Exception就不會拋異常,GeneratorExit是繼承至BaseException的,Exception也是繼承於BaseException的

 1 def gen_func():
 2     #自己處理的話會拋異常,gen.close(),RuntimeError: generator ignored GeneratorExit
 3     try:
 4         yield 'https://www.baidu.com'
 5     #如果是except Exception就不會拋異常,GeneratorExit是繼承至BaseException的,Exception也是繼承於BaseException的
 6     except GeneratorExit as e:
 7         pass
 8     yield 1
 9     yield 2
10     return 'LYQ'
11 
12 if __name__=='__main__':
13     #拋異常StopIteration:
14     gen=gen_func()
15     print(next(gen))
16     gen.close()
17     print(next(gen))
View Code

  3.throw()方法:向生成器中扔異常,需要自己處理,否則會拋錯

 1 def gen_func():
 2     try:
 3         yield 'https://www.baidu.com'
 4     except Exception:
 5         pass
 6     yield 1
 7     yield 2
 8     return 'LYQ'
 9 
10 if __name__=='__main__':
11     #拋異常StopIteration:
12     gen=gen_func()
13     print(next(gen))
14     #扔一個異常,是第一句的異常
15     gen.throw(Exception,'download error')
16     print(next(gen))
17     #扔一個異常,是第二句的異常
18     gen.throw(Exception,'download error')
19     print(next(gen))
View Code

   4.yield from:(Python 3.3新加的語法)

    4.1簡介:

 1 from itertools import chain
 2 my_list=[1,2,3]
 3 my_dict={'name1':'LYQ1',
 4          'name2':'LYQ2'}
 5 #將所有值遍歷輸出
 6 # for value in chain(my_list,my_dict,range(5,10)):
 7 #     print(value)
 8 
 9 def g1(iterable):
10     yield range(10)
11 #yield from iterable
12 def my_chain(*args,**kwargs):
13     for my_iterable in args:
14         #功能非常多
15         yield from my_iterable
16         # for value in my_iterable:
17         #     yield value
18 for value in my_chain(my_list,my_dict,range(5,10)):
19     print(value)
View Code

 

     4.2main調用方 g1:委托生成器 gen:子生成器:

 1 def g1(gen):
 2     yield from gen
 3 gen=range(10)
 4 def main():
 5     g=g1(gen)
 6     #直接發送給子生成器
 7     print(g.send(None))
 8 #main:調用方 g1:委托生成器 gen:子生成器
 9 #yield from會在調用方與子生成器之間建立一個雙向通道
10 main()
View Code

 

     4.3例子:

 1 final_result = {}
 2 
 3 
 4 def sales_sum(pro_name):
 5     total = 0
 6     nums = []
 7     while True:
 8         x = yield
 9         print(pro_name + "銷量: ", x)
10         if not x:
11             break
12         total += x
13         nums.append(x)
14     #直接返回到yield from sales_sum(key)
15     return total, nums
16 
17 
18 def middle(key):
19     while True:
20         final_result[key] = yield from sales_sum(key)
21         print(key + "銷量統計完成!!.")
22 
23 
24 def main():
25     data_sets = {
26         "面膜": [1200, 1500, 3000],
27         "手機": [28, 55, 98, 108],
28         "大衣": [280, 560, 778, 70],
29     }
30     for key, data_set in data_sets.items():
31         print("start key:", key)
32         m = middle(key)
33         #直接send到子生成器里面(x = yield)
34         m.send(None)  # 預激middle協程
35         for value in data_set:
36             m.send(value)  # 給協程傳遞每一組的值
37         m.send(None)
38     print("final_result:", final_result)
39 
40 
41 if __name__ == '__main__':
42     main()
View Code

 

      無yield from:

 1 def sales_sum(pro_name):
 2     total = 0
 3     nums = []
 4     while True:
 5         x = yield
 6         print(pro_name + "銷量: ", x)
 7         if not x:
 8             break
 9         total += x
10         nums.append(x)
11     #直接返回到yield from sales_sum(key)
12     return total, nums
13 
14 if __name__ == "__main__":
15     #直接與子生成器通信(沒用yield from就需要捕獲異常)
16     my_gen = sales_sum("手機")
17     my_gen.send(None)
18     my_gen.send(1200)
19     my_gen.send(1500)
20     my_gen.send(3000)
21     try:
22         my_gen.send(None)
23     #獲取返回值
24     except StopIteration as e:
25         result = e.value
26         print(result)
View Code

 

  4.4介紹yield from詳情:

 1 #pep380
 2 
 3 #1. RESULT = yield from EXPR可以簡化成下面這樣
 4 #一些說明
 5 """
 6 _i:子生成器,同時也是一個迭代器
 7 _y:子生成器生產的值
 8 _r:yield from 表達式最終的值
 9 _s:調用方通過send()發送的值
10 _e:異常對象
11 
12 """
13 
14 _i = iter(EXPR)      # EXPR是一個可迭代對象,_i其實是子生成器;
15 try:
16     _y = next(_i)   # 預激子生成器,把產出的第一個值存在_y中;
17 except StopIteration as _e:
18     _r = _e.value   # 如果拋出了`StopIteration`異常,那么就將異常對象的`value`屬性保存到_r,這是最簡單的情況的返回值;
19 else:
20     while 1:    # 嘗試執行這個循環,委托生成器會阻塞;
21         _s = yield _y   # 生產子生成器的值,等待調用方`send()`值,發送過來的值將保存在_s中;
22         try:
23             _y = _i.send(_s)    # 轉發_s,並且嘗試向下執行;
24         except StopIteration as _e:
25             _r = _e.value       # 如果子生成器拋出異常,那么就獲取異常對象的`value`屬性存到_r,退出循環,恢復委托生成器的運行;
26             break
27 RESULT = _r     # _r就是整個yield from表達式返回的值。
28 
29 """
30 1. 子生成器可能只是一個迭代器,並不是一個作為協程的生成器,所以它不支持.throw()和.close()方法;
31 2. 如果子生成器支持.throw()和.close()方法,但是在子生成器內部,這兩個方法都會拋出異常;
32 3. 調用方讓子生成器自己拋出異常
33 4. 當調用方使用next()或者.send(None)時,都要在子生成器上調用next()函數,當調用方使用.send()發送非 None 值時,才調用子生成器的.send()方法;
34 """
35 _i = iter(EXPR)
36 try:
37     _y = next(_i)
38 except StopIteration as _e:
39     _r = _e.value
40 else:
41     while 1:
42         try:
43             _s = yield _y
44         except GeneratorExit as _e:
45             try:
46                 _m = _i.close
47             except AttributeError:
48                 pass
49             else:
50                 _m()
51             raise _e
52         except BaseException as _e:
53             _x = sys.exc_info()
54             try:
55                 _m = _i.throw
56             except AttributeError:
57                 raise _e
58             else:
59                 try:
60                     _y = _m(*_x)
61                 except StopIteration as _e:
62                     _r = _e.value
63                     break
64         else:
65             try:
66                 if _s is None:
67                     _y = next(_i)
68                 else:
69                     _y = _i.send(_s)
70             except StopIteration as _e:
71                 _r = _e.value
72                 break
73 RESULT = _r
View Code

 

    看完代碼,我們總結一下關鍵點:

    1. 子生成器生產的值,都是直接傳給調用方的;調用方通過.send()發送的值都是直接傳遞給子生成器的;如果發送的是 None,會調用子生成器的__next__()方法,如果不是 None,會調用子生成器的.send()方法;
    2. 子生成器退出的時候,最后的return EXPR,會觸發一個StopIteration(EXPR)異常;
    3. yield from表達式的值,是子生成器終止時,傳遞給StopIteration異常的第一個參數;
    4. 如果調用的時候出現StopIteration異常,委托生成器會恢復運行,同時其他的異常會向上 "冒泡";
    5. 傳入委托生成器的異常里,除了GeneratorExit之外,其他的所有異常全部傳遞給子生成器的.throw()方法;如果調用.throw()的時候出現了StopIteration異常,那么就恢復委托生成器的運行,其他的異常全部向上 "冒泡";
    6. 如果在委托生成器上調用.close()或傳入GeneratorExit異常,會調用子生成器的.close()方法,沒有的話就不調用。如果在調用.close()的時候拋出了異常,那么就向上 "冒泡",否則的話委托生成器會拋出GeneratorExit異常。

七. 生成器如何變成協程?

  1.生成器可以暫停並獲取狀態:

 1 #生成器是可以暫停的函數
 2 import inspect
 3 def gen():
 4     yield 1
 5     return True
 6 
 7 if __name__=='__main__':
 8     g1=gen()
 9     #獲取生成器狀態 GEN_CREATED(創建)
10     print(inspect.getgeneratorstate(g1))
11     next(g1)
12     #GEN_SUSPENDED暫停
13     print(inspect.getgeneratorstate(g1))
14     try:
15         next(g1)
16     except StopIteration:
17         pass
18     #GEN_CLOSED關閉
19     print(inspect.getgeneratorstate(g1))
View Code

 

  2.協程的調度依然是 事件循環+協程模式 ,協程是單線程模式:

 1 #生成器是可以暫停的函數
 2 import inspect
 3 # def gen_func():
 4 #     value=yield from
 5 #     #第一返回值給調用方, 第二調用方通過send方式返回值給gen
 6 #     return "bobby"
 7 #1. 用同步的方式編寫異步的代碼, 在適當的時候暫停函數並在適當的時候啟動函數
 8 import socket
 9 def get_socket_data():
10     yield 1
11 
12 def downloader(url):
13     client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
14     client.setblocking(False)
15 
16     try:
17         client.connect((host, 80))  # 阻塞不會消耗cpu
18     except BlockingIOError as e:
19         pass
20 
21     selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
22     #如果get_socket_data()中出現異常,會直接拋給downloader(向上拋)
23     source = yield from get_socket_data()
24     data = source.decode("utf8")
25     html_data = data.split("\r\n\r\n")[1]
26     print(html_data)
27 
28 def download_html(html):
29     html = yield from downloader()
30 
31 if __name__ == "__main__":
32     #協程的調度依然是 事件循環+協程模式 ,協程是單線程模式
33     pass
View Code

八. async和await原生協程

  1.python為了將語義變得更加明確,就引入了async和await關鍵字定義原生的協程:

    生成器實現的協程又可以當生成器,又可以當協程,且代碼凌亂,不利於后期維護。原生的協程中不可以yield,否則會拋錯(讓協程更加明確)

可異步調用:實際實現了__await__魔法函數

    await:將控制權交出去並等待結果返回,await只能接收awaitable對象,可以理解成yield from

 1 # from collections import Awaitable
 2 #如果是函數,就要使用coroutine裝飾器,實際將__await_指向___iter__
 3 # import types
 4 # @types.coroutine
 5 # def downloader(url):
 6 #     return "haha"
 7 
 8 async def downloader(url):
 9     return "haha"
10 async def download_url(url):
11     #將控制權交出去並等待結果返回,await只能接收awaitable對象,可以理解成yield from
12     html=await downloader(url)
13     return html
14 
15 if __name__=='__main__':
16     coro=download_url('www.baidu.com')
17     #原生協程不能調用next
18     coro.send(None)
View Code

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM