python 並發並行,多線程,隊列


多任務系統

多任務系統可以同時運行多個任務。

單核cpu也可以執行多任務,由於cpu執行代碼都是順序執行的,那么cpu是怎么執行多任務的?

答案是操作系統輪流讓各個任務交替執行

任務1執行0.01s切換任務2,任務2執行0.01s切換任務3.

依次類推,表面上看,每個任務都是交替執行的,但是由於cpu執行速度實在太快,感覺上就是所有任務同時執行。

並發

並發 任務數多於cpu核數,通過操作系統的各種任務調度算法,實現多個任務一起執行,而實際上總有一些任務不在執行,因為切換速度夠快,看上去一起執行

並行

並行 任務數量小於cpu核數,任務是真正的一起執行的

並發 10個客人點餐,1個服務員應對

並行 10個客人點餐,10個服務員應對

串行

先執行a,再執行b

同步與異步

同步

同步協調,指線程在訪問某一資源類的時候,獲得了該資源的返回結果之后才會執行其他操作,先做某件事,在做某件事,一步一步來

異步

異步 提交注冊數據----校驗數據----注冊成功---登陸,校驗校驗數據之后即發送賬號激活---郵件激活,兩者同時發生,不用的等待注冊總流程跑完再進行激活

python中的線程模塊

threading

python的thread是較底層的模塊,threading是對thread的一些封裝。

通過target=指定開啟線程的函數創建線程對象

def func1():  for i in range(5):  print(F"--{threading.current_thread()}---正在執行func1")  time.sleep(1)  def func2():  for i in range(6):  print(F"--{threading.current_thread()}--------整在執行func2")  time.sleep(1)  def main():  t1 = threading.Thread(target=func1)  t2 = threading.Thread(target=func2)  s_time = time.time()   print(t1.name) # 可以直接根據name去獲取,getName setName 取名字,設置線程名字  t1.start() # 開始執行線程1  t2.start() # 開始執行線程2  # func1()  # func2()  print(threading.enumerate())  print(threading.active_count())  # 讓主線程等待子線程執行完畢之后再繼續往下執行  t1.join() # 主線程等待1秒之后再繼續執行 不寫,主線程等到t1完了之后再執行  t2.join()  end_time = time.time()  print("總耗時:", end_time - s_time)  if __name__ == '__main__':  main()

調用threading.Thread(target=開啟線程的目標函數) 返回為一個線程對象,

通過繼承Thread類重寫run方法來開啟線程

編寫一個類,之后繼承threading.Thread 並重寫run方法。

至於為什么重寫,來看一段thread的原碼

def run(self):  """Method representing the thread's activity.   You may override this method in a subclass. The standard run() method  invokes the callable object passed to the object's constructor as the  target argument, if any, with sequential and keyword arguments taken  from the args and kwargs arguments, respectively.   """  try:  if self._target:  self._target(*self._args, **self._kwargs)  finally:  # Avoid a refcycle if the thread is running a function with  # an argument that has a member that points to the thread.  del self._target, self._args, self._kwargs

這其中的self._target是是thread的類變量,在thread類被初始化的時候由target傳入,if判斷如果被傳入,即調用被傳入的目標函數。

而在寫自己的方法類時,對run方法進行重寫之后,便不在需要傳入參數target,

第二種方式創建的實例代碼。

# 第二種創建多線程的方法 # 繼承thread類然后重寫run方法 class RequestThread(threading.Thread):  '''發送request請求'''  def __init__(self,url): # 可以通過重寫init方法來做到參數化,  self.url = url  super().__init__() # 但是不能忘掉要執行父類的init方法   def run(self):  for i in range(10):  res = requests.get(self.url)  print(F'線程:{threading.current_thread()}返回的狀態嘛{res.status_code}')   s_time = time.time() for i in range(5):  t = RequestThread(url="") # 調用的時候需要傳入參數進去  t.start() t.join() end_time = time.time() print("耗時:",end_time-s_time)

補充:如果想要對線程操作類實現參數化進行復雜操作。

編寫init方法時聲明參數,同時不能忘記繼承並執行父類的init方法,父類thread在類初始化時做了相當多的配置達到多線程運行的目的,不寫的話會報錯

線程類常用方法

run() 用以表示線程活動的方法

start() 啟動線程活動

join([time]) 設置主線程會等待time秒后再往下執行,time默認為子線程結束

多個子線程之間的設置會疊加

isAlive() 返回線程是否活動的

getName() 返回線程名

setName() 設置線程名

threading.currentThread() 返回當前執行的線程

threading.enumerate() 返回正在運行的所有線程(list),正在運行 指的是線程啟動后,結束前,不包括啟動前和終止之后的線程

threading.activateCount() 返回正在運行的線程數量。。

多線程的bug---數據安全

多線程中如果使用全局變量,數據會變的不安全不穩定

原因: 子線程和總線程使用的是同一塊內存空間,在主線程中存在的變量為全局變量,各個線程都可以使用,

python只支持單核,通過多任務之間快速切換,任務量少的時候是完全沒問題的。

但是在任務量級特別大的時候,線程之間的切換就會發生問題,數據產生偏差

從10W量級時開始出現問題。

下面看代碼實例

# 全局變量 a = 100  # 10W的時候就已經開始出現問題 # 為什么? # python里面線程只支持單核 # 多任務之間快速切換,任務量少可以杜絕誤差 def func1():  global a  for i in range(1000000):  a += 1  print("線程1修改玩A:",a)  print(a) # 並發,但是python中只有一個核,是不可能完全並行的 # 所以線程之間必定存在切換,量級十分巨大時切換就會發生問題   def func2():  global a  for i in range(1000000):  a += 1  print("線程2修改玩A:",a) t1 = threading.Thread(target=func1) t2 = threading.Thread(target=func2) t1.start() t2.start()  t1.join() t2.join() print(a) # 執行結果 # 100 # 線程1修改玩A: 1203921 # 線程2修改玩A: 1287478 # 1287478

互斥鎖

為了解決這樣的線程互相競爭資源導致數據不穩定不安全的問題,引入鎖的概念。

線程同步能夠邦正多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。

互斥鎖為資源引入一個狀態,鎖定/非鎖定

某個線程要更改共享數據時,先將其鎖定,此時資源狀態為鎖定,其他線程不能更改知道該線程釋放資源。

將資源的狀態改變成 非鎖定 其他的線程才能再次鎖定該資源進行操作。,

互斥鎖保證了每次只有一個線程進行寫入操作,從而保證了多線程下數據的正確性。

threading 模塊中定義了Lock類,可以方便的處理鎖定

# 創建鎖 mutex = threading.Lock() # 鎖定方法 mutex.acquire() # 解鎖 mutex.release() # 上鎖之后不會切換到其他線程,只有被釋放之后才可以切換到其他線程

上鎖的過程

當一個線程調用鎖的acquire(0方法獲得鎖時,鎖進入locked狀態

每次只有一個線程可以獲得鎖,如果此時另一個線程視圖獲得這個鎖,該線程就會變成blocked被阻塞,知道擁有鎖的線程調用鎖的release()釋放鎖之后,鎖進入unlocked狀態。

線程調度程序從處於通同步阻塞狀態的線程中選一個來獲得鎖,並使得該線程進入 running狀態。

總結

鎖的好處

確保了某段關鍵代碼只能由一個線程從頭到尾完整執行

鎖的壞處

組織了多線程並發執行,包含鎖的某段代碼實際上只能以單線程模式執行,效率大大下降,猶豫可以存在多個鎖,不同線程持有不同的鎖,並試圖跟獲取對象持有的鎖時,可能會造成線程死鎖,導致程序直接阻塞不向下運行

正案例

來看一個線程鎖的應用實例。

# python里面線程只支持單核 # 多任務之間快速切換,任務量少可以杜絕誤差 def func1():  global a  for i in range(1000000):  meta.acquire() # 上鎖  a += 1  meta.release() # 釋放鎖  print("線程1修改玩A:", a) # 上鎖之后必須要release, # 互斥鎖,只有一個人能上 加上鎖之后,任務運行速度會下降 # ,因為必須要等一個線程release之后才可以 # 讓下一個線程進行使用 def func2():  global a  for i in range(1000000):  meta.acquire() # 上鎖  # 線程鎖,執行完成之后,才會進入另外一個線程 ,從而保證數據在執行的時候不會造成數據影響  a += 1  meta.release() # 釋放鎖   print("線程2修改玩A:", a)   # 創建鎖 meta = threading.Lock()   t1 = threading.Thread(target=func1) t2 = threading.Thread(target=func2) t1.start() t2.start()  t1.join() t2.join() print(a)

死鎖案例

def func1():  global a  for i in range(1000000):  metaA.acquire() # 上鎖A  metaB.acquire() # 上鎖B  print("-----------------1")  a += 1  metaB.release() # 解鎖B  metaA.release() # 解鎖A  print("線程1修改玩A:", a) # 兩邊互相等待對方線程釋放資源,就會無限等待,兩把鎖互相等待的情況下會出現死鎖 def func2():  global a  for i in range(1000000):  metaB.acquire() # 上鎖B  metaA.acquire() # 上鎖A  print("-----------------2")   a += 1  metaA.release() # 解鎖A  metaB.release() # 解鎖B  print("線程2修改玩A:", a)  # 線程死鎖案例 metaA = threading.Lock() metaB = threading.Lock()

該案例中,兩邊互相等待對方線程釋放資源,就會造成無限等待,行程死鎖。

全局解釋器鎖 GIL

官方文檔中,搜索關鍵字 怎樣修改全局變量是線程安全的?

不用自己操作跟定義, 控制線程運行的

描述 gil的概念,以及它對python多線程的影響

參考哦

1,python語言和GIL沒有關系,僅僅是猶豫歷史原因在Cpython虛擬機(解釋器)難以移除GIL

2,GIL 全局解釋器,每個線程在執行的過程都需要先獲取GIL,保證同一時刻只有一個線程可以執行代碼

3,線程釋放GIL鎖的情況,

---在io操作等可能會引起阻塞的 system call之前,可以暫時釋放GIL,但在執行完畢后

必須重新獲取 GIL

--python 3使用計時器,執行時間達到閾值之后 當前線程釋放GIL 或python 2.x 技tickets技術達到100

4 python使用多進程是可以利用多核的cpu資源的

多線程與單線程分別更適合什么

1.io密集型:涉及到網絡、磁盤io的任務都是io密集型任務,這類任務的特點是cpu消耗很少,任務的大部分的

時間都在等待io操作完成(因為io的速度遠遠低於cpu和內訓的速度)

結論:io密集型操作,多線程比單線程要快

2.cpu密集型:cpu密集型也稱為計算密集型,任務的特點是要進行大量的計算,消耗cpu資源,比如

計算圓周率、對視頻進行高清解碼等等,全靠cpu的運算能力

結論:cpu密集型操作,單線程比多線程要快

隊列

python的queue模塊中提供了同步的,線程安全的隊列類,

1,FIFO 先進先出 queue

2,LIFO 后進先出隊列 LifoQueue

3,PriorityQueue 按照優先級

能夠在多線程中直接使用,

可以用隊列來實現線程間的同步。

 

初始化queue() 對象時, q = queue()

如果參數中沒有指定最大可接受消息量,貨數量為負值

則代表可接受的消息沒有上限

import queue # 隊列中的方法 # def task_done() 表示在完成一項工作之后,使用Queue.task.done()方法可以向隊列發送一個信號 # 表示該任務執行完畢 # def join() 實際上意味着等到隊列中所有的任務(數據) 執行完畢之后,再往下,否則一直等待 #### join是判斷的依據,不單單指的是隊列中沒有數據,數據get出去之后,要使用task_done() #### 向隊列發送一個信號,表示該任務執行(數據使用)完畢  # def qsize() 返回隊列包含信息數量 # def empty() 判斷隊列是否為空, 為空返回true ,不為空返回fasle # def full() 判斷隊列是否已滿,滿為true 不滿為false # def put()  # param1 block=表示是否等待True/false 默認true timeout = 等待時間 默認None  # def get() 從隊列中獲取數據  # param1 block=表示是否等待True/false 默認true timeout = 等待時間 默認None # def put_nowait() 相當於put方法,設置block=false 不進行等待 # def get_nowait() 相當於get方法,設置blcok=false 不進行等待  # 三種隊列 # 1,先進先出 FIFO # 先進去的數據先出來 # 有點類似於竹筒塞球 q = queue.Queue(4) # 創建隊列的時候可以指定長度,如果不寫可以無限塞 # 往隊列中添加數據 q.put(1) q.put(11) q.put(33) # q.put(55,block=False) # 隊列滿了之后會等待吧數據拿出去之后再添加 # 隊列已滿的話會報錯, # queue.Full 如果設置不等待,滿了之后就會報錯,異常為queue.Full # print(q.get()) # print(q.get()) # print(q.get()) # # print(q.get(block=False)) # get方法默認等待,與添加的時候一樣 除非修改block的參數值 # print(q.get_nowait()) # 簡便寫法  q.put(12) print(q.qsize()) # 獲取隊列中的數據量 print(q.full()) # 判斷隊列是否已滿 print(q.empty()) # 判斷隊列是否為空 q.task_done() q.task_done() q.task_done() q.task_done() # join 判斷隊列中的任務是否執行完畢 q.join() # 隊列里的所有任務全部執行完畢才會接着向下執行,否則會阻塞 print("join之后的代碼") # 比如之前用了4個put,對應的要寫4個taskdone來通知隊列操作完成 # 阻塞的原因,python認為隊列中的任務沒有執行完  # 解決方法,使用taskdone方法,通知隊列任務執行完畢  # 2,后入先出 LIFO # 后塞進去的數據先出來 # 有點類似彈夾上彈 from queue import LifoQueue # 內置方法與上面9個相同 q2 =queue.LifoQueue() q2.put(2) q2.put(3) q2.put(4) q2.put(5) q2.put(6) print(q2.get()) # 后入先出  # 3,優先級 PriorityQueue # 不按照塞進去順序獲取,塞進去的時候指定優先級 # 優先級越小的先出 from queue import PriorityQueue q3 = queue.PriorityQueue() # 優先級隊列傳入應該是一個元祖,元祖內第一參數為優先級, q3.put((1,"addicated")) # 第二個參數為內容 q3.put((2,"測試2addicated")) q3.put((3,"測試3addicated")) q3.put((4,"測試4addicated")) print(q3.get())

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM