概述
進程與線程
進程:進程是資源(CPU、內存等)分配的最小單位,進程有獨立的地址空間與系統資源,一個進程可以包含一個或多個線程
線程:線程是CPU調度的最小單位,是進程的一個執行流,線程依賴於進程而存在,線程共享所在進程的地址空間和系統資源,每個線程有自己的堆棧和局部變量
形象的解釋:
系統是一個工廠,進程就是工廠里面的車間;
車間的空間大小以及里面的生產工具就是系統分配給進程的資源(CPU、內存等);
車間要完成生產,就需要工人,工人就是線程;
工人可以使用車間的所有資源,就是線程共享進程資源;
工人使用車間內的一個工作間(全局變量,共享內存)工作的時候,為了防止其他工人打擾,會上一把鎖,工作完成才會取下,這是線程鎖;
有的工作間可以同時容納多個工人工作,於是就有多把鑰匙,每個工人就拿上一把,所有鑰匙被取完后,其他工人就只能等着,這是信號量(Semaphore);
有時候工人之間有合作,當一個工作間的工人工作到滿足某個條件時,會發出通知並同時退出工作間,將鑰匙交給另外符合條件正在等待的工人完成工作,這叫條件同步;
還有一種工作模式,當一個工人完成到某個指標時,會將工作傳遞給其它等待這個指標觸發的工人工作,這叫事件同步
並發與並行
並發:當系統只有一個CPU時,想執行多個線程,CPU就會輪流切換多個線程執行,當有一個線程被執行時,其他線程就會等待,但由於CPU調度很快,所以看起來像多個線程同時執行
並行:當系統有多個CPU時,執行多個線程,就可以分配到多個CPU上同時執行
同步與異步
同步:調用者調用一個功能時,必須要等到這個功能執行完返回結果后,才能再調用其他功能
異步:調用者調用一個功能時,不會立即得到結果,而是在調用發出后,被調用功能通過狀態、通知來通告調用者,或通過回調函數處理這個調用
多線程模塊
python中主要用threading模塊提供對線程的支持
threading模塊
threading模塊常用函數
- threading.current_thread(): 返回當前的線程對象。
- threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
- threading.active_count(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
Thread類
通過threading.Thread()創建線程對象
主要參數:
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- group 默認為 None,為了日后擴展 ThreadGroup 類實現而保留。
- target 是用於 run() 方法調用的可調用對象。默認是 None,表示不需要調用任何方法。
- name 是線程名稱。默認情況下,由 "Thread-N" 格式構成一個唯一的名稱,其中 N 是小的十進制數
- args 是用於調用目標函數的參數元組。默認是 ()
- kwargs 是用於調用目標函數的關鍵字參數字典。默認是 {}。
- daemon表示線程是不是守護線程。
Thread類常用方法與屬性
- run(): 用以表示線程活動的方法。
- start():啟動線程活動。
- join(timeout=None): 等待至線程中止。這阻塞調用線程直至線程的join() 方法被調用中止-正常退出或者拋出未處理的異常-或者是可選的超時發生。
- isAlive(): 返回線程是否活動的。
- getName(): 返回線程名。
- setName(): 設置線程名。
- name:線程對象名字
- setDaemon():設置是否為守護線程
創建多線程
1.通過函數方法創建
import threading import time def run(sec): print('%s 線程開始了!' % threading.current_thread().name) time.sleep(sec) print('%s 線程結束了!' % threading.current_thread().name) if __name__ == '__main__': print('主線程開始執行:', threading.current_thread().name) s_time = time.time() # 創建thread對象,target傳入線程執行的函數,args傳參數 t1 = threading.Thread(target=run, args=(1,)) t2 = threading.Thread(target=run, args=(2,)) t3 = threading.Thread(target=run, args=(3,)) # 使用start()開始執行 t1.start() t2.start() t3.start() # 使用join()來完成線程同步 t1.join() t2.join() t3.join() print('主線程執行結束', threading.current_thread().name) print('一共用時:', time.time()-s_time)
2.通過直接從 threading.Thread 繼承創建一個新的子類,並實例化后調用 start() 方法啟動新線程,即它調用了線程的 run() 方法
import threading import time class MyThread(threading.Thread): def __init__(self, sec): super(MyThread, self).__init__() self.sec = sec #重寫run()方法,使它包含線程需要做的工作 def run(self): print('%s 線程開始了!' % threading.current_thread().name) time.sleep(self.sec) print('%s 線程結束了!' % threading.current_thread().name) if __name__ == '__main__': print('主線程開始執行', threading.current_thread().name) s_time = time.time() # 實例化MyThread對象 t1 = MyThread(1) t2 = MyThread(2) t3 = MyThread(3) # 實例化后調用 start() 方法啟動新線程,即它調用了線程的 run() 方法 t1.start() t2.start() t3.start() # 使用join()來完成線程同步 t1.join() t2.join() t3.join() print('一共用時:', time.time()-s_time) print('主線程結束執行', threading.current_thread().name)
線程鎖
多個線程操作全局變量的時候,如果一個線程對全局變量操作分幾個步驟,當還沒有得到最后結果時,這個線程就被撤下CPU,使用其他線程繼續上CPU操作,最后就會搞亂全局變量數據
import time, threading balance = 0 def change_it(n): # 先存后取,結果應該為0: global balance balance = balance + n balance = balance - n def run_thread(n): for i in range(1000): change_it(n) #這里重復實驗100次 for i in range(100): t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance)
這里可以發現打印出數據會有非0情況
而線程鎖就是防止這種情況。每當一個線程a要訪問共享數據時,必須先獲得鎖定;如果已經有別的線程b獲得鎖定了,那么就讓線程a暫停,也就是同步阻塞;等到線程b訪問完畢,釋放鎖以后,再讓線程a繼續
鎖有兩種狀態:被鎖(locked)和沒有被鎖(unlocked)。擁有acquire()和release()兩種方法,並且遵循一下的規則:
- 如果一個鎖的狀態是unlocked,調用acquire()方法改變它的狀態為locked;
- 如果一個鎖的狀態是locked,acquire()方法將會阻塞,直到另一個線程調用release()方法釋放了鎖;
- 如果一個鎖的狀態是unlocked調用release()會拋出RuntimeError異常;
- 如果一個鎖的狀態是locked,調用release()方法改變它的狀態為unlocked。
import time, threading balance = 0 lock = threading.Lock() def change_it(n): # 先存后取,結果應該為0: global balance #獲取鎖,用於線程同步 lock.acquire() balance = balance + n balance = balance - n # 釋放鎖,開啟下一個線程 lock.release() def run_thread(n): for i in range(1000): change_it(n) #這里重復實驗100次 for i in range(100): t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance)
鎖的好處就是確保了某段關鍵代碼只能由一個線程從頭到尾完整地執行,壞處當然也很多,首先是阻止了多線程並發執行,包含鎖的某段代碼實際上只能以單線程模式執行,效率就大大地下降了。其次,由於可以存在多個鎖,不同的線程持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個線程全部掛起,既不能執行,也無法結束,只能靠操作系統強制終止。
條件同步
條件同步機制是指:一個線程等待特定條件,而另一個線程發出特定條件滿足的信號。
解釋條件同步機制的一個很好的例子就是生產者/消費者(producer/consumer)模型。生產者隨機的往列表中“生產”一個隨機整數,而消費者從列表中“消費”整數。
import threading import random import time class Producer(threading.Thread): """ 向列表中生產隨機整數 """ def __init__(self, integers, condition): """ 構造器 @param integers 整數列表 @param condition 條件同步對象 """ super(Producer, self).__init__() self.integers = integers self.condition = condition def run(self): """ 實現Thread的run方法。在隨機時間向列表中添加一個隨機整數 """ while True: integer = random.randint(0, 256) self.condition.acquire() # 獲取條件鎖 self.integers.append(integer) print ('%d appended to list by %s' % (integer, self.name)) self.condition.notify() # 喚醒消費者線程 self.condition.release() # 釋放條件鎖 time.sleep(1) # 暫停1秒鍾 class Consumer(threading.Thread): """ 從列表中消費整數 """ def __init__(self, integers, condition): """ 構造器 @param integers 整數列表 @param condition 條件同步對象 """ super(Consumer, self).__init__() self.integers = integers self.condition = condition def run(self): """ 實現Thread的run()方法,從列表中消費整數 """ while True: self.condition.acquire() # 獲取條件鎖 while True: if self.integers: # 判斷是否有整數 integer = self.integers.pop() print ('%d popped from list by %s' % (integer, self.name)) break self.condition.wait() # 等待商品,並且釋放資源 self.condition.release() # 最后釋放條件鎖 if __name__ == '__main__': integers = [] condition = threading.Condition() t1 = Producer(integers, condition) t2 = Consumer(integers, condition) t1.start() t2.start() t1.join() t2.join()
類似於線程鎖,通過給生產者和消費者傳入同個條件對象后,他們都會通過acquire()獲取條件鎖;如上,消費者獲取條件鎖,如果沒有鎖,就會去判斷integers中是否有整數,沒有就會觸發wait()方法,等待生產者線程notify()的喚醒;wait()方法會釋放底層鎖,並且阻塞,
直到在另外一個線程中調用同一個條件對象的notify() 或 notify_all() 喚醒它,或者超時發生。生產者則就是獲取鎖,向integers添加值,喚醒其他wait()的線程,釋放鎖的循環過程。
事件同步(Event)
基於事件的同步是指:一個線程發送/傳遞事件,另外的線程等待事件的觸發。與條件同步類似,只是少了我們自己添加鎖的步驟。
同樣用消費者與生產者為例
import threading import random import time class Producer(threading.Thread): """ 向列表中生產隨機整數 """ def __init__(self, integers, event): """ 構造器 @param integers 整數列表 @param event 事件同步對象 """ super(Producer, self).__init__() self.integers = integers self.event = event def run(self): """ 實現Thread的run方法。在隨機時間向列表中添加一個隨機整數 """ while True: integer = random.randint(0, 256) self.integers.append(integer) print('%d appended to list by %s' % (integer, self.name)) print('event set by %s' % self.name) self.event.set() # 設置事件 self.event.clear() # 發送事件 print('event cleared by %s' % self.name) time.sleep(1) class Consumer(threading.Thread): """ 從列表中消費整數 """ def __init__(self, integers, event): """ 構造器 @param integers 整數列表 @param event 事件同步對象 """ super(Consumer, self).__init__() self.integers = integers self.event = event def run(self): """ 實現Thread的run()方法,從列表中消費整數 """ while True: self.event.wait() # 等待事件被觸發 try: integer = self.integers.pop() print('%d popped from list by %s' % (integer, self.name)) except IndexError: # catch pop on empty list time.sleep(1) if __name__ == '__main__': integers = [] event=threading.Event() t1 = Producer(integers, event) t2 = Consumer(integers, event) t1.start() t2.start() t1.join() t2.join()
事件對象的set()方法會將內部標志設置為true,並且喚醒其他阻塞的線程;clear()方法則是將內部標志設置為False;wait()方法當內部標准為false時就阻塞,true則不阻塞且wait()內部什么都不操作
隊列(Queue)
隊列可以看作事件同步與條件同步的簡單版,線程A可以往隊列里面存數據,線程B則可以從隊列里面取,不用關心同步以及鎖的問題,主要方法有:
- put(): 向隊列中添加一個項
- get(): 從隊列中刪除並返回一個項
- task_done(): 當某一項任務完成時調用,表示前面排隊的任務已經被完成
- join(): 阻塞直到所有的項目都被處理完
import threading import queue import random import time class Producer(threading.Thread): def __init__(self, queue): super(Producer,self).__init__() self.queue = queue def run(self): while True: integer = random.randint(0, 1000) self.queue.put(integer) print('%d put to queue by %s' % (integer, self.name)) time.sleep(1) class Consumer(threading.Thread): def __init__(self, queue): super(Consumer, self).__init__() self.queue = queue def run(self): while True: integer = self.queue.get() print('%d popped from list by %s' % (integer, self.name)) self.queue.task_done() if __name__ == '__main__': queue = queue.Queue() t1 = Producer(queue) t2 = Consumer(queue) t1.start() t2.start() t1.join() t2.join()
參考:
http://yoyzhou.github.io/blog/2013/02/28/python-threads-synchronization-locks/
