不是並行,不是真正意義上的並發,可以單核實現並發。進程是資源單位(相當於車間),線程是運行單位(相當於生產線)
io多的項目,多線程更優於多進程
1 threading
- 開啟線程—函數
from threading import Thread
import time
def t_func(name, n):
time.sleep(n)
print("name:", name)
if __name__ == '__main__':
t = Thread(target=t_func, args=("lynn", 4))
t1 = Thread(target=t_func, args=("fancy", 1))
t.start()
t1.start()
t.join() # 線程t完全運行完,才繼續往下運行
print("主")
注意:
target
是函數名字,不加()
args
是元組,必須按位置,只有一個參數時要加,
join
方法,不加join方法,是異步的,加join是把異步變成同步,就是只有該線程完全運行完,才繼續往下運行,不影響其他線程。
- 開啟線程—類
from threading import Thread
import time
class TClass(Thread):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
time.sleep(1)
print("name:", self.name)
return self.name
if __name__ == '__main__':
t = TClass("lynn")
t1 = TClass("fancy")
t.start()
t1.start()
t.join()
print("主")
其他方法
getName()
線程的名字
setName()
設置線程的名字
isAlive()
返回線程是否活動的
- 守護線程
主線程所在的進程內,所有的線程運行完畢,停止運行。其實也是線程運行完畢,停止運行
from threading import Thread
import time
class TClass(Thread):
def __init__(self, n, name):
super().__init__()
self.name = name
self.n = n
def run(self):
time.sleep(self.n)
print("name:", self.name)
return self.name
if __name__ == '__main__':
t = TClass(1, name="lynn")
t1 = TClass(5, name="fancy")
t.daemon = True
t.start()
t1.start()
print("主")
注意:
是主線程所在的進程內所有的線程運行完畢,停止運行
daemon
必須在start
方法之前
2 線程數據安全和通信
線程鎖
- 互斥鎖
用來實現對共享資源的同步訪問,也稱為同步鎖
同一時間只有一個進程對加鎖的數據進行操作。把該部分變成串行,切不運行完,不釋放鎖,會一直阻塞。
from threading import Thread
from threading import Lock
import time
class TClass(Thread):
def __init__(self, n, name, lock):
super().__init__()
self.name = name
self.n = n
self.lock = lock
def run(self):
with self.lock:
with open("t_text.txt", "wt", encoding="utf-8")as f:
f.write(self.name)
time.sleep(self.n)
with open("t_text.txt", "rt", encoding="utf-8")as f:
print("name:", f.read())
print(self.name)
if __name__ == '__main__':
lock = Lock()
t = TClass(1, name="lynn", lock=lock)
t1 = TClass(5, name="fancy", lock=lock)
t.daemon = True
t.start()
t1.start()
print("主")
注意:
GIL鎖也是互斥鎖,是解釋器級別的互斥鎖
盡量只在修改數據的部分加鎖,因為會把並發轉為串行,會影響效率
- 死鎖
兩個或兩個以上的線程(進程),在運行過程中兩個線程(進程)互相等待,兩個鎖互相拿着沒釋放,沒有外部原因,會一直阻塞,稱為死鎖現象。
死鎖現象
import time
from threading import Thread
from threading import Lock
a_lock = Lock()
b_lock = Lock()
class DClass(Thread):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
if self.name == "lynn":
self.a_func()
if self.name == "fancy":
self.b_func()
def a_func(self):
a_lock.acquire()
time.sleep(1)
print("拿到a鎖")
b_lock.acquire()
print("拿到b鎖")
a_lock.release()
print("釋放a鎖")
b_lock.release()
print("釋放b鎖")
def b_func(self):
b_lock.acquire()
time.sleep(1)
print("拿到b鎖")
a_lock.acquire()
print("拿到a鎖")
b_lock.release()
print("釋放b鎖")
a_lock.release()
print("釋放a鎖")
dc1 = DClass("lynn")
dc2 = DClass("fancy")
dc1.start()
dc2.start()
注意:
線程A(進程) 拿着 鎖a,要拿鎖b釋放鎖a
進程B(進程) 拿着 鎖b,要拿鎖a釋放鎖b
鎖a和鎖b形成死鎖現象
- 遞歸鎖RLock
解決死鎖現象,針對多個鎖的情況,遞歸鎖可以被單個線程(進程)拿多次,每拿一次做一次標記,釋放一次減去一個標記,標記為0時,才能被其他線程(進程)拿
import time
from threading import Thread
from threading import RLock
r_lock = RLock()
class DClass(Thread):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
if self.name == "lynn":
self.a_func()
if self.name == "fancy":
self.b_func()
def a_func(self):
r_lock.acquire()
print("拿到a鎖")
r_lock.acquire()
print("拿到b鎖")
r_lock.release()
print("釋放a鎖")
time.sleep(1)
r_lock.release()
print("釋放b鎖")
def b_func(self):
r_lock.acquire()
time.sleep(1)
print("拿到c鎖")
r_lock.acquire()
print("拿到c鎖")
r_lock.release()
print("釋放c鎖")
r_lock.release()
print("釋放c鎖")
dc1 = DClass("lynn")
dc2 = DClass("fancy")
dc1.start()
dc2.start()
注意:
只有被該線程(進程)全部釋放才能被別的線程拿
線程間的通信
線程時相互獨立的,數據是隔離的
- Queue
管道:生產者消費者模型
from queue import Queue
from threading import Thread
import time
class SClass(Thread):
def __init__(self, Q, name):
super().__init__()
self.Q = Q
self.name = name
def run(self):
for i in range(100):
self.Q.put("{}的{}包子".format(self.name, i))
class XClass(Thread):
def __init__(self, Q):
super().__init__()
self.Q = Q
def run(self):
while True:
time.sleep(0.1)
res = self.Q.get()
print(res)
if not res:
break
if __name__ == '__main__':
Q = Queue(10)
st = SClass(Q, "lynn")
xc = XClass(Q)
st.start()
xc.start()
st.join()
Q.put(None)
3 ThreadPoolExecutor
支持線程池和進程池,python3.2之后版本
from concurrent.futures import ThreadPoolExecutor
import time
def thread_func(a):
time.sleep(2)
print("a")
if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=5)as t:
res = t.submit(thread_func, 1)
# print(res.result()) # 會阻塞
r1 = t.submit(thread_func)
print("end")
# time.sleep(2)
print(res.done())
print(res.result())
print(res.done())
注意:
submit
方法,不阻塞,是異步的,第一個參數是方法名,后邊按位置參數傳方法需要的參數
with
會等所有的線程運行完畢,才繼續往下運行
done()
查看線程的運行狀態,True為運行完畢
result()
線程的返回值,會阻塞
- wait
開啟多個線程
import time
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
class TClass:
@staticmethod
def run():
time.sleep(10)
print("ok")
if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=5)as t:
t_list = (t.submit(TClass.run), )
wait(t_list, timeout=0.1, return_when=ALL_COMPLETED)
print('end')
注意:
第一個參數必須是可迭代對象,最好是元組,里邊的元素是submit()
方法提交的數據
timeout
超時時間,超過這個時間,該方法的阻塞時間,默認線程運行完才會繼續往下運行
- as_completed
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
class TClass:
@staticmethod
def run(a, b):
time.sleep(3)
print("ok", a, b)
return {"name": "lynn"}
if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=5)as t:
t_list = [t.submit(TClass.run, 1, 2) for i in range(5)]
res_list = as_completed(t_list) # 阻塞
for i in res_list:
print(i.result()) # 返回值
注意:
as_completed
參數是可迭代對象:列表、元組等,元素是submit
方法處理的線程
for循環能取到每個線程的結果
- map
import time
from concurrent.futures import ThreadPoolExecutor
class TClass:
@staticmethod
def run(a, b):
time.sleep(1)
print('ok', a, b)
return a, b
if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=5)as t:
genera_res = t.map(TClass.run, (1,3), (2,3)) # 結果是生成器
for i in genera_res:
print(i) # 線程的返回值
注意:
map
參數直接是方法,不用submit
參數以元組方式傳遞,多個參數多個元組,一個元組中多個參數表示調用多次