Python多線程,線程死鎖及解決,生產者與消費者問題


1.Thread類

普通調用

t = Thread(target=test, args=(i,))	# test為目標函數名, 若函數需要參數將其以元組形									                # 式賦給args, 若無參數可不寫	
t.start()	# 用start()函數開啟線程

例子

import time
from threading import Thread

# 目標函數
def test(i):
    print("hello ", i)
    time.sleep(1)

def main():
    # 循環5次,開起五個線程
    for i in range(5):
        t = Thread(target=test, args=(i,))	
        t.start()

if __name__ == '__main__':
    main()

繼承Thread類

定義一個自己的類繼承自Thread,重寫run()方法,即 將原本執行任務的函數內容移植到run()方法中.可通過類的屬性傳參.

例子

from threading import Thread
import time

class MyThread(Thread):
    def __init__(self, i):  
        Thread.__init__(self)   # 初始化父類"構造函數"
        self.i = i  # 初始化,目的將run函數參數作為類的屬性

    def run(self):
        time.sleep(1)
        msg = "I'm " + self.name + " @ " + str(self.i)
        print(msg)

def main():
    for i in range(3):	# 開啟三個線程
        t = MyThread(i)	# 實例化自己的類
        t.start()

if __name__ == '__main__':
    main()

線程的執行順序

上面的例子中線程的執行順序是隨機的

2.線程間共享全局變量

下面例子中test1()和test2()共享g_num全局變量.希望test1()執行的結果是1000000,test2()執行的結果是2000000.但是time.sleep()函數會影響結果.

from threading import Thread
import time

g_num = 0

def test1():
    global g_num
    for i in range(1000000):
        g_num += 1

    print('---test1 g_num is %d---' % g_num)

def test2():
    global g_num
    for i in range(1000000):
        g_num += 1

    print('---test2 g_num is %d---' % g_num)

t1 = Thread(target=test1)
t1.start()

# time.sleep(3) # 運行這句話與不運行這句話結果不一樣

t2 = Thread(target=test2)
t2.start()

print('-----g_num: %d-----' % g_num)

不執行sleep函數的結果(可能出現多種不同的運行結果)

---test1 g_num is 1312283---
-----g_num: 1312283-----
---test2 g_num is 1341534---

執行sleep()函數的結果

---test1 g_num is 1000000---
-----g_num: 1079982-----
---test2 g_num is 2000000---

其實這也不難理解,sleep之后test1中的任務肯定先完成,而不執行sleep兩個函數同能對g_num同時操作

通過輪詢的方式解決線程間共享全局變量的問題

from threading import Thread

g_num = 0
g_flag = 1  # 增加一個標識全局變量

def test1():
    global g_num
    global g_flag
    if g_flag == 1:
        for i in range(1000000):
            g_num += 1
    g_flag = 0
    print('---test1 g_num is %d---' % g_num)

def test2():
    global g_num

    # 輪詢
    while True:
        if g_flag != 1: # 一旦test1()執行完,即g_flag = 0時,test2()開始執行累加g_num操作
            for i in range(1000000):
                g_num += 1
            break

    print('---test2 g_num is %d---' % g_num)

t1 = Thread(target=test1)
t1.start()

t2 = Thread(target=test2)
t2.start()

print('-----g_num: %d-----' % g_num)

運行結果

-----g_num: 303721-----
---test1 g_num is 1000000---
---test2 g_num is 2000000---

第二個線程一開始並沒有執行累加g_num的操作,而是先進行一個死循環,在這個循環中不斷的"詢問"g_flag的值是否不等於1.一但g_flag不等於1,即test1()結束后便開始干"正事".

通過互斥鎖解決線程間共享全局變量的問題

from threading import Thread, Lock  # 導入互斥鎖

g_num = 0

def test1():
    global g_num

    for i in range(1000000):
        mutex.acquire()  # 上鎖,此時其他的鎖會等待  上鎖應該遵循最小原則
        g_num += 1
        mutex.release() # 開鎖,此時其他的鎖會搶着開鎖

    print('---test1 g_num is %d---' % g_num)

def test2():
    global g_num

    for i in range(1000000):
        mutex.acquire()
        g_num += 1
        mutex.release()

    print('---test2 g_num is %d---' % g_num)

# 創建一把互斥鎖,默認不上鎖
mutex = Lock()

t1 = Thread(target=test1)
t1.start()

t2 = Thread(target=test2)
t2.start()

print('-----g_num: %d-----' % g_num)

運行結果

-----g_num: 45012-----
---test1 g_num is 1979942---
---test2 g_num is 2000000---

從結果可以看出test2()的結果是正確的,而test1()的結果很接近test2.這也不難理解.互斥鎖會把夾在中間的部分鎖定,也就是說,在極短時間內只能有一個線程在執行該代碼.一旦開鎖了(release),所有線程開始搶這把鎖,某個線程搶到之后會把自己的操作鎖住,其他線程只能等待,一直反復直至全部任務完成.

只有對上述代碼稍微修改便可以實現我們想要的結果

修改后的代碼

from threading import Thread, Lock  # 導入互斥鎖

g_num = 0

def test1():
    global g_num

    mutex.acquire()  # 上鎖,此時其他的鎖會等待  上鎖應該遵循最小原則
    for i in range(1000000):
        g_num += 1
    mutex.release() # 開鎖,此時其他的鎖會搶着開鎖

    print('---test1 g_num is %d---' % g_num)

def test2():
    global g_num

    mutex.acquire()
    for i in range(1000000):
        g_num += 1
    mutex.release()

    print('---test2 g_num is %d---' % g_num)

# 創建一把互斥鎖,默認不上鎖
mutex = Lock()

t1 = Thread(target=test1)
t1.start()

t2 = Thread(target=test2)
t2.start()

print('-----g_num: %d-----' % g_num)

結果

-----g_num: 220254-----
---test1 g_num is 1000000---
---test2 g_num is 2000000---

值得注意的是,互斥鎖上的范圍太大就失去了線程的意義,別的線程都把時間浪費在了等待上.輪詢同理.

3.線程間使用非全局變量

from threading import Thread
import threading
import time

def test1():
    name = threading.current_thread().name  # 獲取當前線程名字
    print('----thread name is %s----' % name)
    g_num = 100
    if name == 'Thread-1':
        g_num += 1
    else:
        time.sleep(2)
    print('---thread is %s | g_num is %d---' % (name, g_num))

t1 = Thread(target=test1)
t1.start()

t2 = Thread(target=test1)
t2.start()

運行結果

----thread name is Thread-1----
---thread is Thread-1 | g_num is 101---
----thread name is Thread-2----
---thread is Thread-2 | g_num is 100---

非全局對於同一個函數來說.可以通過線程的名字來區分.

4.線程死鎖

import threading
import time

class MyThread1(threading.Thread):
    def run(self):
        if mutexA.acquire():
            print(self.name + '---do1---up---')
            time.sleep(1)

            if mutexB.acquire():
                print(self.name + '---do1---down---')
                mutexB.release()
            mutexA.release()

class MyThread2(threading.Thread):
    def run(self):
        if mutexB.acquire():
            print(self.name + '---do2---up---')
            time.sleep(1)

            if mutexA.acquire():
                print(self.name + '---do2---down---')
                mutexA.release()
            mutexB.release()

if __name__ == '__main__':
    mutexA = threading.Lock()
    mutexB = threading.Lock()
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()

運行結果(卡在了這兩句,未結束)

Thread-1---do1---up---
Thread-2---do2---up---

分析代碼,t1的代碼在等待mutexB解鎖的時候t2在等待mutexA解鎖.而t1必須先執行完mutexB鎖中的代碼執行完才能釋放mutexA,t2必須先執行完mutexA鎖中的代碼執行完才能釋放mutexB,這就導致兩個線程一直等待下去形成死鎖,會浪費CPU資源.

解決死鎖的辦法

設置超時時間 mutexA.acquire(2)
當然也可以從算法上避免死鎖

5.使用ThreadLocal

import threading

# 創建全局ThreadLocal對象
local_school = threading.local()

def process_student():
    # 獲取當前線程相關聯的student
    std = local_school.student
    print('Hello, %s in %s' % (std, threading.current_thread().name))

def process_thread(name):
    # 綁定ThreadLocal的student
    local_school.student = name
    process_student()

t1 = threading.Thread(target=process_thread, args=('kain',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('huck',), name='Thread-B')

t1.start()
t2.start()
t1.join()
t2.join()

運行結果

Hello, kain in Thread-A
Hello, huck in Thread-B

6.生產者與消費者問題

import threading
import time

# Python2
# from Queue import Queue

# Python3
from queue import Queue

class Producer(threading.Thread):
    def run(self):
        global queue
        count = 0
        while True:
            if queue.qsize() < 1000:
                for i in range(100):
                    count += 1
                    msg = '生成產品' + str(count)
                    queue.put(msg)
                    print(msg)
            time.sleep(0.5)

class Consumer(threading.Thread):
    def run(self):
        global queue
        while True:
            if queue.qsize() > 100:
                for i in range(3):
                    msg = self.name + '消費了' + queue.get()
                    print(msg)
            time.sleep(1)

if __name__ == '__main__':
    queue = Queue()

    for i in range(500):
        queue.put('初始產品'+str(i))    # 向隊列中塞內容

    for i in range(2):
        p = Producer()
        p.start()

    for i in range(5):
        c = Consumer()
        c.start()

運行結果過長不予展示


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM