Python之路-python(Queue隊列、進程、Gevent協程、Select\Poll\Epoll異步IO與事件驅動)


一、進程:

  1、語法

  2、進程間通訊

  3、進程池

二、Gevent協程

三、Select\Poll\Epoll異步IO與事件驅動

 

一、進程:

  1、語法

1 簡單的啟動線程語法
2 def run(name):
3     time.sleep(2)
4     print("hello",name)
5 
6 if __name__ == '__main__':
7     for i in range(10):同時啟動10個進程
8         p = multiprocessing.Process(target=run,args=("bob",))
9         p.start()
View Code
 1 #進程里面再啟動一個線程
 2 #每個進程里面在啟動一個線程
 3 def thread_run():
 4     print('線程',threading.get_ident())
 5 
 6 def run(name):
 7     time.sleep(2)
 8     print("hello",name)
 9     t = threading.Thread(target=thread_run,)
10     t.start()
11 
12 
13 if __name__ == '__main__':
14     for i in range(10):
15         p = multiprocessing.Process(target=run,args=("bob",))
16         p.start()
 1 from multiprocessing import Process
 2 import os
 3 
 4 
 5 def info(title):
 6     print(title)
 7     print('module name:', __name__)
 8     print('parent process:', os.getppid())
 9     print('process id:', os.getpid())
10     print("\n\n")
11 
12 
13 def f(name):
14     info('\033[31;1mcalled from child process function f\033[0m')
15     print('hello', name)
16 
17 if __name__ == '__main__':
18     info('\033[32;1mmain process line\033[0m')
19     p = Process(target=f, args=('bob',))
20     p.start()
main process line
module name: __main__
parent process: 1556#這個進程是pycharm的進程
process id: 22324#父進程的id



called from child process function f
module name: __mp_main__
parent process: 22324#父進程的id
process id: 8516#子進程的id
總結:每個子進程都是由父進程啟動的

  2、進程間通訊

queue

 1 #線程之間的通信。(我們都知道線程之間是數據共享的)
 2 import threading,queue
 3 
 4 def run():
 5     q.put('測試')
 6 
 7 
 8 if __name__ == '__main__':
 9     q = queue.Queue()
10     p = threading.Thread(target=run,)
11     p.start()
12     print(q.get())

 進程之間是不共享數據的(包括父進程和子進程)

 1 from  multiprocessing import Process,Queue
 2 
 3 def run():
 4     q.put('測試')
 5 
 6 
 7 if __name__ == '__main__':
 8     q = Queue()
 9     # p = threading.Thread(target=run,)
10     p = Process(target= run,)
11     p.start()
12     print(q.get())
13 結果:
14 NameError: name 'q' is not defined
15 結果證明進程之間是不共享數據的

現在我們用線程queue

 1 from  multiprocessing import Process
 2 import queue
 3 def run(qq):
 4     qq.put('測試')
 5 
 6 
 7 if __name__ == '__main__':
 8     q = queue.Queue()
 9     # p = threading.Thread(target=run,)
10     p = Process(target= run,args=(q,))
11     p.start()
12     print(q.get())
13     p.join()
14 結果:
15 TypeError: can't pickle _thread.lock objects
16 結論:
17 進程直接是不可用線程queue互傳數據的。

進程queue

 1 from  multiprocessing import Process,Queue
 2 def run(qq):
 3     qq.put('測試')
 4 
 5 
 6 if __name__ == '__main__':
 7     q = Queue()
 8     # p = threading.Thread(target=run,)
 9     p = Process(target= run,args=(q,))
10     p.start()
11     print(q.get())
12     p.join()
13 結果:
14     測試
15 結論:
16 #總結:兩個進程直接內存空間完全獨立(包括父進程和子進程),所以不能互相訪問數據
17 #       但是子進程生成是就把父進程當做一個變量傳過來。其實就是拿到父進程的q,然后子進程copy一份
18 #       然后用pickle序列化,再反序列化給父進程,這樣子進程就可以訪問父進程的數據

 Pipes

 1 from multiprocessing import Process, Pipe
 2 
 3 
 4 def f(conn):
 5     conn.send([42, None, 'hello from child'])#發給父進程的消息
 6     conn.send([42, None, 'hello from child2'])#發給父進程的消息
 7     print("from parent:",conn.recv())#收來自父進程的消息
 8     conn.close()#關閉
 9 
10 if __name__ == '__main__':
11     parent_conn, child_conn = Pipe()#變量名前后沒有關系。
12     p = Process(target=f, args=(child_conn,))
13     p.start()
14     print(parent_conn.recv())  # prints "[42, None, 'hello']"
15     print(parent_conn.recv())  # prints "[42, None, 'hello']"#收子進程的消息,如果這里多收一次就會卡住(子發2,父進程收3次,就會卡住)
16     parent_conn.send("張三可好") # prints "[42, None, 'hello']",發給子進程的消息
17     p.join()
18 結果:
19 [42, None, 'hello from child']
20 [42, None, 'hello from child2']
21 from parent: 張三可好
22 結論
23 #相當於socket,實現互相通信。管道兩頭順序沒關系.發送的跟收到的必須一致,否則多發多收會造成阻塞狀態

上面的queue和pipe實現了進程之間的傳遞,還不是數據的共享。下面我們看看manager實現兩個進程之間的數據共享。

 

manager

  A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,

  其實就是實現了進程之間的共享,那些東西可以共享呢?列表、字典、命名空間、鎖、遞歸鎖、變量等等

 

 1 #pipe和queue只是實現了進程直接互相通信,但是實際意義上沒有實現之間數據共享。manager可以共享
 2 from multiprocessing import  Process,Manager
 3 import  os
 4 
 5 def f(d,l):
 6     d[os.getpid()] = os.getppid()
 7     l.append(os.getppid())
 8     print(l)
 9 
10 if __name__ == '__main__':
11     with Manager() as manager:
12         d  = manager.dict()#生成一個字典,可在多個線程間共享
13         l = manager.list(range(5))#生成一個列表,可在多個進程間共享,並先生成5個值
14 
15         p_list = []#為了等進程執行完成定義一個列表
16         for i in range(10):
17             p = Process(target=f,args=(d,l))
18             p.start()
19             p_list.append(p)#執行完一個進程往p_list里面添加一次
20         for res in p_list:#等待進程執行完畢后等待一下
21             res.join()
22         print(d)
23 結果:
24 [0, 1, 2, 3, 4, 19112]
25 [0, 1, 2, 3, 4, 19112, 19112]
26 [0, 1, 2, 3, 4, 19112, 19112, 19112]
27 [0, 1, 2, 3, 4, 19112, 19112, 19112, 19112]
28 [0, 1, 2, 3, 4, 19112, 19112, 19112, 19112, 19112]
29 [0, 1, 2, 3, 4, 19112, 19112, 19112, 19112, 19112, 19112]
30 [0, 1, 2, 3, 4, 19112, 19112, 19112, 19112, 19112, 19112, 19112]
31 [0, 1, 2, 3, 4, 19112, 19112, 19112, 19112, 19112, 19112, 19112, 19112]
32 [0, 1, 2, 3, 4, 19112, 19112, 19112, 19112, 19112, 19112, 19112, 19112, 19112]
33 [0, 1, 2, 3, 4, 19112, 19112, 19112, 19112, 19112, 19112, 19112, 19112, 19112, 19112]
34 {9632: 19112, 6800: 19112, 8956: 19112, 11444: 19112, 5220: 19112, 13052: 19112, 18040: 19112, 12452: 19112, 18256: 19112, 6540: 19112}
35 
36 結論:
37 #Queue  \ Pipe 只是實現進程間數據的傳遞
38 #Manager 實現了進程間數據的共享,即多個進程可以修改同一份數據

 進程同步:

 1 from multiprocessing import Process, Lock
 2 
 3 def f(l, i):
 4     l.acquire()
 5     print('hello world', i)
 6     l.release()
 7 
 8 if __name__ == '__main__':
 9     lock = Lock()#
10 
11     for num in range(10):
12         Process(target=f, args=(lock, num)).start()
13 結果:
14 hello world 0
15 hello world 2
16 hello world 1
17 hello world 3
18 hello world 4
19 hello world 9
20 hello world 5
21 hello world 7
22 hello world 8
23 hello world 6
24 結論:
25 #這里的鎖匙屏幕鎖,防止打印到屏幕的時候,一行沒打完就開一下一個。

 

  3、進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。

進程池中有兩個方法:

  1、apply#串行

  2、apply_async#並行

 1 #apply串行運行
 2 #apply_async異步,其實就是並行
 3 
 4 from  multiprocessing import  Process,Pool,freeze_support
 5 import time
 6 import os
 7 
 8 
 9 def Foo(i):
10     time.sleep(2)
11     print("Foo",os.getpid())
12     return i+ 100
13 
14 def Bar(arg):
15     print("--》exec done,Bar:",arg,os.getpid())
16 
17 if __name__ == '__main__':
18     pool = Pool(processes=3)#允許進程池同時放入3個進程
19     print("主進程",os.getpid())
20     for i in range(10):
21         pool.apply_async(func=Foo, args=(i,), callback=Bar) #callback=回調
22         # pool.apply(func=Foo,args=(i,))#串行
23         #pool.apply_async(func=Foo,args=(i,))#串行
24     print('完成')
25     pool.close()
26     pool.join()#牢記進程池中執行關閉后在關閉,如果注釋,那么程序就直接關閉
27 
28 結果:
29 主進程 6516
30 完成
31 Foo 3320
32 --》exec done,Bar: 100 6516
33 Foo 21480
34 --》exec done,Bar: 101 6516
35 Foo 17944
36 --》exec done,Bar: 102 6516
37 Foo 3320
38 --》exec done,Bar: 103 6516
39 Foo 21480
40 --》exec done,Bar: 104 6516
41 Foo 17944
42 --》exec done,Bar: 105 6516
43 Foo 3320
44 --》exec done,Bar: 106 6516
45 Foo 21480
46 --》exec done,Bar: 107 6516
47 Foo 17944
48 --》exec done,Bar: 108 6516
49 Foo 3320
50 --》exec done,Bar: 109 6516

 

二、Gevent協程

協程

協程,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程

協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:

協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

 

協程的好處:

  • 無需線程上下文切換的開銷
  • 無需原子操作鎖定及同步的開銷
  • 方便切換控制流,簡化編程模型
  • 高並發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高並發處理。

缺點:

  • 無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
  • 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序

 

 1 #使用yeild模擬多協程,模擬多並發
 2 import time
 3 import queue
 4 def consumer(name):
 5     print("--->starting eating baozi...")
 6     while True:
 7         new_baozi = yield
 8         print("[%s] is eating baozi %s" % (name,new_baozi))
 9         #time.sleep(1)
10  
11 def producer():
12  
13     r = con.__next__()
14     r = con2.__next__()
15     n = 0
16     while n < 5:
17         n +=1
18         con.send(n)
19         con2.send(n)
20         print("\033[32;1m[producer]\033[0m is making baozi %s" %n )
21  
22  
23 if __name__ == '__main__':
24     con = consumer("c1")
25     con2 = consumer("c2")
26     p = producer()

Greenlet:(手動切換)

 1 from greenlet import greenlet
 2 
 3 def test1():
 4     print(12)#2
 5     gr2.switch()#3
 6     print(34)#6
 7     gr2.switch()#7
 8 
 9 def test2():
10     print(56)#4
11     gr1.switch()#5
12     print(78)#8
13 
14 
15 gr1 = greenlet(test1)#啟動協程
16 gr2 = greenlet(test2)
17 gr1.switch()#相當於yeild的__next__(),沒有它上面的都不會執行

 Gevent:(實現遇到IO自動切換)

Gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。

 1 #自動切換
 2 import gevent
 3 
 4 def foo():
 5     print("Running in foo,foo開始>>>1")
 6     gevent.sleep(2)
 7     print('Explicit context switch to foo again  foo完成>>>6')
 8 
 9 def bar():
10     print('Explicit精確的 context 內容 to bar   bar開始>>>2')
11     gevent.sleep(1)
12     print("Imlicit context sitch back to bar  bar結束>>>5")
13 
14 def func3():
15     print('running func3  func3開始>>>3')
16     gevent.sleep(0)
17     print('running func3 again  func3結束>>>4')
18 
19 gevent.joinall([
20     gevent.spawn(foo),
21     gevent.spawn(bar),
22     gevent.spawn(func3),
23 ])
1 結果:
2 Running in foo,foo開始>>>1
3 Explicit精確的 context 內容 to bar   bar開始>>>2
4 running func3  func3開始>>>3
5 running func3 again  func3結束>>>4
6 Imlicit context sitch back to bar  bar結束>>>5
7 Explicit context switch to foo again  foo完成>>>6
遇到sleep就自動切換,sleep后面的參數是秒。上面這個小程序最多2秒左右運行完

 

通過gevent實現單線程下的多socket並發

server side 

 1 import sys
 2 import socket
 3 import time
 4 import gevent
 5  
 6 from gevent import socket,monkey
 7 monkey.patch_all()
 8  
 9  
10 def server(port):
11     s = socket.socket()
12     s.bind(('0.0.0.0', port))
13     s.listen(500)
14     while True:
15         cli, addr = s.accept()
16         gevent.spawn(handle_request, cli)
17  
18  
19  
20 def handle_request(conn):
21     try:
22         while True:
23             data = conn.recv(1024)
24             print("recv:", data)
25             conn.send(data)
26             if not data:
27                 conn.shutdown(socket.SHUT_WR)
28  
29     except Exception as  ex:
30         print(ex)
31     finally:
32         conn.close()
33 if __name__ == '__main__':
34     server(8001)

client side   

 1 import socket
 2  
 3 HOST = 'localhost'    # The remote host
 4 PORT = 8001           # The same port as used by the server
 5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 6 s.connect((HOST, PORT))
 7 while True:
 8     msg = bytes(input(">>:"),encoding="utf8")
 9     s.sendall(msg)
10     data = s.recv(1024)
11     #print(data)
12  
13     print('Received', repr(data))
14 s.close()

 

簡單的使用協程寫一個爬蟲:

串行

 1 from urllib import request
 2 import time
 3 
 4 
 5 def f(url):
 6     print('GET:%s'%url)
 7     resp = request.urlopen(url)
 8     data = resp.read()
 9     # file = open("data",'wb')#這里可以打開這兩步,寫入文件
10     # file.write(data)
11     print('%d bytes received from %s.'%(len(data),url))
12 
13 
14 
15 #串性模式
16 urls = [
17     'https://www.python.org/',
18     'https://www.yahoo.com/',
19     'https://github.com/']
20 
21 time_start = time.time()
22 for url in urls:
23     f(url)
24 print("同步cost",time.time()-time_start)

並行:

 1 #因為gevent檢測不到urllib是否進行了io操作,所以需要打補丁
 2 
 3 from urllib import request
 4 import gevent,time
 5 
 6 from gevent import monkey#打補丁(把下面有可能有IO操作的單獨做上標記)
 7 monkey.patch_all()#打補丁
 8 
 9 def f(url):
10     print('GET:%s'%url)
11     resp = request.urlopen(url)
12     data = resp.read()
13     # file = open("data",'wb')#這里可以打開這兩步,寫入文件
14     # file.write(data)
15     print('%d bytes received from %s.'%(len(data),url))
16 
17 
18 #異步模式
19 async_time_start = time.time()
20 gevent.joinall([
21     gevent.spawn(f,'https://www.python.org/'),
22     gevent.spawn(f, 'https://www.yahoo.com/'),
23     gevent.spawn(f,'https://github.com/')
24 ])
25 print("異步步cost",time.time()-async_time_start)

 

 

論事件驅動與異步IO

通常,我們寫服務器處理模型的程序時,有以下幾種模型:
(1)每收到一個請求,創建一個新的進程,來處理該請求;
(2)每收到一個請求,創建一個新的線程,來處理該請求;
(3)每收到一個請求,放入一個事件列表,讓主進程通過非阻塞I/O方式來處理請求
上面的幾種方式,各有千秋,
第(1)中方法,由於創建新的進程的開銷比較大,所以,會導致服務器性能比較差,但實現比較簡單。
第(2)種方式,由於要涉及到線程的同步,有可能會面臨 死鎖等問題。
第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都復雜。
綜合考慮各方面因素,一般普遍認為第(3)種方式是大多數 網絡服務器采用的方式
 

看圖說話講事件驅動模型

在UI編程中,常常要對鼠標點擊進行相應,首先如何獲得鼠標點擊呢?
方式一:創建一個線程,該線程一直循環檢測是否有鼠標點擊,那么這個方式有以下幾個缺點
1. CPU資源浪費,可能鼠標點擊的頻率非常小,但是掃描線程還是會一直循環檢測,這會造成很多的CPU資源浪費;如果掃描鼠標點擊的接口是阻塞的呢?
2. 如果是堵塞的,又會出現下面這樣的問題,如果我們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,由於掃描鼠標時被堵塞了,那么可能永遠不會去掃描鍵盤;
3. 如果一個循環需要掃描的設備非常多,這又會引來響應時間的問題;
所以,該方式是非常不好的。

方式二:就是事件驅動模型
目前大部分的UI編程都是事件驅動模型,如很多UI平台都會提供onClick()事件,這個事件就代表鼠標按下事件。事件驅動模型大體思路如下:
1. 有一個事件(消息)隊列;
2. 鼠標按下時,往這個隊列中增加一個點擊事件(消息);
3. 有個循環,不斷從隊列取出事件,根據不同的事件,調用不同的函數,如onClick()、onKeyDown()等;
4. 事件(消息)一般都各自保存各自的處理函數指針,這樣,每個消息都有獨立的處理函數;

 

 

 

事件驅動編程是一種編程范式,這里程序的執行流由外部事件來決定。它的特點是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程范式是(單線程)同步以及多線程編程。

讓我們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展示了隨着時間的推移,這三種模式下程序所做的工作。這個程序有3個任務需要完成,每個任務都在等待I/O操作時阻塞自身。阻塞在I/O操作上所花費的時間已經用灰色框標示出來了。

 

在單線程同步模型中,任務按照順序執行。如果某個任務因為I/O而阻塞,其他所有的任務都必須等待,直到它完成之后它們才能依次執行。這種明確的執行順序和串行化處理的行為是很容易推斷得出的。如果任務之間並沒有互相依賴的關系,但仍然需要互相等待的話這就使得程序不必要的降低了運行速度。

在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操作系統來管理,在多處理器系統上可以並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其他線程得以繼續執行。與完成類似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,因為這類程序不得不通過線程同步機制如鎖、可重入函數、線程局部存儲或者其他機制來處理線程安全問題,如果實現不當就會導致出現微妙且令人痛不欲生的bug。

在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其他昂貴的操作時,注冊一個回調到事件循環中,然后當I/O操作完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢所有的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序盡可能的得以執行而不需要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行為,因為程序員不需要關心線程安全問題。

當我們面對如下的環境時,事件驅動模型通常是一個好的選擇:

  1. 程序中有許多任務,而且…
  2. 任務之間高度獨立(因此它們不需要互相通信,或者等待彼此)而且…
  3. 在等待事件到來時,某些任務會阻塞。

當應用程序需要在任務間共享可變的數據時,這也是一個不錯的選擇,因為這里不需要采用同步處理。

網絡應用程序通常都有上述這些特點,這使得它們能夠很好的契合事件驅動編程模型。

 

 

三、Select\Poll\Epoll異步IO與事件驅動

 

一 概念說明

在進行解釋之前,首先要說明幾個概念:
- 用戶空間和內核空間
- 進程切換
- 進程的阻塞
- 文件描述符
- 緩存 I/O

 

 

用戶空間與內核空間

現在操作系統都是采用虛擬存儲器,那么對32位操作系統而言,它的尋址空間(虛擬存儲空間)為4G(2的32次方)。操作系統的核心是內核,獨立於普通的應用程序,可以訪問受保護的內存空間,也有訪問底層硬件設備的所有權限。為了保證用戶進程不能直接操作內核(kernel),保證內核的安全,操心系統將虛擬空間划分為兩部分,一部分為內核空間,一部分為用戶空間。針對linux操作系統而言,將最高的1G字節(從虛擬地址0xC0000000到0xFFFFFFFF),供內核使用,稱為內核空間,而將較低的3G字節(從虛擬地址0x00000000到0xBFFFFFFF),供各個進程使用,稱為用戶空間。

 

進程切換

為了控制進程的執行,內核必須有能力掛起正在CPU上運行的進程,並恢復以前掛起的某個進程的執行。這種行為被稱為進程切換。因此可以說,任何進程都是在操作系統內核的支持下運行的,是與內核緊密相關的。

從一個進程的運行轉到另一個進程上運行,這個過程中經過下面這些變化:
1. 保存處理機上下文,包括程序計數器和其他寄存器。
2. 更新PCB信息。
3. 把進程的PCB移入相應的隊列,如就緒、在某事件阻塞等隊列。
4. 選擇另一個進程執行,並更新其PCB。
5. 更新內存管理的數據結構。
6. 恢復處理機上下文。

注:總而言之就是很耗資源,具體的可以參考這篇文章:進程切換

 

進程的阻塞

正在執行的進程,由於期待的某些事件未發生,如請求系統資源失敗、等待某種操作的完成、新數據尚未到達或無新工作做等,則由系統自動執行阻塞原語(Block),使自己由運行狀態變為阻塞狀態。可見,進程的阻塞是進程自身的一種主動行為,也因此只有處於運行態的進程(獲得CPU),才可能將其轉為阻塞狀態。當進程進入阻塞狀態,是不占用CPU資源的

文件描述符fd

文件描述符(File descriptor)是計算機科學中的一個術語,是一個用於表述指向文件的引用的抽象化概念。

文件描述符在形式上是一個非負整數。實際上,它是一個索引值,指向內核為每一個進程所維護的該進程打開文件的記錄表。當程序打開一個現有文件或者創建一個新文件時,內核向進程返回一個文件描述符。在程序設計中,一些涉及底層的程序編寫往往會圍繞着文件描述符展開。但是文件描述符這一概念往往只適用於UNIX、Linux這樣的操作系統。

 

緩存 I/O

緩存 I/O 又被稱作標准 I/O,大多數文件系統的默認 I/O 操作都是緩存 I/O。在 Linux 的緩存 I/O 機制中,操作系統會將 I/O 的數據緩存在文件系統的頁緩存( page cache )中,也就是說,數據會先被拷貝到操作系統內核的緩沖區中,然后才會從操作系統內核的緩沖區拷貝到應用程序的地址空間。

緩存 I/O 的缺點:
數據在傳輸過程中需要在應用程序地址空間和內核進行多次數據拷貝操作,這些數據拷貝操作所帶來的 CPU 以及內存開銷是非常大的。

 

二 IO模式

剛才說了,對於一次IO訪問(以read舉例),數據會先被拷貝到操作系統內核的緩沖區中,然后才會從操作系統內核的緩沖區拷貝到應用程序的地址空間。所以說,當一個read操作發生時,它會經歷兩個階段:
1. 等待數據准備 (Waiting for the data to be ready)
2. 將數據從內核拷貝到進程中 (Copying the data from the kernel to the process)

正式因為這兩個階段,linux系統產生了下面五種網絡模式的方案。
- 阻塞 I/O(blocking IO)
- 非阻塞 I/O(nonblocking IO)
- I/O 多路復用( IO multiplexing)
- 信號驅動 I/O( signal driven IO)
- 異步 I/O(asynchronous IO)

注:由於signal driven IO在實際中並不常用,所以我這只提及剩下的四種IO Model。

 

阻塞 I/O(blocking IO)

在linux中,默認情況下所有的socket都是blocking,一個典型的讀操作流程大概是這樣:

 

 

當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:准備數據(對於網絡IO來說,很多時候數據在一開始還沒有到達。比如,還沒有收到一個完整的UDP包。這個時候kernel就要等待足夠的數據到來)。這個過程需要等待,也就是說數據被拷貝到操作系統內核的緩沖區中是需要一個過程的。而在用戶進程這邊,整個進程會被阻塞(當然,是進程自己選擇的阻塞)。當kernel一直等到數據准備好了,它就會將數據從kernel中拷貝到用戶內存,然后kernel返回結果,用戶進程才解除block的狀態,重新運行起來。

所以,blocking IO的特點就是在IO執行的兩個階段都被block了。

非阻塞 I/O(nonblocking IO)

linux下,可以通過設置socket使其變為non-blocking。當對一個non-blocking socket執行讀操作時,流程是這個樣子:

 

當用戶進程發出read操作時,如果kernel中的數據還沒有准備好,那么它並不會block用戶進程,而是立刻返回一個error。從用戶進程角度講 ,它發起一個read操作后,並不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個error時,它就知道數據還沒有准備好,於是它可以再次發送read操作。一旦kernel中的數據准備好了,並且又再次收到了用戶進程的system call,那么它馬上就將數據拷貝到了用戶內存,然后返回。

所以,nonblocking IO的特點是用戶進程需要不斷的主動詢問kernel數據好了沒有。

I/O 多路復用( IO multiplexing)

IO multiplexing就是我們說的select,poll,epoll,有些地方也稱這種IO方式為event driven IO。select/epoll的好處就在於單個process就可以同時處理多個網絡連接的IO。它的基本原理就是select,poll,epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有數據到達了,就通知用戶進程。

 

當用戶進程調用了select,那么整個進程會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據准備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。

所以,I/O 多路復用的特點是通過一種機制一個進程能同時等待多個文件描述符,而這些文件描述符(套接字描述符)其中的任意一個進入讀就緒狀態,select()函數就可以返回。

這個圖和blocking IO的圖其實並沒有太大的不同,事實上,還更差一些。因為這里需要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。但是,用select的優勢在於它可以同時處理多個connection。

所以,如果處理的連接數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優勢並不是對於單個連接能處理得更快,而是在於能處理更多的連接。)

在IO multiplexing Model中,實際中,對於每一個socket,一般都設置成為non-blocking,但是,如上圖所示,整個用戶的process其實是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。

 

異步 I/O(asynchronous IO)

inux下的asynchronous IO其實用得很少。先看一下它的流程:

用戶進程發起read操作之后,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之后,首先它會立刻返回,所以不會對用戶進程產生任何block。然后,kernel會等待數據准備完成,然后將數據拷貝到用戶內存,當這一切都完成之后,kernel會給用戶進程發送一個signal,告訴它read操作完成了。

 

總結

blocking和non-blocking的區別

調用blocking IO會一直block住對應的進程直到操作完成,而non-blocking IO在kernel還准備數據的情況下會立刻返回。

synchronous IO和asynchronous IO的區別

在說明synchronous IO和asynchronous IO的區別之前,需要先給出兩者的定義。POSIX的定義是這樣子的:
- A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
- An asynchronous I/O operation does not cause the requesting process to be blocked;

兩者的區別就在於synchronous IO做”IO operation”的時候會將process阻塞。按照這個定義,之前所述的blocking IO,non-blocking IO,IO multiplexing都屬於synchronous IO。

有人會說,non-blocking IO並沒有被block啊。這里有個非常“狡猾”的地方,定義中所指的”IO operation”是指真實的IO操作,就是例子中的recvfrom這個system call。non-blocking IO在執行recvfrom這個system call的時候,如果kernel的數據沒有准備好,這時候不會block進程。但是,當kernel中數據准備好的時候,recvfrom會將數據從kernel拷貝到用戶內存中,這個時候進程是被block了,在這段時間內,進程是被block的。

而asynchronous IO則不一樣,當進程發起IO 操作之后,就直接返回再也不理睬了,直到kernel發送一個信號,告訴進程說IO完成。在這整個過程中,進程完全沒有被block。

 

各個IO Model的比較如圖所示:

 

通過上面的圖片,可以發現non-blocking IO和asynchronous IO的區別還是很明顯的。在non-blocking IO中,雖然進程大部分時間都不會被block,但是它仍然要求進程去主動的check,並且當數據准備完成以后,也需要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。而asynchronous IO則完全不同。它就像是用戶進程將整個IO操作交給了他人(kernel)完成,然后他人做完后發信號通知。在此期間,用戶進程不需要去檢查IO操作的狀態,也不需要主動的去拷貝數據。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM