threading --- 基于线程的并行
官方文档:threading --- 基于线程的并发 — Python 3.9.9 文档
CPython implementation detail: 在 CPython 中,由于存在全局解释器锁, 同一时刻只有一个线程可以执行 Python 代码(虽然某些性能导向的库可能会去除此限制)。 如果你想让你的应用更好地利用多核心计算机的计算资源,推荐你使用multiprocessing或concurrent.futures.ProcessPoolExecutor但是,如果你想要同时运行多个 I/O 密集型任务,则多线程仍然是一个合适的模型。
再来引入一个概念:
并行(parallelism): 是同一时刻,每个线程都在执行。
并发(concurrency):是同一时刻,只有一个线程执行,然后交替执行(再加上电脑运行速度快)。所以从一个宏观的角度来看,似乎每个线程都在执行了。
可以知道python线程是并发的。
关于线程Threading的方法(获取线程的某种属性)。
active_count():它会获得,执行这个方法时,还存活的Thread()的对象数量。
enumerate():返回当前所有存活的Thread对象的列表。
current_thread():返回当前调用者 控制Thread()线程的对象。如果调用者控制的线程对象不是由threading创建,则会返回一个功能受限的虚拟线程对象。
get_ident():返回当前线程的“线程标识符”。它是一个非零整数。
get_native_id():返回内核分配给当前线程的原生集成线程ID。这是一个非负整数。
main_thread():返回主线程(thread)对象,一般是python解释器开始时创建的线程。
一、简介
线程对象:
官方解释:
The Thread
class represents an activity that is run in a separate thread of control. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run()
method in a subclass. No other methods (except for the constructor) should be overridden in a subclass. In other words, only override the __init__()
and run()
methods of this class.
Thread类表示在单独的控制线程中运行的活动。指定活动有两种方法:将可调用对象传递给构造函数,或重写子类中的run()方法。子类中不应重写任何其他方法(构造函数除外)。换句话说,只重写这个类的_init__;()和run()方法。
一旦线程活动开始,该线程会被认为是 '存活的' 。当它的run() 方法终结了(不管是正常的还是抛出未被处理的异常),就不是'存活的'。
先看看该类的参数有哪些:
class threading.
Thread
(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
group:官方的解释是,为了日后扩展ThreadGroup类实现而保留。(唉,我也不太清楚的)
target:是要于多线程的函数
name:是线程的名字
args :函数的参数,类型是元组()
kwargs:函数的参数,类型是字典{}
为了便于理解,先举一个小例子,为了方便理解,先简单了解一下该类的一个方法(函数在类中被称为方法):
start():开始线程活动
import threading import time def threading_test(name): print('进程名字---->:',name) print( name,'执行的第一步') print( name,'执行的第二步') print(name, '执行的第三步') # 实例化对象 obj1 = threading.Thread(target=threading_test ,args=("obj1",)) obj2 = threading.Thread(target=threading_test ,args=("obj2",)) # 开始线程 obj1.start() obj2.start()
运行结果:
会不会疑惑,嗯?就这,不理解啊,和调用普通的函数有什么区别啊?
但是加上一个一个延时函数就会看的区别的
例如,把上面的函数修改成以下的再看看
def threading_test(name): print('进程名字---->:',name) time.sleep(0.1) print( name,'执行的第一步') time.sleep(0.5) print( name,'执行的第二步') time.sleep(0.2) print(name, '执行的第三步')
运行结果就会变成:
看到区别了吧,其实在这里两个对象是并发运行的,而且多运行几次你会发现每次运行的结果不一样。是因为两个是并发的,谁先执行就要看谁抢得执行权。
二、Threading.Thread其中常用到的方法
既然看懂了多线程并发的效果,那么接下来就更加深入的了解其它的方法吧。
start():开始线程活动。这里注意,每个对象只能执行一次,不信你可以试试,看他会不会抛出RuntimeError这个异常。
run() :表示线程的方法,在线程被cpu调度后,就会自动执行这个方法。
但是如果你在自定义的类中想方法run和父类不一样,可以重写。
join() :等待,直到线程结束。
setName():给线程设置名字
getName():获取线程名字
is_alive():返回线程是否存活(True或者False)
setDaemon():设置守护线程(True或者False),必须在start()之前设置,不然会报错。
isDaemon() :是否是线程守护,默认是False。
介绍完之后亲自尝试一下看看效果:
先看看join()
为了清晰的认识这个方法的作用,我把上面的代码略作改变
import threading import time def threading_test(name): print('进程名字---->:',name) time.sleep(0.1) print( name,'执行的第一步') time.sleep(0.5) print( name,'执行的第二步') time.sleep(0.2) print(name, '执行的第三步') # 实例化对象 obj1 = threading.Thread(target=threading_test ,args=("obj1",)) obj2 = threading.Thread(target=threading_test ,args=("obj2",)) # 开始线程 obj1.start() obj2.start() print('<-----主线程结束:--------->')
运行结果:
注意主线程结束出现的位置,后面还会考到。
稍改一下代码:
# 开始线程 obj1.start() # 加入join()之后会等调用该方法的线程对象执行完之后才会往下执行代码 obj1.join() obj2.start() print('<-----主线程结束:--------->')
运行结果:
看到了没,在obj1执行完之后,才会往下执行,但是主线程与obj2好像也是并发的。多运行几次你会发现,obj2与主线程的执行会有不同的结果。
为了更加清晰的了解,再稍改一下。
# 开始线程 obj1.start() obj2.start() # 把join()放在obj2之后 # 加入join()之后会等调用该方法的线程对象执行完之后才会往下执行代码 obj1.join() print('<-----主线程结束:--------->')
运行结果:
可以看出,obj1与obj2是并发的允许,执行完obj1.join()后才往下执行
obj1.join()
为灵魂级别的理解这次改到有点大不过放心,代码简单。
import threading import time def threading_test(name,n): print('进程名字---->:',name) time.sleep(1) print(name,'进程正在执行') time.sleep(n) print(name,'这个进程结束') # 实例化对象 obj1 = threading.Thread(target=threading_test ,args=("obj1",1)) obj2 = threading.Thread(target=threading_test ,args=("obj2",3)) # 开始线程 obj1.start() obj2.start() # 把join()放在obj2之后 # 加入join()之后会等调用该方法的线程对象执行完之后才会往下执行代码 obj1.join() print('<-----主线程结束:--------->')
运行结果:

这个时候完全看懂了吧,join()在obj2之后,所以obj1与obj2时并发的,但是由于obj2有一个延时大于obj1,所以在obj1执行完之后,obj2还在执行。
主程序结束这条语句只是等待join()对象的obj1结束才会执行,并不等obj2。你可以把他们两个延时的参数调换一下,看看结果。
接下来看看关于setName和getName的用法吧
# 实例化对象,为了方便就不加参数和函数了,就看看效果。 obj1 = threading.Thread() obj2 = threading.Thread() obj3 = threading.Thread() # 开始线程 obj1.start() obj2.start() obj2.setName('你的名字叫做喂') obj3.start() time.sleep(5) print('获取线程名字',obj1.getName()) print('获取线程名字',obj2.getName()) print('获取线程名字',obj3.getName()) print('<-----主线程结束:--------->')
运行结果:
可以看到,它是有默认的名字的,而且有规律,Thread-N ,如果对Thread类还有印象的话就会知道里面有一个餐宿name,如果你没有赋值就会从1到N不断地给它命名
为Thread-N ,而setName只是改变调用这个方法的线程名(这里就是覆盖掉了默认的名字),并不会改变其它线程的N值。但是如果你在实例化对象时就给name赋值,会对其后面的N值影响。当然初始化顺序也会影响N值。
# 实例化对象,为了方便就不加参数和函数了,就看看效果。 obj1 = threading.Thread() obj2 = threading.Thread(name='obj2') #注意这里,Thread-2会跳过去而不是被覆盖 # 初始化顺序影响N的值 obj4 = threading.Thread() obj3 = threading.Thread() # 开始线程 obj1.start() obj2.start() obj2.setName('你的名字叫做喂') obj3.start() time.sleep(5) print('获取线程名字',obj1.getName()) print('获取线程名字',obj2.getName()) print('获取线程名字',obj3.getName()) print('获取线程名字',obj4.getName()) print('<-----主线程结束:--------->')
运行结果:
为了省事,我把下面的一起分析了。
active_count(),current_thread(),get_ident(),enumerate(),main_thread()
这里要注意有一个隐藏线程就是执行整个代码的主线程。
# 实例化对象 obj1 = threading.Thread(target=test,args=()) obj2 = threading.Thread(target=test,args=()) obj3 = threading.Thread(target=test,args=()) print('在start方法之前有几个线程活动',threading.active_count(),' 具体是:') print(threading.enumerate()) obj1.start() obj2.start() obj3.start() print('在全部start方法之后有几个存活',threading.active_count(), ' 具体是:') print(threading.enumerate()) print('返回当前线程的“线程标识符”:',threading.get_ident()) print('返回当前调用者 控制Thread线程的对象:',threading.current_thread()) print('返回主线程(thread)对象:',threading.main_thread())
运行结果:
可以看到,主线程在运行代码时就会开启,而自己利用threading创建的线程只有start()之后才会‘存活’。而且在主线程里面执行其中的一些方法都会指向主线程。
再改一下代码,
def test(name,n): time.sleep(n) print('<-------------------------------------------------------------------------->') print('对象',name,'在延时',n,'秒之后的一些信息是:') print('现在还存活的有', threading.active_count(), '个 具体是:') print(threading.enumerate()) print('返回当前线程的“线程标识符”:', threading.get_ident()) print('返回当前调用者 控制Thread线程的对象:', threading.current_thread()) print('返回主线程(thread)对象:', threading.main_thread()) # 实例化对象 obj1 = threading.Thread(target=test,args=('obj1',1)) obj2 = threading.Thread(target=test,args=('obj2',8)) obj1.start() obj2.start() time.sleep(4)
运行结果:
可以看到了get_ident与current_thread,就是获取当前的情况,main_thread()仍旧不变。这里需要注意,在执行obj2时由于延时8秒,所以obj1已经执行结束了。
setDaemon()守护线程:
def test(name,n): time.sleep(n) print('<-------------------------------------------------------------------------->') print('对象',name,'在延时',n,'秒之后是否还能继续:') # 实例化对象 obj1 = threading.Thread(target=test,args=('obj1',1)) obj2 = threading.Thread(target=test,args=('obj2',8)) obj1.setDaemon(False) obj2.setDaemon(False) obj1.start() obj2.start()
time.sleep(4)
print('主代码运行结束')
都设置为False的运行结果:
都设置为True的运行结果:
从这两个结果应该能看出:当他们都设置为True时,会随着主线程的结束而退出程序,所以会把还没执行完的其它线程也结束。
但是有没有想过其中有一个为True,一个为False会是怎么样的一个结果。有这样的一个结论:当没有存活的非守护线程时,整个Python程序才会退出。
这里把obj1为True,obj2为False
把两个秒数对换一下(或者Ture与False对换也行)
看出来了,如果主线程结束,还有非守护线程在执行,程序就不会退出,就能继续执行。
三、关于threading其它类的知识。
引入:回想一下上面,各个线程是并发。而且每一次运行的结果是是不可预知的,是随机的。这在某些情况是很不好的,有些线程会有先后的关系。
例如:同时copy多个文件,虽然读文件是可以同时读所需要copy的文件,写也是一样的;但是对于一个文件而言,只能先读取,才能写。
所以有了同步,互斥等关系。
为了下面的知识方便讲解:
讲一下线程的知识:(链接:(23条消息) 线程的5种状态详解_老猫的博客-CSDN博客_线程状态 )
1.新建状态(New)
通过:threading.Thread()
2.就绪状态(Runnable)
当线程对象调用start()方法即启动了线程,start()方法创建线程运行的系统资源,并调度线程运行run()方法。当start()方法返回后,线程就处于就绪状态。
处于就绪状态的线程并不一定立即运行run()方法,线程还必须同其他线程竞争CPU时间,只有获得CPU时间才可以运行线程。
3.运行状态(Running)
当线程获得CPU时间后,它才进入运行状态,真正开始执行run()方法。
4. 阻塞状态(Blocked)
线程运行过程中,可能由于各种原因进入阻塞状态:
1>线程通过调用sleep方法进入睡眠状态;
2>线程调用一个在I/O上被阻塞的操作,即该操作在输入输出操作完成之前不会返回到它的调用者;
3>线程试图得到一个锁,而该锁正被其他线程持有;
4>线程在等待某个触发条件;
......
所谓阻塞状态是正在运行的线程没有运行结束,暂时让出CPU,这时其他处于就绪状态的线程就可以获得CPU时间,进入运行状态。
注意:与阻塞对应的是唤醒,由什么事件阻塞,就应由什么事件唤醒,例如由于sleep阻塞,唤醒则是因为sleep结束。
4. 死亡状态(Dead)
有两个原因会导致线程死亡:
1)run方法正常退出而自然死亡。
2)一个未捕获的异常终止了run方法而使线程猝死。
其实第二部分一直在讲Threading.Thread这里类
1、threading.Lock 这个类用于锁对象。
我先抛出一个官方的结论(很重要记住啊)。实现原始锁对象的类。一旦一个线程获得一个锁,会阻塞随后尝试获得锁的线程,直到它被释放;任何线程都可以释放它。
原始锁是一个在锁定时不属于特定线程的同步基元组件。原始锁处于 "锁定" 或者 "非锁定" 两种状态之一。它被创建时为非锁定状态。他有两个基本方法。
acquire()和release()
acquire(blocking=True, timeout=- 1):
可以阻塞或非阻塞地获得锁(这里可以理解为我为了获得锁需不需要暂停的等)。
blocking:默认为True (为阻塞),False(为非阻塞)。
tiemout:只要无法获得锁,将最多阻塞 timeout 设定的秒数。只在要锁定的情况才有用。默认为-1
如果已经获得锁则返回True,如果超时则返回False
release():释放锁(解锁),由锁定转化为非锁定。注意不能对未锁定的对象进行解锁否则会报错。
looked():返回是否处于锁定状态。
import threading import time # 实例化锁对象 look=threading.Lock() # 锁定 flag=look.acquire() print("flag=",flag) print("look.locked()=",look.locked())
flag=look.acquire(timeout=2) #这里可以解释为,我最多只等你两秒,否则就返回False print("flag=",flag) print("look.locked()1=",look.locked()) # 对锁定的对象解锁可以 look.release() print("look.locked()2=",look.locked()) print('看看下面是否出错') look.release()
运行结果:
再看一下阻塞和非阻塞这个问题
threading.Thread() # 实例化锁对象,初始化为非锁定 look=threading.Lock() # 非阻塞获取锁,由于处于非锁定,所以可以锁定。 flag0=look.acquire(blocking=False) print("flag0=",flag0) # 不阻塞,立即取申请锁,且不成功则返回False(不阻塞的会 由于timeout这个参数会出错) flag=look.acquire(blocking=False) print("flag=",flag) # 阻塞,默认timeout=-1 它可以无限地等待 # 最多等3秒,不成功则返回False flag1=look.acquire(blocking=True,timeout=3) print("flag1=",flag1) look.release()
下面介绍多线程的另一种的创建方法:继承父类,可以重写run方法。
import threading import time # 继承threading.Thread这个类 class BankAccount(threading.Thread): def __init__(self, user,sleep_time): # 重构run函数必须要写 super(BankAccount, self).__init__() # 用户 self.user=user# 睡眠时间 self.sleep_time=sleep_time def run(self): time.sleep(1) print(self.user,'开始') # 实例化对象 obj1 = BankAccount('obj1',100,6) obj2 = BankAccount('obj2',100,5) obj3 = BankAccount('obj3',100,4) obj4 = BankAccount('obj4',100,3) obj5 = BankAccount('obj5',100,2) obj6 = BankAccount('obj6',100,1) obj_list=[obj1,obj2,obj3,obj4,obj5,obj6] # 开始 for i in range(6): obj_list[i].start()
未加锁定的情况:
加锁之后:
def run(self): look.acquire() time.sleep(1) print(self.user,'开始') look.release()
运行结果:
注意由于BankAccount继承于threading.Thread这个类重写了run()这个方法,它在start()之后就会自动run()。obj1先启动obj1所以先锁定。等obj1运行到解锁,才会让obj2对look这个对象上锁 ......。
再来一个有趣一点的代码吧,也就是改一下的啦。
import threading import time # 继承threading.Thread这个类 class BankAccount(threading.Thread): def __init__(self, user, sleep_time): # 重构run函数必须要写 super(BankAccount, self).__init__() # 用户 self.user=user # 睡眠时间 self.sleep_time=sleep_time def run(self): start=time.time() # 锁定 look.acquire() time.sleep(1) print(self.user,'开始') # 释放锁 look.release() end=time.time() print(self.user, '锁运行时间为:', end - start) time.sleep(self.sleep_time) print(self.user,'锁结束之后') # 实例化对象 obj1 = BankAccount('obj1',100,6) obj2 = BankAccount('obj2',100,5) obj3 = BankAccount('obj3',100,4) obj4 = BankAccount('obj4',100,3) obj5 = BankAccount('obj5',100,2) obj6 = BankAccount('obj6',100,1) obj_list=[obj1,obj2,obj3,obj4,obj5,obj6] # 实例化锁对象 look=threading.Lock() # 开始 for i in range(6): obj_list[i].start()
运行结果:
分析一下:每个给锁的运行时间不一样,通过time.sleep(self.sleep_time)使得 print(self.user,'锁结束之后') 这条语句它们再同一起跑线,由于这6个对象属于多线程并发。所以由obj1,obj2,obj3的这种情况。
2、再来介绍一个相似的:递归锁对象threading.RLoock这个类。
都是有两个方法:acquire(),release(),用法差不多。但是没有looked()
来讲一下区别:
递归锁对象:对于同一个线程同一个锁对象可以上锁再上锁(相当于多把锁),每个线程解锁只能有开启的线程关闭。
一个线程对象获得了几次锁,必须解几次锁,才会让其他线程能锁定。
锁对象 : 同一个线程只能有一把锁,每个线程可以由其它线程解锁。
来看看效果
首先时锁对象:
import threading import time def test(name): if name=='obj1': # obj1线程锁定 look.acquire() if name=='obj2': # # obj2线程解锁 look.release() time.sleep(1) print(name,'运行中') # 实例化递归锁对象 Rlook=threading.RLock() # 实例化锁对象 look=threading.Lock() # 实例化线程对象 obj1=threading.Thread(target=test,args=("obj1",)) obj2=threading.Thread(target=test,args=("obj2",)) # 列表 obj_list=[obj1,obj2] for obj in obj_list: obj.start()
运行结果:
递归锁对象(略改一下):
def test(name): if name=='obj1': # obj1线程锁定 Rlook.acquire() if name=='obj2': # obj2线程解锁 Rlook.release() time.sleep(1) print(name,'运行中')
报错:
在递归锁对象中obj2没上锁,不能解锁
再略改下:
def test(name): if name=='obj1': Rlook.acquire() Rlook.acquire(timeout=2) Rlook.release() if name=='obj2': Rlook.acquire() # 对于同一个锁对象,obj1两个锁只解了一个,所以obj2不能上锁。会在这里给你卡死 print('我是obj2') time.sleep(1) print(name,'运行中')
运行结果:
再改一下:
def test(name): if name=='obj1': Rlook.acquire() Rlook.acquire(timeout=2) Rlook.release() Rlook.release() if name=='obj2': Rlook.acquire() print('我是obj2') Rlook.release() time.sleep(1) print(name,'运行中')
运行结果:
3、条件对象threading.Condition这个类:
方法有:
acquire(*arg):
请求底层锁。此方法调用底层锁的相应方法,返回值是底层锁相应方法的返回值。
release():
释放底层锁。此方法调用底层锁的相应方法。没有返回值。
wait(timeout=None) :
等待直到被通知或发生超时。如果线程在调用此方法时没有获得锁,将会引发RuntimeError异常。
这个方法释放底层锁,然后阻塞,直到在另外一个线程中调用同一个条件变量的 notify() 或 notify_all() 唤醒它,或者直到可选的超时发生。
一旦被唤醒或者超时,它重新获得锁并返回。
当底层锁是个 RLock ,不会使用它的 release() 方法释放锁,因为当它被递归多次获取时,实际上可能无法解锁。相反,使用了RLock 类的内部接口,
即使多次递归获取它也能解锁它。 然后,在重新获取锁时,使用另一个内部接口来恢复递归级别。
返回 True
,除非提供的 timeout 过期,这种情况下返回 False
。
wait_for(predicate, timeout=None) :
等待,直到条件计算为真。 predicate 应该是一个可调用对象而且它的返回值可被解释为一个布尔值。可以提供 timeout 参数给出最大等待时间。
这个实用方法会重复地调用 wait() 直到满足判断式或者发生超时。返回值是判断式最后一个返回值,而且如果方法发生超时会返回 False
。
忽略超时功能,调用此方法大致相当于编写:
while not predicate(): cv.wait()
因此,规则同样适用于wait() :锁必须在被调用时保持获取,并在返回时重新获取。 随着锁定执行判断式。
notify() :
默认唤醒一个等待这个条件的线程。如果调用线程在没有获得锁的情况下调用这个方法,会引发 RuntimeError异常。
这个方法唤醒最多 n 个正在等待这个条件变量的线程;如果没有线程在等待,这是一个空操作。
当前实现中,如果至少有 n 个线程正在等待,准确唤醒 n 个线程。但是依赖这个行为并不安全。未来,优化的实现有时会唤醒超过 n 个线程。
注意:被唤醒的线程并没有真正恢复到它调用的 wait() ,直到它可以重新获得锁。 因为 notify()不释放锁,其调用者才应该这样做。
notify_all() :
唤醒所有正在等待这个条件的线程。这个方法行为与 notify() 相似,但并不只唤醒单一线程,而是唤醒所有等待线程。
如果调用线程在调用这个方法时没有获得锁,会引发RuntimeError 异常。
import threading import time def test_a(name,condition): # 申请锁,如果成功 if cov.acquire(): print(name) # 条件对象锁 cov = threading.Condition() obj1 = threading.Thread(target=test_a,args=('obj1',12)) obj1.start() obj2 = threading.Thread(target=test_a,args=('obj2',10)) obj2.start()
运行结果:
稍改一下:
def test_a(name,condition): # 申请锁,如果成功 if cov.acquire(): print(name) cov.release()
运行结果:
可以看出来在一个线程获得锁,就会阻止其它线程获得。
再修改一下:
def test_a(name,condition): # 申请锁,如果成功 if cov.acquire(): # if condition>11: # 进入阻塞状态,并释放锁让其它线程可以活得锁 cov.wait() print(name) cov.release() # 条件对象锁 cov = threading.Condition() obj1 = threading.Thread(target=test_a,args=('obj1',12)) obj1.start() obj2 = threading.Thread(target=test_a,args=('obj2',10)) obj2.start()
运行结果:
可以看出:obj1的条件满足就进入阻塞状态,并把锁释放。obj2可以获得锁所以可以继续往下运行,但是obj1一直每唤醒,会卡死。
再稍改一下:
def test_a(name,condition): # 申请锁,如果成功 if cov.acquire(): if condition>11: # 进入阻塞状态,并释放锁让其它线程可以活得锁 cov.wait() # 唤醒进入阻塞的状态 cov.notify() print(name) cov.release()
运行结果:
可以看出obj2的notify 把进入到obj1阻塞状态的线程唤醒,并运行结束
def test_a(name,condition): # 申请锁,如果成功 if cov.acquire(): # if condition>11: # 进入阻塞状态,并释放锁让其它线程可以活得锁 cov.wait() # 唤醒进入阻塞的状态,只有obj2才唤醒,由于多了一个对象也满足条件,所以只释放了一个,还有一个没释放,也会卡死。 if name=="obj2": cov.notify() print(name) cov.release() # 条件对象锁 cov = threading.Condition() obj1 = threading.Thread(target=test_a,args=('obj1',12)) obj1.start() # 加入一个新对象 obj3 = threading.Thread(target=test_a,args=('obj3',13)) obj3.start() obj2 = threading.Thread(target=test_a,args=('obj2',10)) obj2.start()
运行结果:
把上面的notify(), 改成notify(2)可以唤醒两个线程。所以刚好可以全部唤醒。运行结果:
换成cov.notify_all()唤醒全部。运行结果如上。
注意:唤醒之后,它会把进入阻塞状态的线程先随机挑出一个线程锁定,如果不是放锁。另一个线程就会无法锁定,卡死。多运行几次你会发现obj1与obj3的先后不确定。
再变一下:
由
if name=="obj2":
cov.notify()
变成:
# 唤醒进入阻塞的状态,谁都可以唤醒一个,obj2不会进入阻塞,可以释放一个线程,释放之后。唤醒的继续往下执行,也会释放另一个。也会上面的结果。
cov.notify()
下面再介绍他的最后一个方法:
wait_for(predicate, timeout=None)
predicate为一个可调用对象,返回值可以解释为布尔值。
python可调用对象有七种:
1、自定义函数
2、内置函数
3、内置方法
4、方法
5、类
6、类的实例
7、使用yield关键字的函数或方法
具体详细的可以参考这个:Python 可调用对象 - _Joshua - 博客园 (cnblogs.com)
def test_a(name,condition): # 申请锁,如果成功 if cov.acquire(): # func为可调用对象的自定义函数, 注意这里是func返回的解释为False才会wait() # 这里需要住哟tmeout=-1并不是无限时间,None cov.wait_for(func,timeout=None) print(name) cov.release() # 自定义函数,测试可调式对象 def func():
# 0可以解释为False value=0 return value # 条件对象锁 cov = threading.Condition() obj1 = threading.Thread(target=test_a,args=('obj1',12)) obj1.start() 运行结果:
卡死
把上面的
cov.wait_for(func,timeout=NOne)
改成
cov.wait_for(func,timeout=4)
大概四秒之后就会有输出。

改一下:
class A(threading.Thread): def __init__(self,name, condition): super(A,self).__init__() self.name=name self.condition=condition def run(slef): # 申请锁,如果成功 if cov.acquire(): cov.wait_for(slef.func,timeout=None) # 唤醒 cov.notify() print(slef.name) cov.release() # 自定义函数,测试可调式对象,只有obj2才返回1可解释为True def func(slef): # python三目运算符 # 条件为真时的结果 if 判段的条件 else 条件为假时的结果 value = 1 if(slef.condition<11) else 0 return value # 条件对象锁 cov = threading.Condition() obj1 = A('obj1',12) obj1.start() obj2 = A('obj2',10) obj2.start()
运行结果:

obj1会进入阻塞状态,但是没被唤醒,卡死,只输出obj2。虽然有唤醒,
但由于wait_for(predicate, timeout=None)
while not predicate(): cv.wait()
会一直wait,除非predicate()返回值变了
4、信号量对象:
class threading.
Semaphore
(value=1)
可选参数 value 赋予内部计数器初始值,默认值为 1
。如果 value 被赋予小于0的值,将会引发ValueError 异常。
方法:
acquire
(blocking=True, timeout=None)
获取一个信号量。
在不带参数的情况下调用时:
-
如果在进入时内部计数器的值大于零,则将其减一并立即返回
True
。 -
如果在进入时内部计数器的值为零,则将会阻塞直到被对release() 的调用唤醒。 一旦被唤醒(并且计数器的值大于 0),则将计数器减 1 并返回
True
。 每次对release() 的调用将只唤醒一个线程。 线程被唤醒的次序是不可确定的。
当发起调用时将 blocking 设为假值,则不进行阻塞。 如果一个无参数调用将要阻塞,则立即返回 False
;在其他情况下,执行与无参数调用时一样的操作,然后返回 True
。
当发起调用时如果 timeout 不为 None
,则它将阻塞最多 timeout 秒。 请求在此时段时未能成功完成获取则将返回 False
。 在其他情况下返回 True
。
release
(n=1)
释放一个信号量,将内部计数器的值增加 n。 当进入时值为零且有其他线程正在等待它再次变为大于零时,则唤醒那 n 个线程。默认唤醒一个线程。
import threading import time def test(name):
# 获得一个信号 if sem.acquire(): print(name,'开始') # 信号量对象对象,value为计数初值。 sem=threading.Semaphore(value=3) # 线程对象 obj1=threading.Thread(target=test,args=('obj1',)) obj1.start() obj2=threading.Thread(target=test,args=('obj2',)) obj2.start() obj3=threading.Thread(target=test,args=('obj3',)) obj3.start()
运行结果:
value为3,而线程对象也是三个
把value改为2:
obj2的时候计数器减为0,就会阻塞(blocking默认为Ture,为阻塞),要遇到release()才会把它唤醒。
把函数改成:
def test(name):
if sem.acquire():
print(name,'开始')
if name=='obj1':
time.sleep(1)
sem.release(1)
运行结果:

release(1),释放一个信号量,并将计数器值加1,使得obj3能输出。
class threading.
BoundedSemaphore
(value=1)
该类实现有界信号量。有界信号量通过检查以确保它当前的值不会超过初始值。如果超过了初始值,将会引发 ValueError
异常。在大多情况下,信号量用于保护数量有限的资源。如果信号量被释放的次数过多,则表明出现了错误。没有指定时, value 的值默认为1。
信号量通常用于保护数量有限的资源,例如数据库服务器。在资源数量固定的任何情况下,都应该使用有界信号量。在生成任何工作线程前,应该在主线程中初始化信号量。
maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)
工作线程生成后,当需要连接服务器时,这些线程将调用信号量的 acquire 和 release 方法:
with pool_sema:
conn = connectdb()
try:
# ... use connection ...
finally:
conn.close()
使用有界信号量能减少这种编程错误:信号量的释放次数多于其请求次数。
# 事件对象 ev = threading.Event() print('is_set()再set()之前的值:',ev.is_set()) ev.set() print('is_set()再set()之后的值:',ev.is_set()) ev.clear() print('is_set()再clear()之后的值:',ev.is_set())
运行结果:

import threading def test(name): ev.wait() print(name,'开始') # ev.clear() # 事件对象,默认内部标志为False ev = threading.Event() ev.set() # 线程对象 obj1=threading.Thread(target=test,args=('obj1',)) obj1.start() obj2=threading.Thread(target=test,args=('obj2',)) obj2.start()
运行结果:
改一下函数:
def test(name):
ev.wait()
print(name,'开始')
# 把内部标志变成False
ev.clear()
运行结果:

导致obj2会一直阻塞,卡死
定时器对象
此类表示一个操作应该在等待一定的时间之后运行 --- 相当于一个定时器。与线程一样,通过调用 start()
方法启动定时器。而 cancel() 方法可以停止计时器(在计时结束前), 定时器在执行其操作之前等待的时间间隔可能与用户指定的时间间隔不完全相同。
import threading import time def test(name): print(name,'开始') # 时间对象,3秒之后才会执行函数test() obj1 = threading.Timer(interval=3,function=test,args=('obj1',)) obj1.start()
如果在三秒之前加入,例如:
time.sleep(2)
obj1.cancel()
就不会执行函数test()
栅栏对象
栅栏类提供一个简单的同步原语,用于应对固定数量的线程需要彼此相互等待的情况。线程调用wait()方法后将阻塞,直到所有线程都调用了wait() 方法。此时所有线程将被同时释放。
栅栏对象可以被多次使用,但进程的数量不能改变。
class threading.
Barrier
(parties, action=None, timeout=None)
创建一个需要 parties 个线程的栅栏对象。如果提供了可调用的 action 参数,它会在所有线程被释放时在其中一个线程中自动调用。 timeout 是默认的超时时间,如果没有在 wait() 方法中指定超时时间的话。
-
wait
(timeout=None) -
冲出栅栏。当栅栏中所有线程都已经调用了这个函数,它们将同时被释放。如果提供了 timeout 参数,这里的 timeout 参数优先于创建栅栏对象时提供的 timeout 参数。
函数返回值是一个整数,取值范围在0到 parties -- 1,在每个线程中的返回值不相同。可用于从所有线程中选择唯一的一个线程执行一些特别的工作。例如:
i = barrier.wait() if i == 0: # Only one thread needs to print this print("passed the barrier")
如果创建栅栏对象时在构造函数中提供了 action 参数,它将在其中一个线程释放前被调用。如果此调用引发了异常,栅栏对象将进入损坏态。
如果发生了超时,栅栏对象将进入破损态。
如果栅栏对象进入破损态,或重置栅栏时仍有线程等待释放,将会引发 BrokeBarrierError 异常。
-
reset
() -
重置栅栏为默认的初始态。如果栅栏中仍有线程等待释放,这些线程将会收到BrokeBarrierError异常。
请注意使用此函数时,如果存在状态未知的其他线程,则可能需要执行外部同步。 如果栅栏已损坏则最好将其废弃并新建一个。
-
abort
() -
使栅栏处于损坏状态。 这将导致任何现有和未来对wait() 的调用失败并引发 BrokeBarrierError。 例如可以在需要中止某个线程时使用此方法,以避免应用程序的死锁。
更好的方式是:创建栅栏时提供一个合理的超时时间,来自动避免某个线程出错。
-
parties
-
冲出栅栏所需要的线程数量。
-
n_waiting
-
当前时刻正在栅栏中阻塞的线程数量。
-
broken
-
一个布尔值,值为
True
表明栅栏为破损态。
异常类,是 RuntimeError 异常的子类,在 Barrier 对象重置时仍有线程阻塞时和对象进入破损态时被引发。
代码:
# 栅栏函数 def Ba_test(): print('栅栏对象','开始运行') # 栅栏对象 bar = threading.Barrier(parties=4, action=Ba_test(), timeout=None) # 一个布尔值,值为 True 表明栅栏为破损态。 print('初始化的栅栏破损态: ',bar.broken) # 冲出栅栏所需要的线程数量。 print('初始化冲出栅栏所需要的线程数量: ',bar.parties) # 使栅栏处于损坏状态。这将导致任何现有和未来对 wait() 的调用失败 bar.abort() print('abort后的的栅栏破损态: ',bar.broken) print('abort后的冲出栅栏所需要的线程数量: ',bar.parties) # 重置栅栏为默认的初始态. bar.reset() print('reset后的栅栏破损态: ',bar.broken) print('reset后的冲出栅栏所需要的线程数量: ',bar.parties)
运行结果:

# 栅栏函数 def Ba_test(): print('栅栏对象','开始运行') # 线程函数 def th_test(name): bar.wait() print(name,'开始') # 栅栏对象,4个 bar = threading.Barrier(parties=4, action=Ba_test(), timeout=None) # 四个线程对象都调用wait(),阻塞结束,少一个的话,会阻塞不运行 obj1 = threading.Thread(target=th_test,args=('obj1',)) obj2 = threading.Thread(target=th_test,args=('obj2',)) obj3 = threading.Thread(target=th_test,args=('obj3',)) obj4 = threading.Thread(target=th_test,args=('obj4',)) obj1.start() obj2.start() obj3.start() obj4.start()
运行结果: