Python多線程多進程那些事兒看這篇就夠了~~


自己以前也寫過多線程,發現都是零零碎碎,這篇寫寫詳細點,填一下GIL和Python多線程多進程的坑~

總結下GIL的坑和python多線程多進程分別應用場景(IO密集、計算密集)以及具體實現的代碼模塊。

 

 

目錄 

 0x01 進程 and 線程 and “GIL”

0x02 python多線程&&線程鎖&&threading類

0x03 python隊列代碼實現

0x04 python之線程池實現

0x05 python多進程並行實現

 

 

 

 

 

0x01 進程 and 線程 and “GIL”

進程和線程簡單舉例:

對於操作系統來說,一個任務就是一個進程(Process),比如打開一個瀏覽器就是啟動一個瀏覽器進程.

有些進程還不止同時干一件事,比如Word,它可以同時進行打字、拼寫檢查、打印等事情。在一個進程內部,要同時干多件事,就需要同時運行多個“子任務”,我們把進程內的這些“子任務”稱為線程(Thread)。

 

 

線程與進程的區別: 

 

簡而言之,一個程序至少有一個進程,一個進程至少有一個線程.

 

​ 進程就是一個應用程序在處理機上的一次執行過程,它是一個動態的概念,而線程是進程中的一部分,進程包含多個線程在運行。

 

​ 多線程可以共享全局變量,多進程不能。多線程中,所有子線程的進程號相同;多進程中,不同的子進程進程號不同。

 

 

並行和並發

並行處理:是計算機系統中能同時執行兩個或更多個處理的一種計算方法。並行處理可同時工作於同一程序的不同方面。並行處理的主要目的是節省大型和復雜問題的解決時間。

並發處理:指一個時間段中有幾個程序都處於已啟動運行到運行完畢之間,且這幾個程序都是在同一個處理機(CPU)上運行,但任一個時刻點上只有一個程序在處理機(CPU)上運行

同步與異步

同步:指一個進程在執行某個請求的時候,若該請求需要一段時間才能返回信息,那么這個進程將會一直等待下去,直到收到返回信息才繼續執行下去。

異步:指進程不需要一直等待下去,而是繼續執行下面的操作,不管其他進程的狀態,當有消息返回時系統會通知進程進行處理,這樣可以提高執行效率

 

 所以為了高效率執行就有了並發編程多線程多進程概念,這里就要提一下“GIL”了~

GIL:

首先需要明確的一點是 GIL 並不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。所以這里要先明確一點:GIL並不是Python的特性,Python完全可以不依賴於GIL

那么CPython實現中的GIL又是什么呢?GIL全稱 Global Interpreter Lock

使用Python多線程的人都知道,Python中由於GIL(全局解釋鎖:Global Interpreter Lock)的存在,在多線程時並沒有真正的進行多線程計算。

GIL說白了就是偽多線程,一個線程運行其他線程阻塞,使你的多線程代碼不是同時執行,而是交替執行。

 

下面用代碼來說明GIL的多線程是偽多線程。

 

單線程執行代碼:

from threading import Thread
import time
 
def my_counter():
    i = 0
    for _ in range(100000000):
        i = i + 1
    return True
 
def main():
    thread_array = {}
    start_time = time.time()
    for tid in range(2):
        t = Thread(target=my_counter)
        t.start()
        t.join()
    end_time = time.time()
    print("Total time: {}".format(end_time - start_time))
 
if __name__ == '__main__':
    main()

 

兩個線程並發執行代碼:

from threading import Thread
import time
 
def my_counter():
    i = 0
    for _ in range(100000000):
        i = i + 1
    return True
 
def main():
    thread_array = {}
    start_time = time.time()
    for tid in range(2):
        t = Thread(target=my_counter)
        t.start()
        thread_array[tid] = t
    for i in range(2):
        thread_array[i].join()
    end_time = time.time()
    print("Total time: {}".format(end_time - start_time))
 
if __name__ == '__main__':
    main()

 

結果:

 

可以看到多線程反而慢了十幾秒。。。。。

關於使用多線程反而慢的原因可以參考pcode數量的調度方式~~

我等菜鳥就關心 GIL的存在,是否多線程就廢了?當然不是,這里提一下IO密集型和計算密集型

 

計算密集型    

計算密集型,顧名思義就是應用需要非常多的CPU計算資源,在計算密集型任務的特點是要進行大量的計算,消耗CPU資源,比如計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。

IO密集型

    對於IO密集型的應用,涉及到網絡、磁盤IO的任務都是IO密集型任務,大多消耗都是硬盤讀寫和網絡傳輸的消耗。

 

那么GIL多線程的不足,其實是對於計算密集型的不足,這個解決可以利用多進程進行解決,而對於IO密集型的任務,我們還是可以使用多多線程進行提升效率。

 

 

 

 

0x02 python多線程&&線程鎖&&threading類

Python的標准庫提供了兩個模塊:threadthreadingthread是低級模塊,threading是高級模塊,對thread進行了封裝。絕大多數情況下,我們只需要使用threading這個高級模塊。

啟動一個線程就是把一個函數傳入並創建Thread實例,然后調用start()開始執行:

import time, threading

# 新線程執行的代碼:
def loop():
    print 'thread %s is running...' % threading.current_thread().name
    n = 0
    while n < 5:
        n = n + 1
        print 'thread %s >>> %s' % (threading.current_thread().name, n)
        time.sleep(1)
    print 'thread %s ended.' % threading.current_thread().name

print 'thread %s is running...' % threading.current_thread().name
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print 'thread %s ended.' % threading.current_thread().name

 

 

Threading模塊的對象

 

Threading模塊的Thread類

 

Thread類方法

 

使用Thread類,可以有多種方法創建線程:

  • 創建Thread類的實例,傳遞一個函數
  • 創建Thread類的實例,傳遞一個可調用的類實例
  • 派生Thread類的子類,並創建子類的實例

一般的,我們會采用第一種或者第三種方法。

第一種方法:創建Thread類,傳遞一個函數

下面的腳本中,我們先實例化Thread類,並傳遞一個函數(及其參數),當線程執行的時候,函數也會被執行:

import threading
from time import sleep,ctime
import time

loops=[1,2,3,4]

def loop(name,sleep_time):
    print('開始循環線程:'+str(name)+'at:'+str(ctime()))
    sleep(sleep_time)
    print('循環'+str(name)+'結束於:'+str(ctime()))
    
def main():
    print("程序開始於:"+str(ctime()))
    threads=[]
    nloops=range(len(loops))
    
    for i in nloops:
        t=threading.Thread(target=loop,args=(i,loops[i])) #循環 實例化4個Thread類,傳遞函數及其參數,並將線程對象放入一個列表中
        threads.append(t)
        
    for i in nloops:
        threads[i].start()  #循環 開始線程
        
    for i in nloops:
        threads[i].join()   #循環 join()方法可以讓主線程等待所有的線程都執行完畢。
        
    print('任務完成於:'+str(ctime()))
    
if __name__=='__main__':
    main()

 

 

 

thread模塊相比,不同點在於:實現同樣的效果,thread模塊需要鎖對象,而threading模塊的Thread類不需要。

  當所有的線程都分配完成之后,通過調用每個線程的start()方法再讓他們開始。相比於thread模塊的管理一組鎖(分配、獲取、釋放檢查鎖狀態)來說,threading模塊的Thread類只需要為每個線程調用join()方法即可。join(timeout=None)方法將等待線程結束,或者是達到指定的timeout時間時。這種鎖又稱為自旋鎖。

 

第二種方法:創建Thread類的實例,傳遞一個可調用的類實例

創建線程時,於傳入函數類似的方法是傳入一個可調用的類的實例,用於線程執行——這種方法更加接近面向對象的多線程編程。比起一個函數或者從一個函數組中選擇而言,這種可調用的類包含一個執行環境,有更好的靈活性。

 

import threading
from time import sleep,ctime

loops=[1,2,3,4]

class ThreadFunc(object):
    def __init__(self,func,args,name=''):
        self.name=name
        self.func = func
        self.args=args
        
    def __call__(self):
        self.func(*self.args)
        
def loop(nloop,nsec):
    print('開始循環',nloop,'在:'+str(ctime()))
    sleep(nsec)
    print('結束循環',nloop,'於:'+str(ctime()))
def main():
    print('程序開始於:'+str(ctime()))
    threads = []
    nloops = range(len(loops))
    
    for i in nloops:
        t = threading.Thread(target=ThreadFunc(loop,(i,loops[i]),loop.__name__)) #傳遞一個可調用類的實例
        threads.append(t)
        
    for i in nloops:
        threads[i].start()  #開始所有的線程
        
    for i in nloops:
        threads[i].join()   #等待所有的線程執行完畢
        
    print('任務完成於:'+str(ctime()))
    
if __name__=='__main__':
    main()
    

 

 上面主要添加了ThreadFunc類,並在實例化Thread對象時,通過傳參的形式同時實例化了可調用類ThreadFunc。這里同時完成了兩個實例化。

 

第三種方法:派生Thread的子類,並創建子類的實例

 

import threading
from time import sleep,ctime

loops=[1,2,3,4]

class MyThread(threading.Thread):
    def __init__(self,func,args,name=''):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args
        
    def run(self):
        self.func(*self.args)
        
def loop(nloop,nsec):
    print('開始循環',nloop,'在:',str(ctime()))
    sleep(nsec)
    print('結束循環',nloop,'於:',str(ctime()))
    
def main():
    print('程序開始於:',str(ctime()))
    threads = []
    nloops = range(len(loops))
    
    for i in nloops:
        t = MyThread(loop,(i,loops[i]),loop.__name__)
        threads.append(t)
        
    for i in nloops:
        threads[i].start()
        
    for i in nloops:
        threads[i].join()
        
    print('所有的任務完成於:',str(ctime()))
    
if __name__ =='__main__':
    main()

 

這里繼承Threading父類,重寫構造函數,重寫run函數即可。

 

 

threading中還有以下一些屬性,簡單介紹一下:

    Timer類,Timer(int,target=func)  和Thread類類似,只不過它在int秒過后才以target指定的函數開始線程運行

    currentThread()  獲得當前線程對象

    activeCount()  獲得當前活動的線程總個數

    enumerate()  獲得所有活動線程的列表

    settrace(func)  設置一跟蹤函數,在run執行前執行

    setprofile(func)  設置一跟蹤函數,在run執行完畢之后執行

 

 

這里提一下線程鎖:

多線程程序涉及到一個問題,那就是當不同線程要對同一個資源進行修改或利用時會出現混亂,所以有必要引入線程鎖。舉個例子:

import threading
from time import *

class MyThread(threading.Thread):
    def __init__(self,counter,name):
        threading.Thread.__init__(self)  
        self.counter = counter
        self.name = name

    def run(self):
        self.counter[0] += 1
        print self.counter[0]

if __name__ == '__main__':
    counter = [0]
    for i in range(1,11):
        t = MyThread(counter,i)
        t.start()

 

這里並發了10個線程,在沒有混亂的情況下,很明顯一個線程的name和經過它處理過后的counter中的數字應該相同。因為沒有鎖可能引發混亂,想象中,我們可能認為,當某個線程要打印counter中的數字時,別的線程對其作出了改變,從而導致打印出的counter中的數字不符合預期。實際上,這段代碼的運行結果很大概率是很整齊的1\n2\n3....10。如果要解釋一下,1. 雖然稱並發10個線程。但是實際上線程是不可能真的在同一個時間點開始,比如在這個例子中t1啟動后,要將循環進入下一輪,創建新的線程對象t2,然后再讓t2啟動。這段時間雖然很短很短,但是確實是存在的。而這段時間的長度,足夠讓t1的run中,進行自增並且打印的操作。最終,整個結果看上去似乎沒什么毛病。

 

  如果我們想要看到“混亂”的情況,顯然兩個方法。要么縮短for i in range以及創建線程對象的時間,使得線程在自增之后來不及打印時counter被第二個線程自增,這個比較困難;另一個方法就是延長自增后到打印前的這段時間。自然想到,最簡單的,用time.sleep(1)睡一秒即可。此時結果可能是10\n10\n...。主要看第一行的結果。不再是1而是10了。說明在自增操作結束,打印數字之前睡的這一秒里,到第10個線程都成功自增了counter,因此即使是第一個線程,打印到的也是經過第10個線程修改的counter了。

 

 

線程鎖也稱互斥鎖,可以彌補部分線程安全問題。(線程鎖和GIL鎖是不一樣的東西!)

 

當多個線程幾乎同時修改某一個共享數據的時候,需要進行同步控制

線程同步能夠保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。

互斥鎖為資源引入一個狀態:鎖定/非鎖定

某個線程要更改共享數據時,先將其鎖定,此時資源的狀態為“鎖定”,其他線程不能更改;直到該線程釋放資源,將資源的狀態變成“非鎖定”,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程進行寫入操作,從而保證了多線程情況下數據的正確性。

互斥鎖有三個常用步驟

lock = threading.Lock()  # 取得鎖
lock.acquire()  # 上鎖
lock.release()  # 解鎖

改一下上面的代碼:

 

import threading
from time import *

class MyThread(threading.Thread):
    def __init__(self,counter,name,lock):
        threading.Thread.__init__(self)  
        self.counter = counter
        self.name = name
        self.lock = lock

    def run(self):
        self.lock.acquire()
        self.counter[0] += 1
        sleep(1)
        print self.counter[0]
        self.lock.release()

if __name__ == '__main__':
    counter = [0]
    lock = threading.Lock()
    for i in range(1,100):
        t = MyThread(counter,i,lock)
        t.start()

 

鎖也可以使用with lock來加鎖

with lock:
    xxxxxxxxxx

 和Lock類類似的還有一個RLock類,與Lock類的區別在於RLock類鎖可以嵌套地acquire和release。也就是說在同一個線程中acquire之后再acquire也不會報錯,而是將鎖的層級加深一層。只有當每一層鎖從下到上依次都release開這個鎖才算是被解開。RLock鎖也稱遞歸鎖

 

 

這里還要提一下一個更強大的鎖 Condition

  上面提到的threading.Lock類提供了最為簡單的線程鎖的功能。除了Lock和RLock以外,其實threading還補充了其他一些很多的帶有鎖功能的類。Condition就是其中最為強大的類之一。

acquire(): 線程鎖
release(): 釋放鎖
wait(timeout): 線程掛起,直到收到一個notify通知或者超時(可選的,浮點數,單位是秒s)才會被喚醒繼續運行。wait()必須在已獲得Lock前提下才能調用,否則會觸發RuntimeError。
notify(n=1): 通知其他線程,那些掛起的線程接到這個通知之后會開始運行,默認是通知一個正等待該condition的線程,最多則喚醒n個等待的線程。notify()必須在已獲得Lock前提下才能調用,否則會觸發RuntimeError。notify()不會主動釋放Lock。
notifyAll(): 如果wait狀態線程比較多,notifyAll的作用就是通知所有線程

 改下上面的代碼:

import threading
from time import *

class MyThread(threading.Thread):
    def __init__(self,counter,name,con):
        threading.Thread.__init__(self)  
        self.counter = counter
        self.name = name
        self.con = con

    def run(self):
        self.con.acquire()
        self.counter[0] += 1
        sleep(1)
        print self.counter[0]
        con.notify()        
        con.wait()
        self.con.release()

if __name__ == '__main__':
    counter = [0]
    con = threading.Condition()
    for i in range(1,100):
        t = MyThread(counter,i,con)
        t.start()

 

 

 

注意釋放鎖relase是必要的,不然會出現死鎖的現象。

 

信號量(BoundedSemaphore類)

互斥鎖同時只允許一個線程更改數據,而Semaphore信號量是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去。

import threading
from time import *

class MyThread(threading.Thread):
    def __init__(self,counter,name):
        threading.Thread.__init__(self)  
        self.counter = counter
        self.name = name


    def run(self):
        semaphore.acquire()
        self.counter[0] += 1
        sleep(1)
        print self.counter[0]
        semaphore.release()

if __name__ == '__main__':
    counter = [0]
    semaphore = threading.BoundedSemaphore(5) 
    for i in range(1,100):
        t = MyThread(counter,i)
        t.start()

 

事件(Event類)

python線程的事件用於主線程控制其他線程的執行,事件是一個簡單的線程同步對象,其主要提供以下幾個方法:

方法 注釋
clear 將flag設置為“False”
set 將flag設置為“True”
is_set 判斷是否設置了flag
wait 會一直監聽flag,如果沒有檢測到flag就一直處於阻塞狀態

事件處理的機制:全局定義了一個“Flag”,當flag值為“False”,那么event.wait()就會阻塞,當flag值為“True”,那么event.wait()便不再阻塞。

#利用Event類模擬紅綠燈
import threading
import time

event = threading.Event()


def lighter():
    count = 0
    event.set()     #初始值為綠燈
    while True:
        if 5 < count <=10 :
            event.clear()  # 紅燈,清除標志位
            print("\33[41;1mred light is on...\033[0m")
        elif count > 10:
            event.set()  # 綠燈,設置標志位
            count = 0
        else:
            print("\33[42;1mgreen light is on...\033[0m")

        time.sleep(1)
        count += 1

def car(name):
    while True:
        if event.is_set():      #判斷是否設置了標志位
            print("[%s] running..."%name)
            time.sleep(1)
        else:
            print("[%s] sees red light,waiting..."%name)
            event.wait()
            print("[%s] green light is on,start going..."%name)

light = threading.Thread(target=lighter,)
light.start()

car = threading.Thread(target=car,args=("MINI",))
car.start()

 

定時器(Timer類)

定時器,指定n秒后執行某操作

from threading import Timer
 
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

 

0x03 python隊列代碼實現

Queue隊列

Queue用於建立和操作隊列,常和threading類一起用來建立一個簡單的線程隊列。

隊列有很多種,根據進出順序來分類,可以分成

    Queue.Queue(maxsize)  FIFO(先進先出隊列)

    Queue.LifoQueue(maxsize)  LIFO(先進后出隊列)

    Queue.PriorityQueue(maxsize)  為優先級越高的越先出來,對於一個隊列中的所有元素組成的entries,優先隊列優先返回的一個元素是sorted(list(entries))[0]。至於對於一般的數據,優先隊列取什么東西作為優先度要素進行判斷,官方文檔給出的建議是一個tuple如(priority, data),取priority作為優先度。

    如果設置的maxsize小於1,則表示隊列的長度無限長

 

FIFO是常用的隊列,其一些常用的方法有:

    

Queue.qsize()  返回隊列大小

Queue.empty()  判斷隊列是否為空

Queue.full()  判斷隊列是否滿了

Queue.get([block[,timeout]])  從隊列頭刪除並返回一個item,block默認為True,表示當隊列為空卻去get的時候會阻塞線程,等待直到有有item出現為止來get出這個item。如果是False的話表明當隊列為空你卻去get的時候,會引發異常。在block為True的情況下可以再設置timeout參數。表示當隊列為空,get阻塞timeout指定的秒數之后還沒有get到的話就引發Full異常。

Queue.put(...[,block[,timeout]])  向隊尾插入一個item,同樣若block=True的話隊列滿時就阻塞等待有空位出來再put,block=False時引發異常。同get的timeout,put的timeout是在block為True的時候進行超時設置的參數。

Queue.task_done()  從場景上來說,處理完一個get出來的item之后,調用task_done將向隊列發出一個信號,表示本任務已經完成

Queue.join()  監視所有item並阻塞主線程,直到所有item都調用了task_done之后主線程才繼續向下執行。這么做的好處在於,假如一個線程開始處理最后一個任務,它從任務隊列中拿走最后一個任務,此時任務隊列就空了但最后那個線程還沒處理完。當調用了join之后,主線程就不會因為隊列空了而擅自結束,而是等待最后那個線程處理完成了。

 

結合threading和Queue可以構建出一個簡單的生產者-消費者模型,注意,線程隊列的意義並不是進一步提高運行效率而是使線程的並發更加有組織。新線程想要加入隊列開始執行,必須等一個既存的線程完成之后才可以。舉個例子,比如

 

from threading import Thread
import queue, time

q = queue.Queue()


def consumer():
    while 1:
        res = q.get()
        time.sleep(2)
        print('消費者消費了\033[35m%s\033[0m' % res)
        q.task_done()


def producer_0():
    for i in range(5):
        q.put(i)
        print('生產者0生產了\033[35m%s\033[0m' % i)
    q.join()


def producer_1():
    for i in range(5):
        q.put(i)
        print('生產者1生產了\033[32m%s\033[0m' % i)
    q.join()


def producer_2():
    for i in range(5):
        q.put(i)
        print('生產者2生產了\033[33m%s\033[0m' % i)
    q.join()


if __name__ == '__main__':
    t0 = Thread(target=producer_0, )
    t1 = Thread(target=producer_1, )
    t2 = Thread(target=producer_2, )

    t0.start()
    t1.start()
    t2.start()
    consumer_t = Thread(target=consumer, )
    consumer_t.daemon = True
    consumer_t.start()
    t0.join()
    t1.join()
    t2.join()
    print('主線程~')

 

 

 

 

 

 

0x04 python之線程池實現

線城池

對於任務數量不斷增加的程序,每有一個任務就生成一個線程,最終會導致線程數量的失控。對於任務數量不端增加的程序,固定線程數量的線程池是必要的。

 

 

threadpool模塊

threadpool是一個比較老的模塊了,支持py2 和 py3 。

import threadpool
import time

def sayhello (a):
    print("hello: "+a)
    time.sleep(2)

def main():
    global result
    seed=["a","b","c"]
    start=time.time()
    task_pool=threadpool.ThreadPool(5)
    requests=threadpool.makeRequests(sayhello,seed)
    for req in requests:
        task_pool.putRequest(req)
    task_pool.wait()
    end=time.time()
    time_m = end-start
    print("time: "+str(time_m))
    start1=time.time()
    for each in seed:
        sayhello(each)
    end1=time.time()
    print("time1: "+str(end1-start1))

if __name__ == '__main__':
    main(

 

 

concurrent.futures模塊

from concurrent.futures import ThreadPoolExecutor
import time

import time
from concurrent.futures import ThreadPoolExecutor, wait, as_completed

ll = []
def sayhello(a):
    print("hello: "+a)
    ll.append(a)
    time.sleep(0.8)

def main():
    seed=["a","b","c","e","f","g","h"]
    start1=time.time()
    for each in seed:
        sayhello(each)
    end1=time.time()
    print("time1: "+str(end1-start1))
    start2=time.time()
    with ThreadPoolExecutor(2) as executor:
        for each in seed:
            executor.submit(sayhello,each)
    end2=time.time()
    print("time2: "+str(end2-start2))

def main2():
    seed = ["a", "b", "c", "e", "f", "g", "h"]
    executor = ThreadPoolExecutor(max_workers=10)
    f_list = []
    for each in seed:
        future = executor.submit(sayhello, each)
        f_list.append(future)
    wait(f_list)
    print(ll)
    print('主線程結束')


def main3():
    seed = ["a", "b", "c", "e", "f", "g", "h"]
    with ThreadPoolExecutor(max_workers=2) as executor:
        f_list = []
        for each in seed:
            future = executor.submit(sayhello, each)
            f_list.append(future)
        wait(f_list,return_when='ALL_COMPLETED')
        print(ll)
        print('主線程結束')

if __name__ == '__main__':
    main3()

 

 

 

vthread模塊

import vthread
pool_1 = vthread.pool(5,gqueue=1) # open a threadpool with 5 threads named 1
pool_2 = vthread.pool(2,gqueue=2) # open a threadpool with 2 threads named 2

@pool_1
def foolfunc1(num):
    time.sleep(1)
    print(f"foolstring1, test3 foolnumb1:{num}")

@pool_2
def foolfunc2(num):
    time.sleep(1)
    print(f"foolstring2, test3 foolnumb2:{num}")

@pool_2
def foolfunc3(num):
    time.sleep(1)
    print(f"foolstring3, test3 foolnumb3:{num}")

for i in range(10): foolfunc1(i)
for i in range(4): foolfunc2(i)
for i in range(2): foolfunc3(i)

 

 

 

 

 

0x05 python多進程並行實現

 

前面也說了python多線程的弊端和GIL的內容,適合IO密集型,而如果解決計算密集型時候的多線程呢?那就是多進程。

每個進程的GIL互不影響,多進程來並行編程。

 

 

multiprocessing模塊

python中多線程無法利用多核優勢,如果想要充分地使用多核cpu的資源(os.cpu_count()),在python中大部分情況需要使用多進程,python提供了multiprocessing。

multiprocessing並非是python的一個模塊,而是python中多進程管理的一個包

multiprocessing模塊用來開啟子進程,並在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。

multiprocessing模塊的功能眾多:支持子進程,通信和共享數據,執行不同形式的同步,提供了process、Queue、Lock等組件。

需要再次強調的一點是:與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限與該進程內。

 

 

process類

創建進程的類:

Process([group [, target [, name [, args [, kwargs]]]]])

一些創建process類的參數

roup參數未使用,值始終為None
 
target表示調用對象,即子進程要執行的任務

args表示調用對象的位置參數元組,args=(1,2,'egon',)

kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}

name為子進程的名稱

 

簡單創建進程:

import multiprocessing

def worker(num):
    """thread worker function"""
    print('Worker:', num)
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

 

當前進程名:

multiprocessing.current_process().name

 

守護進程:

 mutilprocess.setDaemon(True)

 

一些常用的函數

p.start():啟動進程,並調用該子進程中的p.run() 
p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法  

p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖
p.is_alive():如果p仍然運行,返回True

p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程

 

一些屬性:

p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置

p.name:進程的名稱

p.pid:進程的pid

p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)

p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)

 

開啟子進程例子:

#開進程的方法一:
import time
import random
from multiprocessing import Process
def piao(name):
    print('%s piaoing' %name)
    time.sleep(random.randrange(1,5))
    print('%s piao end' %name)



p1=Process(target=piao,args=('egon',)) #必須加,號
p2=Process(target=piao,args=('alex',))
p3=Process(target=piao,args=('wupeqi',))
p4=Process(target=piao,args=('yuanhao',))

p1.start()
p2.start()
p3.start()
p4.start()
print('主線程')

 

join(),當某個進程fork一個子進程后,該進程必須要調用wait等待子進程結束發送的sigchld信號,對子進程進行資源回收等相關工作,否則,子進程會成為僵死進程,被init收養。所以,在multiprocessing.Process實例化一個對象之后,該對象有必要調用join方法,因為在join方法中完成了對底層wait的處理。

from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)


p=Piao('egon')
p.start()
p.join(0.0001) #等待p停止,等0.0001秒就不再等了
print('開始')

#join:主進程等,等待子進程結束

#join:主進程等,等待子進程結束

 

創建守護進程例子:

from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)


p=Piao('egon')
p.daemon=True #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程代碼執行結束,p即終止運行
p.start()
print('')

 

 

 

 

完畢。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM