python3.4多線程實現同步的四種方式(鎖機制、條件變量、信號量和同步隊列)


threading用於提供線程相關的操作,線程是應用程序中工作的最小單元。python當前版本的多線程庫沒有實現優先級、線程組,線程也不能被停止、暫停、恢復、中斷。

threading模塊提供的類:  
  Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。

threading 模塊提供的常用方法: 
  threading.currentThread(): 返回當前的線程變量。 
  threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。 
  threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

threading 模塊提供的常量:

  threading.TIMEOUT_MAX 設置threading全局超時時間。

Thread類


 

Thread是線程類,有兩種使用方法,直接傳入要運行的方法或從Thread繼承並覆蓋run():

# coding:utf-8
import threading
import time
#方法一:將要執行的方法作為參數傳給Thread的構造方法
def action(arg):
    time.sleep(1)
    print 'the arg is:%s\r' %arg

for i in xrange(4):
    t =threading.Thread(target=action,args=(i,))
    t.start()

print 'main thread end!'

#方法二:從Thread繼承,並重寫run()
class MyThread(threading.Thread):
    def __init__(self,arg):
        super(MyThread, self).__init__()#注意:一定要顯式的調用父類的初始化函數。
        self.arg=arg
    def run(self):#定義每個線程要運行的函數
        time.sleep(1)
        print 'the arg is:%s\r' % self.arg

for i in xrange(4):
    t =MyThread(i)
    t.start()

print 'main thread end!'

 

 

構造方法: 
Thread(group=None, target=None, name=None, args=(), kwargs={}) 

  group: 線程組,目前還沒有實現,庫引用中提示必須是None; 
  target: 要執行的方法; 
  name: 線程名; 
  args/kwargs: 要傳入方法的參數。

實例方法: 
  isAlive(): 返回線程是否在運行。正在運行指啟動后、終止前。 
  get/setName(name): 獲取/設置線程名。 

  start():  線程准備就緒,等待CPU調度
  is/setDaemon(bool): 獲取/設置是后台線程(默認前台線程(False))。(在start之前設置)

    如果是后台線程,主線程執行過程中,后台線程也在進行,主線程執行完畢后,后台線程不論成功與否,主線程和后台線程均停止
         如果是前台線程,主線程執行過程中,前台線程也在進行,主線程執行完畢后,等待前台線程也執行完成后,程序停止
  start(): 啟動線程。 
  join([timeout]): 阻塞當前上下文環境的線程,直到調用此方法的線程終止或到達指定的timeout(可選參數)。

 

使用例子一(未設置setDeamon): 

# coding:utf-8
import threading
import time

def action(arg):
    time.sleep(1)
    print  'sub thread start!the thread name is:%s\r' % threading.currentThread().getName()
    print 'the arg is:%s\r' %arg
    time.sleep(1)

for i in xrange(4):
    t =threading.Thread(target=action,args=(i,))
    t.start()

print 'main_thread end!'

執行結果

main_thread end!
sub thread start!the thread name is:Thread-2
the arg is:1
the arg is:0
sub thread start!the thread name is:Thread-4
the arg is:2
the arg is:3
Process finished with exit code 0
可以看出,創建的4個“前台”線程,主線程執行過程中,前台線程也在進行,主線程執行完畢后,等待前台線程也執行完成后,程序停止

 

驗證了serDeamon(False)(默認)前台線程,主線程執行過程中,前台線程也在進行,主線程執行完畢后,等待前台線程也執行完成后,主線程停止。

 

使用例子二(setDeamon=True)

# coding:utf-8
import threading
import time

def action(arg):
    time.sleep(1)
    print  'sub thread start!the thread name is:%s\r' % threading.currentThread().getName()
    print 'the arg is:%s\r' %arg
    time.sleep(1)

for i in xrange(4):
    t =threading.Thread(target=action,args=(i,))
    t.setDaemon(True)#設置線程為后台線程
    t.start()

print 'main_thread end!'

執行結果

main_thread end!

Process finished with exit code 0

可以看出,主線程執行完畢后,后台線程不管是成功與否,主線程均停止

驗證了serDeamon(True)后台線程,主線程執行過程中,后台線程也在進行,主線程執行完畢后,后台線程不論成功與否,主線程均停止。

使用例子三(設置join)

#coding:utf-8
import threading
import time

def action(arg):
    time.sleep(1)
    print  'sub thread start!the thread name is:%s    ' % threading.currentThread().getName()
    print 'the arg is:%s   ' %arg
    time.sleep(1)

thread_list = []    #線程存放列表
for i in xrange(4):
    t =threading.Thread(target=action,args=(i,))
    t.setDaemon(True)
    thread_list.append(t)

for t in thread_list:
    t.start()

for t in thread_list:
    t.join()

 

執行結果

sub thread start!the thread name is:Thread-2    
the arg is:1   
sub thread start!the thread name is:Thread-3    
the arg is:2   
sub thread start!the thread name is:Thread-1    
the arg is:0   
sub thread start!the thread name is:Thread-4    
the arg is:3   
main_thread end!

Process finished with exit code 0

設置join之后,主線程等待子線程全部執行完成后或者子線程超時后,主線程才結束

 

驗證了 join()阻塞當前上下文環境的線程,直到調用此方法的線程終止或到達指定的timeout,即使設置了setDeamon(True)主線程依然要等待子線程結束。

使用例子四(join不妥當的用法,使多線程編程順序執行)

#coding:utf-8
import threading
import time

def action(arg):
    time.sleep(1)
    print  'sub thread start!the thread name is:%s    ' % threading.currentThread().getName()
    print 'the arg is:%s   ' %arg
    time.sleep(1)


for i in xrange(4):
    t =threading.Thread(target=action,args=(i,))
    t.setDaemon(True)
    t.start()
    t.join()

print 'main_thread end!'

 

執行結果

sub thread start!the thread name is:Thread-1    
the arg is:0   
sub thread start!the thread name is:Thread-2    
the arg is:1   
sub thread start!the thread name is:Thread-3    
the arg is:2   
sub thread start!the thread name is:Thread-4    
the arg is:3   
main_thread end!

Process finished with exit code 0
可以看出此時,程序只能順序執行,每個線程都被上一個線程的join阻塞,使得“多線程”失去了多線程意義。

Lock、Rlock類


 

  由於線程之間隨機調度:某線程可能在執行n條后,CPU接着執行其他線程。為了多個線程同時操作一個內存中的資源時不產生混亂,我們使用鎖。

Lock(指令鎖)是可用的最低級的同步指令。Lock處於鎖定狀態時,不被特定的線程擁有。Lock包含兩種狀態——鎖定和非鎖定,以及兩個基本的方法。

可以認為Lock有一個鎖定池,當線程請求鎖定時,將線程至於池中,直到獲得鎖定后出池。池中的線程處於狀態圖中的同步阻塞狀態。

RLock(可重入鎖)是一個可以被同一個線程請求多次的同步指令。RLock使用了“擁有的線程”和“遞歸等級”的概念,處於鎖定狀態時,RLock被某個線程擁有。擁有RLock的線程可以再次調用acquire(),釋放鎖時需要調用release()相同次數。

可以認為RLock包含一個鎖定池和一個初始值為0的計數器,每次成功調用 acquire()/release(),計數器將+1/-1,為0時鎖處於未鎖定狀態。

簡言之:Lock屬於全局,Rlock屬於線程。

構造方法: 
Lock(),Rlock(),推薦使用Rlock()

實例方法: 
  acquire([timeout]): 嘗試獲得鎖定。使線程進入同步阻塞狀態。 
  release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。

例子一(未使用鎖):

#coding:utf-8
import threading
import time

gl_num = 0

def show(arg):
    global gl_num
    time.sleep(1)
    gl_num +=1
    print gl_num

for i in range(10):
    t = threading.Thread(target=show, args=(i,))
    t.start()

print 'main thread stop'

 

執行結果

main thread stop
12

3
4
568
9

910


Process finished with exit code 0
多次運行可能產生混亂。這種場景就是適合使用鎖的場景。

例子二(使用鎖):

# coding:utf-8

import threading
import time

gl_num = 0

lock = threading.RLock()


# 調用acquire([timeout])時,線程將一直阻塞,
# 直到獲得鎖定或者直到timeout秒后(timeout參數可選)。
# 返回是否獲得鎖。
def Func():
    lock.acquire()
    global gl_num
    gl_num += 1
    time.sleep(1)
    print gl_num
    lock.release()


for i in range(10):
    t = threading.Thread(target=Func)
    t.start()

 

執行結果

1
2
3
4
5
6
7
8
9
10

Process finished with exit code 0
可以看出,全局變量在在每次被調用時都要獲得鎖,才能操作,因此保證了共享數據的安全性

Lock對比Rlock

#coding:utf-8
 
import threading
lock = threading.Lock() #Lock對象
lock.acquire()
lock.acquire()  #產生了死鎖。
lock.release()
lock.release()
print lock.acquire()
 
 
import threading
rLock = threading.RLock()  #RLock對象
rLock.acquire()
rLock.acquire() #在同一線程內,程序不會堵塞。
rLock.release()
rLock.release()

 Condition類


 

  Condition(條件變量)通常與一個鎖關聯。需要在多個Contidion中共享一個鎖時,可以傳遞一個Lock/RLock實例給構造方法,否則它將自己生成一個RLock實例。

  可以認為,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處於等待阻塞狀態,直到另一個線程調用notify()/notifyAll()通知;得到通知后線程進入鎖定池等待鎖定。

構造方法: 
Condition([lock/rlock])

實例方法: 
  acquire([timeout])/release(): 調用關聯的鎖的相應方法。 
  wait([timeout]): 調用這個方法將使線程進入Condition的等待池等待通知,並釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。 
  notify(): 調用這個方法將從等待池挑選一個線程並通知,收到通知的線程將自動調用acquire()嘗試獲得鎖定(進入鎖定池);其他線程仍然在等待池中。調用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。 
  notifyAll(): 調用這個方法將通知等待池中所有的線程,這些線程都將進入鎖定池嘗試獲得鎖定。調用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。

 

例子一:生產者消費者模型

# encoding: UTF-8
import threading
import time

# 商品
product = None
# 條件變量
con = threading.Condition()


# 生產者方法
def produce():
    global product

    if con.acquire():
        while True:
            if product is None:
                print 'produce...'
                product = 'anything'

                # 通知消費者,商品已經生產
                con.notify()

            # 等待通知
            con.wait()
            time.sleep(2)


# 消費者方法
def consume():
    global product

    if con.acquire():
        while True:
            if product is not None:
                print 'consume...'
                product = None

                # 通知生產者,商品已經沒了
                con.notify()

            # 等待通知
            con.wait()
            time.sleep(2)


t1 = threading.Thread(target=produce)
t2 = threading.Thread(target=consume)
t2.start()
t1.start()

 

執行結果

produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...

Process finished with exit code -1
程序不斷循環運行下去。重復生產消費過程。

 

例子二:生產者消費者模型

import threading
import time

condition = threading.Condition()
products = 0

class Producer(threading.Thread):
    def run(self):
        global products
        while True:
            if condition.acquire():
                if products < 10:
                    products += 1;
                    print "Producer(%s):deliver one, now products:%s" %(self.name, products)
                    condition.notify()#不釋放鎖定,因此需要下面一句
                    condition.release()
                else:
                    print "Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products)
                    condition.wait();#自動釋放鎖定
                time.sleep(2)

class Consumer(threading.Thread):
    def run(self):
        global products
        while True:
            if condition.acquire():
                if products > 1:
                    products -= 1
                    print "Consumer(%s):consume one, now products:%s" %(self.name, products)
                    condition.notify()
                    condition.release()
                else:
                    print "Consumer(%s):only 1, stop consume, products:%s" %(self.name, products)
                    condition.wait();
                time.sleep(2)

if __name__ == "__main__":
    for p in range(0, 2):
        p = Producer()
        p.start()

    for c in range(0, 3):
        c = Consumer()
        c.start()

 

例子三:

import threading
 
alist = None
condition = threading.Condition()
 
def doSet():
    if condition.acquire():
        while alist is None:
            condition.wait()
        for i in range(len(alist))[::-1]:
            alist[i] = 1
        condition.release()
 
def doPrint():
    if condition.acquire():
        while alist is None:
            condition.wait()
        for i in alist:
            print i,
        print
        condition.release()
 
def doCreate():
    global alist
    if condition.acquire():
        if alist is None:
            alist = [0 for i in range(10)]
            condition.notifyAll()
        condition.release()
 
tset = threading.Thread(target=doSet,name='tset')
tprint = threading.Thread(target=doPrint,name='tprint')
tcreate = threading.Thread(target=doCreate,name='tcreate')
tset.start()
tprint.start()
tcreate.start()

Event類


 

  Event(事件)是最簡單的線程通信機制之一:一個線程通知事件,其他線程等待事件。Event內置了一個初始為False的標志,當調用set()時設為True,調用clear()時重置為 False。wait()將阻塞線程至等待阻塞狀態。

  Event其實就是一個簡化版的 Condition。Event沒有鎖,無法使線程進入同步阻塞狀態。

構造方法: 
Event()

實例方法: 
  isSet(): 當內置標志為True時返回True。 
  set(): 將標志設為True,並通知所有處於等待阻塞狀態的線程恢復運行狀態。 
  clear(): 將標志設為False。 
  wait([timeout]): 如果標志為True將立即返回,否則阻塞線程至等待阻塞狀態,等待其他線程調用set()。

 

例子一

# encoding: UTF-8
import threading
import time

event = threading.Event()


def func():
    # 等待事件,進入等待阻塞狀態
    print '%s wait for event...' % threading.currentThread().getName()
    event.wait()

    # 收到事件后進入運行狀態
    print '%s recv event.' % threading.currentThread().getName()


t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t1.start()
t2.start()

time.sleep(2)

# 發送事件通知
print 'MainThread set event.'
event.set()

 

 測試結果

Thread-1 wait for event...
Thread-2 wait for event...

#2秒后。。。
MainThread set event.
Thread-1 recv event.
 Thread-2 recv event.

Process finished with exit code 0

timer類


 

  Timer(定時器)是Thread的派生類,用於在指定時間后調用一個方法。

構造方法: 
Timer(interval, function, args=[], kwargs={}) 
  interval: 指定的時間 
  function: 要執行的方法 
  args/kwargs: 方法的參數

實例方法: 
Timer從Thread派生,沒有增加實例方法。

例子一:

# encoding: UTF-8
import threading


def func():
    print 'hello timer!'


timer = threading.Timer(5, func)
timer.start()

 

線程延遲5秒后執行。

local類


 

local是一個小寫字母開頭的類,用於管理 thread-local(線程局部的)數據。對於同一個local,線程無法訪問其他線程設置的屬性;線程設置的屬性不會被其他線程設置的同名屬性替換。

可以把local看成是一個“線程-屬性字典”的字典,local封裝了從自身使用線程作為 key檢索對應的屬性字典、再使用屬性名作為key檢索屬性值的細節。

# encoding: UTF-8
import threading
 
local = threading.local()
local.tname = 'main'
 
def func():
    local.tname = 'notmain'
    print local.tname
 
t1 = threading.Thread(target=func)
t1.start()
t1.join()
 
print local.tname

信號量 

信號量也提供acquire方法和release方法,每當調用acquire方法的時候,如果內部計數器大於0,則將其減1,如果內部計數器等於0,則會阻塞該線程,知道有線程調用了release方法將內部計數器更新到大於1位置。

import threading
import time
class Num:
    def __init__(self):
        self.num = 0
        self.sem = threading.Semaphore(value = 3)
        #允許最多三個線程同時訪問資源
 
    def add(self):
        self.sem.acquire()#內部計數器減1
        self.num += 1
        num = self.num
        self.sem.release()#內部計數器加1
        return num
    
n = Num()
class jdThread(threading.Thread):
    def __init__(self,item):
        threading.Thread.__init__(self)
        self.item = item
    def run(self):
        time.sleep(2)
        value = n.add()
        print(self.item,value)
 
for item in range(100):
    t = jdThread(item)
    t.start()
    t.join()

 

條件變量


所謂條件變量,即這種機制是在滿足了特定的條件后,線程才可以訪問相關的數據。

 

它使用Condition類來完成,由於它也可以像鎖機制那樣用,所以它也有acquire方法和release方法,而且它還有wait,notify,notifyAll方法。

 

"""
一個簡單的生產消費者模型,通過條件變量的控制產品數量的增減,調用一次生產者產品就是+1,調用一次消費者產品就會-1.
"""
 
"""
使用 Condition 類來完成,由於它也可以像鎖機制那樣用,所以它也有 acquire 方法和 release 方法,而且它還有
wait, notify, notifyAll 方法。
"""
 
import threading
import queue,time,random
 
class Goods:#產品類
    def __init__(self):
        self.count = 0
    def add(self,num = 1):
        self.count += num
    def sub(self):
        if self.count>=0:
            self.count -= 1
    def empty(self):
        return self.count <= 0
 
class Producer(threading.Thread):#生產者類
    def __init__(self,condition,goods,sleeptime = 1):#sleeptime=1
        threading.Thread.__init__(self)
        self.cond = condition
        self.goods = goods
        self.sleeptime = sleeptime
    def run(self):
        cond = self.cond
        goods = self.goods
        while True:
            cond.acquire()#鎖住資源
            goods.add()
            print("產品數量:",goods.count,"生產者線程")
            cond.notifyAll()#喚醒所有等待的線程--》其實就是喚醒消費者進程
            cond.release()#解鎖資源
            time.sleep(self.sleeptime)
 
class Consumer(threading.Thread):#消費者類
    def __init__(self,condition,goods,sleeptime = 2):#sleeptime=2
        threading.Thread.__init__(self)
        self.cond = condition
        self.goods = goods
        self.sleeptime = sleeptime
    def run(self):
        cond = self.cond
        goods = self.goods
        while True:
            time.sleep(self.sleeptime)
            cond.acquire()#鎖住資源
            while goods.empty():#如無產品則讓線程等待
                cond.wait()
            goods.sub()
            print("產品數量:",goods.count,"消費者線程")
            cond.release()#解鎖資源
 
g = Goods()
c = threading.Condition()
 
pro = Producer(c,g)
pro.start()
 
con = Consumer(c,g)
con.start()

同步隊列 

put方法和task_done方法,queue有一個未完成任務數量num,put依次num+1,task依次num-1.任務都完成時任務結束。

import threading
import queue
import time
import random
 
'''
1.創建一個 Queue.Queue() 的實例,然后使用數據對它進行填充。
2.將經過填充數據的實例傳遞給線程類,后者是通過繼承 threading.Thread 的方式創建的。
3.每次從隊列中取出一個項目,並使用該線程中的數據和 run 方法以執行相應的工作。
4.在完成這項工作之后,使用 queue.task_done() 函數向任務已經完成的隊列發送一個信號。
5.對隊列執行 join 操作,實際上意味着等到隊列為空,再退出主程序。
'''
 
class jdThread(threading.Thread):
    def __init__(self,index,queue):
        threading.Thread.__init__(self)
        self.index = index
        self.queue = queue
 
    def run(self):
        while True:
            time.sleep(1)
            item = self.queue.get()
            if item is None:
                break
            print("序號:",self.index,"任務",item,"完成")
            self.queue.task_done()#task_done方法使得未完成的任務數量-1
 
q = queue.Queue(0)
'''
初始化函數接受一個數字來作為該隊列的容量,如果傳遞的是
一個小於等於0的數,那么默認會認為該隊列的容量是無限的.
'''
for i in range(2):
    jdThread(i,q).start()#兩個線程同時完成任務
 
for i in range(10):
    q.put(i)#put方法使得未完成的任務數量+1

 

參考地址

作者:djd已經存在   原文:https://blog.csdn.net/djd1234567/article/details/45316373 
python--threading多線程總結:  https://www.cnblogs.com/tkqasn/p/5700281.html

  http://www.cnblogs.com/huxi/archive/2010/06/26/1765808.html

  http://www.cnblogs.com/wupeiqi/articles/5040827.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM