Python並發之多線程


並發系列是一個很龐大的知識體系,要想完全弄明白是挺困難的,因為最近打算閱讀Tornado源碼, 其介紹談到了內部使用了異步非阻塞的調用方式。之前也沒有深入了解,這次就借此好好整理一下。

線程(threading模塊)

    線程是應用程序運行的最小單元,在同一個進程中,可以並發開啟多個線程,每個線程擁有自己的棧(存放臨時變量),同時相互之間是共享資源的。

    Python中使用threading模塊來開啟多線程

import threading, time


def func(n):
    time.sleep(2)
print(time.time(),n)
if __name__ == '__main__': for i in range(10): t = threading.Thread(target=func, args=(1,)) t.start() print('主線程結束')
'結果'
主線程結束
1532921321.058243 1 1532921321.058243 1 1532921321.058243 1 1532921321.058243 1 ...

     或者通過自定義類繼承Thread,並且重寫run方法

import threading,time

class Mythread(threading.Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        time.sleep(1)
        print('hello %s'%self.name)

if __name__ == '__main__':
    for i in range(5):

        m = Mythread('py')
        m.start()

    print('---主線程結束---')

 

    執行順序如下

                           

    主線程和子線程之間是相互獨立的,但是主線程運行完畢會等待子線程的運行,直到完畢,才回收資源。

 

    Thread對象可調用的方法

hread實例對象的方法
  # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名。
  # setName(): 設置線程名。
  #join():使主線程阻塞,直到該線程結束
threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

 

守護線程(setDaemon)

  如果一個線程設置為守護線程,那么它將會和主線程一起結束,而主線程會等待所有的非守護線程的子線程結束而退出。因此可以認為,守護線程是“不重要的線程”,主線程不等它。

import  threading, time
"""
設置兩個線程
"""

def func1():
    print('--非守護線程開始--')
    time.sleep(2)
    print('--非守護線程結束--')

def func2():
    print('--守護線程開始--')
    time.sleep(4)
    print('--守護線程結束--')

if __name__ == '__main__':
    t1 = threading.Thread(target=func1,args=())
    t2 = threading.Thread(target=func2,args=())
    t2.setDaemon(True)
    t1.start()
    t2.start()
 '''
 --非守護線程開始--
--守護線程開始--
--非守護線程結束--

  守護線程還沒運行完,主線程就結束了
 '''

 

而線程之間共享數據,必然會導致同時操作數據時的混亂,影響數據安全

import threading,time

def func():
    #開始處理數據
    global n
    a=n+1
    time.sleep(0.0001)
    n =a
    # 結束處理

if __name__ == '__main__':
    n=0
    li =[]
    for i in range(1000):
        t=threading.Thread(target=func,args=())
        li.append(t)
        t.start()
    for i in li:
        i.join()  #等待子線程全部執行完
    print(n)  #253

    '''
    我們希望能從0加到1000,但是由於有多個線程會拿到數據,
    如果處理速度慢,就會使數據混亂
    '''

 

 因此,對數據進行加鎖就很有必要了。

 

互斥鎖(Lock)

  通過獲取鎖對象,訪問共有數據,最后釋放鎖來完成一次操作,一旦某個線程獲取了鎖,當這個線程被切換時,下個個進程無法獲取該公有數據

import threading,time

def func():
    #開始處理數據
    global n
    lock.acquire() #獲取
    a=n+1
    time.sleep(0.00001)
    n =a
    lock.release() #釋放
    # 結束處理

if __name__ == '__main__':
    n=0
    lock=threading.Lock()
    li =[]
    for i in range(1000):
        t=threading.Thread(target=func,args=())
        li.append(t)
        t.start()
    for i in li:
        i.join()  #等待子線程全部執行完
    print(n)  #1000

 

   通過同步的互斥鎖來保證數據安全相比於線程串行運行而言,如每個線程start之前都使用.join()方法,無疑速度更快,因為它就只有在訪問數據的時候是串級的,其他的情況下是是並發的(雖然也不能是並行運行,因為GIL,之后會談到)。

 

 再看如下情況

  死鎖

import threading

if __name__ == '__main__':
    lock=threading.Lock()
    lock.acquire()
    lock.acquire() #程序卡住,只能獲取一次
    lock.release()
    lock.release()

 

 遞歸鎖(RLock)

  RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使

  用RLock代替Lock,則不會發生死鎖:

import threading

if __name__ == '__main__':
    lock=threading.RLock()
    lock.acquire()
    lock.acquire() #可以多次獲取,程序順利執行
    lock.release()
    lock.release()
   

 

信號量(Semaphore)

  能夠並發執行的線程數,超出的線程阻塞,直到有線程運行完成。

  Semaphore管理一個內置的計數器,
  每當調用acquire()時內置計數器-1;
  調用release() 時內置計數器+1;
  計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。

import threading,time
def sever_help(n):
    s.acquire()
    print('%s歡迎用戶%d'%(threading.current_thread().getName(),n))
    time.sleep(2)
    s.release()

if __name__ == '__main__':
    s = threading.Semaphore(5)
    li = []
    for i in range(32):
        t = threading.Thread(target=sever_help,args=(i,))
        li.append(t)
        t.start()

    for i in li:
        i.join()

    print("===結束==")

 

 通過對比互斥鎖可以看出,互斥鎖就是Semaphore(1)的情況,也完全可以使用后者,但是如果數據必須單獨使用,那么用互斥鎖效率更高。

 

事件(Event)

  如果某一個線程執行,需要判斷另一個線程的狀態,就可以使用Event,如:用Event類初始化一個event對象,線程a執行到某一步,設置event.wait(),即線程a阻塞,直到另一個線程設置event.set(),將event

狀態設置為True(默認是False)。

import threading
import time, random
def eating():
    event.wait()
    print('去吃飯的路上...')

def makeing():
    print('做飯中')
    time.sleep(random.randint(1,2))
    print('做好了,快來...')
    event.set()

if __name__ == '__main__':
    event=threading.Event()
    t1 = threading.Thread(target=eating)
    t2 = threading.Thread(target=makeing)
    t1.start()
    t2.start()    
    # 做飯中
    # 做好了,快來...
    # 去吃飯的路上...
飯做好了我才去吃

 基本方法:

event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;

event.clear():恢復event的狀態值為False。

 

 

線程隊列(queue)

特點:先進先出,

作用:多個線程之間進行通信(作用不大,多進程的隊列用處大)

常用方法:

  •  .get() 獲取  無數據時會阻塞
  •  .set('item') 設置,先設置的數據,先取出
  •     .empty()    是否為空

基本使用:生成者消費者模型

import threading, queue

def eating(n):
    #消費
    i = q.get()
    print('消費者%d吃了第%d份食物' % (n, i))


def making():
    #生產
    for i in range(1, 11):
        print('正在制作第%d份食物' % i)
        time.sleep(1)
        q.put(i)


if __name__ == '__main__':
    q = queue.Queue()
    t2 = threading.Thread(target=makeing)
    t2.start()
    for i in range(1, 11):
        t = threading.Thread(target=eating, args=(i,))
        t.start()
生產者,消費者

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM