【python】-- 多進程的基本語法 、進程間數據交互與共享、進程鎖和進程池的使用


多進程

進程之間是相互獨立的,python是啟動進程的時候,是啟動的是原生進程。進程是沒有GIL鎖的,而且不存在鎖的概念,進程之間的數據式不能共享的,而線程是可以的。

1、進程的定義

用muliprocessing這個包中的Process來定義多進程,跟定義多線程類似

from multiprocessing import Process   # 導入進程模塊
import time


def run(name):
    time.sleep(2)
    print("hello", name)

if __name__ == "__main__":
    p_obj_list = list()    # 存放進程對象
    for i in range(10):    # 啟動10個進程
        p = Process(target=run, args=("QQ{0}".format(i),))  # 產生一個進程實例
        p.start()   # 啟動進程
        p_obj_list.append(p)
 
    for p in p_obj_list:
        p.join()   # 等待進程結果

2、進程中加入線程

from multiprocessing import Process
import time,threading


def thread_run(name):   # 定義線程執行的方法
    print("{0}:{1}".format(name, threading.get_ident()))  # thread.get_ident ()返回當前線程的標識符,標識符是一個非零整數


def run(name):
    time.sleep(2)
    print("hello", name)
    t = threading.Thread(target=thread_run, args=(name,))   # 嵌入線程
    t.start()   # 執行線程


if __name__ == "__main__":
    p_obj_list = list()
    for i in range(10):
        p = Process(target=run, args=("QQ{0}".format(i),))
        p.start()
        p_obj_list.append(p)

    for p in p_obj_list:
        p.join()

3、父子進程

每個子進程都是由一個父進程啟動的,每個程序也是有一個父進程

from multiprocessing import Process
import os


def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())  # 獲得父進程ID
    print('process id:', os.getpid())  # 獲得子進程ID
    print("\n\n")


def f(name):
    info('\033[31;1m function f\033[0m')
    print('hello', name)

if __name__ == '__main__':
    info('\033[32;1m main process line\033[0m')
    p = Process(target=f, args=('QQ',))
    p.start()
    p.join()

  

 

 

進程間數據交互與共享

知道不同進程之間內存是不共享的,要想實現兩個進程間的通信需要用到multiprocessing庫中的queue(隊列)模塊,這個multiprocessing庫中的queue模塊跟單純的queue庫是不一樣的。進程導入前者(這里的queue是專門為進程之間的通信設計的)不出錯,導入后者(這里的queue主要是線程間數據交互)出錯。

1、線程訪問queue

import queue,threading


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = queue.Queue()   # 把這個q傳給了子線程
    p = threading.Thread(target=f, args=(q,))   # 子線程訪問父線程的q
    p.start()
    print(q.get())
    p.join()

#執行結果
[66, None, 'hello word']

2、進程訪問queue

from multiprocessing import Process
import queue


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = queue.Queue()   # 把這個q傳給了子線程
    p = Process(target=f, args=(q,))   # 子線程訪問父線程的q
    p.start()
    print(q.get())
    p.join()

#執行結果
Traceback (most recent call last):
  File "C:/Users/dell/PycharmProjects/untitled/process/進程的定義.py", line 77, in <module>
    p.start()
  File "C:\Python36\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Python36\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Python36\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Python36\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Python36\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

3、進程訪問multiprocessing庫中的Queue模塊

from multiprocessing import Process,Queue


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = Queue()   # 把這個q傳給了子線程
    p = Process(target=f, args=(q,))   # 子線程訪問父線程的q
    p.start()
    print(q.get())
    p.join()

#執行結果
[66, None, 'hello word']

父進程相當於克隆一個Q,把自己的Q克隆了一份交給子進程,子進程這個時候往Q里面放了一份數據,然后父進程又能實際的獲取到。但是你克隆了一份是不是就和父進程沒有關系了,為什么還能聯系在一起呢?但是實際上:等於這兩個Q里面的數據又把它序列化了,序列化到一個中間的地方,類似於翻譯,然后反序列化給這個父進程這邊來了,其實這兩個Q就是通過pickle來序列化的,不是一個真正的Q。

小結:兩個線程之間可以修改一個數據,不加鎖,可能就會出錯。現在進程中的Queue,是實現了數據的傳遞,不是在修改同一份數據,只是實現一個進程的數據傳給了另外一個進程。

 

4、通過Pipe()實現進程間的數據交互,manger實現數據共享

上面的例子是通過進程中的Queue,來進行數據共享的,其實還有一種方式實現數據共享,那就是管道,pipe,以及數據共享manger。

4.1、Pipe()函數

管道函數會返回由管道雙方連接的一組連接對象,該管道默認是雙向的(雙向的)。

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([66, None, 'hello,word'])  # 發送消息給父進程
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 管道生成返回兩個實例,是雙向的,這邊把第1個作為父連接,第2個作為子連接。也可以,兩者角色調換一下
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # 接收子進程的消息
    p.join()

4.2、接受多次和發送多次

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([66, None, 'hello,word'])  # 發送消息給父進程
    conn.send("QQ")  # 發送消息給父進程
    print(conn.recv())   # 接收父進程的消息
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 管道生成返回兩個實例,是雙向的,這邊把第1個作為父連接,第2個作為子連接。也可以,兩者角色調換一下
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())  # 接收兩次
    parent_conn.send("微信")   # 發送給子進程
    p.join()

4.3、manger

manger可以完成數據間的共享。

from multiprocessing import Process, Manager
import os


def f(d, l):
    d[os.getpid()] = os.getpid()
    l.append(os.getpid())
    print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()   # 聲明一個字典,這個字典是用manger聲明的,不是用dict()聲明的
        # manger.dict()是用專門的語法生產一個可在多進程之間進行傳遞和共享的一個字典
        l = manager.list(range(5))  # 同樣聲明一個列表
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
        print(d)
        print(l)

線程修改同一份數據的時候需要加鎖,進程修改數據呢:不用加鎖,因為這個manger已經幫你加鎖了,它就默認不允許兩個進程同時修改一份數據。兩個進程沒有辦法同時修改一份數據,進程之間是獨立的,它自己也要加鎖,因為它把自己的東西同時copy好幾份,跟剛剛的那個Queue一樣,copy10個字典最終合成一個字典

 

 

 

進程鎖和進程池的使用

1、進程鎖

通過multiprocessing中的Lock模塊來實現進程鎖

from multiprocessing import Process,Lock   # 導入進程鎖


def f(l, i):
    l.acquire()    # 加鎖
    try:
        print("hello word", i)
    finally:
        l.release()   # 釋放鎖

if __name__ == "__main__":
    lock = Lock()     # 定義鎖
    for num in range(10):
        Process(target=f, args=(lock, num,)).start()  # 把鎖傳入進程中

進程中不是相互獨立的嗎?為什么還要加鎖:雖然每個進程都是獨立運行的,但是問題來了,它們共享一塊屏幕。這個鎖存在的意義就是屏幕共享。如果進程1想着打印數據,而進程2想也想打印數據的情況,就有可能亂套了,然后通過這個鎖來控制,去打印的時候,這個屏幕只有我獨占,導致屏幕不會亂。

2、進程池apply和apply_saync

2.1、appley

同步執行,也就是串行執行的

from multiprocessing import Pool  # 導入進程池模塊pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印進程號


if __name__ == "__main__":
    pool = Pool(processes=5)   # 設置進程池個數為5,也可以寫成pool = Pool(5),允許進程池同時放入5個進程,並且這5個進程交給cpu去運行
    for i in range(10):
        pool.apply(func=foo, args=(i,))   # 同步執行掛起進程
    print('end')
    pool.close()
    pool.join()  # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。

2.2、apply_saync

異步執行,也就是並行執行。

from multiprocessing import Pool  # 導入進程池模塊pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印進程號


if __name__ == "__main__":
    pool = Pool(processes=5)   # 設置進程池個數為5,也可以寫成pool = Pool(5),允許進程池同時放入5個進程,並且這5個進程交給cpu去運行
    for i in range(10):
        pool.apply_async(func=foo, args=(i,))   # 采用異步方式執行foo函數
    print('end')
    pool.close()
    pool.join()  # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。

2.3、異步下回調函數

程序執行完畢之后,再回調過來執行這個Bar函數。

from multiprocessing import Process,Pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印子進程的進程號


def bar(arg):
    print('-->exec done:', arg, os.getpid())   # 打印進程號

if __name__ == "__main__":
    pool = Pool(processes=2)
    print("主進程", os.getpid())   # 主進程的進程號
    for i in range(3):
        pool.apply_async(func=foo, args=(i,), callback=bar)   # 執行回調函數callback=Bar
    print('end')
    pool.close()
    pool.join()  # 進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。

#執行結果
主進程 752
end
in process 2348
-->exec done: None 752
in process 8364
-->exec done: None 752
in process 2348
-->exec done: None 752

注:

  1. 回調函數說明fun=Foo干不完就不執行bar函數,等Foo執行完就去執行Bar
  2. 這個回調函數是主進程去調用的,而不是每個子進程去調用的。
  3. 回調函數的用處:

      比如說你從各個機器上備份完畢,在回調函數中自動寫一個腳本,說備份完畢

  4. 回調函數是主進程調用的原因?

      如果是子進程去調用這個回調函數,有多少個子進程就有多少個連接,如果是主進程的話,只需要一次長連接就可以了,這個效率就高了

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM