1.IO編程
IO(input/output)。凡是用到數據交換的地方,都會涉及io編程,例如磁盤,網絡的數據傳輸。在IO編程中,stream(流)是一種重要的概念,分為輸入流(input stream)和輸出流(output stream)。可以把流季節為一個水管,數據相當於水管中的水,但是只能單向流動,所以數據傳輸過程中需要假設兩個水管,一個負責輸入,一個負責輸出,這樣讀寫就可以實現同步。
2.GIL:global interpreter lock全局解釋器鎖
在Python中,GIL導致同一時刻只有一個線程進入解釋器。
計算機有些操作不依賴於CPU,I/O操作幾乎不利用,IO密集型不占用CPU,多線程可完成
計算操作都要用到CPU,適合多進程
- IO密集型任務或函數,可以用多線程
- 計算密集型任務函數,sorry,改C
- 計算密集型程序適合C語言多線程,I/O密集型適合腳本語言開發的多線程
線程和進程的區別:
- 線程共享創建它的進程的地址空間; 進程有自己的地址空間。
- 線程可以直接訪問其進程的數據段; 進程有自己的父進程的數據段副本。
- 線程可以直接與其進程的其他線程進行通信; 進程必須使用進程間通信來與兄弟進程進行通信。
- 新線程很容易創建; 新進程需要父進程的重復。
- 線程可以對相同進程的線程進行相當的控制;
- 流程只能控制子進程。
- 對主線程的更改(取消,優先級更改等)可能會影響進程的其他線程的行為; 對父進程的更改不會影響子進程。
進程優點:同時利用多個CPU,同時進行多個操作;缺點:耗費資源(需要開辟多個內存空間)
線程優點:共享多個資源,IO操作,創造並發操作;缺點:搶占資源
3.threading模塊
threading模塊對象
Thread 表示一個線程的執行的對象
Lock 鎖原語對象(跟thread模塊里的鎖對象相同)
RLock 可重入鎖對象。使單線程可以再次獲得已經獲得了的鎖(遞歸鎖定)
Condition條件變量對象能讓一個線程停下來,等待其他線程滿足了某個“條件”。如狀態的改變或值的改變
Event 通用的條件變量。多個線程可以等待某個時間的發生,在事件發生后,所有的線程都被激活
Semaphore 為等待鎖的線程提供一個類似“等候室”的結構
BoundedSemaphore 與Semaphore類似,只是它不允許超過初始值
Timer 與thread類似,只是它要等待一段時間后才開始運行
Barrier 創建一個障礙,必須達到指定數量的線程后才可以繼續
3.1線程的兩種調用方式
#直接調用
import time
import threading begin=time.time() def foo(n): print('start-foo%s'%n) time.sleep(1) print('end foo') def bar(n): print('start-bar%s'%n) time.sleep(2) print('end bar') # foo(2) # bar(2) t1=threading.Thread(target=foo,args=(1,)) t2=threading.Thread(target=bar,args=(1,)) t1.start() t2.start() end=time.time() print('_________main_________') print(t1.getName()) print(t2.getName()) print(end-begin)
#繼承式調用
import threading
import time class MyThread(threading.Thread): def __init__(self, num): threading.Thread.__init__(self) self.num = num def run(self): #定義每個線程要運行的函數 print("running on number:%s" % self.num) time.sleep(4) if __name__ == '__main__': t1 = MyThread(111) t2 = MyThread(222) t1.start() t2.start()
threading的Thread類主要的運行對象
函數
start() 開始線程的執行
run() 定義線程的功能的函數(一般會被子類重寫)
join(timeout=None) 程序掛起,直到線程結束;如果給了timeout,則最多阻塞timeout秒 getName() 返回線程的名字 setName(name) 設置線程的名字 isAlive() 布爾標志,表示這個線程是否還在運行中 isDaemon() 返回線程的daemon標志 setDaemon(daemonic) 把線程的daemon標志設為daemonic(一定要在調用start()函數前調用)
threading.currentThread(): 返回當前的線程變量。
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
3.2 join and setDaemon
import threading
from time import ctime,sleep def music(func): for i in range(2): print ("Begin listening to %s. %s" %(func,ctime())) sleep(2) print("end listening %s"%ctime()) def move(func): for i in range(2): print ("Begin watching at the %s! %s" %(func,ctime())) sleep(5) print('end watching %s'%ctime()) threads = [] t1 = threading.Thread(target=music,args=('七里香',)) threads.append(t1) t2 = threading.Thread(target=move,args=('阿甘正傳',)) threads.append(t2) # join在子線程完成運行之前,這個子線程的父線程將一直被阻塞。 if __name__ == '__main__': t2.setDaemon(True) for t in threads: t.setDaemon(True)#守護線程,必須在start() 方法調用之前設置, 如果不設置為守護線程程序會被無限掛起。 t.start() # t.join()#跟線程沒有關系了,像正常一樣運行,沒意義 t1.join() # t2.join()########考慮這三種join位置下的結果? print(threading.current_thread()) print(threading.active_count()) print ("all over %s" %ctime())
join()會等到線程結束,或者在給了timeout參數的時候,等到超時為止。使用join()比使用一個等待鎖釋放的無限循環清楚一些(也稱“自旋鎖”)。
join()的另一個比較重要的方法是它可以完全不用調用。一旦線程啟動后,就會一直運行,直到線程的函數結束,退出為止。
如果你的主線程除了等線程結束外,還有其他的事情要做(如處理或等待其他的客戶請求),那就不用調用join(),只有在你要等待線程結束的時候才要調用join()。
setDaemon(True):將線程聲明為守護線程,必須在start() 方法調用之前設置, 如果不設置為守護線程程序會被無限掛起。這個方法基本和join是相反的。當我們 在程序運行中,執行一個主線程,如果主線程又創建一個子線程,主線程和子線程 就分兵兩路,分別運行,那么當主線程完成想退出時,會檢驗子線程是否完成。如 果子線程未完成,則主線程會等待子線程完成后再退出。但是有時候我們需要的是 只要主線程完成了,不管子線程是否完成,都要和主線程一起退出,這時就可以 用setDaemon方法
3.3 同步鎖Lock
由於線程之間是進行隨機調度,並且每個線程可能只執行n條執行之后,當多個線程同時修改同一條數據時可能會出現臟數據,所以,出現了線程鎖 - 同一時刻允許一個線程執行操作。
import time
import threading def addNum(): global num #在每個線程中都獲取這個全局變量 num-=1 temp=num print('--get num:',num ) print('看下結果是0') print('再看下結果') time.sleep(0.001) num =temp-1 #對此公共變量進行-1操作 num = 100 #設定一個共享變量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執行完畢 t.join() print('final num:', num )
注意:
1: why num-=1沒問題呢?這是因為動作太快(完成這個動作在切換的時間內)
2: if sleep(1),現象會更明顯,100個線程每一個一定都沒有執行完就進行了切換,我們說過sleep就等效於IO阻塞,1s之內不會再切換回來,所以最后的結果一定是99.
多個線程都在同時操作同一個共享資源,所以造成了資源破壞,怎么辦呢?
有同學會想用join唄,但join會把整個線程給停住,造成了串行,失去了多線程的意義,而我們只需要把計算(涉及到操作公共數據)的時候串行執行。
我們可以通過同步鎖來解決這種問題
import time
import threading begin=time.time() def addNum(): global num #在每個線程中都獲取這個全局變量 # num-=1 lock.acquire() temp=num # print('--get num:',num ) time.sleep(0.0001) num =temp-1 #對此公共變量進行-1操作 lock.release() num = 100 #設定一個共享變量 thread_list = [] lock=threading.Lock() for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執行完畢 t.join() end=time.time() print('final num:', num ) print(end-begin)
同步鎖與GIL的關系?
Python的線程在GIL的控制之下,線程之間,對整個python解釋器,對python提供的C API的訪問都是互斥的,這可以看作是Python內核級的互斥機制。但是這種互斥是我們不能控制的,我們還需要另外一種可控的互斥機制———用戶級互斥。內核級通過互斥保護了內核的共享資源,同樣,用戶級互斥保護了用戶程序中的共享資源。
但是如果你有個操作比如 x += 1,這個操作需要多個bytecodes操作,在執行這個操作的多條bytecodes期間的時候可能中途就換thread了,這樣就出現了data races的情況了。
Lock(指令鎖)是可用的最低級的同步指令。Lock處於鎖定狀態時,不被特定的線程擁有。Lock包含兩種狀態——鎖定和非鎖定,
以及兩個基本的方法。可以認為Lock有一個鎖定池,當線程請求鎖定時,將線程至於池中,直到獲得鎖定后出池。
池中的線程處於狀態圖中的同步阻塞狀態。
構造方法:Lock()
實例方法:
acquire([timeout]): 使線程進入同步阻塞狀態,嘗試獲得鎖定。
release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
3.4 線程死鎖和遞歸鎖
在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源並且同時等待對方的資源,就會造成死鎖,因為系統判斷這部分資源都正在使用,所有這兩個線程在無外力作用下將一直等待下去。
RLock(可重入鎖)是一個可以被同一個線程請求多次的同步指令。RLock使用了“擁有的線程”和“遞歸等級”的概念,
處於鎖定狀態時,RLock被某個線程擁有。擁有RLock的線程可以再次調用acquire(),釋放鎖時需要調用release()相同次數。
可以認為RLock包含一個鎖定池和一個初始值為0的計數器,每次成功調用 acquire()/release(),計數器將+1/-1,為0時鎖處於未鎖定狀態。
構造方法:RLock()
實例方法:acquire([timeout])/release(): 跟Lock差不多。

#__author: greg #date: 2017/9/17 21:38 # 線程死鎖和遞歸鎖 # Thread deadlocks and recursive locks import threading,time class myThread(threading.Thread): def doA(self): lockA.acquire() print(self.name,"gotlockA",time.ctime()) time.sleep(3) lockB.acquire() print(self.name,"gotlockB",time.ctime()) lockB.release() lockA.release() def doB(self): lockB.acquire() print(self.name,"gotlockB",time.ctime()) time.sleep(2) lockA.acquire() print(self.name,"gotlockA",time.ctime()) lockA.release() lockB.release() def run(self): self.doA() self.doB() if __name__=="__main__": lockA=threading.Lock() lockB=threading.Lock() threads=[] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join()#等待線程結束
解決辦法:使用遞歸鎖,將 lockA=threading.Lock() lockB=threading.Lock()
改為Rlock
為了支持在同一線程中多次請求同一資源,python提供了“可重入鎖”:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次acquire。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。

#__author: greg #date: 2017/9/17 21:49 import threading,time class myThread(threading.Thread): def doA(self): lock.acquire() print(self.name,"gotlockA",time.ctime()) # time.sleep(2) lock.acquire() print(self.name,"gotlockB",time.ctime()) lock.release() lock.release() def doB(self): lock.acquire() print(self.name,"gotlockC",time.ctime()) # time.sleep(3) lock.acquire() print(self.name,"gotlockD",time.ctime()) lock.release() lock.release() def run(self): self.doA() self.doB() if __name__=="__main__": # lockA=threading.Lock() # lockB=threading.Lock() lock = threading.RLock() threads=[] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join()#等待線程結束
3.5 單線程和多線程執行對比

import threading from time import ctime, sleep class MyThread(threading.Thread): def __init__(self, func, args, name='', verb=False): threading.Thread.__init__(self) self.name = name self.func = func self.args = args self.verb = verb def getResult(self): return self.res def run(self): if self.verb: print('starting', self.name, 'at:', ctime()) self.res = self.func(*self.args) if self.verb: print(self.name, 'finished at:', ctime()) def fib(x): sleep(0.005) if x < 2: return 1 return (fib(x-2) + fib(x-1)) def fac(x): sleep(0.1) if x < 2: return 1 return (x * fac(x-1)) def sum(x): sleep(0.1) if x < 2: return 1 return (x + sum(x-1)) funcs = (fib, fac, sum) n = 12 def main(): nfuncs = range(len(funcs)) print('*** SINGLE THREAD') for i in nfuncs: print('starting', funcs[i].__name__,'at:', ctime()) print(funcs[i](n)) print(funcs[i].__name__, 'finished at:',ctime()) print('\n*** MULTIPLE THREADS') threads = [] for i in nfuncs: t = MyThread(funcs[i], (n,), funcs[i].__name__) threads.append(t) for i in nfuncs: threads[i].start() for i in nfuncs: threads[i].join() print(threads[i].getResult()) print('all DONE') if __name__ == '__main__': main()
3.6 條件變量同步(Condition)
有一類線程需要滿足條件之后才能夠繼續執行,Python提供了threading.Condition 對象用於條件變量線程的支持,它除了能提供RLock()或Lock()的方法外,還提供了 wait()、notify()、notifyAll()方法。
lock_con=threading.Condition([Lock/Rlock]): 鎖是可選選項,不傳人鎖,對象自動創建一個RLock()。Condition(條件變量)通常與一個鎖關聯。需要在多個Contidion中共享一個鎖時,可以傳遞一個Lock/RLock實例給構造方法,否則它將自己生成一個RLock實例。可以認為,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處於狀態圖中的等待阻塞狀態,直到另一個線程調用notify()/notifyAll()通知;得到通知后線程進入鎖定池等待鎖定。
構造方法: Condition([lock/rlock])
實例方法:
acquire([timeout])/release(): 調用關聯的鎖的相應方法。
wait([timeout]): 調用這個方法將使線程進入Condition的等待池等待通知,並釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
notify(): 調用這個方法將從等待池挑選一個線程並通知,收到通知的線程將自動調用acquire()嘗試獲得鎖定(進入鎖定池);
其他線程仍然在等待池中。調用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
notifyAll(): 調用這個方法將通知等待池中所有的線程,這些線程都將進入鎖定池嘗試獲得鎖定。
調用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
wait():條件不滿足時調用,線程會釋放鎖並進入等待阻塞;
notify():條件創造后調用,通知等待池激活一個線程;
notifyAll():條件創造后調用,通知等待池激活所有線程。

#__author: greg #date: 2017/9/18 11:21 import threading import time product = None# 商品 con = threading.Condition()# 條件變量 def produce():# 生產者方法 global product if con.acquire(): while True: if product is None: print('produce...') product = 'anything' con.notify() # 通知消費者,商品已經生產 con.wait()# 等待通知 time.sleep(2) def consume():# 消費者方法 global product if con.acquire(): while True: if product is not None: print('consume...') product = None con.notify() # 通知生產者,商品已經沒了 con.wait() # 等待通知 time.sleep(2) t1 = threading.Thread(target=produce) t2 = threading.Thread(target=consume) t2.start() t1.start()
3.7 事件event
python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。
- clear:將“Flag”設置為False
- set:將“Flag”設置為True
event.isSet():返回event的狀態值;
event.wait():如果 event.isSet()==False將阻塞線程;
event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;
event.clear():恢復event的狀態值為False。
#__author: greg
#date: 2017/9/18 19:37
import threading
import time event = threading.Event() def func(): # 等待事件,進入等待阻塞狀態 print('%s wait for event...' % threading.currentThread().getName()) event.wait() # 收到事件后進入運行狀態 print('%s recv event.' % threading.currentThread().getName()) t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t1.start() t2.start() time.sleep(2) print('MainThread set event.')# 發送事件通知 event.set()
threading基於Java的線程模型設計。鎖(Lock)和條件變量(Condition)在Java中是對象的基本行為(每一個對象都自帶了鎖和條件變量),
而在Python中則是獨立的對象。Python Thread提供了Java Thread的行為的子集;沒有優先級、線程組,線程也不能被停止、暫停、恢復、中斷。
Java Thread中的部分被Python實現了的靜態方法在threading中以模塊方法的形式提供。
threading 模塊提供的常用方法:
threading.currentThread(): 返回當前的線程變量。
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
Event(事件)是最簡單的線程通信機制之一:一個線程通知事件,其他線程等待事件。
Event內置了一個初始為False的標志,當調用set()時設為True,調用clear()時重置為 False
wait()將阻塞線程至等待阻塞狀態。Event其實就是一個簡化版的 Condition
Event沒有鎖,無法使線程進入同步阻塞狀態。
構造方法:
Event()實例方法
isSet(): 當內置標志為True時返回True。
set(): 將標志設為True,並通知所有處於等待阻塞狀態的線程恢復運行狀態。
clear(): 將標志設為False。
wait([timeout]): 如果標志為True將立即返回,否則阻塞線程至等待阻塞狀態,等待其他線程調用set()。
3.8信號量(Semaphore)
Semaphore/BoundedSemaphore
Semaphore(信號量)是計算機科學史上最古老的同步指令之一。Semaphore管理一個內置的計數器,
每當調用acquire()時-1,調用release() 時+1。計數器不能小於0;當計數器為0時,
acquire()將阻塞線程至同步鎖定狀態,直到其他線程調用release()。
基於這個特點,Semaphore經常用來同步一些有“訪客上限”的對象,比如連接池。
BoundedSemaphore 與Semaphore的唯一區別在於前者將在調用release()時檢查計數器的值是否超過了計數器的初始值,
如果超過了將拋出一個異常。
構造方法: Semaphore(value=1): value是計數器的初始值。
實例方法:
acquire([timeout]): 請求Semaphore。如果計數器為0,將阻塞線程至同步阻塞狀態;否則將計數器-1並立即返回。
release(): 釋放Semaphore,將計數器+1,如果使用BoundedSemaphore,還將進行釋放次數檢查。
release()方法不檢查線程是否已獲得 Semaphore。
#__author: greg
#date: 2017/9/18 10:02
import threading,time
class myThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) time.sleep(3) semaphore.release() if __name__=="__main__": semaphore=threading.Semaphore(10) thrs=[] for i in range(100): thrs.append(myThread()) for t in thrs: t.start()

#__author: greg #date: 2017/9/18 11:28 import threading import time semaphore = threading.Semaphore(2)# 計數器初值為2,比較4 # semaphore = threading.BoundedSemaphore(2) def func(): # 請求Semaphore,成功后計數器-1;計數器為0時阻塞 print('%s acquire semaphore...' % threading.currentThread().getName()) if semaphore.acquire(): print('%s get semaphore' % threading.currentThread().getName()) time.sleep(4) print('%s release semaphore' % threading.currentThread().getName())# 釋放Semaphore,計數器+1 semaphore.release() t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t3 = threading.Thread(target=func) t4 = threading.Thread(target=func) t1.start() t2.start() t3.start() t4.start() time.sleep(2) # 沒有獲得semaphore的主線程也可以調用release # 若使用BoundedSemaphore,t4釋放semaphore時將拋出異常 print('MainThread release semaphore without acquire') semaphore.release()