python中線程


線程

線程的實質:進程只是用來把資源集中到一起(進程只是一個資源單位,或者說資源集合),而線程才是cpu上的執行單位。

線程的特性:

  1. 同一個進程內的多個線程共享該進程內的地址資源,但也任然有自己獨立的存儲空間
  2. 創建線程的開銷要遠小於創建進程的開銷(創建一個進程,就是創建一個車間,涉及到申請空間,而且在該空間內建至少一條流水線,但創建線程,就只是在一個車間內造一條流水線,無需申請空間,所以創建開銷小)

注意:

加入你的電腦是四核的,我開四個進程的話,每一個進程占用一個cpu來運行,如果開4個線程的話 這四個線程都隸屬於一個進程里面,所有4個線程只是占用一個cpu運行(偽並發,GIL鎖)

因為解釋性語言很難知道當前這一句執行完了下一句是什么,解釋器的鍋,不是python語言的鍋

線程和進程的使用場景:

如果兩個任務,需要共享內存,又想實現異步,使用多線程

如果兩個任務,需要數據隔離,使用多進程

線程小故事


 全局解釋器鎖(GIL鎖)解釋為什么Cpython沒法使用多核優勢

GIL本質就是一把互斥鎖,既然是互斥鎖,所有互斥鎖的本質都一樣,都是將並發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。

首先確定一點:運行python文件實際上是運行python解釋器的進程,每次執行python程序,都會產生一個獨立的進程。例如python test.py,python aaa.py,python bbb.py會產生3個不同的python進程

解釋器的代碼是所有線程共享的,所以垃圾回收線程也可能訪問到解釋器的代碼而去執行,這就導致了一個問題:對於同一個數據100,可能線程1執行x=100的同時,而垃圾回收執行的是回收100的操作,解決這種問題沒有什么高明的方法,就是加鎖處理,如下圖的GIL,保證python解釋器同一時間只能執行一個任務的代碼

注:上圖表示多個線程同時執行python代碼,然后和垃圾回收線程一起訪問解釋器代碼

GIL與Lock

機智的同學可能會問到這個問題:Python已經有一個GIL來保證同一時間只能有一個線程來執行了,為什么這里還需要lock?

首先,我們需要達成共識:鎖的目的是為了保護共享的數據,同一時間只能有一個線程來修改共享的數據

然后,我們可以得出結論:保護不同的數據就應該加不同的鎖。

最后,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不一樣前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),后者是保護用戶自己開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock,如下圖

有了GIL的存在,同一時刻同一進程中只有一個線程被執行

聽到這里,有的同學立馬質問:進程可以利用多核,但是開銷大,而python的多線程開銷小,但卻無法利用多核優勢,也就是說python沒用了,php才是最牛逼的語言?

這里就回到了最開始的地方:線程和進程的使用場景:

如果兩個任務,需要共享內存,又想實現異步,使用多線程

如果兩個任務,需要數據隔離,使用多進程

python線程模塊的選擇

  Python提供了幾個用於多線程編程的模塊,包括thread、threading和Queue等。thread和threading模塊允許程序員創建和管理線程。thread模塊提供了基本的線程和鎖的支持,threading提供了更高級別、功能更強的線程管理的功能。Queue模塊允許用戶創建一個可以用於多個線程之間共享數據的隊列數據結構。
  避免使用thread模塊,因為更高級別的threading模塊更為先進,對線程的支持更為完善,而且使用thread模塊里的屬性有可能會與threading出現沖突;其次低級別的thread模塊的同步原語很少(實際上只有一個),而threading模塊則有很多;再者,thread模塊中當主線程結束時,所有的線程都會被強制結束掉,沒有警告也不會有正常的清除工作,至少threading模塊能確保重要的子線程退出后進程才退出。

  thread模塊不支持守護線程,當主線程退出時,所有的子線程不論它們是否還在工作,都會被強行退出。而threading模塊支持守護線程,守護線程一般是一個等待客戶請求的服務器,如果沒有客戶提出請求它就在那等着,如果設定一個線程為守護線程,就表示這個線程是不重要的,在進程退出的時候,不用等待這個線程退出。

threading模塊

multiprocess模塊的完全模仿了threading模塊的接口,二者在使用層面,有很大的相似性,因而不再詳細介紹

線程的創建Threading.Thread類

創建線程的方式1

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    print('主線程')

創建線程的方式2

from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('egon')
    t.start()
    print('主線程')

注意:在主進程下開啟多個線程,每個線程都跟主進程的pid一樣,而開多個進程,每個進程都有不同的pid

t.start()和t.run(),start會自動調用run,但是調用start會自動執行run,但是不一定會立即執行,是等待調度,何時真正的被調度取決於cpu,而調用t.run則是直接執行線程對象的run方法

線程相關的基本方法和使用

Thread實例對象的方法
isAlive(): 返回線程是否活動的。
getName(): 返回線程名。
setName(): 設置線程名。


threading模塊提供的一些方法:

threading.currentThread(): 返回當前的線程變量。
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。


 使用實例

from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主進程下開啟線程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主線程
    print(threading.enumerate()) #連同主線程在內有兩個運行的線程
    print(threading.active_count())
    print('主線程/主進程')

    '''
    打印結果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    主線程/主進程
    Thread-1
    '''

多線程實現socket實例

服務端:

import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
    while True:
        data=conn.recv(1024)
        print(data)
        conn.send(data.upper())

if __name__ == '__main__':

    while True:
        conn,addr=s.accept()


        p=threading.Thread(target=action,args=(conn,))
        p.start()

客戶端:

import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue

    s.send(msg.encode('utf-8'))
    data=s.recv(1024)
    print(data)

守護線程

和進程一樣,主線程依舊會等待子線程的結束才結束,如果不想這樣,把子線程設置成守護線程

無論是進程還是線程,都遵循:守護xxx會等待主xxx運行完畢后被銷毀

需要強調的是:運行完畢並非終止運行

#1.對主進程來說,運行完畢指的是主進程代碼運行完畢

#2.對主線程來說,運行完畢指的是主線程所在的進程內所有非守護線程統統運行完畢,主線程才算運行完畢

詳細解釋:

#1 主進程在其代碼結束后就已經算運行完畢了(守護進程在此時就被回收),然后主進程會一直等非守護的子進程都運行完畢后回收子進程的資源(否則會產生僵屍進程),才會結束,

#2 主線程在其他非守護線程運行完畢后才算運行完畢(守護線程在此時就被回收)。因為主線程的結束意味着進程的結束,進程整體的資源都將被回收,而進程必須保證非守護線程都運行完畢后才能結束。

實例

from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=Thread(target=foo)
t2=Thread(target=bar, daemon=True)

# t2.daemon=True
t1.start()
t2.start()
print("main-------")

# 123
# 456
# main-------
# end123

死鎖和遞歸鎖

一 死鎖現象

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖(A和B都拿着鑰匙,但A要B的鑰匙才能還鎖,B要A的鑰匙才能還鎖,這就是死鎖)

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

執行效果

Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖 #出現死鎖,整個程序阻塞住

二 遞歸鎖

解決方法,遞歸鎖,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖,二者的區別是:遞歸鎖可以連續acquire多次,而互斥鎖只能acquire一次

from threading import Thread,RLock
import time

mutexA=mutexB=RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的情況,則counter繼續加1,這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()
Python大法就是好,提供了with自動加鎖和解鎖!有了它,加鎖時,我們就可以解放雙手~(≧▽≦)/~啦啦啦!
官方大法:Lock 對象和 with 語句塊一起使用可以保證互斥執行,就是每次只有一個線程可以執行 with 語句包含的代碼塊。with 語句會在這個代碼塊執行前自動獲取鎖,在執行結束后自動釋放鎖。
import threading
class SharedCounter:

    def __init__(self, init_value=0):
        self.init_value = init_value
        self._value_lock = threading.Lock()

    def incr(self, delta=1):
        with self._value_lock:
            self.init_value += delta

    def decr(self, delta=1):
        with self._value_lock:
            self.init_value -= delta

使用遞歸鎖,盡可能避免死鎖的發生!強烈推薦使用遞歸鎖加with大法!

import threading

class SharedCounter:
    _lock = threading.RLock()

    def __init__(self, initial_value=0):
        self._value = initial_value

    def incr(self, delta=1):
        with SharedCounter._lock:
            self._value += delta

    def decr(self, delta=1):
        with SharedCounter._lock:
            self.incr(-delta)

信號量

同進程的一樣

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。

實例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數為5):

from threading import Thread,Semaphore
import threading
import time
# def func():
#     if sm.acquire():
#         print (threading.currentThread().getName() + ' get semaphore')
#         time.sleep(2)
#         sm.release()
def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()

Event事件

同進程的一樣,線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。

對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標志被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標志為假,那么這個線程將會被一直阻塞直至該標志為真。

一個線程如果將一個Event對象的信號標志設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那么它將忽略這個事件, 繼續執行


event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞線程,如果給wait傳值,那么就代表只阻塞多長時間;

event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;

event.clear():恢復event的狀態值為False。


 

例如,有多個工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務正常才讓那些工作線程去連接MySQL服務器,如果連接不成功,都會去嘗試重新連接。那么我們就可以采用threading.Event機制來協調各個工作線程的連接操作

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('鏈接超時')
        print('<%s>第%s次嘗試鏈接' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>鏈接成功' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()

條件

使得線程等待,只有滿足某條件時,才釋放n個線程

import threading
 
def run(n):
    con.acquire()
    con.wait()  # 等着
    print("run the thread: %s" %n)
    con.release()
 
if __name__ == '__main__':
    con = threading.Condition()  # 條件=鎖+wait的功能
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
 
    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))  # 傳遞信號,可以放行幾個
        con.release()

定時器

定時器,指定n秒后執行某操作

from threading import Timer
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

驗證碼定時器

from threading import Timer
import random,time

class Code:
    def __init__(self):
        self.make_cache()

    def make_cache(self,interval=5):
        self.cache=self.make_code()
        print(self.cache)
        self.t=Timer(interval,self.make_cache)
        self.t.start()

    def make_code(self,n=4):
        res=''
        for i in range(n):
            s1=str(random.randint(0,9))
            s2=chr(random.randint(65,90))
            res+=random.choice([s1,s2])
        return res

    def check(self):
        while True:
            inp=input('>>: ').strip()
            if inp.upper() ==  self.cache:
                print('驗證成功',end='\n')
                self.t.cancel()
                break


if __name__ == '__main__':
    obj=Code()
    obj.check()

線程queue

queue隊列 :使用import queue,用法與進程Queue一樣

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先進先出
import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''
class queue.LifoQueue(maxsize=0) #last in fisrt out 
import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(后進先出):
third
second
first
'''
class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列,同數字FIFO
import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先級越高,優先級高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''

Python標准模塊--concurrent.futures實現線程池和進程池

Python標准庫為我們提供了threading和multiprocessing模塊編寫相應的多線程/多進程代碼,但是當項目達到一定的規模,頻繁創建/銷毀進程或者線程是非常消耗資源的,這個時候我們就要編寫自己的線程池/進程池,以空間換時間。但從Python3.2開始,標准庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫線程池/進程池提供了直接的支持。

1.Executor和Future:

  concurrent.futures模塊的基礎是Exectuor,Executor是一個抽象類,它不能被直接使用。但是它提供的兩個子類ThreadPoolExecutor和ProcessPoolExecutor卻是非常有用,顧名思義兩者分別被用來創建線程池和進程池的代碼。我們可以將相應的tasks直接放入線程池/進程池,不需要維護Queue來操心死鎖的問題,線程池/進程池會自動幫我們調度。

  Future這個概念相信有java和nodejs下編程經驗的朋友肯定不陌生了,你可以把它理解為一個在未來完成的操作,這是異步編程的基礎,傳統編程模式下比如我們操作queue.get的時候,在等待返回結果之前會產生阻塞,cpu不能讓出來做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。

  p.s: 如果你依然在堅守Python2.x,請先安裝futures模塊。

https://docs.python.org/dev/library/concurrent.futures.html

值得一提的是Executor實現了__enter__和__exit__使得其對象可以使用with操作符

池子的意義:

在剛開始學多進程或多線程時,我們迫不及待地基於多進程或多線程實現並發的套接字通信,然而這種實現方式的致命缺陷是:服務的開啟的進程數或線程數都會隨着並發的客戶端數目地增多而增多,這會對服務端主機帶來巨大的壓力,甚至於不堪重負而癱瘓,於是我們必須對服務端開啟的進程數或線程數加以控制,讓機器在一個自己可以承受的范圍內運行,這就是進程池或線程池的用途,例如進程池,就是用來存放進程的池子,本質還是基於多進程,只不過是對開啟進程的數目加上了限制

注意:如果機器能夠支持100個線程沒有必要開10個線程的那種線程池,直接開100線程更快

開進程池和線程池都有兩種方式:

第一種:

進程池——multiprocessing.Pool

線程池——multiprocessing.dummy.Pool

第二種:

進程池——from concurrent.futures import ProcessPoolExecutor

線程池——from concurrent.futures import ThreadPoolExecutor

注:第二種是對第一種的高度封裝

官網:https://docs.python.org/dev/library/concurrent.futures.html

concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
Both implement the same interface, which is defined by the abstract Executor class.

基本方法

1、submit(fn, *args, **kwargs)
異步提交任務,不阻塞,不用等任務提交完就能繼續往下執行,*args和**kwargs是參數

2、map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操作

3、shutdown(wait=True) 
相當於進程池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源后才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait參數為何值,整個程序都會等到所有任務執行完畢
submit和map必須在shutdown之前(shutdown之前之后不能再提交任務了)

4、result(timeout=None)
取得結果,該方法會使異步池變成同步,因為得拿到結果才能進行下一步,優化是把線程對象先放在列表里,結束后循環拿

5、add_done_callback(fn)
回調函數,這里參數接收的是函數,函數接收的不是返回值,而是future對象  pool.submit(task).add_done_callback(other_task)

6、pool.submit().done()
可以判定提交的任務是否完成,完成了返回True,否則返回done

進程池進階

介紹

The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.

用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)  # 提交任務,task是任務名,i是參數
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())

線程池使用

介紹

ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

用法

把ProcessPoolExecutor換成ThreadPoolExecutor,其余用法全部相同

from concurrent.futures import ThreadPoolExecutor
from urllib2 import urlopen
import time
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'http://qq.com/']

def load_url(url):
    res = urlopen(url, timeout=60)
    print('%r page is %d bytes' % (url, len(res.read())))

if __name__ == '__main__':
    start = time.time()
    executor = ThreadPoolExecutor(max_workers=3)
    #使用submit方式
    for url in URLS:
        future = executor.submit(load_url,url)
        #print(future.done())
        print (future.result()) #加了.result()會阻塞主線程
    #使用map方式
    #executor.map(load_url, URLS)
    end = time.time()
    #print('主線程')
    print (end-start)
    ####

concurrent.futures.wait:

  wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另外一個是uncompleted(未完成的)。使用wait方法的一個優勢就是獲得更大的自由度,它接收三個參數FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默認設置為ALL_COMPLETED。

  如果采用默認的ALL_COMPLETED,程序會阻塞直到線程池里面的所有任務都完成,再執行主線程:

from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        print('%r page is %d bytes' % (url, len(conn.read())))

executor = ThreadPoolExecutor(max_workers=3)

f_list = []
for url in URLS:
    future = executor.submit(load_url,url)
    f_list.append(future)
print(wait(f_list,return_when='ALL_COMPLETE'))

print('主線程')

# 運行結果:
'http://www.163.com' page is 662047 bytes
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 54629 bytes
DoneAndNotDoneFutures(done={<Future at 0x2d0f898 state=finished returned NoneType>, <Future at 0x2bd0630 state=finished returned NoneType>, <Future at 0x2d27470 state=finished returned NoneType>}, not_done=set())
主線程

future

future是concurrent.futures模塊和asyncio模塊的重要組件
從python3.4開始標准庫中有兩個名為Future的類:concurrent.futures.Future和asyncio.Future
這兩個類的作用相同:兩個Future類的實例都表示可能完成或者尚未完成的延遲計算。與Twisted中的Deferred類、Tornado框架中的Future類的功能類似

注意:通常情況下自己不應該創建future,而是由並發框架(concurrent.futures或asyncio)實例化

原因:future表示終將發生的事情,而確定某件事情會發生的唯一方式是執行的時間已經安排好,因此只有把某件事情交給concurrent.futures.Executor子類處理時,才會創建concurrent.futures.Future實例。
如:Executor.submit()方法的參數是一個可調用的對象,調用這個方法后會為傳入的可調用對象排定時間,並返回一個future

客戶端代碼不能應該改變future的狀態,並發框架在future表示的延遲計算結束后會改變期物的狀態,我們無法控制計算何時結束。

這兩種future都有.done()方法,這個方法不阻塞,返回值是布爾值,指明future鏈接的可調用對象是否已經執行。客戶端代碼通常不會詢問future是否運行結束,而是會等待通知。因此兩個Future類都有.add_done_callback()方法,這個方法只有一個參數,類型是可調用的對象,future運行結束后會調用指定的可調用對象。

.result()方法是在兩個Future類中的作用相同:返回可調用對象的結果,或者重新拋出執行可調用的對象時拋出的異常。但是如果future沒有運行結束,result方法在兩個Futrue類中的行為差別非常大。
對concurrent.futures.Future實例來說,調用.result()方法會阻塞調用方所在的線程,直到有結果可返回,此時,result方法可以接收可選的timeout參數,如果在指定的時間內future沒有運行完畢,會拋出TimeoutError異常。
而asyncio.Future.result方法不支持設定超時時間,在獲取future結果最好使用yield from結構,但是concurrent.futures.Future不能這樣做

不管是asyncio還是concurrent.futures.Future都會有幾個函數是返回future,其他函數則是使用future,在最開始的例子中我們使用的Executor.map就是在使用future,返回值是一個迭代器,迭代器的__next__方法調用各個future的result方法,因此我們得到的是各個futrue的結果,而不是future本身


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM