Python-----多線程threading用法


threading模塊是Python里面常用的線程模塊,多線程處理任務對於提升效率非常重要,先說一下線程和進程的各種區別,如圖

 

 概括起來就是

IO密集型(不用CPU)
多線程
計算密集型(用CPU)
多進程
使用線程和進程的目的都是為了提升效率
(1)單進程單線程,主進程、主線程
(2)自定義線程:
主進程
主線程
子線程

  

 

2、threading模塊可以創建多個線程,不過由於GIL鎖的存在,Python在多線程里面其實是快速切換,下面代碼是創建線程的簡單體驗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import  time
import  threading
 
def  f0():
     pass
 
def  f1(a1,a2):
     time.sleep( 5 )
     f0()
 
'''下面代碼是直接運行下去的,不會等待函數里面設定的sleep'''
t =  threading.Thread(target = f1,args = ( 111 , 112 )) #創建線程
t.setDaemon( True ) #設置為后台線程,這里默認是False,設置為True之后則主線程不用等待子線程
t.start() #開啟線程
 
=  threading.Thread(target = f1, args = ( 111 112 ))
t.start()
 
=  threading.Thread(target = f1, args = ( 111 112 ))
t.start()
#默認情況下程序會等線程全部執行完畢才停止的,不過可以設置更改為后台線程,使主線程不等待子線程,主線程結束則全部結束

  在線程里面setDaemon()和join()方法都是常用的,他們的區別如下

(1)join ()方法:主線程A中,創建了子線程B,並且在主線程A中調用了B.join(),那么,主線程A會在調用的地方等待,直到子線程B完成操作后,

     才可以接着往下執行,那么在調用這個線程時可以使用被調用線程的join方法。join([timeout]) 里面的參數時可選的,代表線程運行的最大時

     間,即如果超過這個時間,不管這個此線程有沒有執行完畢都會被回收,然后主線程或函數都會接着執行的,如果線程執行時間小於參數表示的

     時間,則接着執行,不用一定要等待到參數表示的時間。

 (2)setDaemon()方法。主線程A中,創建了子線程B,並且在主線程A中調用了B.setDaemon(),這個的意思是,把主線程A設置為守護線程,這

        時候,要是主線程A執行結束了,就不管子線程B是否完成,一並和主線程A退出.這就是setDaemon方法的含義,這基本和join是相反的。此外,還有

       個要特別注意的:必須在start() 方法調用之前設置,如果不設置為守護線程,程序會被無限掛起,只有等待了所有線程結束它才結束。

3、Python多線程里面的鎖的

        在多線程處理任務的時候,在同時操作一個數據的時候可能會造成臟數據,這時候就出現了鎖的概念,也就是有一個線程在操作該數據的時候,就把

        該數據鎖上,防止別的線程操作,操作完了再釋放鎖。

 4、Python多線程里面的event方法

        該方法的具體用法是給線程設置紅綠燈,紅燈表示停,綠燈表示運行,如代碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import  threading
import  time
def  do(event):
     print ( 'start' )
     event.wait() #紅燈,所有線程執行都這里都在等待
     print ( 'end' )
 
event_obj  =  threading.Event() #創建一個事件
for  in  range ( 10 ): #創建10個線程
     t =  threading.Thread(target = do,args = (event_obj,))
     t.start()
 
 
time.sleep( 5 )
 
 
event_obj.clear() #讓燈變紅,默認也是紅的,阻塞所有線程運行
data =  input ( '請輸入要:' )
if  data  = = 'True' :
     event_obj. set () #變綠燈

Python3 線程中常用的兩個模塊為:

  • _thread
  • threading(推薦使用)

thread 模塊已被廢棄。用戶可以使用 threading 模塊代替。所以,在 Python3 中不能再使用"thread" 模塊。為了兼容性,Python3 將 thread 重命名為 "_thread"。

什么是線程

    線程是CPU分配資源的基本單位。但一個程序開始運行,這個程序就變成了一個進程,而一個進程相當於一個或者多個線程。當沒有多線程編程時,一個進程也是一個主線程,但有多線程編程時,一個進程包含多個線程,包括主線程。使用線程可以實現程序的並發。

下面將介紹threading模塊常用方法: 

1. threading.Lock()

    如果多個線程共同對某個數據修改,則可能出現不可預料的結果,為了保證數據的正確性,需要對多個線程進行同步。

    使用 Thread 對象的 Lock 和 Rlock 可以實現簡單的線程同步,這兩個對象都有 acquire 方法和 release 方法,對於那些需要每次只允許一個線程操作的數據,可以將其操作放到 acquire 和 release 方法之間。

    來看看多個線程同時操作一個變量怎么把內容給改亂了(在windows下不會出現內容混亂情況,可能python在Windows下自動加上鎖了;不過在Linux 下可以測試出內容會被改亂):

復制代碼
#!/usr/bin/env python3

import time, threading

# 假定這是你的銀行存款:
balance = 0

def change_it(n):
    # 先存后取,結果應該為0:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(100000):
        change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
執行結果:
[root@localhost ~]# python3 thread_lock.py 
5
[root@localhost ~]# python3 thread_lock.py 
5
[root@localhost ~]# python3 thread_lock.py 
0
[root@localhost ~]# python3 thread_lock.py 
8
[root@localhost ~]# python3 thread_lock.py 
-8
[root@localhost ~]# python3 thread_lock.py 
5
[root@localhost ~]# python3 thread_lock.py 
-8
[root@localhost ~]# python3 thread_lock.py 
3
[root@localhost ~]# python3 thread_lock.py 
5
復制代碼

    我們定義了一個共享變量balance,初始值為0,並且啟動兩個線程,先存后取,理論上結果應該為0,但是,由於線程的調度是由操作系統決定的,當t1、t2交替執行時,只要循環次數足夠多,balance的結果就不一定是0了。

    如果我們要確保balance計算正確,就要給change_it()上一把鎖,當某個線程開始執行change_it()時,我們說,該線程因為獲得了鎖,因此其他線程不能同時執行change_it(),只能等待,直到鎖被釋放后,獲得該鎖以后才能改。由於鎖只有一個,無論多少線程,同一時刻最多只有一個線程持有該鎖,所以,不會造成修改的沖突。創建一個鎖就是通過threading.Lock()來實現:

復制代碼
#!/usr/bin/env python3

import time, threading

balance = 0
lock = threading.Lock()

def change_it(n):
    # 先存后取,結果應該為0:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(100000):
        # 先要獲取鎖:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要釋放鎖:
            lock.release()
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
執行結果:
[root@localhost ~]# python3 thread_lock.py 
0
[root@localhost ~]# python3 thread_lock.py 
0
[root@localhost ~]# python3 thread_lock.py 
0
[root@localhost ~]# python3 thread_lock.py 
0
[root@localhost ~]# python3 thread_lock.py 
0
[root@localhost ~]# python3 thread_lock.py 
0
復制代碼

    

    當多個線程同時執行lock.acquire()時,只有一個線程能成功地獲取鎖,然后繼續執行代碼,其他線程就繼續等待直到獲得鎖為止。

    獲得鎖的線程用完后一定要釋放鎖,否則那些苦苦等待鎖的線程將永遠等待下去,成為死線程。所以我們用try...finally來確保鎖一定會被釋放。

    鎖的好處就是確保了某段關鍵代碼只能由一個線程從頭到尾完整地執行,壞處當然也很多,首先是阻止了多線程並發執行,包含鎖的某段代碼實際上只能以單線程模式執行,效率就大大地下降了。其次,由於可以存在多個鎖,不同的線程持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個線程全部掛起,既不能執行,也無法結束,只能靠操作系統強制終止。

 2. threading.Rlock()

    RLock允許在同一線程中被多次acquire。而Lock卻不允許這種情況。注意:如果使用RLock,那么acquire和release必須成對出現,即調用了n次acquire,必須調用n次的release才能真正釋放所占用的瑣。

復制代碼
import threading
lock = threading.Lock() 
#Lock對象
lock.acquire()
lock.acquire() 
#產生了死瑣。
lock.release()
lock.release()
  
import threading
rLock = threading.RLock() 
#RLock對象
rLock.acquire()
rLock.acquire() 
#在同一線程內,程序不會堵塞。
rLock.release()
rLock.release()
復制代碼

3. threading.Condition()

    可以把Condiftion理解為一把高級的瑣,它提供了比Lock, RLock更高級的功能,允許我們能夠控制復雜的線程同步問題。threadiong.Condition在內部維護一個瑣對象(默認是RLock),可以在創建Condigtion對象的時候把瑣對象作為參數傳入。Condition也提供了acquire, release方法,其含義與瑣的acquire, release方法一致,其實它只是簡單的調用內部瑣對象的對應的方法而已。Condition還提供wait方法、notify方法、notifyAll方法(特別要注意:這些方法只有在占用瑣(acquire)之后才能調用,否則將會報RuntimeError異常。):

    acquire()/release():獲得/釋放 Lock

    wait([timeout]):線程掛起,直到收到一個notify通知或者超時(可選的,浮點數,單位是秒s)才會被喚醒繼續運行。wait()必須在已獲得Lock前提下才能調用,否則會觸發RuntimeError。調用wait()會釋放Lock,直至該線程被Notify()、NotifyAll()或者超時線程又重新獲得Lock.

    notify(n=1):通知其他線程,那些掛起的線程接到這個通知之后會開始運行,默認是通知一個正等待該condition的線程,最多則喚醒n個等待的線程。notify()必須在已獲得Lock前提下才能調用,否則會觸發RuntimeError。notify()不會主動釋放Lock。

    notifyAll(): 如果wait狀態線程比較多,notifyAll的作用就是通知所有線程(這個一般用得少)

    現在寫個捉迷藏的游戲來具體介紹threading.Condition的基本使用。假設這個游戲由兩個人來玩,一個藏(Hider),一個找(Seeker)。游戲的規則如下:1. 游戲開始之后,Seeker先把自己眼睛蒙上,蒙上眼睛后,就通知Hider;2. Hider接收通知后開始找地方將自己藏起來,藏好之后,再通知Seeker可以找了; 3. Seeker接收到通知之后,就開始找Hider。Hider和Seeker都是獨立的個體,在程序中用兩個獨立的線程來表示,在游戲過程中,兩者之間的行為有一定的時序關系,我們通過Condition來控制這種時序關系。

復制代碼
#!/usr/bin/python3.4
# -*- coding: utf-8 -*-

import threading, time

def Seeker(cond, name):
    time.sleep(2)
    cond.acquire()
    print('%s :我已經把眼睛蒙上了!'% name)
    cond.notify()
    cond.wait()
    for i in range(3):
        print('%s is finding!!!'% name)
        time.sleep(2)
    cond.notify()
    cond.release()
    print('%s :我贏了!'% name)

def Hider(cond, name):
    cond.acquire()
    cond.wait()
    for i in range(2):
        print('%s is hiding!!!'% name)
        time.sleep(3)
    print('%s :我已經藏好了,你快來找我吧!'% name)
    cond.notify()
    cond.wait()
    cond.release()
    print('%s :被你找到了,唉~^~!'% name)


if __name__ == '__main__':
    cond = threading.Condition()
    seeker = threading.Thread(target=Seeker, args=(cond, 'seeker'))
    hider = threading.Thread(target=Hider, args=(cond, 'hider'))
    seeker.start()
    hider.start()
執行結果:
seeker :我已經把眼睛蒙上了!
hider is hiding!!!
hider is hiding!!!
hider :我已經藏好了,你快來找我吧!
seeker is finding!!!
seeker is finding!!!
seeker is finding!!!
seeker :我贏了!
hider :被你找到了,唉~^~!
復制代碼

    上面不同顏色的notify和wait一一對應關系,通知--->等待;等待--->通知

4. threading.Semaphore和BoundedSemaphore

    Semaphore:Semaphore 在內部管理着一個計數器。調用 acquire() 會使這個計數器 -1,release() 則是+1(可以多次release(),所以計數器的值理論上可以無限).計數器的值永遠不會小於 0,當計數器到 0 時,再調用 acquire() 就會阻塞,直到其他線程來調用release()

復制代碼
import threading, time


def run(n):
    # 獲得信號量,信號量減一
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s" % n)

    # 釋放信號量,信號量加一
    semaphore.release()
    #semaphore.release()    # 可以多次釋放信號量,每次釋放計數器+1
    #semaphore.release()    # 可以多次釋放信號量,每次釋放計數器+1


if __name__ == '__main__':

    num = 0
    semaphore = threading.Semaphore(2)  # 最多允許2個線程同時運行(即計數器值);在多次釋放信號量后,計數器值增加后每次可以運行的線程數也會增加
    for i in range(20):
        t = threading.Thread(target=run, args=(i,))
        t.start()

while threading.active_count() != 1:
    pass  # print threading.active_count()
else:
    print('----all threads done---')
    print(num)
復制代碼

 

    BoundedSemaphore:類似於Semaphore;不同在於BoundedSemaphore 會檢查內部計數器的值,並保證它不會大於初始值,如果超了,就引發一個 ValueError。多數情況下,semaphore 用於守護限制訪問(但不限於 1)的資源,如果 semaphore 被 release() 過多次,這意味着存在 bug

復制代碼
import threading, time


def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s" % n)
    semaphore.release()
    # 如果再次釋放信號量,信號量加一,這是超過限定的信號量數目,這時會報錯ValueError: Semaphore released too many times
    #semaphore.release()


if __name__ == '__main__':

    num = 0
    semaphore = threading.BoundedSemaphore(2)  # 最多允許2個線程同時運行
    for i in range(20):
        t = threading.Thread(target=run, args=(i,))
        t.start()

while threading.active_count() != 1:
    pass  # print threading.active_count()
else:
    print('----all threads done---')
    print(num)
復制代碼

 5. threading.Event

    事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞;如果“Flag”值為True,那么執行event.wait 方法時便不再阻塞。

    clear:將“Flag”設置為False
    set:將“Flag”設置為True
    用 threading.Event 實現線程間通信,使用threading.Event可以使一個線程等待其他線程的通知,我們把這個Event傳遞到線程對象中,

    Event默認內置了一個標志,初始值為False。一旦該線程通過wait()方法進入等待狀態,直到另一個線程調用該Event的set()方法將內置標志設置為True時,該Event會通知所有等待狀態的線程恢復運行。

    通過Event來實現兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程做交通指揮燈,生成幾個線程做車輛,車輛行駛按紅燈停,綠燈行的規則。

復制代碼
import threading, time
import random

def light():
    if not event.isSet():    #初始化evet的flag為真
        event.set()    #wait就不阻塞 #綠燈狀態
    count = 0
    while True:
        if count < 10:
            print('\033[42;1m---green light on---\033[0m')
        elif count < 13:
            print('\033[43;1m---yellow light on---\033[0m')
        elif count < 20:
            if event.isSet():
                event.clear()
            print('\033[41;1m---red light on---\033[0m')
        else:
            count = 0
            event.set()    #打開綠燈
        time.sleep(1)
        count += 1

def car(n):
    while 1:
        time.sleep(random.randrange(3, 10))
        #print(event.isSet())
        if event.isSet():
            print("car [%s] is running..." % n)
        else:
            print('car [%s] is waiting for the red light...' % n)
            event.wait()    #紅燈狀態下調用wait方法阻塞,汽車等待狀態

if __name__ == '__main__':
    car_list = ['BMW', 'AUDI', 'SANTANA']
    event = threading.Event()
    Light = threading.Thread(target=light)
    Light.start()
    for i in car_list:
        t = threading.Thread(target=car, args=(i,))
        t.start()
復制代碼

6. threading.active_count()

    返回當前存活的線程對象的數量;通過計算len(threading.enumerate())長度而來

    The returned count is equal to the length of the list returned by enumerate().

復制代碼
import threading, time


def run():
    thread = threading.current_thread()
    print('%s is running...'% thread.getName())    #返回線程名稱
    time.sleep(10)    #休眠10S方便統計存活線程數量

if __name__ == '__main__':
    #print('The current number of threads is: %s' % threading.active_count())
    for i in range(10):
        print('The current number of threads is: %s' % threading.active_count())    #返回當前存活線程數量
        thread_alive = threading.Thread(target=run, name='Thread-***%s***' % i)
        thread_alive.start()
    thread_alive.join()
    print('\n%s thread is done...'% threading.current_thread().getName())
復制代碼

7. threading.current_thread()

    Return the current Thread object, corresponding to the caller's thread of control.

    返回當前線程對象

復制代碼
>>> threading.current_thread
<function current_thread at 0x00000000029F6C80>
>>> threading.current_thread()
<_MainThread(MainThread, started 4912)>
>>> type(threading.current_thread())
<class 'threading._MainThread'>
復制代碼

    繼承線程threading方法;通過help(threading.current_thread())查看。

復制代碼
import threading, time


def run(n):
    thread = threading.current_thread()
    thread.setName('Thread-***%s***' % n)    #自定義線程名稱
    print('-'*30)
    print("Pid is :%s" % thread.ident)  # 返回線程pid
    #print('ThreadName is :%s' % thread.name)  # 返回線程名稱
    print('ThreadName is :%s'% thread.getName())    #返回線程名稱
    time.sleep(2)

if __name__ == '__main__':
    #print('The current number of threads is: %s' % threading.active_count())
    for i in range(3):
        #print('The current number of threads is: %s' % threading.active_count())    #返回當前存活線程數量
        thread_alive = threading.Thread(target=run, args=(i,))
        thread_alive.start()
    thread_alive.join()
    print('\n%s thread is done...'% threading.current_thread().getName())
執行結果: Pid is :11792 ThreadName is :Thread-***0*** ------------------------------ Pid is :12124 ThreadName is :Thread-***1*** ------------------------------ Pid is :11060 ThreadName is :Thread-***2*** MainThread thread is done...
復制代碼

8. threading.enumerate()

    Return a list of all Thread objects currently alive

    返回當前存在的所有線程對象的列表

復制代碼
import threading, time


def run(n):
    thread = threading.current_thread()
    thread.setName('Thread-***%s***' % n)
    print('-'*30)
    print("Pid is :%s" % thread.ident)  # 返回線程pid
    #print('ThreadName is :%s' % thread.name)  # 返回線程名稱
    print('ThreadName is :%s'% threading.enumerate())    #返回所有線程對象列表
    time.sleep(2)

if __name__ == '__main__':
    #print('The current number of threads is: %s' % threading.active_count())
    threading.main_thread().setName('Chengd---python')
    for i in range(3):
        #print('The current number of threads is: %s' % threading.active_count())    #返回當前存活線程數量
        thread_alive = threading.Thread(target=run, args=(i,))
        thread_alive.start()
    thread_alive.join()
    print('\n%s thread is done...'% threading.current_thread().getName())
執行結果:
Pid is :12096
ThreadName is :[<_MainThread(Chengd---python, started 12228)>, <Thread(Thread-***0***, started 12096)>, <Thread(Thread-2, initial)>]
------------------------------
Pid is :10328
ThreadName is :[<_MainThread(Chengd---python, started 12228)>, <Thread(Thread-***0***, started 12096)>, <Thread(Thread-***1***, started 10328)>, <Thread(Thread-3, initial)>]
------------------------------
Pid is :6032
ThreadName is :[<_MainThread(Chengd---python, started 12228)>, <Thread(Thread-***0***, started 12096)>, <Thread(Thread-***1***, started 10328)>, <Thread(Thread-***2***, started 6032)>]

Chengd---python thread is done...
復制代碼

9.threading.get_ident()

    返回線程pid

復制代碼
import threading, time


def run(n):
    print('-'*30)
    print("Pid is :%s" % threading.get_ident())  # 返回線程pid

if __name__ == '__main__':
    threading.main_thread().setName('Chengd---python')    #自定義線程名
    for i in range(3):
        thread_alive = threading.Thread(target=run, args=(i,))
        thread_alive.start()
    thread_alive.join()
    print('\n%s thread is done...'% threading.current_thread().getName())    #獲取線程名
復制代碼

10. threading.main_thread()

    返回主線程對象,類似 threading.current_thread();只不過一個是返回當前線程對象,一個是返回主線程對象

復制代碼
import threading, time


def run(n):
    print('-'*30)
    print("Now Pid is :%s" % threading.current_thread().ident)  # 返回當前線程pid
    print("Main Pid is :%s" % threading.main_thread().ident)  # 返回主線程pid
    print('Now thread is %s...' % threading.current_thread().getName())  # 獲取當前線程名
    print('Main thread is %s...' % threading.main_thread().getName())  # 獲取主線程線程名

if __name__ == '__main__':
    threading.main_thread().setName('Chengd---python')    #自定義線程名
    for i in range(3):
        thread_alive = threading.Thread(target=run, args=(i,))
        thread_alive.start()
        time.sleep(2)
    thread_alive.join()
執行結果: ------------------------------ Now Pid is :8984 Main Pid is :3992 Now thread is Thread-1... Main thread is Chengd---python... ------------------------------ Now Pid is :4828 Main Pid is :3992 Now thread is Thread-2... Main thread is Chengd---python... ------------------------------ Now Pid is :12080 Main Pid is :3992 Now thread is Thread-3... Main thread is Chengd---python...
復制代碼

廖大線程講解

線程常用模塊方法

多線程之Condition()

python—threading.Semaphore和BoundedSemaphore

python 多線程之信號機Semaphore示例

alex線程講解


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM