進程Process之join、daemon(守護)、terminate(關閉)、multiprocessing之鎖、信號量和事件


一、Process
參數介紹:
1 group參數未使用,值始終為None
2 target表示調用對象,即子進程要執行的任務
3 args表示調用對象的位置參數元組,args=(1,2,'a',)
4 kwargs表示調用對象的字典,kwargs={'name':'a','age':18}
5 name為子進程的名稱


方法介紹:
1 p.start():啟動進程,並調用該子進程中的p.run() 
2 p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法  
3 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖
4 p.is_alive():如果p仍然運行,返回True
5 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程 



屬性介紹:
1 p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置
2 p.name:進程的名稱
3 p.pid:進程的pid
4 p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)
5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)





1、join:父進程等待子進程結束后才開始執行自己的代碼
# 發送一封郵件
import time
import random
from multiprocessing import Process

def func():
    time.sleep(random.randint(1,3))  # 模擬郵件發送的延遲
    print('郵件已發送')

if __name__ == '__main__':
    p = Process(target=func)
    p.start()
    p.join()  # 阻塞 直到進程p執行完畢后才結束阻塞
    print('郵件發送完畢')


    
# 發送十封郵件
import time
import random
from multiprocessing import Process

def func(index):
    time.sleep(random.randint(1,3))
    print('第%s封郵件發送完畢' %index)

if __name__ == '__main__':
    p_lst = []
    for i in range(10):
        p = Process(target=func,args=(i,))
        p.start()  # 先讓所有子進程都啟動
        p_lst.append(p)
    for p in p_lst:  # 再進行join阻塞
        p.join()
    print('10封郵件全部發送完畢')




2、用類的方式開啟進程
我們之前創建進程的時候,其實也是在創建一個Process類的對象,再調用對象的start方法開啟進程,
那么我們也可以自己定義一個類來實現進程的創建:

import os
from multiprocessing import Process

class MyProcess(Process):  # 定義一個類,繼承Process類
    def run(self):  # 必須實現的方法,是啟動進程的方法
        print('子進程:',os.getpid(),os.getppid())

if __name__ == '__main__':
    p = MyProcess()  # 實例化
    p.start()  # 自動調用run方法
    print('父進程:',os.getpid())


給自定義類傳參:
首先看看Process源碼
import time import os from multiprocessing import Process class MyProcess(Process): def __init__(self,i): super().__init__() # 實現父類的初始化方法 self.index = i # 定義自己的屬性(參數) def run(self): time.sleep(1) print('子進程:',self.index,os.getpid(),os.getppid()) if __name__ == '__main__': p_lst = [] for i in range(10): p = MyProcess(i) p.start() p_lst.append(p) for p in p_lst: p.join() print('主進程:',os.getpid())

import os
from multiprocessing import Process


def func():
    print("i am target func")


class MyProcess(Process):  # 定義一個類,繼承Process類
    def __init__(self, target):
        super(MyProcess, self).__init__()  # 實現父類的初始化方法
        self.target = target

    def run(self):  # 必須實現的方法,是啟動進程的方法
        self.target()
        print('子進程:', os.getpid(), os.getppid())


if __name__ == '__main__':
    p = MyProcess(target=func)  # 實例化
    p.start()  # 自動調用run方法
    print('父進程:', os.getpid())


# 結果
父進程: 3532
i am target func
子進程: 9628 3532
 
         
 
3、守護進程
主進程創建守護進程 
1:守護進程會在主進程代碼執行結束后就終止 
2:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止


1,守護進程會在主進程代碼執行結束后就終止 
import time
from multiprocessing import Process

def func():
    print('子進程 start')
    time.sleep(3)  # 睡3秒的時候主進程的代碼已經執行完畢了,所以子進程也會跟着結束
    print('子進程end')

if __name__ == '__main__':
    p = Process(target=func)
    p.daemon = True  # daemon是Process的屬性
    p.start()
    time.sleep(2)  # 睡2秒的時候,執行了子進程
    print('主進程')
結果:
子進程 start
主進程


2,守護進程會在主進程代碼執行結束后就終止 ,但是父進程會等待子進程結束才正式結束。
注意:代碼結束是指代碼運行到了最后一行,並不代表進程已經結束了。
import time
from multiprocessing import Process
def func():
    count = 1
    while True:
        print('*' * count)
        time.sleep(1)
        count += 1

def func2():
    print('普通進程開始')
    time.sleep(5)
    print('普通進程結束')

if __name__ == '__main__':
    p1 = Process(target=func)
    p1.daemon = True
    p1.start()

    Process(target=func2).start()
    time.sleep(3)
    print('主進程')

結果:
*
普通進程開始
**
***
主進程
普通進程結束


3,守護進程的作用
守護進程可以報活,就是向某個服務報告自己還活着

場景:
例如你寫好了一個網頁,你的服務端是不應該停的,因為你服務端一旦停止了,別人就無法訪問你的網頁了,所以我們應該確保服務端沒有‘死’,
這個時候就可以使用守護進程,在你的服務端起一個守護進程,讓這個進程只做一件事,就是每隔1個小時(時間按照自己的合理安排設定)向某一台機器匯報自己還活着,
一旦這個機器沒有收到你守護進程傳來的消息,那么就可以認為你的服務端已經掛了。

例如:
import time
from multiprocessing import Process
def Guard():
    while True:
        time.sleep(3600)
        print('我還活着') # 向某個機器匯報我還活着,具體該怎么寫匯報的邏輯就怎么寫,這里只是示范

if __name__ == '__main__':
    p = Process(target=Guard)
    p.daemon = True
    p.start()
    # 主進程的邏輯(主進程應該是一直運行的,不應該有代碼結束的時候)
    print('主進程')




4、terminate:關閉進程
import time
from multiprocessing import Process
def fun():
    print('子進程')

if __name__ == '__main__':
    p = Process(target=fun)
    p.start()
    p.terminate() # 關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活
    print(p.is_alive())  # True
    time.sleep(0.1)
    print(p.is_alive()) # False




二、鎖 Lock 
1、異步的問題
我們都知道異步進程的好處就是可以一起執行,效率高,但是當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。

搶票系統:
#文件ticket的內容為:{"count":3}
#注意一定要用雙引號,不然json無法識別
#並發運行,效率高,但競爭寫同一文件,數據寫入錯亂


import time
import json
from multiprocessing import Process
def search(person):
    with open('ticket') as f:
        ticketinfo = json.load(f)
    print('%s查詢余票:' %person,ticketinfo['count'])

def get_ticket(person):
    with open('ticket') as f:
        ticketinfo = json.load(f)
    time.sleep(0.2)  #模擬讀數據的網絡延遲
    if ticketinfo['count'] > 0:
        print('%s買到票了'%person)
        ticketinfo['count'] -= 1
        time.sleep(0.2)
        with open('ticket','w') as f:
            json.dump(ticketinfo,f)
    else:
        print('%s沒買到票'%person)

def ticket(person):
    search(person)
    get_ticket(person)

if __name__ == '__main__':
    for i in range(5):
        p = Process(target=ticket,args=('person%s'%i,))
        p.start()
結果:
person0查詢余票: 3
person4查詢余票: 3
person1查詢余票: 3
person2查詢余票: 3
person3查詢余票: 3
person0買到票了
person4買到票了
person1買到票了
person2買到票了
person3買到票了

分析:票只有三張,但是5個人都顯示買到了,這是因為5個進程異步進行,大家都同一時間在對一個文件進行修改,導致的混亂。




2、用鎖解決:
# 加鎖降低了程序的效率,讓原來能夠同時執行的代碼變成順序執行了,異步變同步的過程
# 保證了數據的安全

import time
import json
from multiprocessing import Process
from multiprocessing import Lock  # 導入Lock類
def search(person):
    with open('ticket') as f:
        ticketinfo = json.load(f)
    print('%s查詢余票:' %person,ticketinfo['count'])

def get_ticket(person):
    with open('ticket') as f:
        ticketinfo = json.load(f)
    time.sleep(0.2)  #模擬讀數據的網絡延遲
    if ticketinfo['count'] > 0:
        print('%s買到票了'%person)
        ticketinfo['count'] -= 1
        time.sleep(0.2)
        with open('ticket','w') as f:
            json.dump(ticketinfo,f)
    else:
        print('%s沒買到票'%person)

def ticket(person,lock):
    search(person)
    lock.acquire() # 誰獲得鑰匙 誰才能進入
    get_ticket(person)
    lock.release() # 用完了,把鑰匙給下一個人
if __name__ == '__main__':
    lock = Lock()  # 創建一個鎖對象
    for i in range(5):
        p = Process(target=ticket,args=('person%s'%i,lock))
        p.start()

結果:
person1查詢余票: 3
person3查詢余票: 3
person0查詢余票: 3
person2查詢余票: 3
person4查詢余票: 3
person1買到票了
person3買到票了
person0買到票了
person2沒買到票
person4沒買到票





三、信號量 Semaphore
1、信號量的實現機制:計數器 + 鎖實現的
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。
互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據(Samphore相當於有幾把鑰匙,lock只能有一把鑰匙)

import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore

def changba(person,sem):  # 在唱吧 唱歌
    sem.acquire()  # 第一次可以同時進來兩個人
    print('%s走進唱吧' %person)
    time.sleep(random.randint(3,6))  # 每個人唱歌的時間
    print('%s走出唱吧' % person)  # 唱完走人
    sem.release()  # 把鑰匙給下一個人


if __name__ == '__main__':
    sem = Semaphore(2) # 2把鑰匙
    for i in range(5):
        p = Process(target=changba,args=('person%s' %i,sem))
        p.start()





四、事件 Event
python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。

阻塞事件 :wait()方法
    wait是否阻塞是看event對象內部的Flag

控制Flag的值:
    set() 將Flag的值改成True
    clear()  將Flag的值改成False
    is_set() 判斷當前的Flag的值
    
    
紅綠燈:
import time
import random
from multiprocessing import Process
from multiprocessing import Event

def traffic_ligth(e):  # 紅綠燈
    print('\033[31m紅燈亮\033[0m')  # Flag 默認是False
    while True:
        if e.is_set(): # 如果是綠燈
            time.sleep(2) # 2秒后
            print('\033[31m紅燈亮\033[0m')  # 轉為紅燈
            e.clear() # 設置為False

        else: # 如果是紅燈
            time.sleep(2) # 2秒后
            print('\033[32m綠燈亮\033[0m') # 轉為綠燈
            e.set() # 設置為True


def car(e,i): #
    if not e.is_set():
        print('car %s在等待' %i)
        e.wait()
    print('car %s 通過了'%i)


if __name__ == '__main__':
    e = Event()
    p = Process(target=traffic_ligth,args=(e,))  # 紅綠燈進程
    p.daemon = True
    p.start()
    p_lst = []
    for i in range(10): # 10輛車的進程
        time.sleep(random.randrange(0,3,2))
        p = Process(target=car,args=(e,i))
        p.start()
        p_lst.append(p)
    for p in p_lst:p.join()


五、總結
進程之間雖然內存不共享,但是是可以通信的
  Lock Semaphore Event 都在進行進城之間的通信
  只不過這些通信的內容我們不能改變
后續還有隊列和管道能讓進程之間進行通信

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM