IPC 機制的 Python 實現


IPC機制

from multiprocessing import Queue, Process

"""
研究思路
    1.主進程跟子進程借助於隊列通信
    2.子進程跟子進程借助於隊列通信
"""
def producer(q):
    q.put('很高興為您服務')


def consumer(q):
    print(q.get())


if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer,args=(q,))
    p1 = Process(target=consumer,args=(q,))
    p.start()
    p1.start()

 

 


 

多進程IPC與Python支持

linux下進程間通信的幾種主要手段簡介:

  1. 管道(Pipe)及有名管道(named pipe):管道可用於具有親緣關系進程間的通信,有名管道克服了管道沒有名字的限制,因此,除具有管道所具有的功能外,它還允許無親緣關系進程間的通信;

  2. 信號(Signal):信號是比較復雜的通信方式,用於通知接受進程有某種事件發生,除了用於進程間通信外,進程還可以發送信號給進程本身;linux除了支持Unix早期信號語義函數sigal外,還支持語義符合Posix.1標准的信號函數sigaction(實際上,該函數是基於BSD的,BSD為了實現可靠信號機制,又能夠統一對外接口,用sigaction函數重新實現了signal函數);

  3. 報文(Message)隊列(消息隊列):消息隊列是消息的鏈接表,包括Posix消息隊列system V消息隊列。有足夠權限的進程可以向隊列中添加消息,被賦予讀權限的進程則可以讀走隊列中的消息。消息隊列克服了信號承載信息量少,管道只能承載無格式字節流以及緩沖區大小受限等缺點。

  4. 共享內存:使得多個進程可以訪問同一塊內存空間,是最快的可用IPC形式。是針對其他通信機制運行效率較低而設計的。往往與其它通信機制,如信號量結合使用,來達到進程間的同步及互斥。

  5. 信號量(semaphore):主要作為進程間以及同一進程不同線程之間的同步手段。

  6. 套接口(Socket):更為一般的進程間通信機制,可用於不同機器之間的進程間通信。起初是由Unix系統的BSD分支開發出來的,但現在一般可以移植到其它類Unix系統上:Linux和System V的變種都支持套接字。

python 原生支持的有:1, 2, 6. 信號這個比較簡單, 一種注冊監聽機制.本文不涉及

管道是可以通過 (mutiprocessing.Pipe) 獲得, 由c寫的

套接字這個通過 AF_UNIX協議 就可以完成啦, 和網絡編程類似的~

其實仔細想想還有第三種即, 利用文件, 生產者寫到文件中, 消費者從文件中讀.(簡單化成一個生產者, 一個消費者, 否者競爭關系有點復雜.), 當然我們知道文件寫入肯定很慢, 但是有多慢還是要測試一下的.

工具函數:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
#   Author  :   zhangxiaolin
#   E-mail  :   petelin1120@gmail.com
#   Date    :   17/8/17 12:08
#   Desc    :   ...
# through pipe 269667.7903995848 KB/s
 
data_size  =  8  *  1024   # KB
 
 
def  gen_data(size):
     onekb  =  "a"  *  1024
     return  (onekb  *  size).encode( 'ascii' )

  

管道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
#   Author  :   zhangxiaolin
#   E-mail  :   petelin1120@gmail.com
#   Date    :   17/8/17 12:08
#   Desc    :   ...
import  multiprocessing
 
from  mutiprocesscomunicate  import  gen_data, data_size
 
 
def  send_data_task(pipe_out):
     for  in  range (data_size):
         pipe_out.send(gen_data( 1 ))
     # end EOF
     pipe_out.send("")
     print ( 'send done.' )
 
 
def  get_data_task(pipe_in):
     while  True :
         data  =  pipe_in.recv()
         if  not  data:
             break
     print ( "recv done." )
 
 
if  __name__  = =  '__main__' :
     pipe_in, pipe_out  =  multiprocessing.Pipe( False )
     =  multiprocessing.Process(target = send_data_task, args = (pipe_out,), kwargs = ())
     p1  =  multiprocessing.Process(target = get_data_task, args = (pipe_in,), kwargs = ())
 
     p.daemon  =  True
     p1.daemon  =  True
     import  time
 
     start_time  =  time.time()
     p1.start()
     p.start()
     p.join()
     p1.join()
     print ( 'through pipe' , data_size  /  (time.time()  -  start_time),  'KB/s' )

  

注意這個地方 Pipe(True)默認為雙工的, 然而標准的是單工的, 單工緩沖區大小在OSX上有64KB, 設置緩存區是為了協調流入流出速率, 否者寫的太快, 沒人取走也是浪費. 結果: through pipe 99354.71358973449 KB/s

file

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
import  os
from  mutiprocesscomunicate  import  gen_data, data_size
 
 
def  send_data_task(file_name):
     # 是否同步寫入磁盤, 如果同步寫進去, 慢的一 b, 牛逼的是, 不同步寫進去, 也可以讀.操作系統厲害了.
     # os.sync()
     with  open (file_name,  'wb+' ) as fd:
         for  in  range (data_size):
             fd.write(gen_data( 1 ))
             fd.write( '\n' .encode( 'ascii' ))
             # end EOF
         fd.write( 'EOF' .encode( 'ascii' ))
     print ( 'send done.' )
 
 
def  get_data_task(file_name):
     offset  =  0
     fd  =  open (file_name,  'r+' )
     =  0
     while  True :
         data  =  fd.read( 1024 )
         offset  + =  len (data)
         if  'EOF'  in  data:
             fd.truncate()
             break
         if  not  data:
             fd.close()
             fd  =  None
             fd  =  open (file_name,  'r+' )
             fd.seek(offset)
             continue
     print ( "recv done." )
 
 
if  __name__  = =  '__main__' :
     import  multiprocessing
 
     pipe_out  =  pipe_in  =  'throught_file'
     =  multiprocessing.Process(target = send_data_task, args = (pipe_out,), kwargs = ())
     p1  =  multiprocessing.Process(target = get_data_task, args = (pipe_in,), kwargs = ())
 
     p.daemon  =  True
     p1.daemon  =  True
     import  time
 
     start_time  =  time.time()
     p1.start()
     import  time
 
     time.sleep( 0.5 )
     p.start()
     p.join()
     p1.join()
     import  os
     print ( 'through file' , data_size  /  (time.time()  -  start_time),  'KB/s' )
     open (pipe_in,  'w+' ).truncate()

  

有兩個點, 一個是, 打開文件之后, 如果有人在寫入, 需要重新打開才能發現新內容, 另外需要設置offset,只讀取新內容.

!!!重點, 測試的時候這個速度有 through file 110403.02025891568 KB/s這么多, 甚至比管道還要高一點, 這是怎么回事呢?

quite often file data is first written into the page cache (which is in RAM) by the OS kernel.

並沒有被寫入文件, 而是被寫到內存中了, 隨后(不會通知你)被操作系統調度寫入文件.操作系統比較厲害的是, 即使沒有寫到文件中, 讀寫仍然像寫到文件中一樣.

如果設置了 os.sync(), 所有寫操作立即執行, 會發現慢的…類似於卡死.

本地socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
#   Author  :   zhangxiaolin
#   E-mail  :   petelin1120@gmail.com
#   Date    :   17/8/17 12:08
#   Desc    :   ...
import  multiprocessing
import  os
import  socket
 
from  mutiprocesscomunicate  import  gen_data, data_size
 
 
minissdpdSocket  =  '/tmp/m.sock'   # The socket for talking to minissdpd
 
 
def  send_data_task():
     with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server:
         server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,  1 )
 
         try :
             os.remove(minissdpdSocket)
         except  OSError:
             pass
 
         server.bind(minissdpdSocket)
 
         server.listen( 1 )
 
         conn, _  =  server.accept()
         with conn:
             for  in  range (data_size):
                 conn.send(gen_data( 1 ))
             conn.shutdown(socket.SHUT_WR)
             print ( 'send done.' )
 
 
def  get_data_task():
     with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
         client.connect(minissdpdSocket)
         client.shutdown(socket.SHUT_WR)
         while  True :
             data  =  client.recv( 1024 )
             if  not  data:
                 break
         print ( "recv done." )
 
 
if  __name__  = =  '__main__' :
     =  multiprocessing.Process(target = send_data_task, args = (), kwargs = ())
     p1  =  multiprocessing.Process(target = get_data_task, args = (), kwargs = ())
 
     p.daemon  =  True
     p1.daemon  =  True
     import  time
 
     start_time  =  time.time()
     p.start()
 
     p1.start()
     p.join()
     p1.join()
     print ( 'through pipe' , data_size  /  (time.time()  -  start_time),  'KB/s' )

  

本地socket, 會走傳輸層也就是被tcp或者udp封裝一下,到網絡層,網絡層自己有路由表, 發現是本機, 則走本地回環接口, 不經過物理網卡, 發到接受隊列中去.

這個速度不穩定, 最快有through socket 261834.36615940317 KB/s

參考

  1. 深刻理解Linux進程間通信(IPC)
  2. 文件沒有直接寫入磁盤
  3. pipe緩存區大小

 

本文作者:陌雨翎
本文鏈接:https://www.cnblogs.com/rianley/p/9014017.html
關於博主:評論和私信會在第一時間回復。或者直接私信我。
版權聲明:本博客所有文章除特別聲明外,均采用 BY-NC-SA 許可協議。轉載請注明出處!
聲援博主:如果您覺得文章對您有幫助,可以點擊文章右下角推薦】一下。您的鼓勵是博主的最大動力!

在linux下的多個進程間的通信機制叫做IPC(Inter-Process Communication),它是多個進程之間相互溝通的一種方法。在linux下有多種進程間通信的方法:半雙工管道、命名管道、消息隊列、信號、信號量、共享內存、內存映射文件,套接字等等。使用這些機制可以為linux下的網絡服務器開發提供靈活而又堅固的框架。

1. 管道 (PIPE)
    管道實際是用於進程間通信的一段共享內存,創建管道的進程稱為管道服務器,連接到一個管道的進程為管道客戶機。一個進程在向管道寫入數據后,另一進程就可以從管道的另一端將其讀取出來。
管道的特點:
1、管道是半雙工的,數據只能向一個方向流動;需要雙方通信時,需要建立起兩個管道;
2、 只能用於父子進程或者兄弟進程之間( 具有親緣關系的進程)。 比如fork或exec創建的新進程, 在使用exec創建新進程時,需要將管道的文件描述符作為參數傳遞給exec創建的新進程。 當父進程與使用fork創建的子進程直接通信時,發送數據的進程關閉讀端,接受數據的進程關閉寫端。
3、單獨構成一種獨立的文件系統:管道對於管道兩端的進程而言,就是一個文件,但它不是普通的文件,它不屬於某種文件系統,而是自立門戶,單獨構成一種文件系統,並且只存在與內存中。
4、數據的讀出和寫入:一個進程向管道中寫的內容被管道另一端的進程讀出。寫入的內容每次都添加在管道緩沖區的末尾,並且每次都是從緩沖區的頭部讀出數據。
管道的實現機制:

    管道是由內核管理的一個緩沖區,相當於我們放入內存中的一個紙條。管道的一端連接一個進程的輸出。這個進程會向管道中放入信息。管道的另一端連接一個進程的輸入,這個進程取出被放入管道的信息。一個緩沖區不需要很大,它被設計成為環形的數據結構,以便管道可以被循環利用。當管道中沒有信息的話,從管道中讀取的進程會等待,直到另一端的進程放入信息。當管道被放滿信息的時候,嘗試放入信息的進程會等待,直到另一端的進程取出信息。當兩個進程都終結的時候,管道也自動消失。

管道只能在本地計算機中使用,而不可用於網絡間的通信。 

 

2. 命名管道(FIFO)
命名管道是一種特殊類型的文件,它在系統中以文件形式存在。這樣克服了管道的弊端,他可以 允許沒有親緣關系的進程間通信。 

 

管道和命名管道的區別:
對於命名管道FIFO來說,IO操作和普通管道IO操作基本一樣,但是兩者有一個主要的區別,在命名管道中,管道可以是事先已經創建好的,比如我們在命令行下執行
mkfifo myfifo
就是創建一個命名通道,我們必須用open函數來顯示地建立連接到管道的通道,而在管道中,管道已經在主進程里創建好了,然后在fork時直接復制相關數據或者是用exec創建的新進程時把管道的文件描述符當參數傳遞進去。
一般來說FIFO和PIPE一樣總是處於阻塞狀態。也就是說如果命名管道FIFO打開時設置了讀權限,則讀進程將一直阻塞,一直到其他進程打開該FIFO並向管道寫入數據。這個阻塞動作反過來也是成立的。如果不希望命名管道操作的時候發生阻塞,可以在open的時候使用O_NONBLOCK標志,以關閉默認的阻塞操作。

 

3. 信號 (signal)
信號機制是unix系統中最為古老的進程之間的通信機制,用於一個或幾個進程之間傳遞異步信號。信號可以有各種異步事件產生,比如鍵盤中斷等。shell也可以使用信號將作業控制命令傳遞給它的子進程。

 

4. 消息隊列(Message queues)
消息隊列是內核地址空間中的內部鏈表,通過linux內核在各個進程直接傳遞內容,消息順序地發送到消息隊列中,並以幾種不同的方式從隊列中獲得,每個消息隊列可以用 IPC標識符 唯一地進行識別。內核中的消息隊列是通過IPC的標識符來區別,不同的消息隊列直接是相互獨立的。每個消息隊列中的消息,又構成一個 獨立的鏈表。
消息隊列克服了信號承載信息量少,管道只能承載無格式字符流。 

 

5. 信號量(Semaphore)
信號量是一種計數器,用於控制對多個進程共享的資源進行的訪問。它們常常被用作一個鎖機制,在某個進程正在對特定的資源進行操作時,信號量可以防止另一個進程去訪問它。 
信號量是特殊的變量,它只取正整數值並且只允許對這個值進行兩種操作:等待(wait)和信號(signal)。(P、V操作,P用於等待,V用於信號) 
p(sv):如果sv的值大於0,就給它減1;如果它的值等於0,就掛起該進程的執行 
V(sv):如果有其他進程因等待sv而被掛起,就讓它恢復運行;如果沒有其他進程因等待sv而掛起,則給它加1 
簡單理解就是P相當於申請資源,V相當於釋放資源 

 

6. 共享內存(Share Memory)
共享內存是在多個進程之間共享內存區域的一種進程間的通信方式,由IPC為進程創建的一個特殊地址范圍,它將出現在該進程的地址空間(這里的地址空間具體是哪個地方?)中。其他進程可以將 同一段共享內存連接到自己的地址空間中。所有進程都可以訪問共享內存中的地址,就好像它們是malloc分配的一樣。如果一個進程向共享內存中寫入了數據,所做的改動將立刻被其他進程看到。 
共享內存是 IPC最快捷的方式,因為共享內存方式的通信沒有中間過程,而管道、消息隊列等方式則是需要將數據通過中間機制進行轉換。共享內存方式直接將某段內存段進行映射,多個進程間的共享內存是同一塊的物理空間,僅僅映射到各進程的地址不同而已,因此不需要進行復制,可以直接使用此段空間。
注意:共享內存本身並沒有同步機制,需要程序員自己控制。 

 

消息隊列、信號量以及共享內存的相似之處:
它們被統稱為XSI IPC,它們在內核中有相似的IPC結構(消息隊列的msgid_ds,信號量的semid_ds,共享內存的shmid_ds),而且都用一個非負整數的標識符加以引用(消息隊列的msg_id,信號量的sem_id,共享內存的shm_id,分別通過msgget、semget以及shmget獲得),標志符是IPC對象的內部名,每個IPC對象都有一個鍵(key_t key)相關聯,將這個鍵作為該對象的外部名。

XSI IPC和PIPE、FIFO的區別:
1、XSI IPC的IPC結構是在系統范圍內起作用,沒用使用引用計數。如果一個進程創建一個消息隊列,並在消息隊列中放入幾個消息,進程終止后,即使現在已經沒有程序使用該消息隊列,消息隊列及其內容依然保留。而PIPE在最后一個引用管道的進程終止時,管道就被完全刪除了。對於FIFO最后一個引用FIFO的進程終止時,雖然FIFO還在系統,但是其中的內容會被刪除。
2、和PIPE、FIFO不一樣,XSI IPC不使用文件描述符,所以不能用ls查看IPC對象,不能用rm命令刪除,不能用chmod命令刪除它們的訪問權限。只能使用ipcs和ipcrm來查看可以刪除它們。

 

7. 內存映射(Memory Map)
內存映射文件,是由一個文件到一塊內存的映射。內存映射文件與 虛擬內存有些類似,通過內存映射文件可以保留一個地址的區域,
同時將物理存儲器提交給此區域,內存文件映射的物理存儲器來自一個已經存在於磁盤上的文件,而且在對該文件進行操作之前必須首先對文件進行映射。使用內存映射文件處理存儲於磁盤上的文件時,將不必再對文件執行I/O操作。 每一個使用該機制的進程通過把同一個共享的文件映射到自己的進程地址空間來實現多個進程間的通信(這里類似於共享內存,只要有一個進程對這塊映射文件的內存進行操作,其他進程也能夠馬上看到)。
使用內存映射文件不僅可以實現多個進程間的通信,還可以用於 處理大文件提高效率。因為我們普通的做法是 把磁盤上的文件先拷貝到內核空間的一個緩沖區再拷貝到用戶空間(內存),用戶修改后再將這些數據拷貝到緩沖區再拷貝到磁盤文件,一共四次拷貝。如果文件數據量很大,拷貝的開銷是非常大的。那么問題來了,系統在在進行內存映射文件就不需要數據拷貝?mmap()確實沒有進行數據拷貝,真正的拷貝是在在缺頁中斷處理時進行的,由於mmap()將文件直接映射到用戶空間,所以中斷處理函數根據這個映射關系,直接將文件從硬盤拷貝到用戶空間,所以只進行一次數據拷貝。效率高於read/write。

 

共享內存和內存映射文件的區別:
內存映射文件是利用虛擬內存把文件映射到進程的地址空間中去,在此之后進程操作文件,就像操作進程空間里的地址一樣了,比如使用c語言的memcpy等內存操作的函數。這種方法能夠很好的應用在需要頻繁處理一個文件或者是一個大文件的場合,這種方式處理IO效率比普通IO效率要高
  共享內存是內存映射文件的一種特殊情況,內存映射的是一塊內存,而非磁盤上的文件。共享內存的主語是進程(Process),操作系統默認會給每一個進程分配一個內存空間,每一個進程只允許訪問操作系統分配給它的哪一段內存,而不能訪問其他進程的。而有時候需要在不同進程之間訪問同一段內存,怎么辦呢?操作系統給出了 創建訪問共享內存的API,需要共享內存的進程可以通過這一組定義好的API來訪問多個進程之間共有的內存,各個進程訪問這一段內存就像訪問一個硬盤上的文件一樣。

內存映射文件與虛擬內存的區別和聯系:
內存映射文件和虛擬內存都是操作系統內存管理的重要部分,兩者有相似點也有不同點。
聯系:虛擬內存和內存映射都是將一部分內容加載到內存,另一部放在磁盤上的一種機制。對於用戶而言都是透明的。
區別:虛擬內存是硬盤的一部分,是內存和硬盤的數據交換區,許多程序運行過程中把暫時不用的程序數據放入這塊虛擬內存,節約內存資源。內存映射是一個文件到一塊內存的映射,這樣程序通過內存指針就可以對文件進行訪問。
虛擬內存的硬件基礎是分頁機制。另外一個基礎就是局部性原理(時間局部性和空間局部性),這樣就可以將程序的一部分裝入內存,其余部分留在外存,當訪問信息不存在,再將所需數據調入內存。而內存映射文件並不是局部性,而是使虛擬地址空間的某個區域銀蛇磁盤的全部或部分內容,通過該區域對被映射的磁盤文件進行訪問,不必進行文件I/O也不需要對文件內容進行緩沖處理。

 

8. 套接字 
套接字機制不但可以單機的不同進程通信,而且使得跨網機器間進程可以通信。 
套接字的創建和使用與管道是有區別的,套接字 明確地將客戶端與服務器 區分開來,可以實現多個客戶端連到同一服務器。 
服務器套接字連接過程描述: 
首先,服務器應用程序用socket創建一個套接字,它是系統分配服務器進程的類似文件描述符的資源。 接着,服務器調用bind給套接字命名。這個名字是一個標示符,它允許linux將進入的針對特定端口的連接轉到正確的服務器進程。 然后,系統調用listen函數開始接聽,等待客戶端連接。listen創建一個隊列並將其用於存放來自客戶端的進入連接。 當客戶端調用connect請求連接時,服務器調用accept接受客戶端連接,accept此時會創建一個新套接字,用於與這個客戶端進行通信。 
客戶端套接字連接過程描述: 
客戶端首先調用socket創建一個未命名套接字,讓后將服務器的命名套接字作為地址來調用connect與服務器建立連接。 
只要雙方連接建立成功,我們就可以像操作底層文件一樣來操作socket套接字實現通信。 

原文:https://blog.csdn.net/a987073381/article/details/52006729

 


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM