本文希望達到的目標:
- 多線程的基本認識
- 多線程編程的模塊和類的使用
- Cpython的全局解釋器鎖GIL
一、多線程的基本認識
多線程編程的目的:並行處理子任務,大幅度地提升整個任務的效率。
線程就是運行在進程上下文的邏輯流。每個線程都有自己的線程上下文,包含唯一的線程ID(就當前所屬進程而言),棧,棧指針,程序計數器,通用寄存器和條件碼等。
同一個進程中的線程,共享相同的運行環境,共享同一片數據空間,所以線程間的通訊筆進程間的通信更簡單,但是這樣的共享是會有危險的,如果多線程共同訪問同一數據,因為訪問順序的不同,可能會導致結果不一致。
二、多線程編程的模塊和類的使用
為了更好說明多線程的優點以及多個標准庫使用的差異性,以模擬“一個程序完成2個獨立任務時間總和”為例子。
0、單進程單線程運行兩個獨立的任務:順序執行,完成第一個任務后,再完成第二個任務。總時間是各個循環 運行時間之和,實際兩個任務是完全獨立的,如果並行執行,是可以減少運行時間的。
import thread from time import sleep,ctime def loop0(): print 'start loop0','at:',ctime() sleep(4) print 'loop0','done at:',ctime() def loop1(): print 'start loop1','at:',ctime() sleep(3) print 'loop1','done at:',ctime() def main(): print 'starting at:',ctime() loop0() loop1() print 'all done at:',ctime() if __name__=='__main__': main()
1、thread模塊
python提供了兩個標准庫用於多線程編程,thread模塊提供了基本的線程和鎖的支持,而 threading 提供了更高級別,功能更強的線程管理的功能。一般都建議使用threading模塊,畢竟功能更強大,更好管理。
thread模塊和對象:(官網:https://docs.python.org/2/library/thread.html)
使用多線程編程,創建兩個線程同時執行兩個獨立的任務,需要考慮,主線程執行時間和子線程執行時間的關系,如果單純的創建線程去運行這2個任務,主線程執行完成時間必然比子線程快,子線程未運行完,主線程就已經退出了,在thread模塊使用鎖對象lock來管理,為每個線程創建一個鎖對象,在線程執行完成后釋放鎖,而主線程判斷所有的鎖都釋放后才能結束,進程間的通訊機制就這樣簡單的建立起來。
import thread from time import sleep,ctime loops =[4,3] def loop(nloop,nsec,lock): print 'start loop',nloop,'at:',ctime() sleep(nsec) print 'loop',nloop,'done at:',ctime() lock.release() def main(): print 'starting at:',ctime() locks = [] nloops = range(len(loops)) for i in nloops: lock = thread.allocate_lock() lock.acquire() locks.append(lock) for i in nloops: thread.start_new_thread(loop,(i,loops[i],locks[i])) for i in nloops: print 'check lock' while locks[i].locked(): pass print 'all done at:',ctime() if __name__=='__main__': main()
運行時間為4s,單進程耗時7s,運行時間有減少。為什么不在創建鎖的循環里創建線程呢?有以下幾個原因:(1) 我 們想到實現線程的同步,所以要讓“所有的馬同時沖出柵欄”。(2) 獲取鎖要花一些時間,如果線程退出得“太快”,可能會導致還沒有獲得鎖,線程就已經結束了的情況。
注意:
A:子線程開始:創建對象調用start_new_thread函數時,該函數不是在主線程里運行, 而是產生一個新的線程來運行這個函數。一旦調用該函數,子線程已經開始運行。
B:子線程退出:它不支持守護線程。當主線程退出時,所有的子線程不論它們是否還在工作,都會被強行退出。
2、threading模塊 :創建一個 Thread 的實例,傳給它一個函數
它不僅提供了 Thread 類,還提供了各 種非常好用的同步機制。
threading模塊和對象:(官網:https://docs.python.org/2/library/threading.html)
import threading from time import sleep,ctime loops =[4,3] def loop(nloop,nsec): print 'start loop',nloop,'at:',ctime() sleep(nsec) print 'loop',nloop,'done at:',ctime() def main(): print 'starting at:',ctime() threads = [] nloops = range(len(loops)) for i in nloops: t = threading.Thread(target=loop,args=(i,loops[i])) threads.append(t) for i in nloops: print 'thread',i,'start' threads[i].start() for i in nloops: print 'thread',i,'join' threads[i].join() print 'all done at:',ctime() if __name__=='__main__': main()
注意:
A、子線程開始:調用start函數。所有的線程都創建了之后,再一起調用 start()函數啟動,而不是創建一個啟動一個。而且, 不用再管理一堆鎖(分配鎖,獲得鎖,釋放鎖,檢查鎖的狀態等)
B、子線程結束:可以控制子線程和主線程結束的順序,調用join(timeout=None) 程序掛起,直到線程結束;
C、守護線程一般是一個等待客戶請求的服務器, 如果沒有客戶提出請求,它就在那等着。如果設定一個線程為守護線程,就表示這個線程 是不重要的,在進程退出的時候,不用等待這個線程退出。
3、thread類 :Thread 派生出一個子類,創建一個這個子類的實例
import threading from time import sleep,ctime loops =(4,3) class MyThread(threading.Thread): def __init__(self,func,args,name=''): threading.Thread.__init__(self) self.name = name self.func = func self.args = args def run(self): self.func(*self.args) def loop(nloop,nsec): print 'start loop',nloop,'at:',ctime() sleep(nsec) print 'loop',nloop,'done at:',ctime() def main(): print 'starting at:',ctime() threads = [] nloops = range(len(loops)) for i in nloops: t = MyThread(loop,(i,loops[i]),loop.__name__) threads.append(t) for i in nloops: print 'thread',i,'start' threads[i].start() for i in nloops: print 'thread',i,'join' threads[i].join() print 'all done at:',ctime() if __name__=='__main__': main()
4、threading模塊中的thread類部分源碼解析
thread模塊提供了一系列基礎函數,其實不是不能用,書本上寫着的是不建議使用,但是如果用於底層開發是可以的。threading模塊與之相比,最大的不同就是,threading模塊中的thread類的屬性特別多,
包含了對多線程的各自管理上的緯度屬性,所以特別方便使用,實際上threading模塊就是在thread模塊上開發的,做了進一步的集成化和封裝以便於用戶更輕便的管理。
A :threadding模塊有引用thread模塊:
try: import thread except ImportError: del _sys.modules[__name__] raise
B: thread類的初始化函數部分截圖如下:初始化的過程,其實就是多線程的屬性的初始化的過程。把其中需要的資源,入參,thread管理的各自對象都初始化。
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None): assert group is None, "group argument must be None for now" _Verbose.__init__(self, verbose) if kwargs is None: kwargs = {} self.__target = target self.__name = str(name or _newname()) self.__args = args self.__kwargs = kwargs self.__daemonic = self._set_daemon() self.__ident = None self.__started = Event() self.__stopped = False self.__block = Condition(Lock()) self.__initialized = True # sys.stderr is not stored in the class like # sys.exc_info since it can be changed between instances self.__stderr = _sys.stderr
C:thread類的start函數,看到調用底層的_start_new_thread函數,就明白了,為啥thread類是調用start函數來啟動線程,還調用了self.__started.wait(),__started對象實際是_Condition類的實例,這是一個對
線程鎖管理的實例,調用這個類的wait方法就是在獲取一把鎖。
def start(self): if not self.__initialized: raise RuntimeError("thread.__init__() not called") if self.__started.is_set(): raise RuntimeError("threads can only be started once") if __debug__: self._note("%s.start(): starting thread", self) with _active_limbo_lock: _limbo[self] = self try: _start_new_thread(self.__bootstrap, ()) except Exception: with _active_limbo_lock: del _limbo[self] raise self.__started.wait()
def wait(self, timeout=None): if not self._is_owned(): raise RuntimeError("cannot wait on un-acquired lock") waiter = _allocate_lock() waiter.acquire() self.__waiters.append(waiter) saved_state = self._release_save() try: # restore state no matter what (e.g., KeyboardInterrupt) if timeout is None: waiter.acquire() if __debug__: self._note("%s.wait(): got it", self) else: # Balancing act: We can't afford a pure busy loop, so we # have to sleep; but if we sleep the whole timeout time, # we'll be unresponsive. The scheme here sleeps very # little at first, longer as time goes on, but never longer # than 20 times per second (or the timeout time remaining). endtime = _time() + timeout delay = 0.0005 # 500 us -> initial delay of 1 ms while True: gotit = waiter.acquire(0) if gotit: break remaining = endtime - _time() if remaining <= 0: break delay = min(delay * 2, remaining, .05) _sleep(delay) if not gotit: if __debug__: self._note("%s.wait(%s): timed out", self, timeout) try: self.__waiters.remove(waiter) except ValueError: pass else: if __debug__: self._note("%s.wait(%s): got it", self, timeout) finally: self._acquire_restore(saved_state)
D:而調用join方法,實際也是調用_Condition類的實例,判斷當前鎖的狀態,在線程運行完畢后,釋放鎖。
def join(self, timeout=None): if not self.__initialized: raise RuntimeError("Thread.__init__() not called") if not self.__started.is_set(): raise RuntimeError("cannot join thread before it is started") if self is current_thread(): raise RuntimeError("cannot join current thread") if __debug__: if not self.__stopped: self._note("%s.join(): waiting until thread stops", self) self.__block.acquire() try: if timeout is None: while not self.__stopped: self.__block.wait() if __debug__: self._note("%s.join(): thread stopped", self) else: deadline = _time() + timeout while not self.__stopped: delay = deadline - _time() if delay <= 0: if __debug__: self._note("%s.join(): timed out", self) break self.__block.wait(delay) else: if __debug__: self._note("%s.join(): thread stopped", self) finally: self.__block.release()
三、Cpython的全局解釋器鎖GIL
推薦一篇更全面介紹的博客:https://www.cnblogs.com/frchen/p/5740606.html
GIL全稱 Global Interpreter Lock,
這個問題。像單 CPU 的系統中運行多個進程那樣,內存中可以存放多個程序,但任意時刻,只有一個程序在 CPU 中運行。 在CPython 解釋器中可以“運行” 多個線程,但在任意時刻,只有一個線程在解釋器中運行。而對 Python 虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。GIL
並不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。實際現在一般使用的解析器都是基於CPython的,如果是Jpython(基於java),可能就不存在
對所有面向 I/O 的(會調用內建的操作系統 C 代碼的)程序來說,GIL 會在這個 I/O 調用之前被釋放,以允許其它的線程在這個線程等待 I/O 的時候運行。如果某線程並未使用很多 I/O 操作, 它會在自己的時間片內一直占用處理器(和 GIL)。也就是說,I/O 密集型的 Python 程序比計算密集型的程序更能充分利用多線程環境的好處。
簡單的總結下就是:Python的多線程在多核CPU上,只對於IO密集型計算產生積極效果;而當有至少有一個CPU密集型線程存在,那么多線程效率會由於GIL而大幅下降。