Python3多進程共享變量實現方法


今天同事反映一個問題讓幫忙看一下:多進程共用一個變量,在一個進程中修改后,在另外的進程中並沒有產生修改。

 

一、錯誤的實現方式

最初以為是沒添加global聲明導致修改未生效,但實際操作發現global方式在多進程中也只能讀不能寫。錯誤示例代碼如下:

import multiprocessing

# 聲明一個全局變量
share_var = ["start flag"]

def sub_process(process_name):
    # 企圖像單個進程那樣通過global聲明使用全局變量
    global share_var
    share_var.append(process_name)
    # 但是很可惜,在多進程中這樣引用只能讀,修改其他進程不會同步改變
    for item in share_var:
        print(f"{process_name}-{item}")
    pass

def main_process():
    process_list = []
    # 創建進程1
    process_name = "process 1"
    tmp_process = multiprocessing.Process(target=sub_process,args=(process_name,))
    process_list.append(tmp_process)
    # 創建進程2
    process_name = "process 2"
    tmp_process = multiprocessing.Process(target=sub_process, args=(process_name,))
    process_list.append(tmp_process)
    # 啟動所有進程
    for process in process_list:
        process.start()
    for process in process_list:
        process.join()

if __name__ == "__main__":
    main_process()

執行結果如下,可以看到進程1中的修改未表現在進程2中(不過要注意,和多線程一樣,如果運算量再大一點進程1並不一定比進程2先執行):

 

二、共享普通類型變量實現方法

參考:https://blog.csdn.net/houyanhua1/article/details/78244288

import multiprocessing

# 不能將共享變量和共享鎖定義成全局變量然后通過global引用那樣會報錯,只能傳過來
def sub_process(process_name,share_var,share_lock):
    # 獲取鎖
    share_lock.acquire()
    share_var.append(process_name)
    # 釋放鎖
    share_lock.release()
    for item in share_var:
        print(f"{process_name}-{item}")
    pass

def main_process():
    # 單個值聲明方式。typecode是進制類型,C寫法和Python寫法都可以,見下方表格;value是初始值。
    # 這種單值形式取值賦值需要通過get()/set()方法進行,不能直接如一般變量那樣取值賦值
    # share_var = multiprocessing.Manager().Value(typecode, value)
    # 數組聲明方式。typecode是數組變量中的變量類型,sequence是數組初始值
    # share_var = multiprocessing.Manager().Array(typecode, sequence)
    # 字典聲明方式
    # share_var = multiprocessing.Manager().dict()
    # 列表聲明方式
    share_var = multiprocessing.Manager().list()
    share_var.append("start flag")
    # 聲明一個進程級共享鎖
    # 不要給多進程傳threading.Lock()或者queue.Queue()等使用線程鎖的變量,得用其進程級相對應的類
    # 不然會報如“TypeError: can't pickle _thread.lock objects”之類的報錯
    share_lock = multiprocessing.Manager().Lock()
    process_list = []

    process_name = "process 1"
    tmp_process = multiprocessing.Process(target=sub_process,args=(process_name,share_var,share_lock))
    process_list.append(tmp_process)

    process_name = "process 2"
    tmp_process = multiprocessing.Process(target=sub_process, args=(process_name,share_var,share_lock))
    process_list.append(tmp_process)

    for process in process_list:
        process.start()
    for process in process_list:
        process.join()

if __name__ == "__main__":
    main_process()

執行結果如下,可以看到進程1中的修改已表現在進程2中(不過要注意,和多線程一樣,如果運算量再大一點進程1並不一定比進程2先執行):

typecode如果是數值或單個字符,可為以下類型(注意有引號):

Type Code C Type Python Type
'c' char character
'b' signed char int
'B' unsigned char int
'u' Py_UNICODE unicode character
'h' signed short int
'H' unsigned short int
'i' signed int int
'I' unsigned int int
'l' signed long int
'L' unsigned long int
'f' float float
'd' double float

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

如果是字符串類型,typecode可為以下第一列形式(注意無引號):

ctypes type C type Python type

c_bool

_Bool bool (1)
char  char 1-character string
c_wchar wchar_t 1-character unicode string
c_byte char int/long
c_ubyte unsigned char int/long
c_short short int/long
c_ushort unsigned short int/long
c_int int int/long
c_uint unsigned in int/long
c_long long int/long
c_ulong unsigned long int/long
c_longlong __int64 or long long int/long
c_ulonglong unsigned __int64 or unsigned long long int/long
c_float float float
c_double double float
c_longdouble long double float
c_char_p char * (NUL terminated) string or None

c_wchar_p

wchar_t * (NUL terminated) unicode or None
c_void_p void * int/long or None

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

三、共享實例化對象實現方法

同事還想共享一個文件對象,然后問上邊的方法是不是只能共享字典、列表,沒法共享對象。

回頭一看,Value和Array中typecode要求是c語言中存在的類型,其他只有dict()和list()方法沒有其他方法,所以似乎上邊的方法共享實例化對象是不行的。

 

3.1 共享不需要修改實例化對象實現方法----使用global

但我們前面說過global方式不可以修改,但讀還是沒問題的;所以對象引用還是可以使用global方式。

import multiprocessing
import threading

# 實例化一個全局文件對象
file_obj = open("1.txt","a")
share_lock = threading.Lock()

def sub_process(process_name):
    global file_obj,share_lock
    share_lock.acquire()
    file_obj.writelines(f"{process_name}")
    share_lock.release()
    pass

def main_process():
    process_list = []
    # 創建進程1
    process_name = "process 1"
    tmp_process = multiprocessing.Process(target=sub_process,args=(process_name,))
    process_list.append(tmp_process)
    # 創建進程2
    process_name = "process 2"
    tmp_process = multiprocessing.Process(target=sub_process, args=(process_name,))
    process_list.append(tmp_process)
    # 啟動所有進程
    for process in process_list:
        process.start()
    for process in process_list:
        process.join()

if __name__ == "__main__":
    main_process()

 

3.2 共享需要修改實例化對象實現方法----使用BaseManager

global方式不能修改變量(如要修改其成員變量),在大多時候也是可以了,但總讓人覺得不是一種完美的實現方法。有沒有可以修改的實現方法呢,答案是有的,可以使用BaseManager。示例代碼如下。

參考:https://blog.csdn.net/jacke121/article/details/82658471

import multiprocessing
from multiprocessing.managers import BaseManager
import threading

# 鎖可以通過global也可以在Process中傳無所謂
share_lock = threading.Lock()

# 定義一個要共享實例化對象的類
class Test():
    def __init__(self):
        self.test_list = ["start flag"]

    def test_function(self,arg):
        self.test_list.append(arg)

    def print_test_list(self):
        for item in self.test_list:
            print(f"{item}")

def sub_process(process_name,obj):
    global share_lock
    share_lock.acquire()
    obj.test_function(f"{process_name}")
    share_lock.release()
    obj.print_test_list()
    pass

def main_process():
    # 如果是想注冊open方法這樣操作
    # manager = BaseManager()
    # # 一定要在start前注冊,不然就注冊無效
    # manager.register('open', open)
    # manager.start()
    # obj = manager.open("1.txt","a")

    # 為了更加直接我們直接以一個Test類的實例化對象來演示
    manager = BaseManager()
    # 一定要在start前注冊,不然就注冊無效
    manager.register('Test', Test)
    manager.start()
    obj = manager.Test()

    process_list = []
    # 創建進程1
    process_name = "process 1"
    tmp_process = multiprocessing.Process(target=sub_process,args=(process_name,obj))
    process_list.append(tmp_process)
    # 創建進程2
    process_name = "process 2"
    tmp_process = multiprocessing.Process(target=sub_process, args=(process_name,obj))
    process_list.append(tmp_process)
    # 啟動所有進程
    for process in process_list:
        process.start()
    for process in process_list:
        process.join()


if __name__ == "__main__":
    main_process()

執行結果如下,可以看到進程1中的修改已表現在進程2中(不過要注意,和多線程一樣,如果運算量再大一點進程1並不一定比進程2先執行):

 

參考:

https://blog.51cto.com/11026142/1874807

https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM