並發編程之多線程


一 threading模塊介紹

multiprocess模塊的完全模仿了threading模塊的接口,二者在使用層面,有很大的相似性,因而不再詳細介紹

官網鏈接:https://docs.python.org/3/library/threading.html?highlight=threading#

二 開啟線程的兩種方式

#方式一
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('太白',))
    t.start()
    print('主線程')
方式一
#方式二
from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('egon')
    t.start()
    print('主線程')
方式二

三 在一個進程下開啟多個線程與在一個進程下開啟多個子進程的區別

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    #在主進程下開啟線程
    t=Thread(target=work)
    t.start()
    print('主線程/主進程')
    '''
    打印結果:
    hello
    主線程/主進程
    '''

    #在主進程下開啟子進程
    t=Process(target=work)
    t.start()
    print('主線程/主進程')
    '''
    打印結果:
    主線程/主進程
    hello
    '''
誰的開啟速度快
開啟速度對比
from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    #part1:在主進程下開啟多個線程,每個線程都跟主進程的pid一樣
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主線程/主進程pid',os.getpid())

    #part2:開多個進程,每個進程都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主線程/主進程pid',os.getpid())
瞅一瞅pid
對比pid
from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('主',n) #毫無疑問子進程p已經將自己的全局的n改成了0,但改的僅僅是它自己的,查看父進程的n仍然為100


    n=1
    t=Thread(target=work)
    t.start()
    t.join()
    print('',n) #查看結果為0,因為同一進程內的線程之間共享進程內的數據
同一進程內的線程共享該進程的數據?
同一個進程內的線程共享該進程的數據

小練習

  socket相關練習

#_*_coding:utf-8_*_
#!/usr/bin/env python
import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
    while True:
        data=conn.recv(1024)
        print(data)
        conn.send(data.upper())

if __name__ == '__main__':

    while True:
        conn,addr=s.accept()


        p=threading.Thread(target=action,args=(conn,))
        p.start()

多線程並發的socket服務端
服務端
#_*_coding:utf-8_*_
#!/usr/bin/env python


import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue

    s.send(msg.encode('utf-8'))
    data=s.recv(1024)
    print(data)

客戶端
客戶端

  一個接收用戶輸入,一個將用戶輸入的內容格式化成大寫,一個將格式化后的結果存入文件

from threading import Thread
msg_l=[]
format_l=[]
def talk():
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        msg_l.append(msg)

def format_msg():
    while True:
        if msg_l:
            res=msg_l.pop()
            format_l.append(res.upper())

def save():
    while True:
        if format_l:
            with open('db.txt','a',encoding='utf-8') as f:
                res=format_l.pop()
                f.write('%s\n' %res)

if __name__ == '__main__':
    t1=Thread(target=talk)
    t2=Thread(target=format_msg)
    t3=Thread(target=save)
    t1.start()
    t2.start()
    t3.start()
View Code

四  線程相關的其他方法

Thread實例對象的方法
  # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名。
  # setName(): 設置線程名。

threading模塊提供的一些方法:
  # threading.currentThread(): 返回當前的線程變量。
  # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
  # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主進程下開啟線程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主線程
    print(threading.enumerate()) #連同主線程在內有兩個運行的線程
    print(threading.active_count())
    print('主線程/主進程')

    '''
    打印結果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    主線程/主進程
    Thread-1
    '''
View Code

主線程等待子線程結束

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    t.join()
    print('主線程')
    print(t.is_alive())
    '''
    egon say hello
    主線程
    False
    '''
View Code

五 守護線程

無論是進程還是線程,都遵循:守護xxx會等待主xxx運行完畢后被銷毀

需要強調的是:運行完畢並非終止運行

#1.對主進程來說,運行完畢指的是主進程代碼運行完畢

#2.對主線程來說,運行完畢指的是主線程所在的進程內所有非守護線程統統運行完畢,主線程才算運行完畢

詳細解釋:

#1 主進程在其代碼結束后就已經算運行完畢了(守護進程在此時就被回收),然后主進程會一直等非守護的子進程都運行完畢后回收子進程的資源(否則會產生僵屍進程),才會結束,

#2 主線程在其他非守護線程運行完畢后才算運行完畢(守護線程在此時就被回收)。因為主線程的結束意味着進程的結束,進程整體的資源都將被回收,而進程必須保證非守護線程都運行完畢后才能結束。
復制代碼
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必須在t.start()之前設置
    t.start()

    print('主線程')
    print(t.is_alive())
    '''
    主線程
    True
    '''
復制代碼
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True
t1.start()
t2.start()
print("main-------")

迷惑人的例子
View Code

六 互斥鎖(同步鎖)

多線程的同步鎖與多進程的同步鎖是一個道理,就是多個線程搶占同一個數據(資源)時,我們要保證數據的安全,合理的順序。

from threading import Thread
import time

x = 100
def task():
    global x
    temp = x
    time.sleep(0.1)
    temp -= 1
    x = temp



if __name__ == '__main__':
    t_l1 = []
    for i in range(100):
        t = Thread(target=task)
        t_l1.append(t)
        t.start()

    for i in t_l1:
        i.join()
    print(f'主{x}')
不加鎖搶占同一個資源的問題
from threading import Thread
from threading import Lock
import time

x = 100
lock = Lock()

def task():
    global x
    lock.acquire()
    temp = x
    time.sleep(0.1)
    temp -= 1
    x = temp
    lock.release()


if __name__ == '__main__':
    t_l1 = []
    for i in range(100):
        t = Thread(target=task)
        t_l1.append(t)
        t.start()

    for i in t_l1:
        i.join()
    print(f'主{x}')
同步鎖保證數據安全

七 死鎖現象與遞歸鎖

進程也有死鎖與遞歸鎖,進程的死鎖和遞歸鎖與線程的死鎖遞歸鎖同理。

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖

from threading import Thread
from threading import Lock
import time

lock_A = Lock()
lock_B = Lock()


class MyThread(Thread):

    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        lock_A.acquire()
        print(f'{self.name}拿到A鎖')

        lock_B.acquire()
        print(f'{self.name}拿到B鎖')
        lock_B.release()

        lock_A.release()

    def f2(self):
        lock_B.acquire()
        print(f'{self.name}拿到B鎖')
        time.sleep(0.1)

        lock_A.acquire()
        print(f'{self.name}拿到A鎖')
        lock_A.release()

        lock_B.release()

if __name__ == '__main__':
    for i in range(3):
        t = MyThread()
        t.start()

    print('主....')
View Code

解決方法,遞歸鎖,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:

from threading import Thread
from threading import RLock
import time

lock_A = lock_B = RLock()


class MyThread(Thread):
    
    def run(self):
        self.f1()
        self.f2()
    
    def f1(self):
        lock_A.acquire()
        print(f'{self.name}拿到A鎖')
        
        lock_B.acquire()
        print(f'{self.name}拿到B鎖')
        lock_B.release()
        
        lock_A.release()
    
    def f2(self):
        lock_B.acquire()
        print(f'{self.name}拿到B鎖')
        time.sleep(0.1)
        
        lock_A.acquire()
        print(f'{self.name}拿到A鎖')
        lock_A.release()
        
        lock_B.release()


if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()
    
    print('主....')
    
View Code

八 信號量Semaphore

同進程的一樣

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。

實例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數為5):

from threading import Thread
from threading import Semaphore
from threading import current_thread
import time
import random

sem = Semaphore(5)

def go_public_wc():
    sem.acquire()
    print(f'{current_thread().getName()} 上廁所ing')
    time.sleep(random.randint(1,3))
    sem.release()


if __name__ == '__main__':
    for i in range(20):
        t = Thread(target=go_public_wc)
        t.start()
View Code

九 Python GIL(Global interpreter Lock)

首先,一些語言(java、c++、c)是支持同一個進程中的多個線程是可以應用多核CPU的,也就是我們會聽到的現在4核8核這種多核CPU技術的牛逼之處。那么我們之前說過應用多進程的時候如果有共享數據是不是會出現數據不安全的問題啊,就是多個進程同時一個文件中去搶這個數據,大家都把這個數據改了,但是還沒來得及去更新到原來的文件中,就被其他進程也計算了,導致數據不安全的問題啊,所以我們是不是通過加鎖可以解決啊,多線程大家想一下是不是一樣的,並發執行就是有這個問題。但是python最早期的時候對於多線程也加鎖,但是python比較極端的(在當時電腦cpu確實只有1核)加了一個GIL全局解釋鎖,是解釋器級別的,鎖的是整個線程,而不是線程里面的某些數據操作,每次只能有一個線程使用cpu,也就說多線程用不了多核,但是他不是python語言的問題,是CPython解釋器的特性,如果用Jpython解釋器是沒有這個問題的,Cpython是默認的,因為速度快,Jpython是java開發的,在Cpython里面就是沒辦法用多核,這是python的弊病,歷史問題,雖然眾多python團隊的大神在致力於改變這個情況,但是暫沒有解決。(這和解釋型語言(python,php)和編譯型語言有關系嗎???待定!,編譯型語言一般在編譯的過程中就幫你分配好了,解釋型要邊解釋邊執行,所以為了防止出現數據不安全的情況加上了這個鎖,這是所有解釋型語言的弊端??)

  

 

    但是有了這個鎖我們就不能並發了嗎?當我們的程序是偏計算的,也就是cpu占用率很高的程序(cpu一直在計算),就不行了,但是如果你的程序是I/O型的(一般你的程序都是這個)(input、訪問網址網絡延遲、打開/關閉文件讀寫),在什么情況下用的到高並發呢(金融計算會用到,人工智能(阿爾法狗),但是一般的業務場景用不到,爬網頁,多用戶網站、聊天軟件、處理文件),I/O型的操作很少占用CPU,那么多線程還是可以並發的,因為cpu只是快速的調度線程,而線程里面並沒有什么計算,就像一堆的網絡請求,我cpu非常快速的一個一個的將你的多線程調度出去,你的線程就去執行I/O操作了,

  詳細的GIL鎖介紹:https://www.cnblogs.com/jin-xin/articles/11232225.html

十 GIL鎖與Lock的關系

GIL VS Lock

    機智的同學可能會問到這個問題,就是既然你之前說過了,Python已經有一個GIL來保證同一時間只能有一個線程來執行了,為什么這里還需要lock? 

 首先我們需要達成共識:鎖的目的是為了保護共享的數據,同一時間只能有一個線程來修改共享的數據

    然后,我們可以得出結論:保護不同的數據就應該加不同的鎖。

 最后,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),后者是保護用戶自己開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock

過程分析:所有線程搶的是GIL鎖,或者說所有線程搶的是執行權限

  線程1搶到GIL鎖,拿到執行權限,開始執行,然后加了一把Lock,還沒有執行完畢,即線程1還未釋放Lock,有可能線程2搶到GIL鎖,開始執行,執行過程中發現Lock還沒有被線程1釋放,於是線程2進入阻塞,被奪走執行權限,有可能線程1拿到GIL,然后正常執行到釋放Lock。。。這就導致了串行運行的效果

  既然是串行,那我們執行

  t1.start()

  t1.join

  t2.start()

  t2.join()

  這也是串行執行啊,為何還要加Lock呢,需知join是等待t1所有的代碼執行完,相當於鎖住了t1的所有代碼,而Lock只是鎖住一部分操作共享數據的代碼。
復制代碼

詳解:

因為Python解釋器幫你自動定期進行內存回收,你可以理解為python解釋器里有一個獨立的線程,每過一段時間它起wake up做一次全局輪詢看看哪些內存數據是可以被清空的,此時你自己的程序 里的線程和 py解釋器自己的線程是並發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程中的clearing時刻,可能一個其它線程正好又重新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,為了解決類似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題,  這可以說是Python早期版本的遺留問題。

十一 Event(事件)

同進程的一樣

線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標志被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標志為假,那么這個線程將會被一直阻塞直至該標志為真。一個線程如果將一個Event對象的信號標志設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那么它將忽略這個事件, 繼續執行

復制代碼
event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;

event.clear():恢復event的狀態值為False。
復制代碼

 

例如,有多個工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務正常才讓那些工作線程去連接MySQL服務器,如果連接不成功,都會去嘗試重新連接。那么我們就可以采用threading.Event機制來協調各個工作線程的連接操作

 

十二 條件Condition(了解)

使得線程等待,只有滿足某條件時,才釋放n個線程,看一下大概怎么用就可以啦~~

import time
from threading import Thread,RLock,Condition,current_thread

def func1(c):
    c.acquire(False) #固定格式
    # print(1111)

    c.wait()  #等待通知,
    time.sleep(3)  #通知完成后大家是串行執行的,這也看出了鎖的機制了
    print('%s執行了'%(current_thread().getName()))

    c.release()

if __name__ == '__main__':
    c = Condition()
    for i in range(5):
        t = Thread(target=func1,args=(c,))
        t.start()

    while True:
        num = int(input('請輸入你要通知的線程個數:'))
        c.acquire() #固定格式
        c.notify(num)  #通知num個線程別等待了,去執行吧
        c.release()

#結果分析: 
# 請輸入你要通知的線程個數:3
# 請輸入你要通知的線程個數:Thread-1執行了 #有時候你會發現的你結果打印在了你要輸入內容的地方,這是打印的問題,沒關系,不影響
# Thread-3執行了
# Thread-2執行了

示例代碼
View Code

 

十三 定時器(了解)

 定時器,指定n秒后執行某個操作,這個做定時任務的時候可能會用到。

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('鏈接超時')
        print('<%s>第%s次嘗試鏈接' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>鏈接成功' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()
View Code

 

十四 線程隊列

  線程之間的通信我們列表行不行呢,當然行,那么隊列和列表有什么區別呢?

  queue隊列 :使用import queue,用法與進程Queue一樣

  queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

  class queue.Queue(maxsize=0) #先進先出
import queue #不需要通過threading模塊里面導入,直接import queue就可以了,這是python自帶的
#用法基本和我們進程multiprocess中的queue是一樣的
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
# q.put_nowait() #沒有數據就報錯,可以通過try來搞
print(q.get())
print(q.get())
print(q.get())
# q.get_nowait() #沒有數據就報錯,可以通過try來搞
'''
結果(先進先出):
first
second
third
'''

先進先出示例代碼
先進先出示例

  class queue.LifoQueue(maxsize=0) #last in fisrt out

import queue

q=queue.LifoQueue() #隊列,類似於棧,棧我們提過嗎,是不是先進后出的順序啊
q.put('first')
q.put('second')
q.put('third')
# q.put_nowait()

print(q.get())
print(q.get())
print(q.get())
# q.get_nowait()
'''
結果(后進先出):
third
second
first
'''

先進后出示例代碼
先進后出示例

  class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高
q.put((-10,'a'))
q.put((-5,'a'))  #負數也可以
# q.put((20,'ws'))  #如果兩個值的優先級一樣,那么按照后面的值的acsii碼順序來排序,如果字符串第一個數元素相同,比較第二個元素的acsii碼順序
# q.put((20,'wd'))
# q.put((20,{'a':11})) #TypeError: unorderable types: dict() < dict() 不能是字典
# q.put((20,('w',1)))  #優先級相同的兩個數據,他們后面的值必須是相同的數據類型才能比較,可以是元祖,也是通過元素的ascii碼順序來排序

q.put((20,'b'))
q.put((20,'a'))
q.put((0,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先級越高,優先級高的優先出隊):
'''

優先級隊列示例代碼
優先級隊列示例

這三種隊列都是線程安全的,不會出現多個線程搶占同一個資源或數據的情況。

 

十五 Python標准模塊--concurrent.futures

到這里就差我們的線程池沒有講了,我們用一個新的模塊給大家講,早期的時候我們沒有線程池,現在python提供了一個新的標准或者說內置的模塊,這個模塊里面提供了新的線程池和進程池,之前我們說的進程池是在multiprocessing里面的,現在這個在這個新的模塊里面,他倆用法上是一樣的。

為什么要將進程池和線程池放到一起呢,是為了統一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式一樣,而且只要通過這個concurrent.futures導入就可以直接用他們兩個了

復制代碼
concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
#submit(fn, *args, **kwargs)
異步提交任務

#map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操作

#shutdown(wait=True) 
相當於進程池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源后才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait參數為何值,整個程序都會等到所有任務執行完畢
submit和map必須在shutdown之前

#result(timeout=None)
取得結果

#add_done_callback(fn)
回調函數
復制代碼
import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    print('%s打印的:'%(threading.get_ident()),n)
    return n*n
tpool = ThreadPoolExecutor(max_workers=5) #默認一般起線程的數據不超過CPU個數*5
# tpool = ProcessPoolExecutor(max_workers=5) #進程池的使用只需要將上面的ThreadPoolExecutor改為ProcessPoolExecutor就行了,其他都不用改
#異步執行
t_lst = []
for i in range(5):
    t = tpool.submit(func,i) #提交執行函數,返回一個結果對象,i作為任務函數的參數 def submit(self, fn, *args, **kwargs):  可以傳任意形式的參數
    t_lst.append(t)  #
    # print(t.result())
    #這個返回的結果對象t,不能直接去拿結果,不然又變成串行了,可以理解為拿到一個號碼,等所有線程的結果都出來之后,我們再去通過結果對象t獲取結果
tpool.shutdown() #起到原來的close阻止新任務進來 + join的作用,等待所有的線程執行完畢
print('主線程')
for ti in t_lst:
    print('>>>>',ti.result())

# 我們還可以不用shutdown(),用下面這種方式
# while 1:
#     for n,ti in enumerate(t_lst):
#         print('>>>>', ti.result(),n)
#     time.sleep(2) #每個兩秒去去一次結果,哪個有結果了,就可以取出哪一個,想表達的意思就是說不用等到所有的結果都出來再去取,可以輪詢着去取結果,因為你的任務需要執行的時間很長,那么你需要等很久才能拿到結果,通過這樣的方式可以將快速出來的結果先拿出來。如果有的結果對象里面還沒有執行結果,那么你什么也取不到,這一點要注意,不是空的,是什么也取不到,那怎么判斷我已經取出了哪一個的結果,可以通過枚舉enumerate來搞,記錄你是哪一個位置的結果對象的結果已經被取過了,取過的就不再取了

#結果分析: 打印的結果是沒有順序的,因為到了func函數中的sleep的時候線程會切換,誰先打印就沒准兒了,但是最后的我們通過結果對象取結果的時候拿到的是有序的,因為我們主線程進行for循環的時候,我們是按順序將結果對象添加到列表中的。
# 37220打印的: 0
# 32292打印的: 4
# 33444打印的: 1
# 30068打印的: 2
# 29884打印的: 3
# 主線程
# >>>> 0
# >>>> 1
# >>>> 4
# >>>> 9
# >>>> 16

ThreadPoolExecutor的簡單使用
ThreaPoolExecutor簡單使用

ProcessPoolExecutor的使用:

只需要將這一行代碼改為下面這一行就可以了,其他的代碼都不用變
tpool = ThreadPoolExecutor(max_workers=5) #默認一般起線程的數據不超過CPU個數*5
# tpool = ProcessPoolExecutor(max_workers=5)

你就會發現為什么將線程池和進程池都放到這一個模塊里面了,用法一樣
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import threading
import os,time,random
def task(n):
    print('%s is runing' %threading.get_ident())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    s = executor.map(task,range(1,5)) #map取代了for+submit
    print([i for i in s])

map的使用
map的使用
import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    return n*n

def call_back(m):
    print('結果為:%s'%(m.result()))

tpool = ThreadPoolExecutor(max_workers=5)
t_lst = []
for i in range(5):
    t = tpool.submit(func,i).add_done_callback(call_back)

回調函數簡單應用
回調函數簡單使用
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<進程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,需要用obj.result()拿到結果

回調函數的應用,需要你自己去練習的
回調函數的簡單應用(練習)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM