Python多進程與多線程編程及GIL詳解


介紹如何使用python的multiprocess和threading模塊進行多線程和多進程編程。

Python的多進程編程與multiprocess模塊

python的多進程編程主要依靠multiprocess模塊。我們先對比兩段代碼,看看多進程編程的優勢。我們模擬了一個非常耗時的任務,計算8的20次方,為了使這個任務顯得更耗時,我們還讓它sleep 2秒。第一段代碼是單進程計算(代碼如下所示),我們按順序執行代碼,重復計算2次,並打印出總共耗時。

import time
import os
def long_time_task():
    print('當前進程: {}'.format(os.getpid()))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))

if __name__ == "__main__":
    print('當前母進程: {}'.format(os.getpid()))
    start = time.time()
    for i in range(2):
        long_time_task()

    end = time.time()
    print("用時{}秒".format((end-start)))

輸出結果如下,總共耗時4秒,至始至終只有一個進程14236。看來電腦計算8的20次方基本不費時。

當前母進程: 14236
當前進程: 14236
結果: 1152921504606846976
當前進程: 14236
結果: 1152921504606846976
用時4.01080060005188秒

第2段代碼是多進程計算代碼。我們利用multiprocess模塊的Process方法創建了兩個新的進程p1和p2來進行並行計算。Process方法接收兩個參數, 第一個是target,一般指向函數名,第二個時args,需要向函數傳遞的參數。對於創建的新進程,調用start()方法即可讓其開始。我們可以使用os.getpid()打印出當前進程的名字。

from multiprocessing import Process
import os
import time
def long_time_task(i):
    print('子進程: {} - 任務{}'.format(os.getpid(), i))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))
if __name__=='__main__':
    print('當前母進程: {}'.format(os.getpid()))
    start = time.time()
    p1 = Process(target=long_time_task, args=(1,))
    p2 = Process(target=long_time_task, args=(2,))
    print('等待所有子進程完成。')
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

輸出結果如下所示,耗時變為2秒,時間減了一半,可見並發執行的時間明顯比順序執行要快很多。你還可以看到盡管我們只創建了兩個進程,可實際運行中卻包含里1個母進程和2個子進程。之所以我們使用join()方法就是為了讓母進程阻塞,等待子進程都完成后才打印出總共耗時,否則輸出時間只是母進程執行的時間。

當前母進程: 6920
等待所有子進程完成。
子進程: 17020 - 任務1
子進程: 5904 - 任務2
結果: 1152921504606846976
結果: 1152921504606846976
總共用時2.131091356277466秒

知識點:

  • 新創建的進程與進程的切換都是要耗資源的,所以平時工作中進程數不能開太大。

  • 同時可以運行的進程數一般受制於CPU的核數。

  • 除了使用Process方法,我們還可以使用Pool類創建多進程。

 

利用multiprocess模塊的Pool類創建多進程

很多時候系統都需要創建多個進程以提高CPU的利用率,當數量較少時,可以手動生成一個個Process實例。當進程數量很多時,或許可以利用循環,但是這需要程序員手動管理系統中並發進程的數量,有時會很麻煩。這時進程池Pool就可以發揮其功效了。可以通過傳遞參數限制並發進程的數量,默認值為CPU的核數。 

Pool類可以提供指定數量的進程供用戶調用,當有新的請求提交到Pool中時,如果進程池還沒有滿,就會創建一個新的進程來執行請求。如果池滿,請求就會告知先等待,直到池中有進程結束,才會創建新的進程來執行這些請求。 

下面介紹一下multiprocessing 模塊下的Pool類的幾個方法:

1.apply_async

函數原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

其作用是向進程池提交需要執行的函數及參數, 各個進程采用非阻塞(異步)的調用方式,即每個子進程只管運行自己的,不管其它進程是否已經完成。

2.map()

函數原型:map(func, iterable[, chunksize=None])

Pool類中的map方法,與內置的map函數用法行為基本一致,它會使進程阻塞直到結果返回。 注意:雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒后,程序才會運行子進程。

3.map_async()

函數原型:map_async(func, iterable[, chunksize[, callback]])
與map用法一致,但是它是非阻塞的。其有關事項見apply_async。

4.close()

關閉進程池(pool),使其不在接受新的任務。

5. terminate()

結束工作進程,不在處理未處理的任務。

6.join()

主進程阻塞等待子進程的退出, join方法要在close或terminate之后使用。

 

下例是一個簡單的multiprocessing.Pool類的實例。因為小編我的CPU是4核的,一次最多可以同時運行4個進程,所以我開啟了一個容量為4的進程池。4個進程需要計算5次,你可以想象4個進程並行4次計算任務后,還剩一次計算任務(任務4)沒有完成,系統會等待4個進程完成后重新安排一個進程來計算。

from multiprocessing import Pool, cpu_count
import os
import time
def long_time_task(i):
    print('子進程: {} - 任務{}'.format(os.getpid(), i))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))
if __name__=='__main__':
    print("CPU內核數:{}".format(cpu_count()))
    print('當前母進程: {}'.format(os.getpid()))
    start = time.time()
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('等待所有子進程完成。')
    p.close()
    p.join()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

知識點:  

  • 對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close()或terminate()方法,讓其不再接受新的Process了。

 輸出結果如下所示,5個任務(每個任務大約耗時2秒)使用多進程並行計算只需4.37秒,, 耗時減少了60%。

CPU內核數:4
當前母進程: 2556
等待所有子進程完成。
子進程: 16480 - 任務0
子進程: 15216 - 任務1
子進程: 15764 - 任務2
子進程: 10176 - 任務3
結果: 1152921504606846976
結果: 1152921504606846976
子進程: 15216 - 任務4
結果: 1152921504606846976
結果: 1152921504606846976
結果: 1152921504606846976
總共用時4.377134561538696秒

 相信大家都知道python解釋器中存在GIL(全局解釋器鎖), 它的作用就是保證同一時刻只有一個線程可以執行代碼。由於GIL的存在,很多人認為python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。然而這並意味着python多線程編程沒有意義哦,請繼續閱讀下文。

  多進程間的數據共享與通信

 通常,進程之間是相互獨立的,每個進程都有獨立的內存。通過共享內存(nmap模塊),進程之間可以共享對象,使多個進程可以訪問同一個變量(地址相同,變量名可能不同)。多進程共享資源必然會導致進程間相互競爭,所以應該盡最大可能防止使用共享狀態。還有一種方式就是使用隊列queue來實現不同進程間的通信或數據共享,這一點和多線程編程類似。

from multiprocessing import Process, Queue
import os, time, random
# 寫數據進程執行的代碼:
def write(q):
    print('Process to write: {}'.format(os.getpid()))
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())
# 讀數據進程執行的代碼:
def read(q):
    print('Process to read:{}'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)
if __name__=='__main__':
    # 父進程創建Queue,並傳給各個子進程:
   q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啟動子進程pw,寫入:
    pw.start()
    # 啟動子進程pr,讀取:
    pr.start()
    # 等待pw結束:
    pw.join()
    # pr進程里是死循環,無法等待其結束,只能強行終止:
    pr.terminate()

下例這段代碼中中創建了2個獨立進程,一個負責寫(pw), 一個負責讀(pr), 實現了共享一個隊列queue。

輸出結果如下所示:

Process to write: 3036
Put A to queue...
Process to read:9408
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

 Python的多線程編程與threading模塊

 python 3中的多進程編程主要依靠threading模塊。創建新線程與創建新進程的方法非常類似。threading.Thread方法可以接收兩個參數, 第一個是target,一般指向函數名,第二個時args,需要向函數傳遞的參數。對於創建的新線程,調用start()方法即可讓其開始。我們還可以使用current_thread().name打印出當前線程的名字。 下例中我們使用多線程技術重構之前的計算代碼。

import threading
import time
def long_time_task(i):
    print('當前子線程: {} - 任務{}'.format(threading.current_thread().name, i))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))
if __name__=='__main__':
    start = time.time()
    print('這是主線程:{}'.format(threading.current_thread().name))
    t1 = threading.Thread(target=long_time_task, args=(1,))
    t2 = threading.Thread(target=long_time_task, args=(2,))
    t1.start()
    t2.start()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

下面是輸出結果。為什么總耗時居然是0秒? 我們可以明顯看到主線程和子線程其實是獨立運行的,主線程根本沒有等子線程完成,而是自己結束后就打印了消耗時間。主線程結束后,子線程仍在獨立運行,這顯然不是我們想要的。

這是主線程:MainThread
當前子線程: Thread-1 - 任務1
當前子線程: Thread-2 - 任務2
總共用時0.0017192363739013672秒
結果: 1152921504606846976
結果: 1152921504606846976

如果要實現主線程和子線程的同步,我們必需使用join方法(代碼如下所示)。

import threading
import time
def long_time_task(i):
    print('當前子線程: {} 任務{}'.format(threading.current_thread().name, i))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))
if __name__=='__main__':
    start = time.time()
    print('這是主線程:{}'.format(threading.current_thread().name))
    thread_list = []
    for i in range(1, 3):
        t = threading.Thread(target=long_time_task, args=(i, ))
        thread_list.append(t)
    for t in thread_list:
        t.start()
    for t in thread_list:
        t.join()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

修改代碼后的輸出如下所示。這時你可以看到主線程在等子線程完成后才答應出總消耗時間(2秒),比正常順序執行代碼(4秒)還是節省了不少時間。

這是主線程:MainThread
當前子線程: Thread - 1 任務1
當前子線程: Thread - 2 任務2
結果: 1152921504606846976
結果: 1152921504606846976
總共用時2.0166890621185303秒

當我們設置多線程時,主線程會創建多個子線程,在python中,默認情況下主線程和子線程獨立運行互不干涉。如果希望讓主線程等待子線程實現線程的同步,我們需要使用join()方法。如果我們希望一個主線程結束時不再執行子線程,我們應該怎么辦呢? 我們可以使用t.setDaemon(True),代碼如下所示。

import threading
import time
def long_time_task():
    print('當子線程: {}'.format(threading.current_thread().name))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))
if __name__=='__main__':
    start = time.time()
    print('這是主線程:{}'.format(threading.current_thread().name))
    for i in range(5):
        t = threading.Thread(target=long_time_task, args=())
        t.setDaemon(True)
        t.start()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

通過繼承Thread類重寫run方法創建新線程

 除了使用Thread()方法創建新的線程外,我們還可以通過繼承Thread類重寫run方法創建新的線程,這種方法更靈活。下例中我們自定義的類為MyThread, 隨后我們通過該類的實例化創建了2個子線程。

#-*- encoding:utf-8 -*-
import threading
import time
def long_time_task(i):
    time.sleep(2)
    return 8**20
class MyThread(threading.Thread):
    def __init__(self, func, args , name='', ):
        threading.Thread.__init__(self)
        self.func = func
        self.args = args
        self.name = name
        self.result = None
    def run(self):
        print('開始子進程{}'.format(self.name))
        self.result = self.func(self.args[0],)
        print("結果: {}".format(self.result))
        print('結束子進程{}'.format(self.name))
if __name__=='__main__':
    start = time.time()
    threads = []
    for i in range(1, 3):
        t = MyThread(long_time_task, (i,), str(i))
        threads.append(t)
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

輸出結果如下所示:

開始子進程1
開始子進程2
結果: 1152921504606846976
結果: 1152921504606846976
結束子進程1
結束子進程2
總共用時2.005445718765259秒

 不同線程間的數據共享

一個進程所含的不同線程間共享內存,這就意味着任何一個變量都可以被任何一個線程修改,因此線程之間共享數據最大的危險在於多個線程同時改一個變量,把內容給改亂了。如果不同線程間有共享的變量,其中一個方法就是在修改前給其上一把鎖lock,確保一次只有一個線程能修改它。threading.lock()方法可以輕易實現對一個共享變量的鎖定,修改完后release供其它線程使用。比如下例中賬戶余額balance是一個共享變量,使用lock可以使其不被改亂。

# -*- coding: utf-8 -*
import threading
class Account:
    def __init__(self):
        self.balance = 0
    def add(self, lock):
        # 獲得鎖
        lock.acquire()
        for i in range(0, 100000):
            self.balance += 1
        # 釋放鎖
        lock.release()
    def delete(self, lock):
        # 獲得鎖
        lock.acquire()
        for i in range(0, 100000):
            self.balance -= 1
            # 釋放鎖
        lock.release()
if __name__ == "__main__":
    account = Account()
    lock = threading.Lock()
    # 創建線程
   thread_add = threading.Thread(target=account.add, args=(lock,), name='Add')
    thread_delete = threading.Thread(target=account.delete, args=(lock,), name='Delete')
    # 啟動線程
   thread_add.start()
    thread_delete.start()
    # 等待線程結束
   thread_add.join()
    thread_delete.join()
    print('The final balance is: {}'.format(account.balance))

 

另一種實現不同線程間數據共享的方法就是使用消息隊列queue。不像列表,queue是線程安全的,可以放心使用,見下文。

 使用queue隊列通信-經典的生產者和消費者模型

下例中創建了兩個線程,一個負責生成,一個負責消費,所生成的產品存放在queue里,實現了不同線程間溝通。

from queue import Queue
import random, threading, time
# 生產者類
class Producer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self, name=name)
        self.queue = queue
    def run(self):
        for i in range(1, 5):
            print("{} is producing {} to the queue!".format(self.getName(), i))
            self.queue.put(i)
            time.sleep(random.randrange(10) / 5)
        print("%s finished!" % self.getName())
# 消費者類
class Consumer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self, name=name)
        self.queue = queue

    def run(self):
        for i in range(1, 5):
            val = self.queue.get()
            print("{} is consuming {} in the queue.".format(self.getName(), val))
            time.sleep(random.randrange(10))
        print("%s finished!" % self.getName())
def main():
    queue = Queue()
    producer = Producer('Producer', queue)
    consumer = Consumer('Consumer', queue)
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
    print('All threads finished!')
if __name__ == '__main__':
    main()

隊列queue的put方法可以將一個對象obj放入隊列中。如果隊列已滿,此方法將阻塞至隊列有空間可用為止。queue的get方法一次返回隊列中的一個成員。如果隊列為空,此方法將阻塞至隊列中有成員可用為止。queue同時還自帶emtpy(), full()等方法來判斷一個隊列是否為空或已滿,但是這些方法並不可靠,因為多線程和多進程,在返回結果和使用結果之間,隊列中可能添加/刪除了成員。

 Python多進程和多線程哪個快?

 由於GIL的存在,很多人認為Python多進程編程更快,針對多核CPU,理論上來說也是采用多進程更能有效利用資源。網上很多人已做過比較,我直接告訴你結論吧。

  • 對CPU密集型代碼(比如循環計算) - 多進程效率更高

  • 對IO密集型代碼(比如文件操作,網絡爬蟲) - 多線程效率更高。 

為什么是這樣呢?其實也不難理解。對於IO密集型操作,大部分消耗時間其實是等待時間,在等待時間中CPU是不需要工作的,那你在此期間提供雙CPU資源也是利用不上的,相反對於CPU密集型代碼,2個CPU干活肯定比一個CPU快很多。那么為什么多線程會對IO密集型代碼有用呢?這因是為python碰到等待會釋放GIL供新的線程使用,實現了線程間的切換。

 

GIL是什么

首先需要明確的一點是GIL並不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標准,但是可以用不同的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因為CPython是大部分環境下默認的Python執行環境。所以在很多人的概念里CPython就是Python,也就想當然的把GIL歸結為Python語言的缺陷。所以這里要先明確一點:GIL並不是Python的特性,Python完全可以不依賴於GIL。

GIL: 一個防止多線程並發執行機器碼的一個Mutex,乍一看就是個BUG般存在的全局鎖嘛!別急,我們下面慢慢的分析。

為什么會有GIL

由於物理上得限制,各CPU廠商在核心頻率上的比賽已經被多核所取代。為了更有效的利用多核處理器的性能,就出現了多線程的編程方式,而隨之帶來的就是線程間數據一致性和狀態同步的困難。即使在CPU內部的Cache也不例外,為了有效解決多份緩存之間的數據同步時各廠商花費了不少心思,也不可避免的帶來了一定的性能損失。

Python當然也逃不開,為了利用多核,Python開始支持多線程。而解決多線程之間數據完整性和狀態同步的最簡單方法自然就是加鎖。 於是有了GIL這把超級大鎖,而當越來越多的代碼庫開發者接受了這種設定后,他們開始大量依賴這種特性(即默認python內部對象是thread-safe的,無需在實現時考慮額外的內存鎖和同步操作)。

慢慢的這種實現方式被發現是蛋疼且低效的。但當大家試圖去拆分和去除GIL的時候,發現大量庫代碼開發者已經重度依賴GIL而非常難以去除了。有多難?做個類比,像MySQL這樣的“小項目”為了把Buffer Pool Mutex這把大鎖拆分成各個小鎖也花了從5.5到5.6再到5.7多個大版為期近5年的時間,本且仍在繼續。MySQL這個背后有公司支持且有固定開發團隊的產品走的如此艱難,那又更何況Python這樣核心開發和代碼貢獻者高度社區化的團隊呢?

所以簡單的說GIL的存在更多的是歷史原因。如果推到重來,多線程的問題依然還是要面對,但是至少會比目前GIL這種方式會更優雅。

 GIL的影響

從上文的介紹和官方的定義來看,GIL無疑就是一把全局排他鎖。毫無疑問全局鎖的存在會對多線程的效率有不小影響。甚至就幾乎等於Python是個單線程的程序。
那么讀者就會說了,全局鎖只要釋放的勤快效率也不會差啊。只要在進行耗時的IO操作的時候,能釋放GIL,這樣也還是可以提升運行效率的嘛。或者說再差也不會比單線程的效率差吧。理論上是這樣,而實際上呢?Python比你想的更糟。

下面我們就對比下Python在多線程和單線程下得效率對比。測試方法很簡單,一個循環1億次的計數器函數。一個通過單線程執行兩次,一個多線程執行。最后比較執行總時間。測試環境為雙核的Mac pro。注:為了減少線程庫本身性能損耗對測試結果帶來的影響,這里單線程的代碼同樣使用了線程。只是順序的執行兩次,模擬單線程。

順序執行的單線程(single_thread.py)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM