python 實現線程之間的通信


  前言:因為GIL的限制,python的線程是無法真正意義上並行的。相對於異步編程,其性能可以說不是一個等量級的。為什么我們還要學習多線程編程呢,雖然說異步編程好處多,但編程也較為復雜,邏輯不容易理解,學習成本和維護成本都比較高。畢竟我們大部分人還是適應同步編碼的,除非一些需要高性能處理的地方采用異步。

 

首先普及下進程和線程的概念:

進程:進程是操作系統資源分配的基本單位。

線程:線程是任務調度和執行的基本單位。

一個應用程序至少一個進程,一個進程至少一個線程。

兩者區別:同一進程內的線程共享本進程的資源如內存、I/O、cpu等,但是進程之間的資源是獨立的。

 

一、多線程

 python 可以通過 thread 或 threading 模塊實現多線程,threading 相比 thread 提供了更高階、更全面的線程管理。我們下文主要以 threading 模塊介紹多線程的基本用法。

import threading
import time

class thread(threading.Thread):
    def __init__(self, threadname):
        threading.Thread.__init__(self, name='線程' + threadname)

    def run(self):
        print('%s:Now timestamp is %s'%(self.name,time.time()))

threads = []
for a in range(int(5)):  # 線程個數
    threads.append(thread(str(a)))
for t in threads:  # 開啟線程
    t.start()
for t in threads:  # 阻塞線程
    t.join()
print('END')


輸出:
#線程3:Now timestamp is 1557386184.7574518
#線程2:Now timestamp is 1557386184.7574518
#線程0:Now timestamp is 1557386184.7574518
#線程1:Now timestamp is 1557386184.7574518
#線程4:Now timestamp is 1557386184.7582724
#END

 start() 方法開啟子線程。運行多次 start() 方法代表開啟多個子線程。

 join() 方法用來阻塞主線程,等待子線程執行完成。舉個例子,主線程A創建了子線程B,並使用了 join() 方法,主線程A在 join() 處就被阻塞了,等待子線程B完成后,主線程A才能執行 print('END')。如果沒有使用 join() 方法,主線程A創建子線程B后,不會等待子線程B,直接執行 print('END'),如下:

 

import threading
import time

class thread(threading.Thread):
    def __init__(self, threadname):
        threading.Thread.__init__(self, name='線程' + threadname)

    def run(self):
        time.sleep(1)
        print('%s:Now timestamp is %s'%(self.name,time.time()))

threads = []
for a in range(int(5)):  # 線程個數
    threads.append(thread(str(a)))
for t in threads:  # 開啟線程
    t.start()
# for t in threads:  # 阻塞線程
#     t.join()
print('END')


輸出:
#END
#線程0:Now timestamp is 1557386321.376941
#線程3:Now timestamp is 1557386321.377937
#線程1:Now timestamp is 1557386321.377937
#線程2:Now timestamp is 1557386321.377937
#線程4:Now timestamp is 1557386321.377937
View Code

 

 

二、線程之間的通信

1.threading.Lock()

 如果多個線程對某一資源同時進行修改,可能會存在不可預知的情況。為了修改數據的正確性,需要把這個資源鎖住,只允許線程依次排隊進去獲取這個資源。當線程A操作完后,釋放鎖,線程B才能進入。如下腳本是開啟多個線程修改變量的值,但輸出結果每次都不一樣。

import threading

money = 0
def Order(n):
    global money
    money = money + n
    money = money - n

class thread(threading.Thread):
    def __init__(self, threadname):
        threading.Thread.__init__(self, name='線程' + threadname)
        self.threadname = int(threadname)

    def run(self):
        for i in range(1000000):
            Order(self.threadname)

t1 = thread('1')
t2 = thread('5')
t1.start()
t2.start()
t1.join()
t2.join()
print(money)

 接下來我們用 threading.Lock() 鎖住這個變量,等操作完再釋放這個鎖。lock.acquire() 給資源加一把鎖,對資源處理完成之后,lock.release() 再釋放鎖。以下腳本執行結果都是一樣的,但速度會變慢,因為線程只能一個個的通過。

import threading

money = 0
def Order(n):
    global money
    money = money + n
    money = money - n

class thread(threading.Thread):
    def __init__(self, threadname):
        threading.Thread.__init__(self, name='線程' + threadname)
        self.threadname = int(threadname)

    def run(self):
        for i in range(1000000):
            lock.acquire()
            Order(self.threadname)
            lock.release()
#        print('%s:Now timestamp is %s'%(self.name,time.time()))

lock = threading.Lock()
t1 = thread('1')
t2 = thread('5')
t1.start()
t2.start()
t1.join()
t2.join()
print(money)

 

2.threading.Rlock()

 用法和 threading Lock() 一致,區別是 threading.Rlock() 允許多次鎖資源,acquire() 和 release() 必須成對出現,也就是說加了幾把鎖就得釋放幾把鎖。

lock = threading.Lock()
# 死鎖
lock.acquire()
lock.acquire()
print('...')
lock.release()
lock.release()

rlock = threading.RLock()
# 同一線程內不會阻塞線程
rlock.acquire()
rlock.acquire()
print('...')
rlock.release()
rlock.release()

 

3.threading.Condition()

threading.Condition() 可以理解為更加高級的鎖,比 Lock 和 Rlock 的用法更高級,能處理一些復雜的線程同步問題。threading.Condition() 創建一把資源鎖(默認是Rlock),提供 acquire() 和 release() 方法,用法和 Rlock 一致。此外 Condition 還提供 wait()、Notify() 和 NotifyAll() 方法。

wait():線程掛起,直到收到一個 Notify() 通知或者超時(可選參數),wait() 必須在線程得到 Rlock 后才能使用。

Notify() :在線程掛起的時候,發送一個通知,讓 wait() 等待線程繼續運行,Notify() 也必須在線程得到 Rlock 后才能使用。 Notify(n=1),最多喚醒 n 個線程。

NotifyAll() :在線程掛起的時候,發送通知,讓所有 wait() 阻塞的線程都繼續運行。

舉例說明下 Condition() 使用

import threading,time

def TestA():
    cond.acquire()
    print('李白:看見一個敵人,請求支援')
    cond.wait()
    print('李白:好的')
    cond.notify()
    cond.release()

def TestB():
    time.sleep(2)
    cond.acquire()
    print('亞瑟:等我...')
    cond.notify()
    cond.wait()
    print('亞瑟:我到了,發起沖鋒...')

if __name__=='__main__':
    cond = threading.Condition()
    testA = threading.Thread(target=TestA)
    testB = threading.Thread(target=TestB)
    testA.start()
    testB.start()
    testA.join()
    testB.join()


輸出
#李白:看見一個敵人,請求支援
#亞瑟:等我...
#李白:好的
#亞瑟:我到了,發起沖鋒...

 

4.threading.Event()

 threading.Event() 原理是在線程中立了一個 Flag ,默認值是 False ,當一個或多個線程遇到 event.wait() 方法時阻塞,直到 Flag 值 變為 True 。threading.Event() 通常用來實現線程之間的通信,使一個線程等待其他線程的通知 ,把 Event 傳遞到線程對象中。

event.wait() :阻塞線程,直到 Flag 值變為 True

event.set() :設置 Flag 值為 True

event.clear() :修改 Flag 值為 False

event.isSet() :  僅當 Flag 值為 True 時返回

下面這個例子,主線程啟動子線程后 sleap 2秒,子線程因為 event.wait() 被阻塞。當主線程醒來后執行 event.set() ,子線程才繼續運行,兩者輸出時間差 2s。

import threading
import datetime,time

class thread(threading.Thread):
    def __init__(self, threadname):
        threading.Thread.__init__(self, name='線程' + threadname)
        self.threadname = int(threadname)

    def run(self):
        event.wait()
        print('子線程運行時間:%s'%datetime.datetime.now())

if __name__ == '__main__':
    event = threading.Event()
    t1 = thread('0')
    #啟動子線程
    t1.start()
    print('主線程運行時間:%s'%datetime.datetime.now())
    time.sleep(2)
    # Flag設置成True
    event.set()
    t1.join()


輸出
#主線程運行時間:2019-05-30 15:51:49.690872
#子線程運行時間:2019-05-30 15:51:51.691523

 

5.其他方法

threading.active_count():返回當前存活的線程對象的數量

threading.current_thread():返回當前線程對象

threading.enumerate():返回當前所有線程對象的列表

threading.get_ident():返回線程pid

threading.main_thread():返回主線程對象

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM