Python進程間通信


原文鏈接:https://zhuanlan.zhihu.com/p/37370601

作者:老錢

其他參考:https://www.cnblogs.com/zgq0/p/8780893.html

 

進程間通信的幾種主要手段簡介:

  1. 管道(Pipe)及有名管道(named pipe):管道可用於具有親緣關系進程間的通信,有名管道克服了管道沒有名字的限制,因此,除具有管道所具有的功能外,它還允許無親緣關系進程間的通信;
  2. 信號(Signal):信號是比較復雜的通信方式,用於通知接受進程有某種事件發生,除了用於進程間通信外,進程還可以發送信號給進程本身;linux除了支持Unix早期信號語義函數sigal外,還支持語義符合Posix.1標准的信號函數sigaction(實際上,該函數是基於BSD的,BSD為了實現可靠信號機制,又能夠統一對外接口,用sigaction函數重新實現了signal函數);
  3. 報文(Message)隊列(消息隊列):消息隊列是消息的鏈接表,包括Posix消息隊列system V消息隊列。有足夠權限的進程可以向隊列中添加消息,被賦予讀權限的進程則可以讀走隊列中的消息。消息隊列克服了信號承載信息量少,管道只能承載無格式字節流以及緩沖區大小受限等缺點。
  4. 共享內存:使得多個進程可以訪問同一塊內存空間,是最快的可用IPC形式。是針對其他通信機制運行效率較低而設計的。往往與其它通信機制,如信號量結合使用,來達到進程間的同步及互斥。
  5. 信號量(semaphore):主要作為進程間以及同一進程不同線程之間的同步手段。
  6. 套接口(Socket):更為一般的進程間通信機制,可用於不同機器之間的進程間通信。起初是由Unix系統的BSD分支開發出來的,但現在一般可以移植到其它類Unix系統上:Linux和System V的變種都支持套接字。

文件

使用文件進行通信是最簡單的一種通信方式,子進程將結果輸出到臨時文件,父進程從文件中讀出來。文件名使用子進程的進程id來命名。進程隨時都可以通過os.getpid()來獲取自己的進程id。

# coding: utf-8

import os
import sys
import math


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    pids = []
    unit = n / 10
    for i in range(10):  # 分10個子進程
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            pids.append(pid)
        else:
            s = slice(mink, maxk)  # 子進程開始計算
            with open("%d" % os.getpid(), "w") as f:
                f.write(str(s))
            sys.exit(0)  # 子進程結束
    sums = []
    for pid in pids:
        os.waitpid(pid, 0)  # 等待子進程結束
        with open("%d" % pid, "r") as f:
            sums.append(float(f.read()))
        os.remove("%d" % pid)  # 刪除通信的文件
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

輸出

3.14159262176

管道pipe

管道是Unix進程間通信最常用的方法之一,它通過在父子進程之間開通讀寫通道來進行雙工交流。我們通過os.read()和os.write()來對文件描述符進行讀寫操作,使用os.close()關閉描述符。

上圖為單進程的管道

 

上圖為父子進程分離后的管道

# coding: utf-8

import os
import sys
import math


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    childs = {}
    unit = n / 10
    for i in range(10):  # 分10個子進程
        mink = unit * i
        maxk = mink + unit
        r, w = os.pipe()
        pid = os.fork()
        if pid > 0:
            childs[pid] = r  # 將子進程的pid和讀描述符存起來
            os.close(w)  # 父進程關閉寫描述符,只讀
        else:
            os.close(r)  # 子進程關閉讀描述符,只寫
            s = slice(mink, maxk)  # 子進程開始計算
            os.write(w, str(s))
            os.close(w)  # 寫完了,關閉寫描述符
            sys.exit(0)  # 子進程結束
    sums = []
    for pid, r in childs.items():
        sums.append(float(os.read(r, 1024)))
        os.close(r)  # 讀完了,關閉讀描述符
        os.waitpid(pid, 0)  # 等待子進程結束
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

輸出

3.14159262176

有名管道fifo

相對於管道只能用於父子進程之間通信,Unix還提供了有名管道可以讓任意進程進行通信。有名管道又稱fifo,它會將自己注冊到文件系統里一個文件,參數通信的進程通過讀寫這個文件進行通信。
fifo要求讀寫雙方必須同時打開才可以繼續進行讀寫操作,否則打開操作會堵塞直到對方也打開。

# coding: utf-8

import os
import sys
import math


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    childs = []
    unit = n / 10
    fifo_path = "/tmp/fifo_pi"
    os.mkfifo(fifo_path)  # 創建named pipe
    for i in range(10):  # 分10個子進程
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            childs.append(pid)
        else:
            s = slice(mink, maxk)  # 子進程開始計算
            with open(fifo_path, "w") as ff:
                ff.write(str(s) + "\n")
            sys.exit(0)  # 子進程結束
    sums = []
    while True:
        with open(fifo_path, "r") as ff:
            # 子進程關閉寫端,讀進程會收到eof
            # 所以必須循環打開,多次讀取
            # 讀夠數量了就可以結束循環了
            sums.extend([float(x) for x in ff.read(1024).strip().split("\n")])
            if len(sums) == len(childs):
                break
    for pid in childs:
        os.waitpid(pid, 0)  # 等待子進程結束
    os.unlink(fifo_path)  # 移除named pipe
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

輸出

3.14159262176

以太網套接字

套接字無疑是通信使用最為廣泛的方式了,它不但能跨進程還能跨網絡。今天英特網能發達成這樣,全拜套接字所賜。不過作為同一個機器的多進程通信還是挺浪費的。暫不討論這個,還是先看看它如何使用吧。

# coding: utf-8

import os
import sys
import math
import socket


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    childs = []
    unit = n / 10
    servsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 注意這里的AF_INET表示普通套接字
    servsock.bind(("localhost", 0))  # 0表示隨機端口
    server_address = servsock.getsockname()  # 拿到隨機出來的地址,給后面的子進程使用
    servsock.listen(10)  # 監聽子進程連接請求
    for i in range(10):  # 分10個子進程
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            childs.append(pid)
        else:
            servsock.close()  # 子進程要關閉servsock引用
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.connect(server_address)  # 連接父進程套接字
            s = slice(mink, maxk)  # 子進程開始計算
            sock.sendall(str(s))
            sock.close()  # 關閉連接
            sys.exit(0)  # 子進程結束
    sums = []
    for pid in childs:
        conn, _ = servsock.accept()  # 接收子進程連接
        sums.append(float(conn.recv(1024)))
        conn.close()  # 關閉連接
    for pid in childs:
        os.waitpid(pid, 0)  # 等待子進程結束
    servsock.close()  # 關閉套接字
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

輸出

3.14159262176

Unix域套接字

當同一個機器的多個進程使用普通套接字進行通信時,需要經過網絡協議棧,這非常浪費,因為同一個機器根本沒有必要走網絡。所以Unix提供了一個套接字的特殊版本,它使用和套接字一摸一樣的api,但是地址不再是網絡端口,而是文件。相當於我們通過某個特殊文件來進行套接字通信。

# coding: utf-8

import os
import sys
import math
import socket


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    server_address = "/tmp/pi_sock"  # 套接字對應的文件名
    childs = []
    unit = n / 10
    servsock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    servsock.bind(server_address)
    servsock.listen(10)  # 監聽子進程連接請求
    for i in range(10):  # 分10個子進程
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            childs.append(pid)
        else:
            servsock.close()  # 子進程要關閉servsock引用
            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            sock.connect(server_address)  # 連接父進程套接字
            s = slice(mink, maxk)  # 子進程開始計算
            sock.sendall(str(s))
            sock.close()  # 關閉連接
            sys.exit(0)  # 子進程結束
    sums = []
    for pid in childs:
        conn, _ = servsock.accept()  # 接收子進程連接
        sums.append(float(conn.recv(1024)))
        conn.close()  # 關閉連接
    for pid in childs:
        os.waitpid(pid, 0)  # 等待子進程結束
    servsock.close()  # 關閉套接字
    os.unlink(server_address)  # 移除套接字文件
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

輸出

3.14159262176

無名套接字socketpair

我們知道跨網絡通信免不了要通過套接字進行通信,但是本例的多進程是在同一個機器上,用不着跨網絡,使用普通套接字進行通信有點浪費。

上圖為單進程的socketpair

 

上圖為父子進程分離后的socketpair

為了解決這個問題,Unix系統提供了無名套接字socketpair,不需要端口也可以創建套接字,父子進程通過socketpair來進行全雙工通信。

socketpair返回兩個套接字對象,一個用於讀一個用於寫,它有點類似於pipe,只不過pipe返回的是兩個文件描述符,都是整數。所以寫起代碼形式上跟pipe幾乎沒有什么區別。

我們使用sock.send()和sock.recv()來對套接字進行讀寫,通過sock.close()來關閉套接字對象。

# coding: utf-8

import os
import sys
import math
import socket


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    childs = {}
    unit = n / 10
    for i in range(10):  # 分10個子進程
        mink = unit * i
        maxk = mink + unit
        rsock, wsock = socket.socketpair()
        pid = os.fork()
        if pid > 0:
            childs[pid] = rsock
            wsock.close()
        else:
            rsock.close()
            s = slice(mink, maxk)  # 子進程開始計算
            wsock.send(str(s))
            wsock.close()
            sys.exit(0)  # 子進程結束
    sums = []
    for pid, rsock in childs.items():
        sums.append(float(rsock.recv(1024)))
        rsock.close()
        os.waitpid(pid, 0)  # 等待子進程結束
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

輸出

3.14159262176

OS消息隊列

操作系統也提供了跨進程的消息隊列對象可以讓我們直接使用,只不過python沒有默認提供包裝好的api來直接使用。我們必須使用第三方擴展來完成OS消息隊列通信。第三方擴展是通過使用Python包裝的C實現來完成的。

OS消息隊列有兩種形式,一種是posix消息隊列,另一種是systemv消息隊列,有些操作系統兩者都支持,有些只支持其中的一個,比如macos僅支持systemv消息隊列,我本地的python的docker鏡像是debian linux,它僅支持posix消息隊列。

posix消息隊列
我們先使用posix消息隊列來完成圓周率的計算,posix消息隊列需要提供一個唯一的名稱,它必須是/開頭。close()方法僅僅是減少內核消息隊列對象的引用,而不是徹底關閉它。unlink()方法才能徹底銷毀它。O_CREAT選項表示如果不存在就創建。向隊列里塞消息使用send方法,收取消息使用receive方法,receive方法返回一個tuple,tuple的第一個值是消息的內容,第二個值是消息的優先級。之所以有優先級,是因為posix消息隊列支持消息的排序,在send方法的第二個參數可以提供優先級整數值,默認為0,越大優先級越高。

# coding: utf-8

import os
import sys
import math
from posix_ipc import MessageQueue as Queue


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    pids = []
    unit = n / 10
    q = Queue("/pi", flags=os.O_CREAT)
    for i in range(10):  # 分10個子進程
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            pids.append(pid)
        else:
            s = slice(mink, maxk)  # 子進程開始計算
            q.send(str(s))
            q.close()
            sys.exit(0)  # 子進程結束
    sums = []
    for pid in pids:
        sums.append(float(q.receive()[0]))
        os.waitpid(pid, 0)  # 等待子進程結束
    q.close()
    q.unlink()  # 徹底銷毀隊列
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

輸出

3.14159262176

systemv消息隊列
systemv消息隊列和posix消息隊列用起來有所不同。systemv的消息隊列是以整數key作為名稱,如果不指定,它就創建一個唯一的未占用的整數key。它還提供消息類型的整數參數,但是不支持消息優先級。

# coding: utf-8

import os
import sys
import math
import sysv_ipc
from sysv_ipc import MessageQueue as Queue


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    pids = []
    unit = n / 10
    q = Queue(key=None, flags=sysv_ipc.IPC_CREX)
    for i in range(10):  # 分10個子進程
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            pids.append(pid)
        else:
            s = slice(mink, maxk)  # 子進程開始計算
            q.send(str(s))
            sys.exit(0)  # 子進程結束
    sums = []
    for pid in pids:
        sums.append(float(q.receive()[0]))
        os.waitpid(pid, 0)  # 等待子進程結束
    q.remove()  # 銷毀消息隊列
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

輸出

3.14159262176

共享內存

共享內存也是非常常見的多進程通信方式,操作系統負責將同一份物理地址的內存映射到多個進程的不同的虛擬地址空間中。進而每個進程都可以操作這份內存。考慮到物理內存的唯一性,它屬於臨界區資源,需要在進程訪問時搞好並發控制,比如使用信號量。我們通過一個信號量來控制所有子進程的順序讀寫共享內存。我們分配一個8字節double類型的共享內存用來存儲極限的和,每次從共享內存中讀出來時,要使用struct進行反序列化(unpack),將新的值寫進去之前也要使用struct進行序列化(pack)。每次讀寫操作都需要將讀寫指針移動到內存開頭位置(lseek)。

# coding: utf-8
import os
import sys
import math
import struct
import posix_ipc
from posix_ipc import Semaphore
from posix_ipc import SharedMemory as Memory

def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s

def pi(n):
    pids = []
    unit = n / 10
    sem_lock = Semaphore("/pi_sem_lock", flags=posix_ipc.O_CREX, initial_value=1)  # 使用一個信號量控制多個進程互斥訪問共享內存
    memory = Memory("/pi_rw", size=8, flags=posix_ipc.O_CREX)
    os.lseek(memory.fd, 0, os.SEEK_SET)  # 初始化和為0.0的double值
    os.write(memory.fd, struct.pack('d', 0.0))
    for i in range(10):  # 分10個子進程
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            pids.append(pid)
        else:
            s = slice(mink, maxk)  # 子進程開始計算
            sem_lock.acquire()
            try:
                os.lseek(memory.fd, 0, os.SEEK_SET)
                bs = os.read(memory.fd, 8)  # 從共享內存讀出來當前值
                cur_val, = struct.unpack('d', bs)  # 反序列化,逗號不能少
                cur_val += s  # 加上當前進程的計算結果
                bs = struct.pack('d', cur_val) # 序列化
                os.lseek(memory.fd, 0, os.SEEK_SET)
                os.write(memory.fd, bs)  # 寫進共享內存
                memory.close_fd()
            finally:
                sem_lock.release()
            sys.exit(0)  # 子進程結束
    sums = []
    for pid in pids:
        os.waitpid(pid, 0)  # 等待子進程結束
    os.lseek(memory.fd, 0, os.SEEK_SET)
    bs = os.read(memory.fd, 8)  # 讀出最終這結果
    sums, = struct.unpack('d', bs)  # 反序列化
    memory.close_fd()  # 關閉共享內存
    memory.unlink()  # 銷毀共享內存
    sem_lock.unlink()  #  銷毀信號量
    return math.sqrt(sums * 8)

print pi(10000000)

輸出

3.14159262176


總結:

1.管道:速度慢,容量有限,只有父子進程能通訊    

2.FIFO:任何進程間都能通訊,但速度慢    

3.消息隊列:容量受到系統限制,且要注意第一次讀的時候,要考慮上一次沒有讀完數據的問題    

4.信號量:不能傳遞復雜消息,只能用來同步    

5.共享內存區:能夠很容易控制容量,速度快,但要保持同步,比如一個進程在寫的時候,另一個進程要注意讀寫的問題,相當於線程中的線程安全,當然,共享內存區同樣可以用作線程間通訊,不過沒這個必要,線程間本來就已經共享了同一進程內的一塊內存

 

https://www.zhihu.com/people/codehole


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM