python並發編程之多線程(實戰)


  • 一 threading模塊介紹
  • 二 開啟線程的兩種方式
  • 三 在一個進程下開啟多個線程與在一個進程下開啟多個子進程的區別
  • 四 練習
  • 五 線程相關的其他方法
  • 六 守護線程 
  • 七  Python GIL(Global Interpreter Lock)
  • 八 同步鎖
  • 九 死鎖現象與遞歸鎖
  • 十 信號量Semaphore
  • 十一 Event
  • 十二 條件Condition(了解)
  • 十三 定時器
  • 十四 線程queue
  • 十五  Python標准模塊--concurrent.futures

 

一 threading模塊

multiprocess模塊的完全模仿了threading模塊的接口,二者在使用層面有很大的相似性

 

二 開啟線程的兩種方式

1. 創建threading實例 出target和args

2. 繼承threading類,創建子類並覆蓋父類的run方法

 

三 進程與線程區別

from multiprocessing import Process
from threading import Thread
import time
import os


def work():
    print('hello')
    print(os.getpid())


if __name__ == '__main__':
    start = time.time()
    t = Thread(target=work)
    t.start()
    t.join()
    end = time.time()
    print('thread: %.30fs' % (end - start))

    start = time.time()
    p = Process(target=work)
    p.start()
    p.join()
    end = time.time()
    print('process: %.30fs' % (end - start))

'''
hello
6988
thread: 0.000986576080322265625000000000s
hello
9624
process: 0.109427213668823242187500000000s
'''
進程和線程誰的開啟速度快
from threading import Thread

def work():
    global n
    n += 1

if __name__ == '__main__':
    n = 100
    print('n:', n)
    t = Thread(target=work)
    t.start()
    t.join()
    print('n:', n)
'''
n: 100
n: 101
'''
同一進程內的多線程共享資源

 

四 練習

練習一:

利用多線程並行處理客戶端訪問

from socket import *
from threading import Thread

ss = socket(AF_INET, SOCK_STREAM)
ss.bind(('127.0.0.1', 8080))
ss.listen(5)


def work(conn):
    while True:
        try:
            rev = conn.recv(1024)
            msg = rev.upper()
            conn.send(msg)
        except ConnectionResetError as e:
            print(e)
            break

if __name__ == '__main__':
    while True:
        conn, addr = ss.accept()
        t = Thread(target=work, args=(conn, ))
        t.start()
socket客戶端
import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))

while True:
    msg = input('>>: ').strip()
    if not msg:
        continue
    s.send(msg.encode('utf-8'))
    data=s.recv(1024)
    print(data)
socket用戶端

練習二:

多線程流程處理輸入

from threading import Thread

msg_l = []
format_l = []


def get():
    while True:
        data = input('>>>').strip()
        if not data:
            continue
        format_l.append(data)


def format_msg():
    while True:
        if format_l:
            res = format_l.pop()
            msg_l.append(res.upper())


def save_mdg():
    while True:
        if msg_l:
            with open('da.txt', 'a') as f:
                f.write(msg_l.pop() + '\n')


if __name__ == '__main__':
    t1 = Thread(target=get)
    t2 = Thread(target=format_msg)
    t3 = Thread(target=save_mdg)
    t1.start()
    t2.start()
    t3.start()
多線程處理輸入

 

五 線程的其他方法

 

Thread實例對象的方法
  # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名。
  # setName(): 設置線程名。

threading模塊提供的一些方法:
  # threading.currentThread(): 返回當前的線程變量。
  # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
  # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

 

 

import threading
from threading import Thread
import time


def work():
    print(threading.current_thread().getName())
    time.sleep(3)

if __name__ == '__main__':
    t = Thread(target=work, name='work')
    t.start()

    print(threading.current_thread().getName())
    print(threading.enumerate())
    print(threading.active_count())
    print(t.is_alive())

    time.sleep(4)
    print(t.is_alive())
threading其他方法示例
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    t.join()
    print('主線程')
    print(t.is_alive())
    '''
    egon say hello
    主線程
    False
    '''
主線程等待子線程結束

 

六 守護線程

無論是進程還是線程,都遵循:守護xxx會等待主xxx運行完畢后被銷毀

需要強調的是:運行完畢並非終止運行

#1.對主進程來說,運行完畢指的是主進程代碼運行完畢

#2.對主線程來說,運行完畢指的是主線程所在的進程內所有非守護線程統統運行完畢,主線程才算運行完畢

詳細解釋:

#1 主進程在其代碼結束后就已經算運行完畢了(守護進程在此時就被回收),然后主進程會一直等非守護的子進程都運行完畢后回收子進程的資源(否則會產生僵屍進程),才會結束,

#2 主線程在其他非守護線程運行完畢后才算運行完畢(守護線程在此時就被回收)。因為主線程的結束意味着進程的結束,進程整體的資源都將被回收,而進程必須保證非守護線程都運行完畢后才能結束。
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    t1=Thread(target=foo)
    t2=Thread(target=bar)

    t2.daemon=True
    t1.start()
    t2.start()
    time.sleep(2)
    print(t1.is_alive())
    print(t2.is_alive())
'''
t1是守護線程
123
456
end123
False
True
end456
'''

'''
t2是守護線程,主線程結束,守護線程被迫結束
123
456
end123
False
True
'''
守護線程因主線程被迫終止

 

七 GIL(Global Interpreter Lock)

鏈接:http://www.cnblogs.com/linhaifeng/articles/7449853.html

 

八 同步鎖

三個需要注意的點:
#1.線程搶的是GIL鎖,GIL鎖相當於執行權限,拿到執行權限后才能拿到互斥鎖Lock,其他線程也可以搶到GIL,但如果發現Lock仍然沒有被釋放則阻塞,即便是拿到執行權限GIL也要立刻交出來

#2.join是等待所有,即整體串行,而鎖只是鎖住修改共享數據的部分,即部分串行,要想保證數據安全的根本原理在於讓並發變成串行,join與互斥鎖都可以實現,毫無疑問,互斥鎖的部分串行效率要更高

#3. 一定要看本小節最后的GIL與互斥鎖的經典分析

GIL VS Lock

    機智的同學可能會問到這個問題,就是既然你之前說過了,Python已經有一個GIL來保證同一時間只能有一個線程來執行了,為什么這里還需要lock? 

 首先我們需要達成共識:鎖的目的是為了保護共享的數據,同一時間只能有一個線程來修改共享的數據

    然后,我們可以得出結論:保護不同的數據就應該加不同的鎖。

 最后,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),后者是保護用戶自己開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock

過程分析:所有線程搶的是GIL鎖,或者說所有線程搶的是執行權限

  線程1搶到GIL鎖,拿到執行權限,開始執行,然后加了一把Lock,還沒有執行完畢,即線程1還未釋放Lock,有可能線程2搶到GIL鎖,開始執行,執行過程中發現Lock還沒有被線程1釋放,於是線程2進入阻塞,被奪走執行權限,有可能線程1拿到GIL,然后正常執行到釋放Lock。。。這就導致了串行運行的效果

  既然是串行,那我們執行

  t1.start()

  t1.join

  t2.start()

  t2.join()

  這也是串行執行啊,為何還要加Lock呢,需知join是等待t1所有的代碼執行完,相當於鎖住了t1的所有代碼,而Lock只是鎖住一部分操作共享數據的代碼。

 

因為Python解釋器幫你自動定期進行內存回收,你可以理解為python解釋器里有一個獨立的線程,每過一段時間它起wake up做一次全局輪詢看看哪些內存數據是可以被清空的,此時你自己的程序 里的線程和 py解釋器自己的線程是並發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程中的clearing時刻,可能一個其它線程正好又重新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,為了解決類似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題,  這可以說是Python早期版本的遺留問題。
詳細
from threading import Thread
import os,time
def work():
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1
if __name__ == '__main__':
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果可能為99
共享數據不加鎖
from threading import Thread, Lock
import os,time
def work(lock):
    lock.acquire()
    global n
    temp=n
    time.sleep(0.01)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    n=100
    l=[]
    lock = Lock()
    for i in range(100):
        p=Thread(target=work, args=(lock, ))
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果為0
共享數據加鎖
#不加鎖:並發執行,速度快,數據不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''


#不加鎖:未加鎖部分並發執行,加鎖部分串行執行,速度慢,數據安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加鎖的代碼並發運行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    #加鎖的代碼串行運行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''

#有的同學可能有疑問:既然加鎖會讓運行變成串行,那么我在start之后立即使用join,就不用加鎖了啊,也是串行的效果啊
#沒錯:在start之后立刻使用jion,肯定會將100個任務的執行變成串行,毫無疑問,最終n的結果也肯定是0,是安全的,但問題是
#start后立即join:任務內的所有代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的
#單從保證數據安全方面,二者都可以實現,但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多么的恐怖
'''
最佳方案:涉及共享數據加鎖

 

九 死鎖現象與遞歸鎖

進程也有死鎖與遞歸鎖,在進程那里忘記說了,放到這里一切說了額

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

'''
Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖
然后就卡住,死鎖了
'''
多個線程獲取多個鎖可能導致思索現象

解決方法,遞歸鎖,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:

mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的情況,則counter繼續加1,這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止
from threading import Thread,RLock
import time
rlock = RLock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        rlock.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        rlock.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        rlock.release()
        rlock.release()

    def func2(self):
        rlock.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        rlock.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        rlock.release()
        rlock.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()
遞歸鎖解決死鎖問題

 

十 信號量Semaphore

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。

實例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數為5):

from threading import Thread, Semaphore, current_thread
import time, random

def worker(sem):
    sem.acquire()
    time.sleep(random.randrange(1,3))
    print(current_thread().getName(), 'done')
    sem.release()

if __name__ == '__main__':
    sem = Semaphore(3)
    for i in range(10):
        t = Thread(target=worker, args=(sem, ))
        t.start()
示例

 

十一 事件

同進程的一樣

線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標志被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標志為假,那么這個線程將會被一直阻塞直至該標志為真。一個線程如果將一個Event對象的信號標志設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那么它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;

event.clear():恢復event的狀態值為False。
from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('鏈接超時')
        print('<%s>第%s次嘗試鏈接' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>鏈接成功' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()
模擬mysql數據庫檢查

 

十二 條件變量

 

Condition(條件變量)通常與一個鎖關聯。需要在多個Contidion中共享一個鎖時,可以傳遞一個Lock/RLock實例給構造方法,否則它將自己生成一個RLock實例。

 

可以認為,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處於狀態圖中的等待阻塞狀態,直到另一個線程調用notify()/notifyAll()通知;得到通知后線程進入鎖定池等待鎖定。

 

Condition():

 

  • acquire(): 線程鎖
  • release(): 釋放鎖
  • wait(timeout): 線程掛起,直到收到一個notify通知或者超時(可選的,浮點數,單位是秒s)才會被喚醒繼續運行。wait()必須在已獲得Lock前提下才能調用,否則會觸發RuntimeError。
  • notify(n=1): 通知其他線程,那些掛起的線程接到這個通知之后會開始運行,默認是通知一個正等待該condition的線程,最多則喚醒n個等待的線程。notify()必須在已獲得Lock前提下才能調用,否則會觸發RuntimeError。notify()不會主動釋放Lock。
  • notifyAll(): 如果wait狀態線程比較多,notifyAll的作用就是通知所有線程
    import threading
    import time
    
    con = threading.Condition()
    
    num = 0
    
    # 生產者
    class Producer(threading.Thread):
    
        def __init__(self):
            threading.Thread.__init__(self)
    
        def run(self):
            # 鎖定線程
            global num
            con.acquire()
            while True:
                print "開始添加!!!"
                num += 1
                print "火鍋里面魚丸個數:%s" % str(num)
                time.sleep(1)
                if num >= 5:
                    print "火鍋里面里面魚丸數量已經到達5個,無法添加了!"
                    # 喚醒等待的線程
                    con.notify()  # 喚醒小伙伴開吃啦
                    # 等待通知
                    con.wait()
            # 釋放鎖
            con.release()
    
    # 消費者
    class Consumers(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
    
        def run(self):
            con.acquire()
            global num
            while True:
                print "開始吃啦!!!"
                num -= 1
                print "火鍋里面剩余魚丸數量:%s" %str(num)
                time.sleep(2)
                if num <= 0:
                    print "鍋底沒貨了,趕緊加魚丸吧!"
                    con.notify()  # 喚醒其它線程
                    # 等待通知
                    con.wait()
            con.release()
    
    p = Producer()
    c = Consumers()
    p.start()
    c.start()

     

十三 定時器

n秒后 執行某操作

from threading import Timer
 
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed
復制代碼

復制代碼
from threading import Timer
import random,time

class Code:
    def __init__(self):
        self.make_cache()

    def make_cache(self,interval=5):
        self.cache=self.make_code()
        print(self.cache)
        self.t=Timer(interval,self.make_cache)
        self.t.start()

    def make_code(self,n=4):
        res=''
        for i in range(n):
            s1=str(random.randint(0,9))
            s2=chr(random.randint(65,90))
            res+=random.choice([s1,s2])
        return res

    def check(self):
        while True:
            inp=input('>>: ').strip()
            if inp.upper() ==  self.cache:
                print('驗證成功',end='\n')
                self.t.cancel()
                break


if __name__ == '__main__':
    obj=Code()
    obj.check()

 

十四 線程queue

class queue.Queue(maxsize=0) #先進先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''
View Code

class queue.LifoQueue(maxsize=0) #last in fisrt out 

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(后進先出):
third
second
first
'''
View Code

class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先級越高,優先級高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''
View Code

其他方法

Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty
Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full
Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue.qsize()
Queue.empty() #return True if empty  
Queue.full() # return True if full 
Queue.put(item, block=True, timeout=None)
Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue.put_nowait(item)
Equivalent to put(item, False).

Queue.get(block=True, timeout=None)
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.get_nowait()
Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue.join() block直到queue被消費完畢
View Code

 

十五 concurrent.futures

#1 介紹
concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
#submit(fn, *args, **kwargs)
異步提交任務

#map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操作

#shutdown(wait=True) 
相當於進程池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源后才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait參數為何值,整個程序都會等到所有任務執行完畢
submit和map必須在shutdown之前

#result(timeout=None)
取得結果

#add_done_callback(fn)
回調函數
#介紹
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.


#用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())
ProcessPoolExecutor
#介紹
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

#用法
與ProcessPoolExecutor相同
ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    ress = executor.map(task,range(1,12)) #map取代了for+submit
    executor.shutdown(True)
    for res in ress:
        print(res)
map用法

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM