python並發、異步—多進程


python並發-多進程

​ 多進程能實現真正意義上的並發(並行),能利用多核優勢,適用計算密集型的程序

1 Process類

  • 開啟子進程—函數
import time
from multiprocessing import Process


def p_func(name):
    time.sleep(3)
    print(name, ": ok")

if __name__ == '__main__':
    
    p = Process(target=p_func, args=("lynn",))
    p.start()
    print("主進程")

注意:

​ Process中的關鍵字參數,target的值是方法名字,args是元組

​ start()方法,開啟子進程

​ 子進程是異步的

  • 開啟子進程—類
import time
from multiprocessing import Process


class PClass(Process):

    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        time.sleep(2)
        print("name: ", self.name)


if __name__ == '__main__':
    p = PClass("lynn")
    p.start()
    print("主")

注意:

​ 繼承Process,重寫__init__方法,必須有super().__init__()

​ 必須有run方法,start()方法會自動調用run方法

​ 子進程是異步的

進程之間的內存是完全隔離的

  • Process對象的join方法
import time
from multiprocessing import Process

class PClass(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        time.sleep(1)
        print("name:", self.name)

if __name__ == '__main__':
    p = PClass("lynn")
    p.start()
    print("主開")
    p.join()
    print("主")

注意:

​ join()方法會阻塞,上邊開啟的所有的子進程全部運行完成,才繼續往下運行,並不是所有的進程是串行,只是在這會阻塞

  • Process對象的其他方法或屬性

    is_alive

p.is_alive() # 查看進程是否存活,True/False
p.pid # 進程的pid

2 特殊的進程

  • 守護進程

    ​ 守護進程會在主進程運行結束時,立馬停止運行

    ​ 守護進程內無法開啟子進程,會報錯

from multiprocessing import Process
import time

class PClass(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        time.sleep(1)
        print("name:", self.name)
if __name__ == '__main__':
    p = PClass("lynn")
    p1 = PClass("fancy")
    p.daemon = True # 設置p為守護進程
    p.start()
    p1.start()
    print("主") # 打印時,p進程停止運行

注意:

p.daemon = True必須在p.start()之前

  • 其他

僵屍進程

​ 子進程在運行完后,不會把所有的資源回收,會告訴父進程自己運行完畢,等待父進程回收,那么這個過程中沒有完全死的進程,就是僵屍進程

孤兒進程

​ 父進程死了,但是還有子進程活着,這些子進程稱為孤兒進程

3 進程的高級應用

  • 進程同步—鎖

進程之間數據雖然隔離,但是可能會操作同一個數據庫,同一個文件等外部資源,可能會造成數據的錯亂,所以多進程在處理外部資源時要加鎖處理

數據錯亂

from multiprocessing import Process
import time

class PClass(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        with open("name.txt", "wt", encoding="utf-8")as f:
            f.write(self.name)
        time.sleep(0.1)
        with open("name.txt", "rt", encoding="utf-8")as f:
            print("name:", self.name ,f.read())

if __name__ == '__main__':
    lock = Lock()
    lynn_p = PClass("lynn")
    fancy_p = PClass("fancy")
    lynn_p.start()
    fancy_p.start()
    print("完")

加鎖

from multiprocessing import Process
from multiprocessing import Lock
import time


class PClass(Process):
    def __init__(self, name, lock):
        super().__init__()
        self.name = name
        self.lock = lock

    def run(self):
        self.lock.acquire() # 加鎖 相當於with self.lock:
        # with self.lock:
        with open("name.txt", "wt", encoding="utf-8")as f:
            f.write(self.name)
        time.sleep(0.1)
        with open("name.txt", "rt", encoding="utf-8")as f:
            print("name:", self.name, f.read())
        self.lock.release() # 開鎖


if __name__ == '__main__':
    lock = Lock()
    lynn_p = PClass("lynn", lock)
    fancy_p = PClass("fancy", lock)
    lynn_p.start()
    fancy_p.start()
    print("完")

注意:

​ 把數據處理部分加鎖,把並行變成串行

​ 降低了效率,但是保證了數據安全

​ 加鎖一定記得開鎖,或者直接用with lock:

  • 隊列—Queue

進程之間數據是隔離的,想實現進程間的數據通信,multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

生產者—消費者模型

​ 生產者:只負責生產,生產者把生產的東西放到隊列中

​ 消費者:只負責消費,隊列中有一直消費,沒有就等着

import time
from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import Lock


class PClass(Process):
    def __init__(self, name, lock, q):
        super().__init__()
        self.name = name
        self.lock = lock
        self.q = q

    def run(self):
        while True:
            res = self.q.get()
            print(res)
            print("name:", self.name)
            try:
                if res == "ok": # 接到完成信號,停止接受
                    break
            except Exception:
                print(1)


class ProClass(Process):
    def __init__(self, name, q):
        super().__init__()
        self.name = name
        self.q = q

    def run(self):
        for i in range(10):
            time.sleep(1)
            self.q.put(i)
        self.q.put("ok") # 生產者完成后,會發送完成信號

if __name__ == '__main__':
    lock = Lock()
    q = Queue(10)
    pc = PClass("lynn", lock, q)
    pro = ProClass("fancy", q)
    pc.start()
    pro.start()

# 生產者把生產的數字加到隊列中,消費者不停的從隊列中去數字

注意:

Queue(max)max數字,隊列限制的最大數據量

q.put(data)把數據data加到隊列中

q.get()取出隊列中的數據

q.full()隊列中數據個數等於max時為True

q.empty()隊列中為空時,等於True

  • 管道

  • 進程池

進程利用的是計算機的多核優勢,但是當進程過多時,效率反而受到影響,所以要對進程數量做限制。

進程池:進程池可以指定數量的進程供用戶調用,當有新的請求(進程)發起時,如果進程池滿了,就等進程池中有進程結束,進入進程池。

同步

import time
from multiprocessing import Pool


def p_func(name):
    time.sleep(1)
    print(name)


if __name__ == '__main__':
    p = Pool(3)
    for i in range(5):
        p.apply(p_func, args=("lynnadsadasda",))
    p.close()
    print('w')

注意:

apply()同步開啟子進程,只有一個進程完成后,才會起下一個進程,返回值會立馬返回。

異步

import time
from multiprocessing import Pool


def p_func(name):
    time.sleep(1)
    print(name)
    return 100


if __name__ == '__main__':
    p = Pool(3)
    res_list = []
    for i in range(5):
        res = p.apply_async(p_func, args=("lynnadsadasda",))
        res_list.append(res)
    p.close()
    p.join()
    for i in res_list:
        print(i.get()) 
    print('w')

注意:

apply_async異步開啟子進程

​ 注意join()異步調用時必須加該方法,主進程運行完畢后,進程池中開啟的子進程全部結束,所以要加該方法

close()方法必須在join()方法之前,意思是先關閉進程池,在等待

get()獲取返回值,只有異步時才有該方法,同步沒有


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM