Python作為一種解釋型語言,由於使用了全局解釋鎖(GIL)的原因,其代碼不能同時在多核CPU上並發的運行。這也導致在Python中使用多線程編程並不能實現並發,我們得使用其他的方法在Python中實現並發編程。
一、全局解釋鎖(GIL)
Python中不能通過使用多線程實現並發編程主要是因為全局解釋鎖的機制,所以首先解釋一下全局解釋鎖的概念。
首先,我們知道C++和Java是編譯型語言,而Python則是一種解釋型語言。對於Python程序來說,它是直接被輸入到解釋器中直接運行的。解釋器在程序執行之前對其並不了解;它所知道的只是Python的規則,以及在執行過程中怎樣去動態的應用這些規則。它也有一些優化,但是這基本上只是另一個級別的優化。由於解釋器沒法很好的對程序進行推導,Python的大部分優化其實是解釋器自身的優化。更快的解釋器自然意味着程序的運行也能“免費”的更快。也就是說,解釋器優化后,Python程序不用做修改就可以享受優化后的好處。
為了利用多核系統,Python必須支持多線程運行。但作為解釋型語言,Python的解釋器需要做到既安全又高效。解釋器要注意避免在不同的線程操作內部共享的數據,同時還要保證在管理用戶線程時保證總是有最大化的計算資源。為了保證不同線程同時訪問數據時的安全性,Python使用了全局解釋器鎖(GIL)的機制。從名字上我們很容易明白,它是一個加在解釋器上的全局(從解釋器的角度看)鎖(從互斥或者類似角度看)。這種方式當然很安全,但它也意味着:對於任何Python程序,不管有多少的處理器,任何時候都總是只有一個線程在執行。即:只有獲得了全局解釋器鎖的線程才能操作Python對象或者調用Python/C API函數。
所以,在Python中”不要使用多線程,請使用多進程”。具體來說,如果你的代碼是IO密集型的,使用多線程或者多進程都是可以的,多進程比線程更易用,但是會消耗更多的內存;如果你的代碼是CPU密集型的,多進程(multiprocessing模塊)就明顯是更好的選擇——特別是所使用的機器是多核或多CPU的時候。
另外,Python的官方實現CPython帶有GIL,但並不是所有的Python實現版本都是這樣的。IronPython,Jython,還有使用.NET框架實現的Python就沒有GIL。所以如果你不能忍受GIL,也可以嘗試用一下其他實現版本的Python。

如果是一個計算型的任務,GIL就會讓多線程變慢。我們舉個計算斐波那契數列的例子:
import time import threading def text(name): def profile(func): def wrapper(*args,**kwargs): start = time.time() res = func(*args,**kwargs) end = time.time() print('{} cost:{}'.format(name,end-start)) return res return wrapper return profile def fib(n): if n <= 2: return 1 return fib(n-1) + fib(n-2) @text('nothread') def nothread(): fib(35) fib(35) @text('hasthread') def hasthread(): for i in range(2): t = threading.Thread(target=fib,args=(35,)) t.start() main_thread = threading.current_thread() for t in threading.enumerate(): if t is main_thread: continue t.join() nothread() hasthread() ##輸出結果### nothread cost:6.141353607177734 hasthread cost:6.15336275100708
這種情況還不如不用多線程!
GIL是必須的,這是Python設計的問題:Python解釋器是非線程安全的。這意味着當從線程內嘗試安全的訪問Python對象的時候將有一個全局的強制鎖。 在任何時候,僅僅一個單一的線程能夠獲取Python對象或者C API。每100個字節的Python指令解釋器將重新獲取鎖,這(潛在的)阻塞了I/O操作。因為鎖,CPU密集型的代碼使用線程庫時,不會獲得性能的提高。
那是不是由於GIL的存在,多線程庫就是個「雞肋」呢?當然不是。事實上我們平時會接觸非常多的和網絡通信或者數據輸入/輸出相關的程序,比如網絡爬蟲、文本處理等等。這時候由於網絡情況和I/O的性能的限制,Python解釋器會等待讀寫數據的函數調用返回,這個時候就可以利用多線程庫提高並發效率了。
2.同步機制
A. Semaphore(信號量)
在多線程編程中,為了防止不同的線程同時對一個公用的資源(比如全部變量)進行修改,需要進行同時訪問的數量(通常是1)的限制。信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。
import time from random import random from threading import Thread,Semaphore,current_thread,enumerate sema = Semaphore(3) def foo(tid): with sema: print('{} acquire sema'.format(tid)) wt = random() * 2 time.sleep(wt) print('{} release sema'.format(tid)) for i in range(5): t = Thread(target=foo,args=(i,)) t.start() main_thread = current_thread() for t in enumerate(): if t is main_thread: continue t.join() ####輸出結果##### 0 acquire sema 1 acquire sema 2 acquire sema 0 release sema 3 acquire sema 1 release sema 4 acquire sema 2 release sema 3 release sema 4 release sema
B. Lock(互斥鎖)
Lock也可以叫做互斥鎖,其實相當於信號量為1。我們先看一個不加鎖的例子:
import time import threading value = 0 def getlock(): global value new = value + 1 time.sleep(0.001) # 讓線程有機會切換 value = new for i in range(100): t = threading.Thread(target=getlock) t.start() main_thread = threading.current_thread() for t in threading.enumerate(): if t == main_thread: continue t.join() print(value) ####輸出結果##### 不確定(刷新值會發生改變)
現在,我們來看看加鎖之后的情況:
import time import threading value = 0 lock = threading.Lock() def getlock(): global value with lock: new = value + 1 time.sleep(0.001) # 讓線程有機會切換 value = new for i in range(100): t = threading.Thread(target=getlock) t.start() main_thread = threading.current_thread() for t in threading.enumerate(): if t == main_thread: continue t.join() print(value) ####輸出結果為############# 100
我們對value的自增加了鎖,就可以保證了結果了。
3. RLock(遞歸鎖)
先來說說死鎖,所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程。
import threading import time mutexA = threading.Lock() mutexB = threading.Lock() class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): self.fun1() self.fun2() def fun1(self): mutexA.acquire() # 如果鎖被占用,則阻塞在這里,等待鎖的釋放 print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) mutexB.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) mutexB.release() mutexA.release() def fun2(self): mutexB.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) time.sleep(0.2) mutexA.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) mutexA.release() mutexB.release() if __name__ == "__main__": print("start---------------------------%s"%time.time()) for i in range(0, 10): my_thread = MyThread() my_thread.start()
解決方案:
import threading import time mutex = threading.RLock() class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): self.fun1() self.fun2() def fun1(self): mutex.acquire() # 如果鎖被占用,則阻塞在這里,等待鎖的釋放 print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) mutex.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) mutex.release() mutex.release() def fun2(self): mutex.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) time.sleep(0.2) mutex.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) mutex.release() mutex.release() if __name__ == "__main__": print("start---------------------------%s"%time.time()) for i in range(0, 10): my_thread = MyThread() my_thread.start()
遞歸鎖內部維護了一個計數器,當有線程拿到了Lock以后,這個計數器會自動加1,只要這計數器的值大於0,那么其他線程就不能搶到改鎖,這就保證了,在同一時刻,僅有一個線程使用該鎖,從而避免了死鎖的方法。關於遞歸鎖內部實現,有興趣的可以看看源碼。
4. Condition(條件)
一個線程等待特定條件,而另一個線程發出特定條件滿足的信號。最好說明的例子就是「生產者/消費者」模型:
import time import threading def consumer(cond): t = threading.current_thread() with cond: cond.wait() # 創建了一個鎖,等待producer解鎖 print('{}: Resource is available to consumer'.format(t.name)) def producer(cond): t = threading.current_thread() with cond: print('{}:Making resource available'.format(t.name)) cond.notifyAll() # 釋放鎖,喚醒消費者 condition = threading.Condition() c1 = threading.Thread(name='c1',target=consumer,args=(condition,)) p = threading.Thread(name='p',target=producer,args=(condition,)) c2 = threading.Thread(name='c2',target=consumer,args=(condition,)) c1.start() time.sleep(1) c2.start() time.sleep(1) p.start()
5. Event
一個線程發送/傳遞事件,另外的線程等待事件的觸發。我們同樣的用「生產者/消費者」模型的例子:
import time import threading from random import randint TIMEOUT = 2 def consumer(event, l): t = threading.currentThread() while 1: event_is_set = event.wait(TIMEOUT) if event_is_set: try: integer = l.pop() print('{} popped from list by {}'.format(integer,t.name)) event.clear() # 重置狀態 except IndexError: pass def producer(event, l): t = threading.currentThread() while 1: integer = randint(10,100) l.append(integer) print('{} append to list by {}'.format(integer, t.name)) event.set() time.sleep(1) event = threading.Event() l = [] threads = [] p = threading.Thread(name='producer1', target=producer, args=(event, l)) p.start() threads.append(p) for name in ('consumer1','consumer2'): t = threading.Thread(target=consumer, name=name, args=(event, l)) t.start() threads.append(t) for t in threads: t.join() print('ending')
可以看到事件被2個消費者比較平均的接收並處理了。如果使用了wait方法,線程就會等待我們設置事件,這也有助於保證任務的完成。
6. Queue
隊列在並發開發中最常用的。我們借助「生產者/消費者」模式來理解:生產者把生產的「消息」放入隊列,消費者從這個隊列中對去對應的消息執行。
大家主要關心如下4個方法就好了:
-
put: 向隊列中添加一個消息。
-
get: 從隊列中刪除並返回一個消息。
-
task_done: 當某一項任務完成時調用。
-
join: 阻塞直到所有的項目都被處理完。
import time import threading import random import queue q = queue.Queue() def double(n): return n*2 def producer(): while 1: wt = random.randint(1,10) time.sleep(random.random()) q.put((double, wt)) def consumer(): while 1: task, arg = q.get() print(arg, task(arg)) q.task_done() for target in (producer, consumer): t = threading.Thread(target=target) t.start()
Queue模塊還自帶了PriorityQueue(帶有優先級)和LifoQueue(先進先出)2種特殊隊列。我們這里展示下線程安全的優先級隊列的用法,
PriorityQueue要求我們put的數據的格式是(priority_number, data),我們看看下面的例子:
import time import threading from random import randint import queue q = queue.PriorityQueue() def double(n): return n * 2 def producer(): count = 0 while 1: if count > 5: break prit = randint(0,100) print("put :{}".format(prit)) q.put((prit, double, prit)) # (優先級,函數,參數) count += 1 def consumer(): while 1: if q.empty(): break pri,task,arg = q.get() print('[PRI:{}] {} * 2 = {}'.format(pri,arg,task(arg))) q.task_done() time.sleep(0.1) t = threading.Thread(target=producer) t.start() time.sleep(1) t = threading.Thread(target=consumer) t.start()
7.線程池
面向對象開發中,大家知道創建和銷毀對象是很費時間的,因為創建一個對象要獲取內存資源或者其它更多資源。無節制的創建和銷毀線程是一種極大的浪費。那我們可不可以把執行完任務的線程不銷毀而重復利用呢?仿佛就是把這些線程放進一個池子,一方面我們可以控制同時工作的線程數量,一方面也避免了創建和銷毀產生的開銷。
import time import threading from random import random import queue def double(n): return n * 2 class Worker(threading.Thread): def __init__(self, queue): super(Worker, self).__init__() self._q = queue self.daemon = True self.start() def run(self): while 1: f, args, kwargs = self._q.get() try: print('USE:{}'.format(self.name)) print(f(*args, **kwargs)) except Exception as e: print(e) self._q.task_done() class ThreadPool(object): def __init__(self, max_num=5): self._q = queue.Queue(max_num) for _ in range(max_num): Worker(self._q) # create worker thread def add_task(self, f, *args, **kwargs): self._q.put((f, args, kwargs)) def wait_compelete(self): self._q.join() pool = ThreadPool() for _ in range(8): wt = random() pool.add_task(double, wt) time.sleep(wt) pool.wait_compelete()
