python中多線程與多進程中的數據共享問題


之前在寫多線程與多進程的時候,因為一般情況下都是各自完成各自的任務,各個子線程或者各個子進程之前並沒有太多的聯系,如果需要通信的話我會使用隊列或者數據庫來完成,但是最近我在寫一些多線程與多進程的代碼時,發現如果它們需要用到共享變量的話,需要有一些注意的地方

多線程之間的共享數據

標准數據類型在線程間共享

看以下代碼

#coding:utf-8
import threading

def test(name,data):
    print("in thread {} name is {}".format(threading.current_thread(),name))
    print("data is {} id(data) is {}".format(data,id(data)))


if __name__ == '__main__':
    d = 5
    name = "楊彥星"
    for i in range(5):
        th = threading.Thread(target=test,args=(name,d))
        th.start()

這里我創建一個全局的int變量d,它的值是5,當我在5個線程中調用test函數時,將d作為參數傳進去,那么這5個線程所擁有的是同一個d嗎?我在test函數中通過id(data) 來打印一下它們的ID,得到了如下的結果

in thread <Thread(Thread-1, started 6624)> name is 楊彥星
data is 5 id(data) is 1763791776
in thread <Thread(Thread-2, started 8108)> name is 楊彥星
data is 5 id(data) is 1763791776
in thread <Thread(Thread-3, started 3356)> name is 楊彥星
data is 5 id(data) is 1763791776
in thread <Thread(Thread-4, started 13728)> name is 楊彥星
data is 5 id(data) is 1763791776
in thread <Thread(Thread-5, started 3712)> name is 楊彥星
data is 5 id(data) is 1763791776

從結果中可以看到,在5個子線程中,data的id都是1763791776,說明在主線程中創建了變量d,在子線程中是可以共享的,在子線程中對共享元素的改變是會影響到其它線程的,所以如果要對共享變量進行修改時,也就是線程不安全的,需要加鎖。

自定義類型對象在線程間共享

如果我們要自定義一個類呢,將一個對象作為變量在子線程中傳遞呢?會是什么效果呢?

#coding:utf-8
import threading

class Data:
    def __init__(self,data=None):
        self.data = data

    def get(self):
        return self.data

    def set(self,data):
        self.data = data

def test(name,data):
    print("in thread {} name is {}".format(threading.current_thread(),name))
    print("data is {} id(data) is {}".format(data.get(),id(data)))


if __name__ == '__main__':
    d = Data(10)
    name = "楊彥星"
    print("in main thread id(data) is {}".format(id(d)))
    for i in range(5):
        th = threading.Thread(target=test,args=(name,d))
        th.start()

這里我定義一個簡單的類,在主線程初始化了一個該類型的對象d,然后將它作為參數傳給子線程,主線程和子線程分別打印了這個對象的id,我們來看一下結果

in main thread id(data) is 2849240813864
in thread <Thread(Thread-1, started 11648)> name is 楊彥星
data is 10 id(data) is 2849240813864
in thread <Thread(Thread-2, started 11016)> name is 楊彥星
data is 10 id(data) is 2849240813864
in thread <Thread(Thread-3, started 10416)> name is 楊彥星
data is 10 id(data) is 2849240813864
in thread <Thread(Thread-4, started 8668)> name is 楊彥星
data is 10 id(data) is 2849240813864
in thread <Thread(Thread-5, started 4420)> name is 楊彥星
data is 10 id(data) is 2849240813864

我們看到,在主線程和子線程中,這個對象的id是一樣的,說明它們用的是同一個對象。

無論是標准數據類型還是復雜的自定義數據類型,它們在多線程之間是共享同一個的,但是在多進程中是這樣的嗎?

多進程之間的共享數據

標准數據類型在進程間共享

還是上面的代碼,我們先來看一下int類型的變量的子進程間的共享

#coding:utf-8
import threading
import multiprocessing

def test(name,data):
    print("in thread {} name is {}".format(threading.current_thread(),name))
    print("data is {} id(data) is {}".format(data,id(data)))


if __name__ == '__main__':
    d = 10
    name = "楊彥星"
    print("in main thread id(data) is {}".format(id(d)))
    for i in range(5):
        pro = multiprocessing.Process(target=test,args=(name,d))
        pro.start()

得到的結果是

in main thread id(data) is 1763791936
in thread <_MainThread(MainThread, started 9364)> name is 楊彥星
data is 10 id(data) is 1763791936
in thread <_MainThread(MainThread, started 9464)> name is 楊彥星
data is 10 id(data) is 1763791936
in thread <_MainThread(MainThread, started 3964)> name is 楊彥星
data is 10 id(data) is 1763791936
in thread <_MainThread(MainThread, started 10480)> name is 楊彥星
data is 10 id(data) is 1763791936
in thread <_MainThread(MainThread, started 13608)> name is 楊彥星
data is 10 id(data) is 1763791936

可以看到它們的id是一樣的,說明用的是同一個變量,但是當我嘗試把d由int變為了string時,發現它們又不一樣了……

if __name__ == '__main__':
    d = 'yangyanxing'
    name = "楊彥星"
    print("in main thread id(data) is {}".format(id(d)))
    for i in range(5):
        pro = multiprocessing.Process(target=test,args=(name,d))
        pro.start()

此時得到的結果是

in main thread id(data) is 2629633397040
in thread <_MainThread(MainThread, started 9848)> name is 楊彥星
data is yangyanxing id(data) is 1390942032880
in thread <_MainThread(MainThread, started 988)> name is 楊彥星
data is yangyanxing id(data) is 2198251377648
in thread <_MainThread(MainThread, started 3728)> name is 楊彥星
data is yangyanxing id(data) is 2708672287728
in thread <_MainThread(MainThread, started 5288)> name is 楊彥星
data is yangyanxing id(data) is 2376058999792
in thread <_MainThread(MainThread, started 12508)> name is 楊彥星
data is yangyanxing id(data) is 2261044040688

於是我又嘗試了list、Tuple、dict,結果它們都是不一樣的,我又回過頭來試着在多線程中使用列表元組和字典,結果在多線程中它們的id還是一樣的。

這里有一個有趣的問題,如果是int類型,當值小於等於256時,它們在多進程間的id是相同的,如果大於256,則它們的id就會不同了,這個我沒有查看原因。

自定義類型對象在進程間共享

#coding:utf-8
import threading
import multiprocessing

class Data:
    def __init__(self,data=None):
        self.data = data

    def get(self):
        return self.data

    def set(self,data):
        self.data = data

def test(name,data):
    print("in thread {} name is {}".format(threading.current_thread(),name))
    print("data is {} id(data) is {}".format(data.get(),id(data)))


if __name__ == '__main__':
    d = Data(10)
    name = "楊彥星"
    print("in main thread id(data) is {}".format(id(d)))
    for i in range(5):
        pro = multiprocessing.Process(target=test,args=(name,d))
        pro.start()

得到的結果是

in main thread id(data) is 1927286591728
in thread <_MainThread(MainThread, started 2408)> name is 楊彥星
data is 10 id(data) is 1561177927752
in thread <_MainThread(MainThread, started 5728)> name is 楊彥星
data is 10 id(data) is 2235260514376
in thread <_MainThread(MainThread, started 1476)> name is 楊彥星
data is 10 id(data) is 2350586073040
in thread <_MainThread(MainThread, started 996)> name is 楊彥星
data is 10 id(data) is 2125002248088
in thread <_MainThread(MainThread, started 10740)> name is 楊彥星
data is 10 id(data) is 1512231669656

可以看到它們的id是不同的,也就是不同的對象。

在多進程間如何共享數據

我們看到,數據在多進程間是不共享的(小於256的int類型除外),但是我們又想在主進程和子進程間共享一個數據對象時該如何操作呢?

在看這個問題之前,我們先將之前的多線程代碼做下修改

#coding:utf-8
import threading
import multiprocessing

class Data:
    def __init__(self,data=None):
        self.data = data

    def get(self):
        return self.data

    def set(self,data):
        self.data = data

def test(name,data,lock):
    lock.acquire()
    print("in thread {} name is {}".format(threading.current_thread(),name))
    print("data is {} id(data) is {}".format(data,id(data)))
    data.set(data.get()+1)
    lock.release()


if __name__ == '__main__':
    d = Data(0)
    thlist = []
    name = "yang"
    lock = threading.Lock()
    for i in range(5):
        th = threading.Thread(target=test,args=(name,d,lock))
        th.start()
        thlist.append(th)
    for i in thlist:
        i.join()
    print(d.get())

我們這個代碼的目的是這樣,使用自定義的Data類型對象,當經過5個子線程操作以后,每個子線程對其data值進行加1操作,最后在主線程打印對象的data值。
該輸出結果如下

in thread <Thread(Thread-1, started 3296)> name is yang
data is <__main__.Data object at 0x000001A451139198> id(data) is 1805246501272
in thread <Thread(Thread-2, started 9436)> name is yang
data is <__main__.Data object at 0x000001A451139198> id(data) is 1805246501272
in thread <Thread(Thread-3, started 760)> name is yang
data is <__main__.Data object at 0x000001A451139198> id(data) is 1805246501272
in thread <Thread(Thread-4, started 1952)> name is yang
data is <__main__.Data object at 0x000001A451139198> id(data) is 1805246501272
in thread <Thread(Thread-5, started 5988)> name is yang
data is <__main__.Data object at 0x000001A451139198> id(data) is 1805246501272
5

可以看到在主線程最后打印出來了5,符合我們的預期,但是如果放到多進程中呢?因為多進程下,每個子進程所持有的對象是不同的,所以每個子進程操作的是各自的Data對象,對於主進程的Data對象應該是沒有影響的,我們來看下它的結果

#coding:utf-8
import threading
import multiprocessing

class Data:
    def __init__(self,data=None):
        self.data = data

    def get(self):
        return self.data

    def set(self,data):
        self.data = data

def test(name,data,lock):
    lock.acquire()
    print("in thread {} name is {}".format(threading.current_thread(),name))
    print("data is {} id(data) is {}".format(data,id(data)))
    data.set(data.get()+1)
    lock.release()


if __name__ == '__main__':
    d = Data(0)
    thlist = []
    name = "yang"
    lock = multiprocessing.Lock()
    for i in range(5):
        th = multiprocessing.Process(target=test,args=(name,d,lock))
        th.start()
        thlist.append(th)
    for i in thlist:
        i.join()
    print(d.get())

它的輸出結果是:

in thread <_MainThread(MainThread, started 7604)> name is yang
data is <__mp_main__.Data object at 0x000001D110130EB8> id(data) is 1997429477048
in thread <_MainThread(MainThread, started 12108)> name is yang
data is <__mp_main__.Data object at 0x000002C4E88E0E80> id(data) is 3044738469504
in thread <_MainThread(MainThread, started 3848)> name is yang
data is <__mp_main__.Data object at 0x0000027827270EF0> id(data) is 2715076202224
in thread <_MainThread(MainThread, started 12368)> name is yang
data is <__mp_main__.Data object at 0x000002420EA80E80> id(data) is 2482736991872
in thread <_MainThread(MainThread, started 4152)> name is yang
data is <__mp_main__.Data object at 0x000001B1577F0E80> id(data) is 1861188783744
0

最后的輸出是0,說明了子進程對於主進程傳入的Data對象操作其實對於主進程的對象是不起作用的,我們需要怎樣的操作才能實現子進程可以操作主進程的對象呢?我們可以使用multiprocessing.managers 下的 BaseManager 來實現

#coding:utf-8
import threading
import multiprocessing
from multiprocessing.managers import BaseManager

class Data:
    def __init__(self,data=None):
        self.data = data

    def get(self):
        return self.data

    def set(self,data):
        self.data = data
        
BaseManager.register("mydata",Data)

def test(name,data,lock):
    lock.acquire()
    print("in thread {} name is {}".format(threading.current_thread(),name))
    print("data is {} id(data) is {}".format(data,id(data)))
    data.set(data.get()+1)
    lock.release()



def getManager():
    m = BaseManager()
    m.start()
    return m


if __name__ == '__main__':
    manager = getManager()
    d = manager.mydata(0)
    thlist = []
    name = "yang"
    lock = multiprocessing.Lock()
    for i in range(5):
        th = multiprocessing.Process(target=test,args=(name,d,lock))
        th.start()
        thlist.append(th)
    for i in thlist:
        i.join()
    print(d.get())

使用from multiprocessing.managers import BaseManager 引入 BaseManager以后,在定義完Data類型之后,使用BaseManager.register("mydata",Data) 將Data類型注冊到BaseManager中,並且給了它一個名字叫mydata,之后就可以使用BaseManager對象的這個名字來初始化對象,我們來看一下輸出

C:\Python35\python.exe F:/python/python3Test/multask.py
in thread <_MainThread(MainThread, started 12244)> name is yang
data is <__mp_main__.Data object at 0x000001FE1B7D9668> id(data) is 2222932504080
in thread <_MainThread(MainThread, started 2860)> name is yang
data is <__mp_main__.Data object at 0x000001FE1B7D9668> id(data) is 1897574510096
in thread <_MainThread(MainThread, started 2748)> name is yang
data is <__mp_main__.Data object at 0x000001FE1B7D9668> id(data) is 2053415775760
in thread <_MainThread(MainThread, started 7812)> name is yang
data is <__mp_main__.Data object at 0x000001FE1B7D9668> id(data) is 2766155820560
in thread <_MainThread(MainThread, started 2384)> name is yang
data is <__mp_main__.Data object at 0x000001FE1B7D9668> id(data) is 2501159890448
5

我們看到,雖然在每個子進程中使用的是不同的對象,但是它們的值卻是可以“共享”的。

標准的數據類型也可以通過multiprocessing庫中的Value對象,舉一個簡單的例子

#coding:utf-8
import threading
import multiprocessing
from multiprocessing.managers import BaseManager

class Data:
    def __init__(self,data=None):
        self.data = data

    def get(self):
        return self.data

    def set(self,data):
        self.data = data

BaseManager.register("mydata",Data)

def test(name,data,lock):
    lock.acquire()
    print("in thread {} name is {}".format(threading.current_thread(),name))
    print("data is {} id(data) is {}".format(data,id(data)))
    data.value +=1
    lock.release()


if __name__ == '__main__':
    d = multiprocessing.Value("l",10) #
    print(d)
    thlist = []
    name = "yang"
    lock = multiprocessing.Lock()
    for i in range(5):
        th = multiprocessing.Process(target=test,args=(name,d,lock))
        th.start()
        thlist.append(th)
    for i in thlist:
        i.join()
    print(d.value)

這里使用d = multiprocessing.Value("l",10) 初始化了一個數字類型的對象,這個類型是Synchronized wrapper for c_long ,multiprocessing.Value在初始化時,第一個參數是類型,第二個參數是值,具體支持的類型如下

還可以使用ctypes庫里和類初始化字符串

>>> from ctypes import c_char_p
>>> s = multiprocessing.Value(c_char_p, b'\xd1\xee\xd1\xe5\xd0\xc7')
>>> print(s.value.decode('gbk'))
楊彥星

還可以使用Manager對象初始list,dict等

#coding:utf-8
import multiprocessing


def func(mydict, mylist):
    # 子進程改變dict,主進程跟着改變
    mydict["index1"] = "aaaaaa" 
    # 子進程改變List,主進程跟着改變 
    mydict["index2"] = "bbbbbb"
    mylist.append(11)  
    mylist.append(22)
    mylist.append(33)


if __name__ == "__main__":
    # 主進程與子進程共享這個字典
    mydict = multiprocessing.Manager().dict()
    # 主進程與子進程共享這個List
    mylist = multiprocessing.Manager().list(range(5))  

    p = multiprocessing.Process(target=func, args=(mydict, mylist))
    p.start()
    p.join()

    print(mylist)
    print(mydict)

其實我們這里所說的共享只是數據值上的共享,因為在多進程中,各自持有的對象都不相同,所以如果想要同步狀態需要曲線救國。不過這種在自己寫的小項目中可以簡單的使用,如果做一些大一點的項目,還是建議不要使用這種共享數據的方式,這種大大的增加了程序間的耦合性,使用邏輯變得復雜難懂,所以建議還是使用隊列或者數據庫作為通信的渠道。

參考文章
Python 進程之間共享數據(全局變量)

Python多進程編程-進程間共享數據


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM