1.Thread類
普通調用
t = Thread(target=test, args=(i,)) # test為目標函數名, 若函數需要參數將其以元組形 # 式賦給args, 若無參數可不寫
t.start() # 用start()函數開啟線程
例子
import time
from threading import Thread
# 目標函數
def test(i):
print("hello ", i)
time.sleep(1)
def main():
# 循環5次,開起五個線程
for i in range(5):
t = Thread(target=test, args=(i,))
t.start()
if __name__ == '__main__':
main()
繼承Thread類
定義一個自己的類繼承自Thread,重寫run()方法,即 將原本執行任務的函數內容移植到run()方法中.可通過類的屬性傳參.
例子
from threading import Thread
import time
class MyThread(Thread):
def __init__(self, i):
Thread.__init__(self) # 初始化父類"構造函數"
self.i = i # 初始化,目的將run函數參數作為類的屬性
def run(self):
time.sleep(1)
msg = "I'm " + self.name + " @ " + str(self.i)
print(msg)
def main():
for i in range(3): # 開啟三個線程
t = MyThread(i) # 實例化自己的類
t.start()
if __name__ == '__main__':
main()
線程的執行順序
上面的例子中線程的執行順序是隨機的
2.線程間共享全局變量
下面例子中test1()和test2()共享g_num全局變量.希望test1()執行的結果是1000000,test2()執行的結果是2000000.但是time.sleep()函數會影響結果.
from threading import Thread
import time
g_num = 0
def test1():
global g_num
for i in range(1000000):
g_num += 1
print('---test1 g_num is %d---' % g_num)
def test2():
global g_num
for i in range(1000000):
g_num += 1
print('---test2 g_num is %d---' % g_num)
t1 = Thread(target=test1)
t1.start()
# time.sleep(3) # 運行這句話與不運行這句話結果不一樣
t2 = Thread(target=test2)
t2.start()
print('-----g_num: %d-----' % g_num)
不執行sleep函數的結果(可能出現多種不同的運行結果)
---test1 g_num is 1312283---
-----g_num: 1312283-----
---test2 g_num is 1341534---
執行sleep()函數的結果
---test1 g_num is 1000000---
-----g_num: 1079982-----
---test2 g_num is 2000000---
其實這也不難理解,sleep之后test1中的任務肯定先完成,而不執行sleep兩個函數同能對g_num同時操作
通過輪詢的方式解決線程間共享全局變量的問題
from threading import Thread
g_num = 0
g_flag = 1 # 增加一個標識全局變量
def test1():
global g_num
global g_flag
if g_flag == 1:
for i in range(1000000):
g_num += 1
g_flag = 0
print('---test1 g_num is %d---' % g_num)
def test2():
global g_num
# 輪詢
while True:
if g_flag != 1: # 一旦test1()執行完,即g_flag = 0時,test2()開始執行累加g_num操作
for i in range(1000000):
g_num += 1
break
print('---test2 g_num is %d---' % g_num)
t1 = Thread(target=test1)
t1.start()
t2 = Thread(target=test2)
t2.start()
print('-----g_num: %d-----' % g_num)
運行結果
-----g_num: 303721-----
---test1 g_num is 1000000---
---test2 g_num is 2000000---
第二個線程一開始並沒有執行累加g_num的操作,而是先進行一個死循環,在這個循環中不斷的"詢問"g_flag的值是否不等於1.一但g_flag不等於1,即test1()結束后便開始干"正事".
通過互斥鎖解決線程間共享全局變量的問題
from threading import Thread, Lock # 導入互斥鎖
g_num = 0
def test1():
global g_num
for i in range(1000000):
mutex.acquire() # 上鎖,此時其他的鎖會等待 上鎖應該遵循最小原則
g_num += 1
mutex.release() # 開鎖,此時其他的鎖會搶着開鎖
print('---test1 g_num is %d---' % g_num)
def test2():
global g_num
for i in range(1000000):
mutex.acquire()
g_num += 1
mutex.release()
print('---test2 g_num is %d---' % g_num)
# 創建一把互斥鎖,默認不上鎖
mutex = Lock()
t1 = Thread(target=test1)
t1.start()
t2 = Thread(target=test2)
t2.start()
print('-----g_num: %d-----' % g_num)
運行結果
-----g_num: 45012-----
---test1 g_num is 1979942---
---test2 g_num is 2000000---
從結果可以看出test2()的結果是正確的,而test1()的結果很接近test2.這也不難理解.互斥鎖會把夾在中間的部分鎖定,也就是說,在極短時間內只能有一個線程在執行該代碼.一旦開鎖了(release),所有線程開始搶這把鎖,某個線程搶到之后會把自己的操作鎖住,其他線程只能等待,一直反復直至全部任務完成.
只有對上述代碼稍微修改便可以實現我們想要的結果
修改后的代碼
from threading import Thread, Lock # 導入互斥鎖
g_num = 0
def test1():
global g_num
mutex.acquire() # 上鎖,此時其他的鎖會等待 上鎖應該遵循最小原則
for i in range(1000000):
g_num += 1
mutex.release() # 開鎖,此時其他的鎖會搶着開鎖
print('---test1 g_num is %d---' % g_num)
def test2():
global g_num
mutex.acquire()
for i in range(1000000):
g_num += 1
mutex.release()
print('---test2 g_num is %d---' % g_num)
# 創建一把互斥鎖,默認不上鎖
mutex = Lock()
t1 = Thread(target=test1)
t1.start()
t2 = Thread(target=test2)
t2.start()
print('-----g_num: %d-----' % g_num)
結果
-----g_num: 220254-----
---test1 g_num is 1000000---
---test2 g_num is 2000000---
值得注意的是,互斥鎖上的范圍太大就失去了線程的意義,別的線程都把時間浪費在了等待上.輪詢同理.
3.線程間使用非全局變量
from threading import Thread
import threading
import time
def test1():
name = threading.current_thread().name # 獲取當前線程名字
print('----thread name is %s----' % name)
g_num = 100
if name == 'Thread-1':
g_num += 1
else:
time.sleep(2)
print('---thread is %s | g_num is %d---' % (name, g_num))
t1 = Thread(target=test1)
t1.start()
t2 = Thread(target=test1)
t2.start()
運行結果
----thread name is Thread-1----
---thread is Thread-1 | g_num is 101---
----thread name is Thread-2----
---thread is Thread-2 | g_num is 100---
非全局對於同一個函數來說.可以通過線程的名字來區分.
4.線程死鎖
import threading
import time
class MyThread1(threading.Thread):
def run(self):
if mutexA.acquire():
print(self.name + '---do1---up---')
time.sleep(1)
if mutexB.acquire():
print(self.name + '---do1---down---')
mutexB.release()
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
if mutexB.acquire():
print(self.name + '---do2---up---')
time.sleep(1)
if mutexA.acquire():
print(self.name + '---do2---down---')
mutexA.release()
mutexB.release()
if __name__ == '__main__':
mutexA = threading.Lock()
mutexB = threading.Lock()
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
運行結果(卡在了這兩句,未結束)
Thread-1---do1---up---
Thread-2---do2---up---
分析代碼,t1的代碼在等待mutexB解鎖的時候t2在等待mutexA解鎖.而t1必須先執行完mutexB鎖中的代碼執行完才能釋放mutexA,t2必須先執行完mutexA鎖中的代碼執行完才能釋放mutexB,這就導致兩個線程一直等待下去形成死鎖,會浪費CPU資源.
解決死鎖的辦法
設置超時時間 mutexA.acquire(2)
當然也可以從算法上避免死鎖
5.使用ThreadLocal
import threading
# 創建全局ThreadLocal對象
local_school = threading.local()
def process_student():
# 獲取當前線程相關聯的student
std = local_school.student
print('Hello, %s in %s' % (std, threading.current_thread().name))
def process_thread(name):
# 綁定ThreadLocal的student
local_school.student = name
process_student()
t1 = threading.Thread(target=process_thread, args=('kain',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('huck',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
運行結果
Hello, kain in Thread-A
Hello, huck in Thread-B
6.生產者與消費者問題
import threading
import time
# Python2
# from Queue import Queue
# Python3
from queue import Queue
class Producer(threading.Thread):
def run(self):
global queue
count = 0
while True:
if queue.qsize() < 1000:
for i in range(100):
count += 1
msg = '生成產品' + str(count)
queue.put(msg)
print(msg)
time.sleep(0.5)
class Consumer(threading.Thread):
def run(self):
global queue
while True:
if queue.qsize() > 100:
for i in range(3):
msg = self.name + '消費了' + queue.get()
print(msg)
time.sleep(1)
if __name__ == '__main__':
queue = Queue()
for i in range(500):
queue.put('初始產品'+str(i)) # 向隊列中塞內容
for i in range(2):
p = Producer()
p.start()
for i in range(5):
c = Consumer()
c.start()
運行結果過長不予展示