多進程IPC與Python支持


多進程IPC與Python支持

linux下進程間通信的幾種主要手段簡介:

  1. 管道(Pipe)及有名管道(named pipe):管道可用於具有親緣關系進程間的通信,有名管道克服了管道沒有名字的限制,因此,除具有管道所具有的功能外,它還允許無親緣關系進程間的通信;

  2. 信號(Signal):信號是比較復雜的通信方式,用於通知接受進程有某種事件發生,除了用於進程間通信外,進程還可以發送信號給進程本身;linux除了支持Unix早期信號語義函數sigal外,還支持語義符合Posix.1標准的信號函數sigaction(實際上,該函數是基於BSD的,BSD為了實現可靠信號機制,又能夠統一對外接口,用sigaction函數重新實現了signal函數);

  3. 報文(Message)隊列(消息隊列):消息隊列是消息的鏈接表,包括Posix消息隊列system V消息隊列。有足夠權限的進程可以向隊列中添加消息,被賦予讀權限的進程則可以讀走隊列中的消息。消息隊列克服了信號承載信息量少,管道只能承載無格式字節流以及緩沖區大小受限等缺點。

  4. 共享內存:使得多個進程可以訪問同一塊內存空間,是最快的可用IPC形式。是針對其他通信機制運行效率較低而設計的。往往與其它通信機制,如信號量結合使用,來達到進程間的同步及互斥。

  5. 信號量(semaphore):主要作為進程間以及同一進程不同線程之間的同步手段。

  6. 套接口(Socket):更為一般的進程間通信機制,可用於不同機器之間的進程間通信。起初是由Unix系統的BSD分支開發出來的,但現在一般可以移植到其它類Unix系統上:Linux和System V的變種都支持套接字。

python 原生支持的有:1, 2, 6. 信號這個比較簡單, 一種注冊監聽機制.本文不涉及

管道是可以通過 (mutiprocessing.Pipe) 獲得, 由c寫的

套接字這個通過 AF_UNIX協議 就可以完成啦, 和網絡編程類似的~

其實仔細想想還有第三種即, 利用文件, 生產者寫到文件中, 消費者從文件中讀.(簡單化成一個生產者, 一個消費者, 否者競爭關系有點復雜.), 當然我們知道文件寫入肯定很慢, 但是有多慢還是要測試一下的.

工具函數:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
#   Author  :   zhangxiaolin
#   E-mail  :   petelin1120@gmail.com
#   Date    :   17/8/17 12:08
#   Desc    :   ...
# through pipe 269667.7903995848 KB/s

data_size = 8 * 1024  # KB


def gen_data(size):
    onekb = "a" * 1024
    return (onekb * size).encode('ascii')

  

管道:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
#   Author  :   zhangxiaolin
#   E-mail  :   petelin1120@gmail.com
#   Date    :   17/8/17 12:08
#   Desc    :   ...
import multiprocessing

from mutiprocesscomunicate import gen_data, data_size


def send_data_task(pipe_out):
    for i in range(data_size):
        pipe_out.send(gen_data(1))
    # end EOF
    pipe_out.send("")
    print('send done.')


def get_data_task(pipe_in):
    while True:
        data = pipe_in.recv()
        if not data:
            break
    print("recv done.")


if __name__ == '__main__':
    pipe_in, pipe_out = multiprocessing.Pipe(False)
    p = multiprocessing.Process(target=send_data_task, args=(pipe_out,), kwargs=())
    p1 = multiprocessing.Process(target=get_data_task, args=(pipe_in,), kwargs=())

    p.daemon = True
    p1.daemon = True
    import time

    start_time = time.time()
    p1.start()
    p.start()
    p.join()
    p1.join()
    print('through pipe', data_size / (time.time() - start_time), 'KB/s')

  

注意這個地方 Pipe(True)默認為雙工的, 然而標准的是單工的, 單工緩沖區大小在OSX上有64KB, 設置緩存區是為了協調流入流出速率, 否者寫的太快, 沒人取走也是浪費. 結果: through pipe 99354.71358973449 KB/s

file

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
import os
from mutiprocesscomunicate import gen_data, data_size


def send_data_task(file_name):
    # 是否同步寫入磁盤, 如果同步寫進去, 慢的一 b, 牛逼的是, 不同步寫進去, 也可以讀.操作系統厲害了.
    # os.sync()
    with open(file_name, 'wb+') as fd:
        for i in range(data_size):
            fd.write(gen_data(1))
            fd.write('\n'.encode('ascii'))
            # end EOF
        fd.write('EOF'.encode('ascii'))
    print('send done.')


def get_data_task(file_name):
    offset = 0
    fd = open(file_name, 'r+')
    i = 0
    while True:
        data = fd.read(1024)
        offset += len(data)
        if 'EOF' in data:
            fd.truncate()
            break
        if not data:
            fd.close()
            fd = None
            fd = open(file_name, 'r+')
            fd.seek(offset)
            continue
    print("recv done.")


if __name__ == '__main__':
    import multiprocessing

    pipe_out = pipe_in = 'throught_file'
    p = multiprocessing.Process(target=send_data_task, args=(pipe_out,), kwargs=())
    p1 = multiprocessing.Process(target=get_data_task, args=(pipe_in,), kwargs=())

    p.daemon = True
    p1.daemon = True
    import time

    start_time = time.time()
    p1.start()
    import time

    time.sleep(0.5)
    p.start()
    p.join()
    p1.join()
    import os
    print('through file', data_size / (time.time() - start_time), 'KB/s')
    open(pipe_in, 'w+').truncate()

  

有兩個點, 一個是, 打開文件之后, 如果有人在寫入, 需要重新打開才能發現新內容, 另外需要設置offset,只讀取新內容.

!!!重點, 測試的時候這個速度有 through file 110403.02025891568 KB/s這么多, 甚至比管道還要高一點, 這是怎么回事呢?

quite often file data is first written into the page cache (which is in RAM) by the OS kernel.

並沒有被寫入文件, 而是被寫到內存中了, 隨后(不會通知你)被操作系統調度寫入文件.操作系統比較厲害的是, 即使沒有寫到文件中, 讀寫仍然像寫到文件中一樣.

如果設置了 os.sync(), 所有寫操作立即執行, 會發現慢的…類似於卡死.

本地socket

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
#   Author  :   zhangxiaolin
#   E-mail  :   petelin1120@gmail.com
#   Date    :   17/8/17 12:08
#   Desc    :   ...
import multiprocessing
import os
import socket

from mutiprocesscomunicate import gen_data, data_size


minissdpdSocket = '/tmp/m.sock'  # The socket for talking to minissdpd


def send_data_task():
    with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server:
        server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        try:
            os.remove(minissdpdSocket)
        except OSError:
            pass

        server.bind(minissdpdSocket)

        server.listen(1)

        conn, _ = server.accept()
        with conn:
            for i in range(data_size):
                conn.send(gen_data(1))
            conn.shutdown(socket.SHUT_WR)
            print('send done.')


def get_data_task():
    with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
        client.connect(minissdpdSocket)
        client.shutdown(socket.SHUT_WR)
        while True:
            data = client.recv(1024)
            if not data:
                break
        print("recv done.")


if __name__ == '__main__':
    p = multiprocessing.Process(target=send_data_task, args=(), kwargs=())
    p1 = multiprocessing.Process(target=get_data_task, args=(), kwargs=())

    p.daemon = True
    p1.daemon = True
    import time

    start_time = time.time()
    p.start()

    p1.start()
    p.join()
    p1.join()
    print('through pipe', data_size / (time.time() - start_time), 'KB/s')

  

本地socket, 會走傳輸層也就是被tcp或者udp封裝一下,到網絡層,網絡層自己有路由表, 發現是本機, 則走本地回環接口, 不經過物理網卡, 發到接受隊列中去.

這個速度不穩定, 最快有through socket 261834.36615940317 KB/s

參考

  1. 深刻理解Linux進程間通信(IPC)
  2. 文件沒有直接寫入磁盤
  3. pipe緩存區大小


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM