python並發編程之多線程


開啟線程的兩種方式:

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello'%name)
if__name__=='__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    print('主線程')
方式一
from threading import Thread
import time
class Sayhi(Thread):
    def__init__(self,name):
        supper().__init__()
        self.name=name
    def run(self):
        time.sleep(2):
        print('%s say hello'%self.name)
if __name__=='__main__':
    t=Say('egon')
    t.start()
    print('主線程')
方式二

在這里我要說明一下他們誰的開啟速度快

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    #在主進程下開啟線程
    t=Thread(target=work)
    t.start()
    print('主線程/主進程')
    '''
    打印結果:
    hello
    主線程/主進程
    '''

    #在主進程下開啟子進程
    t=Process(target=work)
    t.start()
    print('主線程/主進程')
    '''
    打印結果:
    主線程/主進程
    hello

很明顯我們可以看到:在線程里面會先打印子線程在打印主線程,而在進程里面會先打印主進程然后打印子進程。(在這里我想簡單的說一下,就是說你開啟一個進程,你得去重新獲得資源,然而開啟線程的時候,資源已經存在了,不需要去開辟新的資源,所以它的開啟速度就會明顯快了好多)

補充:開啟一個線程,在他之上有一個進程,我們在開啟線程的時候回自動產生一個線程,這個線程就叫做主線程,開啟的線程叫做其他線程(為什么不叫做子線程呢,就是因為在這里線程他只是共享資源,他們之間沒有任何的依賴關系)

接下來講一下:同一進程內的線程共享該進程的數據(資源)

from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('主',n) #毫無疑問子進程p已經將自己的全局的n改成了0,但改的僅僅是它自己的,查看父進程的n仍然為100


    n=1
    t=Thread(target=work)
    t.start()
    t.join()
    print('',n) #查看結果為0,因為同一進程內的線程之間共享進程內的數據

(在這里我要簡單的說明一下,為什么同一進程內的線程可以共享該進程的數據,從這個實例中我們可以清楚的看到,在進程中,子進程他只是將自己的n改成了0,而父進程的n始終都是100,而對於線程來說,n的結果是0,這就是因為,同一進程內的線程共享該進程的數據)

實例:三個任務,一個接收用戶輸入,一個將用戶輸入的內容格式化成大寫,一個將格式化后的結果存入文件(首先,在這里面三個任務是同時執行的,)

from threading import Thread
msg_l=[]
format_l=[]
def talk():
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        msg_l.append(msg)

def format_msg():
    while True:
        if msg_l:
            res=msg_l.pop()
            format_l.append(res.upper())

def save():
    while True:
        if format_l:
            with open('db.txt','a',encoding='utf-8') as f:
                res=format_l.pop()
                f.write('%s\n' %res)

if __name__ == '__main__':
    t1=Thread(target=talk)
    t2=Thread(target=format_msg)
    t3=Thread(target=save)
    t1.start()
    t2.start()
    t3.start()

線程相關的其他方法:

Thread實例對象的方法
  # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名。
  # setName(): 設置線程名。

threading模塊提供的一些方法:
  # threading.currentThread(): 返回當前的線程變量。
  # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
  # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
復制代碼
from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主進程下開啟線程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主線程
    print(threading.enumerate()) #連同主線程在內有兩個運行的線程
    print(threading.active_count())
    print('主線程/主進程')

    '''
    打印結果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    主線程/主進程
    Thread-1
    '''

主線程等待子線程結束

復制代碼
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    t.join()
    print('主線程')
    print(t.is_alive())
    '''
    egon say hello
    主線程
    False
    '''
復制代碼

守護線程:

無論是進程還是線程,都是:守護xxx會等待主xxx完畢后被銷毀,

主進程與主線程在什么情況下才算運行完畢

1.主進程在其代碼結束后就已經算運行完畢了,(守護進程就在此時被回收)。主進程會一直等非守護的子進程都運行玩不后回收子進程的資源,(否則會產生僵屍進程),才會結束

2.主線程在其他非守護線程運行完畢后才算運行完畢(守護線程在此時就被回收)。主線程的結束意味着進程的結束,進程整體的資源都被回收,因而主線程必須在其余非守護線程都運行完畢后才能結束

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必須在t.start()之前設置
    t.start()

    print('主線程')
    print(t.is_alive())
    '''
    主線程
    True

八 同步鎖

三個需要注意的點:
#1.分析Lock的同時一定要說明:線程搶的是GIL鎖,拿到執行權限后才能拿到互斥鎖Lock

#2.使用join與加鎖的區別:join是等待所有,即整體串行,而鎖只是鎖住一部分,即部分串行

#3. 一定要看本小節最后的GIL與互斥鎖的經典分析

GIL VS Lock

    機智的同學可能會問到這個問題,就是既然你之前說過了,Python已經有一個GIL來保證同一時間只能有一個線程來執行了,為什么這里還需要lock? 

 首先我們需要達成共識:鎖的目的是為了保護共享的數據,同一時間只能有一個線程來修改共享的數據

    然后,我們可以得出結論:保護不同的數據就應該加不同的鎖。

 最后,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),后者是保護用戶自己開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock

過程分析:所有線程搶的是GIL鎖,或者說所有線程搶的是執行權限

  線程1搶到GIL鎖,拿到執行權限,開始執行,然后加了一把Lock,還沒有執行完畢,即線程1還未釋放Lock,有可能線程2搶到GIL鎖,開始執行,執行過程中發現Lock還沒有被線程1釋放,於是線程2進入阻塞,被奪走執行權限,有可能線程1拿到GIL,然后正常執行到釋放Lock。。。這就導致了串行運行的效果

  既然是串行,那我們執行

  t1.start()

  t1.join

  t2.start()

  t2.join()

  這也是串行執行啊,為何還要加Lock呢,需知join是等待t1所有的代碼執行完,相當於鎖住了t1的所有代碼,而Lock只是鎖住一部分操作共享數據的代碼。

過程分析:所有線程搶的是GIL鎖,或者說所有線程搶的是執行權限

  

過程分析:所有線程搶的是GIL鎖,或者說所有線程搶的是執行權限

 

因為Python解釋器幫你自動定期進行內存回收,你可以理解為python解釋器里有一個獨立的線程,每過一段時間它起wake up做一次全局輪詢看看哪些內存數據是可以被清空的,此時你自己的程序 里的線程和 py解釋器自己的線程是並發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程中的clearing時刻,可能一個其它線程正好又重新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,為了解決類似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題,  這可以說是Python早期版本的遺留問題。
from threading import Thread
import os,time
def work():
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1
if __name__ == '__main__':
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果可能為99

鎖通常被用來實現對共享資源的同步訪問。為每一個共享資源創建一個Lock對象,當你需要訪問該資源時,調用acquire方法來獲取鎖對象(如果其它線程已經獲得了該鎖,則當前線程需等待其被釋放),待資源訪問完后,再調用release方法釋放鎖:

import threading

R=threading.Lock()

R.acquire()
'''
對公共數據的操作
'''
R.release()
from threading import Thread,Lock
import os,time
def work():
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果肯定為0,由原來的並發執行變成串行,犧牲了執行效率保證了數據安全
分析:
  #1.100個線程去搶GIL鎖,即搶執行權限
     #2. 肯定有一個線程先搶到GIL(暫且稱為線程1),然后開始執行,一旦執行就會拿到lock.acquire()
     #3. 極有可能線程1還未運行完畢,就有另外一個線程2搶到GIL,然后開始運行,但線程2發現互斥鎖lock還未被線程1釋放,於是阻塞,被迫交出執行權限,即釋放GIL
    #4.直到線程1重新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,然后其他的線程再重復2 3 4的過程
GIL鎖與互斥鎖綜合分析(重點!!!)
不加鎖:並發執行,速度快,數據不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''


#不加鎖:未加鎖部分並發執行,加鎖部分串行執行,速度慢,數據安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加鎖的代碼並發運行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    #加鎖的代碼串行運行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''

#有的同學可能有疑問:既然加鎖會讓運行變成串行,那么我在start之后立即使用join,就不用加鎖了啊,也是串行的效果啊
#沒錯:在start之后立刻使用jion,肯定會將100個任務的執行變成串行,毫無疑問,最終n的結果也肯定是0,是安全的,但問題是
#start后立即join:任務內的所有代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的
#單從保證數據安全方面,二者都可以實現,但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多么的恐怖
'''
互斥鎖與join的區別(重點!!!)

九 死鎖現象與遞歸鎖

進程也有死鎖與遞歸鎖,在進程那里忘記說了,放到這里一切說了額

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

'''
Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖
然后就卡住,死鎖了
'''

解決方法,遞歸鎖,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:

mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的情況,則counter繼續加1,這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止

十 信號量Semaphore

同進程的一樣

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。

實例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數為5)

from threading import Thread,Semaphore,currentThread
import time,random
s=Semaphore(5)
def task():
    s.acquire()
    print('%s 上廁所'%currentThread().getName())
    time.sleep(random.randint(1,3))
    print('%s 走了'%currentThread().getName())
    s.release()
if __name__ == '__main__':
    for i in range(20):
        t=Thread(target=task)
        t.start()

與進程池是完全不同的概念,進程池Pool(4),最大只能產生4個進程,而且從頭到尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程

也可以這樣理解啊:信號量本身就相當於一種鎖,他只有在釋放的時候其他的線程才能進入,而進程池里面的進程是固定的,就是說它只有在運行完畢后,其他的進程才會進入啊

十一 Event

同進程的一樣

線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標志被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標志為假,那么這個線程將會被一直阻塞直至該標志為真。一個線程如果將一個Event對象的信號標志設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那么它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;

event.clear():恢復event的狀態值為False。
from threading import Thread,Event,currentThread
import time
e=Event()
def conn_mysql():
    count=1
    while not e.is_set():
        if count > 3:
            raise ConnectionError('嘗試鏈接的次數過多')
        print('\033[45m%s 第%s次嘗試' %(currentThread().getName(),count))
        e.wait(timeout=1)
        count+=1
    print('\033[45m%s 開始鏈接' %currentThread().getName())

def check_mysql():
    print('\033[45m%s 檢測mysql...' %currentThread().getName())
    time.sleep(2)
    e.set()
if __name__ == '__main__':
    for i in range(3):
        t=Thread(target=conn_mysql)
        t.start()
    t=Thread(target=check_mysql)
    t.start()

十三 定時器

簡單來說就是制定多少秒后執行某個操作

from threading import Timer
def hello(n):
    print('hello world')
# if __name__ == '__main__':
s=Timer(5,hello,args=(123,))
s.start()

十四 線程queue

queue隊列 :使用import queue,用法與進程Queue一樣

class queue.Queue(maxsize=0) #先進先出

import queue

q=queue.Queue(3)
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''

class queue.LifoQueue(maxsize=0) #last in fisrt out 

'先進后出'
# import queue
# q=queue.LifoQueue()
# q.put('first')
# q.put('second')
# q.put('third')
# print(q.get())
# print(q.get())
# print(q.get())

class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先級越高,優先級高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM