並發編程


Python並發編程

author:素心

本文比較長,繞的也比較快,需要慢慢跟着敲代碼並親自運行一遍,並發編程本身來說就是編程里面最為抽象的概念,單純的理論確實很枯燥,但這是基礎,基礎不牢,地洞山搖,在概念這節里面還需要好好的品味一番。如果跟着這篇文章敲代碼的話,推薦Python 3.6 版本以上

一、概念

Python並發的概念非常的抽象,但同時也非常的重要,因為這事關能不能准確的寫出高並發的質量性代碼。

進程:顧名思義,正在進行的一個過程。

背景:進程起源於操作系統,是操作系統最核心的概念,也是操作系統提供的最古老最重要最抽象的概念之一。

1. 單核並發

即使操作系統只有一個CPU,但是使用進程的概念也能使這一個CPU支持並發的能力,這種並發就稱之為偽並發

將一個CPU變成多個虛擬的CPU的技術就稱之為多道技術。而多道技術又分為時間多路復用和空間多路復用(當然,必須硬件支持隔離),所以就有這樣一個理論,沒有進程的抽象,現代計算機將不復存在。

2. 操作系統

在進程並發上,操作系統起着非常重要的作用,它隱藏了復雜的硬件接口,提供了良好的抽象接口,與此同時,它更是對進程的管理與調度起着不可或缺的作用,因為如果沒有操作系統來管理進程,多個進程就會變得雜亂無章,使得計算機資源嚴重浪費。

3. 多道技術

前邊提到了多道技術,不難發現,它就是針對單核的計算機實現並發。

(1) 空間復用

舉個例子:

假設現在雙擊桌面上的IDM快捷方式,然后呈現軟件界面。其實后台運行了很多的復雜的IO操作。

  • 雙擊快捷方式
  • 快捷方式會告訴操作系統一個資源路徑,也就是快捷方式所對應的應用程序的路徑
  • 操作系統從硬盤讀取文件內容並克隆到內存中
  • CPU從內存中讀取數據,然后執行

而這里的多道技術可以理解為多個程序,使用空間復用就是在內存中同時跑多個程序,夠簡單了吧。

(2) 時間復用

同樣舉例說明:

我們知道有一款下載軟件叫IDM,下載速度非常的快,它的原理其實就是將一個文件切割成非常多的細小文件下載,假設現在下載一張圖片,它會將一張圖片分為10塊,現在有2個進程同時下載,在遇到阻塞的情況下,就切換到其他小塊下載,下載完之后再回到這個小塊上試試能不能繼續下載,如果不能,就再切換到其他的小塊,如此往復循環,直到一個完整的文件下載完畢。

時間上的復用,其實就是復用CPU上的一個時間片,進程在執行的時候,遇到IO就切換,占用CPU時間過長的時候也切,值得注意的是,在切換之前,會保存進程的狀態,這樣才能保證下次切換回來的時候繼續上次停頓的地方。

二、多進程

多進程的概念網上比比皆是,簡單來講,就是正在進行的一個過程或者一個任務,而負責執行任務的是CPU。

需要注意的是,進程和程序之間是有區別的,兩者絕對不能混淆。

進程與程序之間的區別:

程序僅僅是一堆代碼,而進程是這堆代碼的執行過程

看到這里還在迷糊不要緊,舉個例子就能很明白了

栗子:

假設M正在織毛衣,組成毛衣的毛線就是一堆堆的代碼,而M就是CPU正在執行織毛衣的過程。

那么現在M的丈夫回來了,丈夫說好餓,這個時候就需要考慮哪件事情相對來說重要,M考慮一下,覺得先做飯比較重要,這就是優先級,然后M記錄下自己織的毛衣織到哪里了,再去做飯,這種切換就是處理其他優先級高的任務,每個進程擁有各自的程序,就是菜和毛線,當M做完飯后又回來織毛衣,從離開任務的地方繼續執行。

這里需要注意一點:同一個程序執行多次,也就是多個進程,比如上邊的IDM,我啟動兩次,就既可以下載蒼老師,也可以下載波老師,兩個進程之間互不影響。

1. 並發與並行

並行:多個程序同時運行

並發:偽並行,看起來是同時運行,其實質是利用了多道技術

無論是並行還是並發,在用戶眼里看起來都是同時運行的,不管是線程還是進程,都只是一個任務,真正干活的CPU,而同一個CPU在同一時刻只能執行一個任務。

2. 進程的創建

我們將可以跑很多應用程序的系統稱之為通用系統,那么對於通用系統來說,創建進程有4中形式

  • 系統的初始化,在linux中查看進程ps,windows中使用任務管理器。在前台運行的進程負責與用戶發生交互,后台進程與用戶無關,而有些時候,用戶需要去喚醒后台的進程與之發生交互,這種類似於睡眠的后台進程就稱之為守護進程
  • 一個進程在開啟的時候必須開啟子進程工作。比如python的fork方法
  • 用戶的交互式請求使得操作系統創建一個新的進程。如雙擊IDM
  • 批處理作業的初始化。這種情況只會在大型機中出現

不管是哪一種,創建新進程都是由已經存在的進程調用系統創建進程的接口來實現的。

  • linux中的系統調用是fork,它會創建一個與本身進程一模一樣的副本進程,這個被創建的進程就是子進程,二者具有相同的存儲映像、相同的環境字符串和相同的打開文件。比如說,在shell解釋器中,每執行一個命令就會創建一個子進程
  • windows中調用是createProcess,它會有兩種作用,既創建進程,還會將程序裝進新的進程

以上兩種操作系統創建進程並不完全一樣

  • 相同點:進程在創建之后,兩個進程都各自有不同的地址空間,任何一個進程的地址空間的修改都不會影響其他進程
  • 不同點:在linux中,子進程的初始地址空間是父進程的一個副本,它的子進程和父進程之間是可以存在只讀的共享內存區;windows中,從一開始兩個進程的地址空間完全不同

所以說,學習Python推薦在linux上學習,便於后期進程之間的通信,而mac其核心也是linux,所以linux的任何python代碼在mac上都是可行的

3. 進程的終止

進程有四種退出方式

  • 正常退出:用戶行為退出。比如點擊IDM界面的X號關掉,在linux中使用exit或者quit
  • 報錯退出:用戶行為報錯。比如現在執行命令python demo.py,而該路徑下並沒有demo.py文件
  • 告警退出:系統本身出錯。比如執行 pa -aux,而系統本身是沒有該命令
  • 殺死退出:其他進程殺死。比如常用的kil -9

4. 進程的層其結構

  • 在linux中,所有的進程都是以init進程為根,組成樹形結構。父進程共同組成進程組。
  • 在windows中沒有進程層次之分,所有進程地位相同。

值得注意的是:

在windows創建進程的時候,父進程會得到一個特別的令牌,這個令牌就是句柄,這個句柄可以控制子進程,這時進程就有了層次的概念,但是windows中的父進程有權把句柄傳遞給其他子進程,這樣一來,進程又沒有了層次的概念。

5. 進程的狀態

進程有三種狀態:就緒、阻塞、運行

舉個例子:

現在執行linux命令tail -f web.log | grep '404'

執行程序tail,則會開啟一個子進程,而grep又會另外開啟一個子進程,這兩個進程基於管道|來進行通訊,也就是將tail出來的內容交給grep處理。在這個過程中,grep等待tail的結果,這種現象就是阻塞,如果tail一直阻塞,則grep將無法執行。

實質上總結出一下兩點情況:

  • 進程掛起是自身原因,在遇到IO阻塞,就讓出CPU給其他進程,這樣下來就保證了CPU一直處於工作的狀態
  • 在遇到CPU占用時間過長或者處理優先級較高的進程

6. 進程並發的實現

之所以在這里添加進程表的概念,是為了更好的理解操作系統與進程之間的關聯,了解即可

所謂進程的並發,無非是硬件中斷一個正在進行的進程,然后保存當前進程的狀態。

操作系統會維護一張表格,我們稱之為進程表,每一個進程都會占用一個進程表項,這個進程表項就稱之為進程控制塊

這張表大致分四塊:進程描述信息、進程控制信息、CPU現場保護結構

(1) 進程描述信息

  • 進程名或者進程標識號:每個進程都有自己唯一的進程名或標識號
  • 用戶名或者用戶識別號:每個進程隸屬於某個用戶,有利於資源共享和保護
  • 家族關系:有的系統中的所有進程互成家族關系。比如常見的linux系統就是以init為根的家族樹

(2) 進程控制信息

  • 進程當前狀態:說明進程當前處於什么狀態,就是前邊提到的就緒、運行、阻塞
  • 進程優先級:用於選取進程占有CPU。與優先級有關的PCB表項還有占有CPU時間、進程優先級偏移、占據內存時間等等
  • 程序開始地址:用來規定該進程以此地開始進行
  • 各種計時信息:給出進程占有和利用資源的情況
  • 通信信息:說明該進程在執行過程中與其他進程之間發生的信息交換
  • 資源管理信息:占用內存大小以及其管理用數據結構指針

(3) CPU現場保護結構

寄存器值:通用、程序計數器PC、狀態PSW、地址包括棧指針

7. 開啟子進程

如果你順利的看到了這里,則已經掌握了進程的理論知識,現在開始最為精彩的代碼小節

(1) multiprocessing模塊

Python中的多線程是無法利用計算多核的優勢,如果需要充分的使用多核資源,在Python中大部分使用多進程。

multiprocessing模塊用來開啟子進程,並且在子進程中執行指定的任務。

該模塊功能諸多:支持子進程、通信、數據共享、執行不同形式的同步,更是提供了 Process、Queue、Pipe、Lock等組件

這里一定要注意:與線程不同,進程沒有任何的共享狀態,進程修改的數據、改動僅限於該進程之內

(2) Process類

創建進程的類:Process([group [, target [, name [, args [, kwargs]]]]])

由該類實例化得到的對象,表示一個子進程中的任務。

注意:

  • 這里的參數必須使用關鍵字來指定
  • args為傳給target函數的位置參數,必須以元組的形式傳入

參數

group:值始終為None

target:調用的對象,即子進程需要執行的任務

args:位置參數元組,按照位置傳參

kwargs:按照字典傳參

name:子進程名稱

方法

假設p = Process()

p.start():啟動進程

p.run():進程啟動時運行的方法,用於調用target來指定需要執行的函數

p.terminate():強制終止進程p,並不會做任何的清理操作,如果p下還創建了子進程,那么這個子進程並沒有父進程處理,這個子進程就稱之為僵屍進程

p.is_alive():判斷p是否還在運行,如果還在運行則返回True

p.join([timeout]):主線程等待p終止,也可以理解為回收計算機資源,值得注意的是,主線程從始至終處於等待的狀態,而p是處於運行的狀態,join只能作用於start的進程,而不能作用於run的進程

屬性介紹:

p.daemon:默認值為False,如果設置為True,則代表p為后台的守護進程,也就是前邊提到的后台運行的守護進程等待用戶與之發生交互。當p的父進程終止的時候,p也隨之終止 ,並且設定為True之后是不能創建自己的新進程的,改設置必須在start之前設置

p.name:進程的名稱

p.pid:進程的PID,和linux的PID類似

(3) 代碼實現

from multiprocessing import Process
import time

def fun(name):
    print("{} 正在執行。。。".format(name))
    time.sleep(2)
    print("{} 執行完畢。。。".format(name))

if __name__ == '__main__':
    p = Process(target=fun, args=('Chancey',)) # 這里傳參必須是以元組的形式傳參
    p.start()
    print("主線程啟動。。。")

上述例子非常簡單,不過,這只是開啟了一個進程,接下來開啟4個進程

from multiprocessing import Process
import time

def fun(name):
    print("{} 正在執行。。。".format(name))
    time.sleep(2)
    print("{} 執行完畢。。。".format(name))

if __name__ == '__main__':
    # 實例化以得到四個對象
    p1 = Process(target=fun, args=('Chancey',))
    p2 = Process(target=fun, args=('Waller',))
    p3 = Process(target=fun, args=('Mary',))
    p4 = Process(target=fun, args=('Arry',))

    # 調用方法,開啟四個進程
    p1.start()
    p2.start()
    p3.start()
    p4.start()

    print("主線程啟動。。。")

運行上述代碼的時候會發現,四個進程同時進行,同時結束,這是因為設定了sleep的時間。

還有一種啟動方式,就是面向對象的三大特征之一的繼承,我們通過繼承process類並重寫父類方法以達到我們的需求

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self, name):
        super().__init__() # 重用父類的init方法
        self.name = name

    def run(self): # 重寫父類方法
        print("{} 正在執行。。。".format(self.name))
        time.sleep(2)
        print("{} 執行完畢。。。".format(self.name))

if __name__ == '__main__':
    p = MyProcess('Chancey')
    p.start()
    print('主進程')

那么開啟多進程就變成了

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self, name):
        super().__init__() # 重用父類的init
        self.name = name

    def run(self): # 重寫父類方法
        print("{} 正在執行。。。".format(self.name))
        time.sleep(2)
        print("{} 執行完畢。。。".format(self.name))

if __name__ == '__main__':
    p1 = MyProcess('Chancey')
    p2 = MyProcess('Waller')
    p3 = MyProcess('Mary')
    p4 = MyProcess('Arry')

    p1.start()
    p2.start()
    p3.start()
    p4.start()

    print('主進程')

(4) 查看進程的信息

pid用來查看父進程的ID

ppid用來查看子進程的ID

from multiprocessing import Process
import time
import os

class MyProcess(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print("{} 正在執行。。。子進程ID:{},父進程ID:{}".format(self.name, os.getpid(), os.getppid()))
        time.sleep(2)
        print("{} 執行完畢。。。子進程ID:{},父進程ID:{}".format(self.name, os.getpid(), os.getppid()))

if __name__ == '__main__':
    p1 = MyProcess('Chancey')
    p2 = MyProcess('Waller')
    p3 = MyProcess('Mary')

    p1.start()
    p2.start()
    p3.start()

    print("主進程", os.getppid(), os.getppid())

非常的簡單,同一個父進程下邊有三個子進程工作

(5) 其他屬性

在Python的多進程編程中,還有其他很重要的Process對象屬性

join方法

在主進程運行的過程中如果想並發的執行其他任務,就需要開啟子進程,這時就有兩種情況

  • 如果主進程的任務和子進程的任務彼此獨立,主進程在完成執行任務之后等待子進程執行完畢,然后統一回收資源;
  • 如果主進程在執行到某一個階段需要子進程執行完畢之后才能繼續,屆時就需要一種機制來檢測子進程是否執行完畢,而這種檢測機制正是join的作用。如果子進程沒有執行完畢,就需要阻塞等待

簡單示例:

from multiprocessing import Process
import time
import os

class MyProcess(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print("{} 正在執行。。。子進程ID:{},父進程ID:{}".format(self.name, os.getpid(), os.getppid()))
        time.sleep(2)
        print("{} 執行完畢。。。子進程ID:{},父進程ID:{}".format(self.name, os.getpid(), os.getppid()))

if __name__ == '__main__':
    p1 = MyProcess('Chancey')
    p2 = MyProcess('Waller')

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print('主進程', '父進程ID:', os.getpid(), '子進程ID:', os.getppid())

運行截圖:

在這里就可以很容易的發現,主進程從原來的最先執行變為了最后,這正是因為使用join使得主進程等待子進程執行完畢才回收,那么,這樣下來會不會有僵屍進程的存在,有關僵屍進程忘記的,請移步至第7節的(2)Process類。只需要在上邊的代碼結尾加上print(p.pid())即可查看

事實證明,這里確實存在了僵屍進程。

多個進程同時進行就是並發,如果多個進程是每一個都等待上一個進程執行完畢之后才執行,這種執行方式就是串行

對上邊代碼稍微改動一下:

from multiprocessing import Process
import time
import os

class MyProcess(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print("{} 正在執行。。。子進程ID:{},父進程ID:{}".format(self.name, os.getpid(), os.getppid()))
        time.sleep(2)
        print("{} 執行完畢。。。子進程ID:{},父進程ID:{}".format(self.name, os.getpid(), os.getppid()))

if __name__ == '__main__':
    p1 = MyProcess('Chancey')
    p2 = MyProcess('Waller')

    p1.start()
    p1.join()

    p2.start()
    p2.join()

    print('主進程', '父進程ID:', os.getpid(), '子進程ID:', os.getppid())

    print(p1.pid)
    print(p2.pid)

貼上運行截圖

不難發現,其實每個進程都是分時間段進行的,在同一時間並沒同時進行,正是所謂的串行。

is_alive方法

該方法是用於查看進程是否存活,如果存活則返回True,反之亦然。

同時這里還有個屬性name,用於給進程起名

from multiprocessing import Process
import time
import os

def fun(name):
    print("{} 正在執行。。。子進程ID:{},父進程ID:{}".format(name, os.getpid(), os.getppid()))
    time.sleep(2)
    print("{} 執行完畢。。。子進程ID:{},父進程ID:{}".format(name, os.getpid(), os.getppid()))

if __name__ == '__main__':
    p = Process(target=fun, args=('Chancey',))
    print(p.is_alive())
    p.start()
    print(p.is_alive())
    print(p.name)
    print("主進程, 父進程ID:{},子進程ID:{}".format(os.getpid(), os.getppid()))

    print('='*60)

    p = Process(target=fun, args=('Waller',), name="Cute_Process")
    print(p.is_alive())
    p.start()
    print(p.is_alive())
    p.terminate() # 強制殺死該進程
    print(p.is_alive())
    time.sleep(3)
    print(p.is_alive())
    print(p.name)
    print("主進程, 父進程ID:{},子進程ID:{}".format(os.getpid(), os.getppid()))

上邊的代碼中p.terminate()是為了后邊驗證是否存活的使用

這里可以清晰的看到,在剛剛執行了p.terminate()之后還是返回了True,但是在sleep了3秒之后,又變成了False,所以說,在殺死一個進程的時候並不會立即回收空間;沒有命名的進程默認Process-1,取名之后為自己定義的那個值。

在函數中我們都知道如果要在一個函數中使用另外一個函數中的變量,就可以這么寫

n = 0

def fun1():
    global n
    n = 100
    print(n)

def fun2():
    print(n)

if __name__ == '__main__':
    fun1()
    fun2()

這樣一來,在fun2中輸出的n就是fun1中的值,那么進程中同樣使用global關鍵字試試

from multiprocessing import Process

n = 0

def fun1():
    global n
    n = 100
    print('子進程中的變量:', n)


if __name__ == '__main__':
    p = Process(target=fun1)
    p.start()
    print('主進程中的變量:', n)

非常容易就能發現,其實關鍵字並沒有起作用,這就證明,進程之間的內存空間是隔離的

(6) 守護進程

前邊有介紹過什么是守護進程

那么定義守護進程必須在start之前使用daemon定義

寫個未使用守護進程的例子:

from multiprocessing import Process
import time

def fun(name):
    print('{}正在執行'.format(name))
    p = Process(target=time.sleep, args=(3,))
    p.start()

if __name__ == '__main__':
    p = Process(target=fun, args=("Chancey", ))
    p.start()
    p.join()
    print('主進程')

運行上邊的代碼不會出現任何問題,現在將其改為守護進程

只需要在創建實例的時候添加參數daemon = True即可

動手能力強的人可能已經跑了一遍修改過的代碼了,毫無疑問,這段代碼有錯誤,看一下官方文檔

這個說明正好對應上了上邊修改過代碼的報錯

沒有猜錯,它就是說守護進程的子進程不能再次創建子進程,再次修改代碼

from multiprocessing import Process
import time

def fun(name):
    print('{}正在執行'.format(name))
    # p = Process(target=time.sleep, args=(3,))
    # p.start()
    time.sleep(3)

if __name__ == '__main__':
    p = Process(target=fun, args=("Chancey", ), daemon=True)
    p.start()
    p.join()
    print('主進程')

這樣一來,主進程總是等待子進程執行完畢才執行,那如果不做join呢,去掉上邊代碼中的p.join(),執行發現,主進程並沒有等待子進程執行完畢,而是直接退出,這也就使得子進程的任務並沒有被執行就被迫退出,這就是守護進程存在的意義。

來個多進程的實例:

from multiprocessing import Process

import time

def game(name):
   print('%s 正在玩游戲。。。' % name)
   time.sleep(3)
   print('%s 玩完游戲了。。。' % name)

def sing(name):
   print('%s 正在唱歌。。。' % name)
   time.sleep(3)
   print('%s 唱完歌了。。。' % name)

if __name__ == '__main__':
   p1 = Process(target=game, args=('Chancey', ), daemon=True)
   p2 = Process(target=sing, args=('Chancey', ))

   p1.start()
   p2.start()

   print("進程一:", p1.name)
   print("進程二:", p2.name)

主進程結束后,只會讓子進程跟着結束,但是其他的子進程會依舊執行,這就是為什么game沒有執行而sing執行的原因,如果在這里join一下呢,答案是肯定的,game和sing都會執行。

(7) 互斥鎖

現在有一段代碼

from multiprocessing import Process
import time

def foo(name):
    print("進程{}輸出: 1".format(name))
    time.sleep(2)
    print("進程{}輸出: 2".format(name))
    time.sleep(3)
    print("進程{}輸出: 3".format(name))

if __name__ == '__main__':
    for i in range(3):
        p = Process(target=foo, args=(i, ))
        p.start()

先不要運行,分析一下邏輯,當運行一個進程,函數中的代碼自上而下的執行,所以它應該是這樣輸出的

現在運行一下代碼

並不是想象那樣輸出,輸出步驟完全紊亂,要是用這樣的操作數據庫的話,后果不堪設想,估計就得跑路了。

OK,為了解決這種出現數據紊亂的情況,就出現了Lock()互斥鎖,它會在運行的時候鎖住資源,從而使得其他進程並不會使用該資源。

通俗點講:現在有一群工人,他們要搶一間房子,當一個工人搶到之后就給房間上一把鎖,然后執行任務,這時其他工人在外邊等候,當這個搶到房子並完成任務開鎖之后,其他工人才能進入執行任務。

這里的工人就是進程,房子就是計算機資源,而門鎖就是互斥鎖,正是因為有了互斥鎖,才保證了共享數據的完整性

語法

mutex = Lock():實例化一個互斥鎖

mutex.acquire():上鎖

mutex.release():解鎖

使用

from multiprocessing import Process, Lock
import time

def foo(i, mutex):
    mutex.acquire()
    print("進程{}輸出: 1".format(i))
    time.sleep(2)
    print("進程{}輸出: 2".format(i))
    time.sleep(1)
    print("進程{}輸出: 3".format(i))
    mutex.release()

if __name__ == '__main__':
    mutex = Lock()
    for i in range(3):
        p = Process(target=foo, args=(i, mutex))
        p.start()

然后這里輸出正常了,但是,這里變成了串行, 因為使用時占用時間,影響其他進程等待,所以盡量修改處理塊的數據后立即釋放鎖。

用當下最為火熱的搶票過程演示一下互斥鎖的使用場景

查詢余票是並發的,而購票只能確保一個人成功,所以購票的方法應該使用互斥鎖

from multiprocessing import Process, Lock
import time

def search(name):
    time.sleep(1)
    with open('data.txt') as f:
        count = int(f.read())
    print('<%s> 查看到剩余票數【%s】' % (name, count))

def get(name):
    time.sleep(1)
    f = open('data.txt')
    count = f.read()
    f.close()
    count = int(count)
    if count > 0:
        count -= 1
        time.sleep(1)
        f = open('data.txt', 'w')
        f.write(str(count))
        f.close()
        print('<%s> 購票成功' % name)


def start(name, mutex):
    search(name)
    mutex.acquire()
    get(name)
    mutex.release()


if __name__ == '__main__':
    mutex = Lock()
    for i in range(5):
        p = Process(target=start, args=('顧客%s' % i, mutex))
        p.start()

這里的data.txt里只有余票,就是數字2,通過運行,多人並發查詢,一人購票

這里想到了join一下里面的get或者search,原理上就會出現所有程序的串行運行,極大的降低了程序的運行效率。

(8) 隊列

前邊有提到,每個進程的空間都是互相隔離的,如果想要在進程之間進行通信,就需要的手段。通常情況下,需要在內存中開辟一塊空間,然后多個進程使用同一空間進行IO操作,這個空間就是管道空間

按照作用可以分為兩種:

  • 雙向管道:全雙工,所有進程均可讀寫(默認)
  • 單向管道:半雙工,一個只讀,一個只寫

管道均由Pipe()實現

但是如果多個進程同時進入管道的話,數據依舊會亂,那么這個時候,還是需要加鎖,而multiprocessing還提供了Queue,即隊列,隊列就是管道加鎖的體現。

Queue

Queue():參數為隊列的最大項數,不設置則不限制大小

注意:隊列里面存放的消息而不是數據,另外,隊列占用的是內存空間,所以參數即使不設置也會受限於內存大小

方法

q.put():插入數據至隊列

q.get():從隊列中獲取一個數據並刪除

q.full():判斷隊列是否已滿,已滿的話返回True,反之亦然

q.empty():判斷隊列是否為空,空則返回True,反之亦然

q.nowait(obj):相當於put(obj, Flase)

q.size() :返回隊列的大致長度,不夠准確,甚至在linux平台報NotImplementedError

例如:

from multiprocessing import Queue

q = Queue()

for i in range(5):
    q.put(i)
    print('隊列是否已滿:', q.full())
    print('隊列大小:', q.qsize())

for i in range(5):
    q.get()
    print('取出一個消息')
    print('隊列是否為空:', q.empty())

這里並沒有限制隊列的大小,所以隊列一直沒滿,而在取出的時候,直到取完最后一個消息的時候才返回True

現在設置一下隊列大小,將q = Queue()改為q = Queue(3)

可以看到,當隊列大小達到3的時候,進程阻塞,因為設置了隊列的大小使得消息添加不進去,但是取出的方法還沒有執行,所以就一直阻塞

(9) 生產者和消費者

上邊例子中,put添加消息,get取出消息,實際上就是生產者與消費者的關系,一個負責生產數據,一個負責消費數據。

在並發編程中,如果生產者的處理速度非常快,而消費者處理速度慢,這時生產者就需要等待消費者消費完隊列的數據才再次生產。同樣的道理,如果消費者的處理速度快於生產者的速度,消費者也是要等到生產者生產出數據才能繼續執行任務。

實質上,生產者和消費者模式是通過一個容器來來解決生產者和消費者之間的耦合度問題。他們之間彼此不直接通信。而當生產者或者消費者積累一定消息的時候,彼此無法執行,所以當生產者生產完數據的時候,直接扔進一個地方,然后消費者去那個地方那數據,有一個緩沖的作用,被稱之為阻塞隊列,阻塞隊列平衡了生產者和消費者的處理能力,用於耦合他們。

示例:

from multiprocessing import Process, Queue
import time

# 生產程序
def production(q, number):
    for i in range(3):
        msg = 'URL%s' % i
        time.sleep(2)
        print("生產者%s生產了" % number, msg)

        q.put(msg)

# 消費程序
def consumption(q, number):
    while True:
        msg = q.get()
        if msg is None : break
        time.sleep(2)
        print('消費者%s爬取了%s' % (number, msg))

if __name__ == '__main__':
    q = Queue()

    # 生產者們
    p1 = Process(target=production, args=(q, 1))
    p2 = Process(target=production, args=(q, 2))
    p3 = Process(target=production, args=(q, 3))

    # 消費者們
    c1 = Process(target=consumption, args=(q, 1))
    c2 = Process(target=consumption, args=(q, 2))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()


    p1.join()
    p2.join()
    p3.join()

    q.put(None)
    q.put(None)

    print("主線程")

以上生產者充當URL生產器,而消費者則為爬蟲,消耗URL來爬取數據,這也正是上次爬蟲博文中采用並發的方式

8. JoinableQueue

查看官方文檔可以看到,除了前邊提到的使用Queue來處理隊列,這里還有JoinableQueue,其實JoinableQueue就像是一個Queue對象,但是隊列允許項目的消費者來通知生產者已經成功處理,通知進程是通過共享的信號和條件。

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue是Queue的子類,額外添加了task_done()join()方法。

參數介紹

task_done():指出之前進入隊列的任務已經完成。

由隊列的消費者進程使用。對於每次調用get() 獲取的任務,執行完成后調用 task_done() 告訴隊列該任務已經處理完成;如果join()方法正在阻塞之中,該方法會在所有對象都被處理完的時候返回 (即對之前使用put()放進隊列中的所有對象都已經返回了對應的task_done() ) ;如果被調用的次數多於放入隊列中的項目數量,將引發ValueError 異常 。

join():阻塞至隊列中所有的元素都被接收和處理完畢。

當條目添加到隊列的時候,未完成任務的計數就會增加。每當消費者進程調用task_done() 表示這個條目已經被回收,該條目所有工作已經完成,未完成計數就會減少。當未完成計數降到零的時候, join() 阻塞被解除。

from multiprocessing import Process, JoinableQueue
import time

def producer(q):
    for i in range(3):
        msg = 'URL %s' % i
        time.sleep(1)
        print('生產者生產了 %s' % msg)

        q.put(msg)
    q.join()

def consumer(q):
    while True:
         msg = q.get()
         if msg is None : break
         time.sleep(2)
         print('消費者消耗了 %s' % msg)
         q.task_done()

if __name__ == '__main__':
    q = JoinableQueue()

    p1 = Process(target=producer, args=(q, ))
    p2 = Process(target=producer, args=(q, ))
    p3 = Process(target=producer, args=(q, ))
    c1 = Process(target=consumer, args=(q, ), daemon=True)
    c2 = Process(target=consumer, args=(q, ), daemon=True)

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    print('主線程')

三、多線程

線程和進程就是上下級的關系,相互依賴,許許多多的線程共同組成了進程,而一個進程至少包含一個線程,在這里將會談到在節省開支的條件下,達到使用資源的最大化。進程只是將資源集合到一起,而線程才是CPU上的執行單位。

多線程,即多個控制線程,需要注意的是,多個線程是共享進程的地址空間的。

1. 開啟線程

開啟線程有兩種方式,分別是函數式和OOP式

第一種

使用threading模塊開啟

from threading import Thread
import time

def MyThread(name):
    print('%s 正在執行。。。。' % name)
    time.sleep(2)
    print('%s 執行完畢。。。。' % name)

if __name__ == '__main__':
    t1 = Thread(target=MyThread, args=('chancey', ))
    t1.start()
    print("主線程")

第二種

通過繼承Thread類並重寫run方法開啟

from threading import Thread
import time

class MyThread(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print('%s 正在執行。。。。' % self.name)
        time.sleep(2)
        print('%s 執行完畢。。。。。' % self.name)

if __name__ == '__main__':
    t = MyThread('chancey')
    t.start()

    print("主線程")

2. 進程與線程對比

在選用並發模型上必須對症下葯,切記亂投醫,不僅會造成資源上的浪費,還會影響程序的執行效率

2.1 開銷

在主進程下開啟線程

from threading import Thread
import time

'''
在主進程下開啟線程,這里的主進程就是pycharm
'''

def run(name):
    print('%s 正在執行。。。。' % name)
    time.sleep(2)
    print('%s 執行完畢。。。。' % name)

if __name__ == '__main__':
    start = time.time()
    t1 = Thread(target=run, args=('chancey', ))
    t1.start()
    t1.join()
    print('主線程')
    end = time.time()
    print(end - start)

在主進程下開啟子進程

from multiprocessing import Process
from threading import Thread
import time

def run(name):
    print('%s 正在執行。。。。' % name)
    time.sleep(2)
    print('%s 執行完畢。。。。' % name)

if __name__ == '__main__':
    start = time.time()
    p1 = Process(target=run, args=('chancey', ))
    p1.start()
    p1.join()
    print("主進程")
    end = time.time()
    print(end - start)

可以很清楚的看到,在啟動線程的時候耗時2.0009秒,而啟動進程耗時2.1451秒,這說明線程啟動的速度非常快。這是因為,在開啟進程的時候,p.start()會向操作系統發送一個信號,然后操作系統要申請內存空間以讓父進程的地址空間拷貝到子進程,開銷遠遠大於線程。

2.2 PID

在前邊介紹進程並發的時候,發現每一個進程的PID都不相同,再看下多線程里面(忘記前邊內容的朋友可以再去前邊跑一遍代碼)

from threading import Thread
import time
import os

def run(name):
    print('%s 正在執行。。。。' % name, os.getpid())
    time.sleep(2)
    print('%s 執行完畢。。。。' % name)

if __name__ == '__main__':
    t1 = Thread(target=run, args=('chancey', ))
    t2 = Thread(target=run, args=('waller', ))

    t1.start()
    t2.start()

    print('主進程', os.getpid())

這里很明顯就能看到,所有的線程的PID都和主進程的PID一樣。

2.3 地址空間

前邊多進程中講過,父進程和子進程之間的地址空間是相互隔離的,父進程和子進程並沒有共享內存空間

from multiprocessing import Process

p = 100
def run():
    global p
    p = 0

if __name__ == '__main__':
    p1 = Process(target=run,)
    p2 = Process(target=run,)

    p1.start()
    p2.start()

    print("主進程", p)

這里主進程輸出的是100,說明進程之間沒有共享內存空間

from threading import Thread

p = 100
def run():
    global p
    p = 0

if __name__ == '__main__':
    p1 = Thread(target=run,)
    p2 = Thread(target=run,)

    p1.start()
    p2.start()

    print("主進程", p)

將其換成線程后輸出的是0,這就說明同一進程下的所有線程之間是共享該進程的數據

這里稍作總結:

  • 啟動線程的速度要比啟動進程的速度快很多,啟動進程的開銷更大
  • 在主進程下面開啟的多個線程,每個線程都和主進程的pid(進程的id)一致
  • 在主進程下開啟多個子進程,每個進程都有不一樣的pid
  • 同一進程內的多個線程共享該進程的地址空間
  • 父進程與子進程不共享地址空間,表明進程之間的地址空間是隔離的

3. Thread對象

貼上官方文檔

Threading模塊的方法:

  • active_count():返回當前存活的線程類對象
  • current_thread(): 返回當前對應調用者的控制線程的對象。如果調用者的控制線程不是利用 threading創建,會返回一個功能受限的虛擬線程對象
  • get_ident():返回當前線程的線程標識符
  • enumerate():返回所有線程存活對象,與前邊的active_count()返回一致
  • main_thread():返回主線程對象,一般情況下,主線程是Python解釋器創建的對象
  • 而在3.4版本以后還添加了settrace(func) setprofile(func)stack_size([size])功能分別為追蹤函數、性能測試函數、阻塞函數(一般情況下用不到,二般情況下再考慮)

構造函數的關鍵字:

class Threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

  • group:默認為None,為了日后擴展ThreadGroup類而保留的
  • target:要調用的方法,如果傳入方法名則為其開辟內存空間;相同地,傳入方法對象則立即執行
  • name:線程名稱,默認以Thread-N命名,N為當前線程數
  • args:參數
  • kwargs:參數的另一種傳入方式
  • daemon:守護模式

實例化對象的方法:

  • is_alive():返回線程是否活動的。一但線程活動開始,該線程就被認定是存活的,如果被t.run()結束或者拋出異常,都被認定為死亡線程;
  • getName():返回線程名,可以使用setName()來重新命名
  • ident():線程標識符,如果線程未開始則為None,非零整數。如果一個線程退出的同時另一個線程被創建,則標識符會被復用
  • 其他的參數與進程中參數目的相同:start()run()join(timeout=None)daemon()isDaemon以及setDaemon
from threading import Thread, currentThread, active_count, enumerate
import time

def run(name):
    print('%s 正在執行。。。。當前線程名:' % name, currentThread().getName())
    time.sleep(2)
    print('%s 執行完畢。。。。當前線程名:' % name, currentThread().getName())

if __name__ == '__main__':
    t = Thread(target=run, args=('Chancey', ), name="我是可愛的子線程")
    t.start()
    t.setName("我是酷酷的子線程")
    t.join()
    print('當前活躍的線程數:', active_count())
    print('子線程的名字:', t.getName())
    currentThread().setName("我是窮逼")
    print('成功修改子線程')
    print("查看線程是否存活:", t.is_alive())
    print('主線程名字:', currentThread().getName())

    t.join()
    print("再次join一下")
    print("活躍的線程數:", active_count())
    print("當前活躍的線程:", enumerate())

這里也就幾個參數,沒什么邏輯上比較燒腦的,就不作贅述了

4. 守護線程

前邊在進程並發上討論的守護進程,這里的守護線程也差不多。

這里切記,運行完畢不是終止運行:

  • 對於主進程來說,運行完畢指的是主進程代碼執行完畢
  • 對於主線程來說,運行完畢指的是線程所在的進程之內的所有非守護線程全部運行完畢,屆時才算主線程運行完畢

下邊就這兩個點展開討論

4.1 結論一

詳細解釋一下:對於進程,只要主進程運行完畢就稱之為執行完畢,而此刻的守護進程也會被回收,然后主進程就會等待非守護進程運行完畢后才回收所有子進程的資源,這樣下來就有效的避免了僵屍進程的產生。

from threading import Thread
import time


def run(name):
    print('%s 正在執行。。。。' % name)
    time.sleep(2)
    print('%s 執行完畢。。。。' % name)

if __name__ == '__main__':
    t = Thread(target=run, args=('chancey', ), daemon=True)
    t.start()

    print('主進程')
    print('線程是否存活:', t.is_alive())

這里可以看到,只打印了sleep之前的信息,這也正是驗證了當主線程結束的時候,守護線程也跟着結束,所以就出現了不完全執行的現象。

4.2 結論二

from threading import Thread
import time

def fun1():
    print('方法一開始運行')
    time.sleep(1)
    print('方法一結束運行')

def fun2():
    print('方法二開始運行')
    time.sleep(0.5)
    print('方法二結束運行')

if __name__ == '__main__':
    t1 = Thread(target=fun1, daemon=True)
    t2 = Thread(target=fun2, )

    t1.start()
    t2.start()

    print('主進程')

通過運行發現,定義的fun1也是不完全運行,因為在start之前設置了守護線程,當主線程結束的時候,該子線程隨之結束,而fun2因為沒有設置守護線程,所以會等待非守護線程運行完畢才回收。

由此得出結論(划重點):

只要是有其他守護線程還沒有運行完畢,守護線程就不會被回收,進程只有當非守護線程全部運行完畢才會結束

5. 互斥鎖

前邊有談進程的互斥鎖,實際上就是將並發的進程變成了串行,從而使的效率大打折扣,但是數據變得安全。

而對於線程來說,一個進程內的所有線程是共享地址空間的,所以在數據上依然亂掉。

from threading import Thread, Lock
import time

n = 100
def fun():
    global n
    tmp = n
    time.sleep(0.1)
    n = tmp - 1

if __name__ == '__main__':
    start = time.time()
    t_list = []
    for i in range(100):
        t = Thread(target=fun, )
        t_list.append(t)
        t.start()

    for t in t_list:
        t.join()

    print('主進程', n)
    end = time.time()
    print(end - start)

理論上這里開辟100個線程,分別減n,最終結果為0

這里變成了99,但是運行效率非常的快,現在為該方法加鎖再次運行

from threading import Thread, Lock
import time

n = 100
def fun():
    global n
    mutex.acquire()
    tmp = n
    time.sleep(0.1)
    n = tmp - 1
    mutex.release()

if __name__ == '__main__':
    mutex = Lock()

    start = time.time()
    t_list = []
    for i in range(100):
        t = Thread(target=fun, )
        t_list.append(t)
        t.start()

    for t in t_list:
        t.join()

    print('主進程', n)
    end = time.time()
    print(end - start)

可以看到,這里的數據並沒有錯亂,但是執行速度由原來的0.1秒變成了現在的10秒,在實際的開發中還需要對症下葯。

6. GIL

Python的線程並發有一個特性,就是使用單核,並且同一時刻只有一個線程在執行,這就無法充分的使用多核計算機的資源了。

6.1 介紹

在線程的並發的時候其實就是幾個線程來回折騰,給用戶的感覺像是同時進行,本質上是在一個線程進行的時候,Python就會將整個解釋器鎖掉,從而使得其他線程無法執行,這種機制就是cPython著名的GIL全局解釋器鎖

注意:這種機制在jPython中是沒有的,所以說,GIL並不是Python的特性。

而將並發變成串行的,有互斥鎖,同樣的,GIL也是一種互斥鎖,只不過GIL保護的是解釋器級別的數據,而普通的互斥鎖是保護應用程序的數據。

import os
import time

print(os.getcwd())
time.sleep(120)

分別在windows和linux運行該代碼並查看進程

可以看到,在一個Python進程內,不僅有demo文件的線程,還有Python解釋器級別的垃圾回收機制的線程在運行。但是所有線程都在同一個進程之內。

如果多個線程的target都是某一個函數,那么這多個線程首先訪問解釋器的代碼,即拿到執行權限,然后把target的代碼交予解釋器的代碼去執行。

在一個進程中,所有數據都是共享的,解釋器代碼也不例外,所以垃圾回收線程可以通過訪問解釋器代碼而執行,這就直接導致了一個紊亂數據的問題:對於同一個數據100,可能線程1執行x = 100的同時,垃圾回收線程回收100。數據直接亂掉。解決該問題只有加鎖處理。

6.2 GIL與Lock

很清楚,鎖的目的就是通過降低效率來保證數據的安全,使得在同一時間只能有一個線程修改。

這里需要區分GIL與Lock:

  • GIL保護解釋器級別的數據,而Lock保護應用程序

6.3 GIL與多線程

有了GIL的存在,同一時刻同一進程中只有一個線程被執行,進程可以利用多核,但是開銷大,而python的多線程開銷小,但卻無法利用多核優勢。

  • 對計算來說,cpu越多越好,但是對於I/O來說,再多的cpu也沒用
  • 當然對運行一個程序來說,隨着cpu的增多執行效率肯定會有所提高(不管提高幅度多大,總會有所提高),這是因為一個程序基本上不會是純計算或者純I/O,只能相對的去看一個程序到底是計算密集型還是I/O密集型,從而進一步分析python的多線程到底有無用武之地

現在的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提升,甚至不如串行(沒有大量切換),但是,對於IO密集型的任務效率還是有顯著提升的。

對於選取並發模型上,文章后邊會提到

6.3.1 死鎖

如果多個線程相互爭搶資源,任何線程都沒有拿到,屆時就會相互等待,從而造成堵塞,這種情況的現象就是死鎖,如果沒有外來的因素,則會一直阻塞下去

from threading import Thread, Lock
import time

mutex1 = Lock()
mutex2 = Lock()

class MyThread(Thread):
    def run(self, ):
        self.fun1()
        self.fun2()

    def fun1(self, ):
        mutex1.acquire()
        print("%s 拿到了1鎖" % self.name)

        mutex2.acquire()
        print('%s 拿到了2鎖' % self.name)
        mutex2.release()

        mutex1.release()
    
    def fun2(self, ):
        mutex2.acquire()
        print("%s 拿到了2鎖" % self.name)
        time.sleep(1)

        mutex1.acquire()
        print("%s 拿到了1鎖" % self.name)
        mutex1.release()

        mutex2.release()

if __name__ == "__main__":
    for i in range(10):
        t = MyThread()
        t.start()

解讀一下上邊的代碼:

由於線程開銷極小,所以啟動速度非常的快,thread-1拿到1鎖之后解鎖,此時thread-2還沒拿到1鎖,而在thread-1拿到2鎖的時候,thread-2拿到了2鎖,屆時,thread-1需要1鎖,而thread-2需要2鎖,所以在這里就相互等待。

要解決這種情況的出現,就需要一把能夠連續acqurie多次,這種鎖就是遞歸鎖

Rlock內部維護了一個Lock和一個count變量,count記錄了acquire的次數,從而使得資源可以被多次acquire。直到一個線程的所有acquire都被release之后才允許其他的線程獲得資源。更改上邊的代碼,將lock更換為Rlock

from threading import Thread, RLock
import time

mutex1 = mutex2 = RLock()

class MyThread(Thread):
    def run(self, ):
        self.fun1()
        self.fun2()

    def fun1(self, ):
        mutex1.acquire()
        print("%s 拿到了1鎖" % self.name)

        mutex2.acquire()
        print('%s 拿到了2鎖' % self.name)
        mutex2.release()

        mutex1.release()
    
    def fun2(self, ):
        mutex2.acquire()
        print("%s 拿到了2鎖" % self.name)
        time.sleep(1)

        mutex1.acquire()
        print("%s 拿到了1鎖" % self.name)
        mutex1.release()

        mutex2.release()

if __name__ == "__main__":
    for i in range(10):
        t = MyThread()
        t.start()
# 執行結果
Thread-1 拿到了1鎖
Thread-1 拿到了2鎖
Thread-1 拿到了2鎖
Thread-1 拿到了1鎖
Thread-2 拿到了1鎖
Thread-2 拿到了2鎖
Thread-3 拿到了1鎖
Thread-3 拿到了2鎖
Thread-3 拿到了2鎖
Thread-3 拿到了1鎖
Thread-5 拿到了1鎖
Thread-5 拿到了2鎖
Thread-5 拿到了2鎖
Thread-5 拿到了1鎖
Thread-7 拿到了1鎖
Thread-7 拿到了2鎖
Thread-7 拿到了2鎖
Thread-7 拿到了1鎖
Thread-9 拿到了1鎖
Thread-9 拿到了2鎖
Thread-9 拿到了2鎖
Thread-9 拿到了1鎖
Thread-2 拿到了2鎖
Thread-2 拿到了1鎖
Thread-4 拿到了1鎖
Thread-4 拿到了2鎖
Thread-4 拿到了2鎖
Thread-4 拿到了1鎖
Thread-8 拿到了1鎖
Thread-8 拿到了2鎖
Thread-8 拿到了2鎖
Thread-8 拿到了1鎖
Thread-6 拿到了1鎖
Thread-6 拿到了2鎖
Thread-6 拿到了2鎖
Thread-6 拿到了1鎖
Thread-10 拿到了1鎖
Thread-10 拿到了2鎖
Thread-10 拿到了2鎖
Thread-10 拿到了1鎖
PS D:\code\並發>

可以看到這里並沒有發生永久性的阻塞,這就是遞歸鎖的使用

6.3.2 信號量

如果一把鎖將程序的執行效率變得非常慢,就可以在這里設置同一把鎖讓多個線程同時拿去執行任務,這個參數就是信號量。而指定的大小就是同時拿鎖的線程數量。

這是計算機科學史上最古老的同步原語之一,計數器的值一定是大於零,它會因acquire()的調用而遞減1,當acquire()發現值為0時就阻塞,直到其他線程調用release()

創建

class Threading.Semaphore(value=1): 可選參數 value 賦予內部計數器初始值,默認值為 1 。如果 value 被賦予小於0的值,將會引發 ValueError異常。

對象屬性

acquire(blocking=True, timeout=None):獲取一個信號量,blocking為false時不會阻塞,timeout為阻塞延時

release():釋放信號量

from threading import Thread, Semaphore, currentThread
import time
import random

sem = Semaphore(3) # 設置信號量大小為3

def fun():
    sem.acquire()
    print('%s 執行' % currentThread().getName())
    sem.release()
    time.sleep(random.randint(1, 2))

if __name__ == "__main__":
    for i in range(5):
        t = Thread(target=fun, )
        t.start()

執行過程中可以發現,線程1,2,3同時執行,之后才加入4,5

6.3.3 Event

一個線程發出事件信號,而其他線程等待該信號,這也是線程之間最簡單的通信方式之一。

一個事件對象管理一個內部標志,調用set()可以將其設置為True,而設置為False則使用clear,調用wait()方法將會進入阻塞,直到標志為True。

關鍵字

class threading.Event

對象屬性

is_set():當且僅當內部標志為True時返回True

set():將內部標志設置為True。這時所有等待該事件線程將會被喚醒,並且當標志為true的時候調用wait()不會阻塞

clear():將內部標志設置為False。這時調用wait()將會被阻塞,一直等待調用set()

wait(timeout=None):一直阻塞線程,直到內部變量為True。如果調用set()則立即返回。否則一直阻塞或者到達timeout時間。這里的timeout是一個浮點數。很明顯,wait()返回的值一直是None。

以連接數據庫為例:

現在管理一堆線程去連接數據庫,但是必須有一個線程先去嘗試連接,測試數據庫Server是否正常活動,這就用到了事件信號,即Event()來協調各個線程之間的工作。

from threading import Thread, Event, currentThread
import time

event = Event()

def connect():
    n = 0
    while not event.is_set():
        if n == 3:
            print('%s 連接超時。。。' % currentThread().getName)
            return 
        print('%s 嘗試連接 <%s>'% (currentThread().getName, n))
        event.wait(0.5)
        n += 1

    print('%s 已連接' %currentThread().getName)

def check():
    print('%s 可以正常連接了'% currentThread().getName)
    time.sleep(2)
    event.set()

if __name__ == "__main__":
    for i in range(3):
        t = Thread(target=connect)
        t.start()

    t = Thread(target=check)
    t.start()
<bound method Thread.getName of <Thread(Thread-1, started 5852)>> 嘗試連接 <0>
<bound method Thread.getName of <Thread(Thread-2, started 5196)>> 嘗試連接 <0>
<bound method Thread.getName of <Thread(Thread-3, started 3012)>> 嘗試連接 <0>
<bound method Thread.getName of <Thread(Thread-4, started 8744)>> 可以正常連接了
<bound method Thread.getName of <Thread(Thread-1, started 5852)>> 嘗試連接 <1>
<bound method Thread.getName of <Thread(Thread-2, started 5196)>> 嘗試連接 <1>
<bound method Thread.getName of <Thread(Thread-3, started 3012)>> 嘗試連接 <1>
<bound method Thread.getName of <Thread(Thread-2, started 5196)>> 嘗試連接 <2>
<bound method Thread.getName of <Thread(Thread-1, started 5852)>> 嘗試連接 <2>
<bound method Thread.getName of <Thread(Thread-3, started 3012)>> 嘗試連接 <2>
<bound method Thread.getName of <Thread(Thread-2, started 5196)>> 連接超時。。。
<bound method Thread.getName of <Thread(Thread-1, started 5852)>> 連接超時。。。
<bound method Thread.getName of <Thread(Thread-3, started 3012)>> 連接超時。。。
6.3.4 定時器

顧名思義,就是在等待N秒時候執行某操作

對象創建

class Threading.Timer(interval, function, args=None, kwargs=None):創建一個定時器,在經過interval秒之后,就是用args和kwargs參數調用function

對象屬性

cancel():停止計時器,並取消當前執行的操作。只有計時器處於等待狀態下才有效

from threading import Timer

def demo(name):
    print('%s 說:hello' % name)

t = Timer(1, demo, args=('chancey', ))
t.start()

非常的簡單,就是等待某一段時間

from threading import Timer
import random

class Code:
    def __init__(self):
        self.make_cache()

    def make_cache(self, interval=10):
        self.cache = self.make_code()
        print('\n', self.cache)
        self.t = Timer(interval, self.make_cache)
        self.t.start()

    def make_code(self, n=4):
        res = ''
        for i in range(n):
            s1 = str(random.randint(0, 9))  # 隨機取出ASCII表里面數字,並轉為字符,方便后面拼接
            s2 = chr(random.randint(65, 90))  # 隨機取出ASCII表中大小寫字母
            res += random.choice([s1, s2])
        return res

    def check(self):
        while True:
            code = input('請輸入你的驗證碼>>: ').strip()
            if code.upper() == self.cache:
                print('驗證碼輸入正確')
                self.t.cancel()
                break

obj = Code()
obj.check()

可以看到這里在等待10秒之后刷新驗證碼

6.3.5 柵欄對象

在Python 3.2 以上版本中還添加了柵欄對象。略作了解即可,在實際項目中並不常用,反正我做了兩年爬蟲一次都沒用過。

當固定數量的線程需要彼此相互等待時就需要用到柵欄類。線程調用wait()方法后將會阻塞,一直阻塞到所有的線程都調用wait()方法,屆時所有的線程都將被釋放。

創建對象

class threading.Barrier(parties, action=None, timeout=None)

  • parties:線程的數量,值為幾就有幾個該線程的柵欄對象
  • action:可調用對象,它會在所有的線程被釋放的時候在其中的一個線程中自動調用
  • timeout:超時時間

對象屬性

  • wait(timeout=None):沖出柵欄。當所有的線程都被調用了wait()方法就會被統一釋放,這里的timeout參數優先於創建對象的timeout參數
  • reset():重置柵欄為默認的初始狀態。如果柵欄中仍有等待釋放的線程,將會引發異常
  • abort():損壞柵欄。如果正好有需要調用wait()方法的線程,則會引發BrokenPipeError異常,如果需要終止某個線程,可以調用該方法來避免死鎖。不過最好在創建柵欄的時候指定超時時間

實例:模擬開門,假設只有當人數達到3人的時候開門

from threading import Thread, Barrier

def open():
    print('人數夠了,開門')

barrier = Barrier(parties=3, action=open)

class Game(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.n = 3

    def run(self):
        while self.n > 0:
            self.n -= 1
            print('%s 正在等待開門' % self.name)
            try:
                barrier.wait(timeout=2)
            except BrokenPipeError:
                pass
            print('已開門')

if __name__ == '__main__':
    names = ['Chancey', 'Waller', 'Mary']
    for i in range(3):
        t = Game(name=names[i])
        t.start()
C:\Users\chancey\AppData\Local\Programs\Python\Python36\python.exe D:/code/並發/線程/event介紹/柵欄對象.py
Chancey 正在等待開門
Waller 正在等待開門
Mary 正在等待開門
人數夠了,開門
已開門
已開門
Chancey 正在等待開門
已開門
Mary 正在等待開門
Waller 正在等待開門
人數夠了,開門
已開門
已開門
Chancey 正在等待開門
已開門
Mary 正在等待開門
Waller 正在等待開門
人數夠了,開門
已開門
已開門
已開門

Process finished with exit code 0

6.4 隊列

在線程中隊列的方法有三種

  • Queue
  • LifoQueue
  • PriorityQueue

上述三種方法里面,Queue的方法在進程並發中已經詳細做了介紹,這里就不贅述了,而后邊的LifoQueuePriorityQueue的對象屬性和Queue是一樣的,他們之間都是通用的,像什么qsize()empty()put()get()都是通用的

6.4.1 Queue

在線程中的Queue的用法和進程中的使用方法一樣,而在線程中正是人們口中經常說的先進先出

其實很好理解,就是先進去隊列的對象先行出來,引用方法名就是先put()進去的先get()出來。

import queue

q = queue.Queue(3)

q_list = []
for i in range(3):
    q.put(i)
    q_list.append(q)

for q in q_list:
    print(q.get())

6.4.2 LifoQueue

Queue正好相反,它是先進后出,這也就是著名的堆棧

還是上邊代碼稍作改動

import queue

q = queue.LifoQueue(3)

q_list = []
for i in range(3):
    q.put(i)
    q_list.append(q)

for q in q_list:
    print(q.get())

6.4.3 PriorityQueue

指定優先級,put()方法使用一樣,指定優先級即可

import queue

q = queue.PriorityQueue(3)

q1 = q.put((2, 'Chancey'))
q2 = q.put((3, 'Waller'))
q3 = q.put((1, 'Mary'))

print(q.get())
print(q.get())
print(q.get())

可以看到mary是最后入隊列的,但是其優先級高於所有,所以先行出隊列。

判斷優先級是看值的大小,值越小優先級就越高咯,灰常滴簡單

四、並發池

對於線程池和進程池的構造和使用,在Python中也處於一種比較高階的技術。這里會着重講解並發池的使用以及注意事項

1. 進程池

如果一個項目里面只需要開啟幾個或者幾十個進程,就可用Process手動創建或者for循環創建,但是如果進程量很高呢,這就用到了進程池,它可以減少進程創建和釋放的開銷,極大的降低了計算機資源的浪費。

舉個例子:早上洗臉,如果水一滴滴的落在臉上洗臉,是不是很慢?而找個盆子把水裝起來洗臉就會方便很多,進程池也一樣,就是將諸多的進程裝進容器,等待調用。

進程池,顧名思義,一種特殊的容器,用來存儲進程,而在進程數量的選擇上,並不是越多越好,應該綜合計算機軟硬件的條件來設置。

特點:非常清楚,一個進程創建和銷毀是需要大量的時間和計算機資源,如果有十萬個進程在這里重復的開辟內存空間、釋放內存,那這個計算機估計瀕臨滅亡(我指的是普通的家用計算機,高性能服務器自行測試),而有了進程池,假設進程池限制3個進程,那么在運行的時候,只創建3個進程,然后循環利用,最后統一回收。所以說,進程池可以極大的減少計算機資源的浪費。

對象創建

multiprocessing.Pool(processes)

  • processes:允許入池的最大進程數

對象屬性

  • apply():傳遞不定參數,主進程會被阻塞直到函數執行結束,在Python以后已經沒有了該方法
  • apply_async():與上述apply()一樣,但是非阻塞,且支持結果返回進行回調
  • map(func, iterable[, chunksize=None]):與內置的map函數用法行為基本一致,它會使進程阻塞直到返回結果。注意,雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒后,程序才會運行子進程
  • close():關閉進程池
  • terminate():結束進程池,未處理的任務也不會再處理
  • join():主進程阻塞等待子進程退出,該方法必須在close()或者terminate()之后
import multiprocessing
import time
import os
import random


def hello(name):
    start = time.time()
    print('%s 開始執行,進程號:%s' % (name, os.getpid()))
    time.sleep(random.random()*2)
    end = time.time()
    print('%s 結束執行,進程號:%s,耗時%0.2fS' % (name, os.getpid(), end-start))

if __name__ == '__main__':
    p = multiprocessing.Pool()
    for i in range(4):
        p.apply_async(hello, (i, ))
    print('*'*30, '程序開始', '*'*30)
    p.close()
    p.join()
    print('*'*30, '程序結束', '*'*30)

可以清楚的看到,同時進行的只有3個進程,而且在進程池中的某一個進程處理完任務后不會回收,而是新入池一個任務繼續進行,知道所有的任務執行完畢,進程才統一回收。

2. 線程池

線程池的創建有三種方式

  • threadpool
  • concurrent.futures該方法也可用來創建進程池,后邊會做介紹
  • 重寫threadpool或者concurrent.futures.ThreadPoolExecutor

這里只介紹第一種,后邊會詳細介紹concurrent.futures來創建

在python 2.7 以上包括Python 3.x ,支持第三方庫threadpool

注意:該庫現已停止官方支持,僅作為舊項目的支持

網上對於threadpool的介紹少之又少,作為從來規矩上網的我,官方文檔怎么也看不明白,所以就剖其源碼研究了一番

源碼介紹:

"""Easy to use object-oriented thread pool framework.

A thread pool is an object that maintains a pool of worker threads to perform
time consuming operations in parallel. It assigns jobs to the threads
by putting them in a work request queue, where they are picked up by the
next available thread. This then performs the requested operation in the
background and puts the results in another queue.

The thread pool object can then collect the results from all threads from
this queue as soon as they become available or after all threads have
finished their work. It's also possible, to define callbacks to handle
each result as it comes in.

The basic concept and some code was taken from the book "Python in a Nutshell,
2nd edition" by Alex Martelli, O'Reilly 2006, ISBN 0-596-10046-9, from section
14.5 "Threaded Program Architecture". I wrapped the main program logic in the
ThreadPool class, added the WorkRequest class and the callback system and
tweaked the code here and there. Kudos also to Florent Aide for the exception
handling mechanism.

Basic usage::

    >>> pool = ThreadPool(poolsize)
    >>> requests = makeRequests(some_callable, list_of_args, callback)
    >>> [pool.putRequest(req) for req in requests]
    >>> pool.wait()

See the end of the module code for a brief, annotated usage example.

Website : http://chrisarndt.de/projects/threadpool/

"""

大致意思就是介紹了該庫的運行原理,和基本用法,就是Basic usage的內容

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

分行解釋代碼:

  • 定義線程池,參數為最大的線程數

  • 調用makeRequests()創建了要開啟多線程的函數或者方法,后邊的list_of_args為該函數的參數,默認為None,callback為回調函數。也就是說,只需要兩個參數即可開啟

  • 將線程扔進線程池。等同於

    for item in requests:
        pool.putRequest(item)
    
  • 等待所有的線程完成任務后退出

import threadpool
import time

def hello(name):
    print('%s 說 hello' % name)
    time.sleep(1)
    print('%s 說 bye' % name)

if __name__ == '__main__':
    names = ['Chancey', 'Wanger', 'Mary', 'Alex', 'Guido']

    start = time.time()
    pool = threadpool.ThreadPool(3)
    requests = threadpool.makeRequests(hello, names)
    [pool.putRequest(req) for req in requests]
    pool.wait()
    print('總共耗時:%0.2f' % (time.time() - start))

如果開啟多線程的函數有比較多的參數的話,函數調用時第一個解包list,然后解包dict。這樣的話就兩種方法傳參,一種是列表,一種字典。

  • 列表傳參
# 多個參數
import threadpool
import time

def counts(a, b, c):
    print('%d+%d+%d=%d' % (a, b, c, a+b+c))
    time.sleep(2)

if __name__ == '__main__':

    # 構造參數
    number_one = [1, 2 ,3]
    number_two = [4, 5, 6]
    number_three = [7, 8, 9]
    params = [(number_one, None), (number_two, None), (number_three, None)]

    # 創建線程池
    start = time.time()
    pool = threadpool.ThreadPool(2)
    requests = threadpool.makeRequests(counts, params)
    [pool.putRequest(req) for req in requests]
    pool.wait()
    print('總共耗時%0.2f' % (time.time() - start))

  • 字典傳參
# 多個參數
import threadpool
import time

def counts(a, b, c):
    print('%d+%d+%d=%d' % (a, b, c, a+b+c))
    time.sleep(2)

if __name__ == '__main__':

    # 構造參數
    number_one = {'a':1, 'b':2, 'c':3}
    number_two = {'a':4, 'b':5, 'c':6}
    number_three = {'a':7, 'b':8, 'c':9}
    params = [(None, number_one), (None, number_two), (None, number_three)]

    # 創建線程池
    start = time.time()
    pool = threadpool.ThreadPool(2)
    requests = threadpool.makeRequests(counts, params)
    [pool.putRequest(req) for req in requests]
    pool.wait()
    print('總共耗時%0.2f' % (time.time() - start))

依舊可以完美運行

不過threadpool並不建議在新項目中使用,官方是這樣聲明的:

This module is OBSOLETE and is only provided on PyPI to support old projects that still use it. Please DO NOT USE IT FOR NEW PROJECTS!

該模塊已經過時,但仍在PyPi中提供,以支持仍然使用它的舊項目。請勿用於新項目!

3. 並發池

這里命名並發池是我自己想的,因為concurrent支持多進程和多線程。這里也將是本文的亮點。

該模塊是Python3自帶包,而Python2.7以上也可安裝使用。concurrent包下只有一個模塊futures,模塊下最常用的就是Executor類,它下邊有兩個子類,分別是ThreadPoolExecutorProcessPoolExecutor,顧名思義,分別支持多線程和多進程。

3.1 ThreadPoolExecutor

該類專為多線程提供支持

創建對象

class concurrent.futures.ThreadPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

  • max_workes:指定線程數。在Python 3.5 以上的版本中,為None或者沒有指定的時候開啟和計算機CPU相同數量的線程,並且在Windows上必須小於61,附上源碼

    if max_workers is None:
        # Use this number because ThreadPoolExecutor is often
        # used to overlap I/O instead of CPU work.
        max_workers = (os.cpu_count() or 1) * 5
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")
    
  • map_context:允許用戶控制由進程池創建給工作者進程的開始方法

  • initializerinitargs:在每個工作者線程開始處調用的一個可選可調用對象。 initargs 是傳遞給初始化器的元組參數。任何向池提交更多工作的嘗試, initializer 都將引發一個異常,當前所有等待的工作都會引發一個 BrokenThreadPool。該功能在python 3.8 版本以上提供

對象屬性

抽象類提供異步執行調用方法。要通過它的子類調用,而不是直接調用。

注意:下邊介紹的對象方法是通用的

  • submit(fn, *args, **kwargs):異步提交,傳參的方式依舊是元組
  • map(func, *iterables, timeout=None, chunksize=1)):類似於 map(),也就是將submit() for

循環了

  • shutdown(wait=True):類似於進程池中的pool.close()pool.join()的結合體。當wait為True時等待池子內所有任務完畢后釋放,反之亦然,默認為True。

注意:不論wait為何值,整個程序都會等到所有任務執行完畢

  • result(timeout=None):獲取結果

  • add_done_callback(fn):回調函數

from concurrent.futures import ThreadPoolExecutor
import os
import time

def say(name):
    print('%s 說 hello,我的PID:%s' % (name, os.getpid()))
    time.sleep(2)
    print('%s 說 bye,我的PID:%s' % (name, os.getpid()))

if __name__ == '__main__':
    pool = ThreadPoolExecutor(2)
    names = ['Chancey', 'Wanger', 'SuXin', 'Mark', 'Alex']

    start = time.time()
    for i in names:
        pool.submit(say, i)

    pool.shutdown(wait=True)

    print('耗時:%0.2f' % (time.time() - start))

3.2 ProcessPoolExecutor

用法相同,不再贅述。只不過換成ProcessPoolExecutor,里面所有創建對象和對象方法都是一樣的。

3.3 提交任務

任務提交有兩種方式:

  • 同步調用:提交任務后等待任務執行完畢,拿到結果后在執行下一步,這樣下來的話,程序就變成了串行
  • 異步調用:提交任務后不用等待

前邊在介紹ThreadPoolExecutor或者ProcessPoolExecutor時提到了add_done_callback(fn),這個就是回調機制。異步調用和回調機制都是提交任務的方式。

以爬蟲的方式寫一下提交任務的方式

說明:這里構造兩個函數,一個負責構造URL,一個負責爬取數據

先看一下同步提交的方式:

from concurrent.futures import ThreadPoolExecutor
import time

def get_url(keyword):
    url = 'https://www.suxin.site/%s' % keyword
    time.sleep(1)
    print('%s URL構造成功' % keyword)
    return url

def get_html(url):
    html = '<html>%s</html>' % url
    time.sleep(2)
    print('%s HTML獲取成功' % url)
    return html

if __name__ == '__main__':
    pool = ThreadPoolExecutor(2)
    keyword_list = ['Chancey', 'Wanger', 'SuXin', 'Mark', 'Alex']

    start = time.time()
    for keyword in keyword_list:
        msg = pool.submit(get_url, keyword).result()
        get_html(msg)

    pool.shutdown(wait=True)
    print('耗時:%0.2f' % (time.time() - start))

再看下異步提交

from concurrent.futures import ThreadPoolExecutor
import time

def get_url(keyword):
    url = 'https://www.suxin.site/%s' % keyword
    time.sleep(1)
    print('%s URL構造成功' % keyword)
    return url

def get_html(url):
    url = url.result()
    html = '<html>%s</html>' % url
    time.sleep(2)
    print('%s HTML獲取成功' % url)
    return html

if __name__ == '__main__':
    pool = ThreadPoolExecutor(2)
    keyword_list = ['Chancey', 'Wanger', 'SuXin', 'Mark', 'Alex']

    start = time.time()
    for keyword in keyword_list:
        pool.submit(get_url, keyword).add_done_callback(get_html)

    pool.shutdown(wait=True)
    print('耗時:%0.2f' % (time.time() - start))

效率一目了然

4. 重寫

本來這里是打算將concurrent.fautres單獨拿出來說一說,但是發現本文已經夠長了,所以在下篇詳細討論如何重寫ThreadPoolExecutor

前邊的線程池中有講到可以通過重寫concurrent或者fautres來使用並發池,這里需要詳細的了解源碼以及運行原理,建議對current源碼有研究的朋友們可以琢磨一下,考慮到篇幅這里就不說了。

5. 自構並發池

網絡上對自構並發池大多千篇一律,然並卵,所以這里介紹一下,不過上邊的方法已經足夠了項目的使用,極特別的需求可能用到,有興趣可以看看,沒興趣的直接跳到協程去看。

5.1 構思

python里面的Queue類似於並發,可以說是低配版的並發

  • 在隊列中加入任務
    • 創建隊列
    • 設置大小
    • 真實創建的線程
  • 處理任務
    • 創建線程
      • 判斷空閑線程的數量,等於0的時候不再創建
      • 線程數不能超過線程池大小
      • 根據任務的數量判斷要創建線程的數量
    • 執行任務
      • 獲取任務,每取出一個就剔除那個
      • 判斷任務是否為空

5.2 實現

大致的思路就這些,接下來就是精彩的代碼,里面有詳細的注釋,不必慌

import threading
import time
import queue

stop = object() # 這個是用來標志任務停止

class ThreadPoolChancey(object):
    def __init__(self, max_thread=None):
        self.queue = queue.Queue() # 創建的隊列可以放無限制的任務
        self.max_thread = max_thread # 指定的最大線程數,默認為None
        self.terminal = False # 停止標志
        self.create_thread_list = [] # 真實創建的線程數,這里以列表的方式存儲,方便判斷線程數量
        self.free_thread_list = [] # 空閑線程數

    def run(self, function, args, callback=None):
        '''
        :param function : 執行函數
        :param args : 要執行的函數的參數,定義為元組傳參
        :param callback : 回調函數,T or F 的返回值
        :return :
        '''

        # 判斷是否創建真實線程
        if len(self.free_thread_list) == 0 and len(self.create_thread_list) < self.max_thread: # 如果空閑線程為0並且創建的真實線程沒有達到最大限度就創建
            self.create_thread()
        task = (function, args, callback)
        self.queue.put(task)

    def callback(self):
        '''回調函數:用以循環獲取任務並執行'''

        current_thread = threading.current_thread() # 獲取當前線程
        self.create_thread_list.append(current_thread) # 添加到線程列表里面
        event = self.queue.get() # 獲取一個任務並執行

        while event != stop: # 用以判斷是否終止任務
            function, args, callback = event # 解開任務包,該包包含了執行函數、參數、回調函數
            try: # 執行函數運行的結果,該判斷執行成功,故狀態為True
                message = function(*args)
                state = True
            except Exception as err: # 執行異常,狀態為False
                message = err
                state = False
            if callback is not None: # 不為空則表示執行完畢
                try:
                    callback(state, message) # 執行回調函數
                except Exception as err:
                    print(err) # 拋出異常
            else:
                pass

            if not self.terminal:
                self.free_thread_list.append(current_thread) # 有終止任務的時候就添加一個新任務
                event = self.queue.get()
                self.free_thread_list.remove(current_thread) # 這里添加了任務,線程有一個占用,剔除空閑
            else:
                event = stop # 停止put

        else:
            self.create_thread_list.remove(current_thread) # 剔除執行完畢的任務

    def create_thread(self):
        '''創建線程'''
        t = threading.Thread(target=self.callback, )
        t.start()

    def terminal(self):
        '''終止任務,無論隊列是否還有任務'''
        self.terminal = True

    def close(self) :
        '''關閉線程池'''
        num = len(self.create_thread_list) # 將真實的線程全部添加進線程池
        self.queue.empty()
        while num:
            self.queue.put(stop)
            num -= 1
            

將其放置在同級目錄下並作為第三方模塊導入試用一下:

這里模擬連接數據庫示例

from DiyThreadPool import ThreadPoolChancey

import time
import random

def connect(name):
    db = random.randint(10, 20)
    time.sleep(1)
    print('%s 連接到了數據庫%s' % (name, db))
    return db


pool = ThreadPoolChancey(2)
name_list = ['Chancey', 'Wanger', 'SuXin', 'Mark', 'Alex']

for name in name_list:
    pool.run(function=connect, args=(name, ))

# pool.terminal()
pool.close()

五、協程

協程,又叫微線程或者纖程。它是比線程更為細小的線程,微線程的名字由此得來。只支持python 3.4以上的版本,不過建議使用python 3.6版本,因為我的代碼都是跑在3.6上的,出錯找都找不見報錯原因

優點:

  • 使用高並發、高擴展、低性能的;一個CPU支持上萬的協程都不是問題。所以很適合用於高並發處理
  • 無需上下文的切換開銷

缺點:

  • 無法利用計算機多核優勢

一般情況下,實現協程並發有三種方式

  • yield(簡單協程)

  • asyncio(Python自帶)

  • greenlet(第三方庫)

  • gevent(第三方庫)

偉大的Scrapy框架就是基於asycio做了異步IO框架,而下載器是多線程的,所以以后千萬不要說scrapy是多線程框架,雖然感覺沒什么毛病,但總有刁難的人死鑽牛角尖。

這里會介紹兩種方式並行執行,不過我個人更喜歡使用gevent第三方庫,使用更加方便,理解也比較容易

1. yield

學過Python基礎的朋友們都知道,函數的返回值有兩種方式,一種是最常用的return,還有一種是yeild,雖然它是起到掛起的作用,但是依舊能返回值。

基本思路就是創建生成器然后獲取生成器並執行

import time

def func1():
    while True:
        print('正在執行 func1')
        yield
        time.sleep(1)

def func2():
    while True:
        print('正在執行 func2')
        yield
        time.sleep(1)

if __name__ == '__main__':
    f1 = func1()
    f2 = func2()

    while True:
        next(f1)
        next(f2)

這就是最為簡單的協程的實現,異步IO的實現

在不開啟線程的基礎上,實現多個任務,協程是一個特殊的生成器

實現過程:

  • func1 生成器
  • func2 生成器
  • 獲取生成器
  • 運行生成器

2.asyncio

在實際的開發中,為了實現更高的並發有很多的方案,比如多進程、多線程。但是無論是多進程還是多線程,IO的調度更多的取決於操作系統,而協程的方式,其調度確是來自於用戶,用戶在函數中yield一個狀態。使用協程可以實現高效的並發任務。

最簡單的示例

import asyncio
import time

async def say(name):
    print('%s 開始執行' % name)
    time.sleep(2)
    print('%s 執行完畢' % name)

loop = asyncio.get_event_loop()
loop.run_until_complete(say('chancey'))

接下來詳細介紹一下它的使用

基本流程

  • 通過關鍵字async定義一個協程對象
  • 協程不能直接運行,所以要丟進事件循環loop,由loop在適當的時候調用
  • asycio.get_event_loop創建一個事件循環
  • run_until_complete注冊協程到事件循環並啟動

2.1 創建任務

協程對象在注冊到循環事件的時候,也就是在調用run_until_complete之后將協程對象打包成一個任務對象。所謂的任務對象其本質就是一個Future類的子類。它會保存運行后的狀態,用於獲取該協程執行的結果。

介紹一下常用的方法:

  • event_loop:事件循環。開啟一個事件循環,只需要將函數注冊到事件循環,在條件滿足的時候調用
  • coroutione:協程對象,使用關鍵字async聲明的函數不會立即執行,而是返回一個協程對象。協程對象就是原生可以掛起的函數
  • task:任務對象。將協程對象進一步封裝,就變成了任務,它包含各種任務的狀態
  • future:任務結果。不管是將來執行還是沒有執行的任務,它都代表這個任務的結果。和task並沒有本質上的區別
  • async/await:關鍵字。前者用於定義一個協程,后者用於掛起阻塞的異步調用
import asyncio
import time

# 使用關鍵字修飾對象,則這個對象就變成了協程對象
async def say(name):
    print('%s 開始執行' % name)
    time.sleep(2)
    print('%s 執行完畢' % name)

now = lambda : time.time()

start = now()

# 創建協程對象
result = say('Chancey')

# 創建事件循環
loop = asyncio.get_event_loop()

# 創建任務對象,生成任務包
task = loop.create_task(result)
print(task)

# 注冊協程對象到事件循環,並執行
loop.run_until_complete(task)
print(task)

print('耗時:%0.2f' % (now() - start))

可以看到,在get_event_loop之后,在加入事件循環之前處於pending狀態,在run_until_complete之后,其狀態變成了finished

創建協程對象如果用gather的話,后邊await的返回值就是協程對象的執行結果,這里提一下,后邊詳細探討。

上邊的代碼task還可以通過asyncio.ensure_future(coroutine)來創建,run_until_complete參數就是future對象,在傳入協程之后封裝成task,而task是future的子類,可以使用inistance函數檢驗

2.2 獲取執行結果

獲取協程對象的執行結果有兩種方法,一種是通過回調獲取,一種是直接result。

2.2.1 綁定回調

在task執行完畢后可以獲取結果,回調的最后一個參數為future對象,可以通過這個對象來獲取協程的返回值,這也就是協程里面常說的綁定回調

import asyncio
import time

async def say(name):
    print('%s 開始執行' % name)
    time.sleep(2)
    print('%s 執行完畢' % name)
    return '%s 已執行完畢' % name

def callback(future):
    print('callback:', future.result())

now = lambda : time.time()

start = now()

result = say('chancey')
loop = asyncio.get_event_loop() # 事件循環
task = asyncio.ensure_future(result) # 打包任務
task.add_done_callback(callback) # 回調函數
loop.run_until_complete(task)

print('耗時:%0.2f' % (now() - start))

# 執行結果
chancey 開始執行
chancey 執行完畢
callback: chancey 已執行完畢
耗時:2.00

但是如果回調需要多個參數的話怎么辦?學過python基礎的都知道,偏函數正好能解決該類問題。將future作為固定參數,極大的減少了編程成本,也非常好的遵循了DRY原則。

假設上述代碼中的callback函數需要再傳入一個時間參數,就可以這么做

from functools import partial

import asyncio
import time

async def say(name):
    print('%s 開始執行' % name)
    time.sleep(2)
    print('%s 執行完畢' % name)
    return '%s 已執行完畢' % name

def callback(now, future):
    print('callback:%s, 當前時間:%s' % (future.result(), now))

now = lambda : time.time()

start = now()

result = say('chancey')
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(result)
task.add_done_callback(partial(callback, time.ctime()))
loop.run_until_complete(task)

print('耗時:%0.2f' % (now() - start))

2.2.2 直接獲取

task調用result方法即可

import asyncio
import time

async def say(name):
    print('%s start' % name)
    time.sleep(1)
    print('%s end' % name)
    return name

# 創建協程對象
coroutine = say('Chancey')
# 創建事件循環
loop = asyncio.get_event_loop()
# 創建任務對象
task = loop.create_task(coroutine)
# 注冊任務對象到事件循環
loop.run_until_complete(task)

print(task.result())

2.3 阻塞

當某個協程在執行開銷較大或者耗時的IO操作時,進入阻塞,屆時使用await即可將函數掛起,類似於函數中yeild的功能,只有這樣,同步的IO操作也就異步化了

import asyncio
import time

async def say(name):
    print('%s 開始執行' % name)
    await asyncio.sleep(2)
    print('%s 執行完畢' % name)

now = lambda : time.time()

start = now()
coroutine = say('Chancey')
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)

loop.run_until_complete(task)
print('耗時:%0.2f' % (now() - start))

單協程貌似也看不出來什么,下邊在探討並發協程的時候效果就明顯了

2.4 並發

同樣的,協程並發和並行也是有區別的,同文章開頭的介紹,接下來創建多個協程

import asyncio
import time

async def say(name, hour):
    print('%s 等待%d秒'% (name, hour))
    await asyncio.sleep(hour)

name_list = ['Chancey', 'Wanger', 'Mary', 'SuXin']
now = lambda : time.time()

start = now()
# 創建協程對象
coroutine_list = []
for i in range(1, 5):
    name = name_list[i - 1]
    hour = i
    coroutine_list.append(say(name=name, hour=hour))

# 創建事件循環
loop = asyncio.get_event_loop()

# 創建任務對象
task_list = []
for item in coroutine_list:
    task_list.append(loop.create_task(item))

# 注冊任務對象
for task in task_list:
    loop.run_until_complete(task)

print('耗時:%0.2f' % (now() - start))

如果單協程就應該是耗時1+2+3+4=10秒,這里做了異步化,所以在遇到阻塞的時候掛起去執行其他的任務,因而在阻塞4秒的時候足夠其他的協程執行,所以僅僅耗時4秒

2.5 嵌套

在一般的爬蟲中,涉及的IO操作諸多,從網絡請求到磁盤寫入數據,都是需要大量的時間成本,那么,如果封裝大量的IO操作過程,就會非常明顯的提高效率,這個方式就是協程嵌套,可以通過在一個協程中await其他協程來實現嵌套

以獲取執行結果為例:

2.5.1 第一種獲取方式
import asyncio
import time

async def wait(name, hour):
    print('%s 延時 %d秒' % (name, hour))
    await asyncio.sleep(hour)
    return '%s 執行完成' % name

async def run():
    name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx']

    # 封裝協程對象的列表
    coroutine_list = []
    for hour in range(1, 5):
        coroutine_list.append(wait(name=name_list[hour - 1], hour=hour))

    # 封裝任務對象列表
    task_list = []
    for coroutine in coroutine_list:
        task_list.append(asyncio.ensure_future(coroutine))
    
    # 獲取協程對象的執行結果,一下的代碼會有改動
    dones, pendings = await asyncio.wait(task_list) # 這里返回一個元組,dones是返回的執行結果
    for task in dones:
        print('執行結果:', task.result())

# 把run協程對象添加到事件循環中
if __name__ == '__main__':
    now = lambda : time.time()
    start = now()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    print('耗時:%0.2f' % (now() - start))

2.5.2 第二種獲取方式

前邊有提到使用gather創建協程對象,那么,await的返回值就是協程對象運行的結果,對上述代碼稍微改動

results = await asyncio.gather(*task_list)
for result in results:
    print('執行結果:', result)
2.5.3 第三種獲取方式

不僅如此,不在run函數里面處理結果,直接返回await的內容,那么最外層的run_until_complete將會返回run協程的結果,也就說,現在不在協程對象中獲取執行結果了,而是在事件循環中獲取

import asyncio
import time

async def wait(name, hour):
    print('%s 延時 %d秒' % (name, hour))
    await asyncio.sleep(hour)
    return '%s 執行完成' % name

async def run():
    name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx']
    # 封裝協程對象列表
    coroutine_list = []
    for hour in range(1, 5):
        coroutine_list.append(wait(name=name_list[hour-1], hour=hour))

    # 封裝任務對象列表
    task_list = []
    for coroutine in coroutine_list:
        task_list.append(asyncio.gather(coroutine))

    # asyncio.gather返回的是一個元組
    return await asyncio.gather(*task_list)

if __name__ == '__main__':
    now = lambda : time.time()

    start = now()

    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(run())

    for result in results:
        print('執行結果:', result[0]) # 上邊提醒的,返回對象是一個元組
2.5.4 第四種獲取方式

還可以使用asyncio.wait掛起協程

import asyncio
import time

async def wait(name, hour):
    print('%s 延時 %d秒' % (name, hour))
    await asyncio.sleep(hour)
    return '%s 執行完成' % name

async def run():
    name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx']
    # 封裝協程對象列表
    coroutine_list = []
    for hour in range(1, 5):
        coroutine_list.append(wait(name=name_list[hour-1], hour=hour))

    # 封裝任務對象列表
    task_list = []
    for coroutine in coroutine_list:
        task_list.append(asyncio.gather(coroutine))

    return await asyncio.wait(task_list)

if __name__ == '__main__':
    now = lambda : time.time()
    start = now()
    loop = asyncio.get_event_loop()

    # 依舊返回一個元組,分別接收
    results, pending = loop.run_until_complete(run())
    for result in results:
        print('執行結果:', result.result()[0])
2.5.5 第五種獲取方式

使用as_completed方法,該方法和線程池中的task的功能一樣

import asyncio
import time

async def wait(name, hour):
    print('%s 延時 %d秒' % (name, hour))
    await asyncio.sleep(hour)
    return '%s 執行完成' % name

async def run():
    name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx']
    # 封裝協程對象列表
    coroutine_list = []
    for hour in range(1, 5):
        coroutine_list.append(wait(name=name_list[hour-1], hour=hour))

    # 封裝任務對象列表
    task_list = []
    for coroutine in coroutine_list:
        task_list.append(asyncio.gather(coroutine))

    for task in asyncio.as_completed(task_list):
        result = await task
        print('執行結果:', result)

if __name__ == '__main__':
    now = lambda : time.time()

    start = now()

    loop = asyncio.get_event_loop()

    # 依舊返回一個元組
    loop.run_until_complete(run())
    print('耗時:%0.2f' % (now() - start))

由此可見,協程的調用和組合是非常的靈活。單單對於執行結果的獲取就有5種方法,所以說,對於協程並發的設計,還需要更多的經驗。

2.6 協程停止

future對象,也就是協程對象有4種狀態,前邊有提到Pending和Finish狀態

  • Pending:未執行
  • Running:正在執行
  • Done:執行完畢
  • Cancelled:停止

不難理解,停止協程就是將狀態修改為cancelled,這就用到了asyncio.Tasks以獲取事件循環的任務。

要停止事件循環,需要先取消task,然后停止協程,切記在停止之后還要開啟,不然會拋出異常

import asyncio
import time

async def wait(name, hour):
    print('%s 延時%0.2f秒' % (name, hour))
    await asyncio.sleep(hour)
    print('%s 執行完畢' % name)

if __name__ == '__main__':
    now = lambda : time.time()

    name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx']
    # 創建協程對象
    coroutine_list = []
    for i in range(1, 5):
        coroutine_list.append(wait(name=name_list[i - 1], hour=i), )

    # 創建任務對象
    task_list = []
    for coroutine in coroutine_list:
        task_list.append(asyncio.ensure_future(coroutine))

    start = now()
    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(asyncio.wait(task_list))
    except KeyboardInterrupt as err:
        # 獲取事件循環中所有的任務列表
        for task in asyncio.Task.all_tasks():
            print(task.cancel()) # 返回True代表任務已取消

        loop.stop()
        loop.run_forever()
    finally:
        loop.close()

    print("耗時:%2.0f" % (now() - start))

可以看到,這里的chancey協程對象執行完畢,所以在后邊取消的時候返回False


除了上邊的方法,還可將task列表封裝進run函數中,然后run函數對外調用事件循環。屆時,run相當於最外層的task,這時只需要處理包裝過的task也就是run函數即可

import asyncio
import time

async def work(name, hour):
    print('%s 延時%s秒' % (name, hour))
    await asyncio.sleep(hour)
    return '%s 執行完畢' % name

async def run():
    name_list = ['Chancey', 'Wanger', 'SuXin', 'Zxx']
    coroutine_list = []
    for hour in range(1, 5):
        hour = hour
        name = name_list[hour - 1]
        coroutine_list.append(work(name=name, hour=hour))

    task_list = []
    for coroutine in coroutine_list:
        task_list.append(asyncio.ensure_future(coroutine))

    done, pending = await asyncio.wait(task_list)
    for task in done:
        print('Task ret: ', task.result())

if __name__ == '__main__':
    now = lambda: time.time()
    start = now()

    loop = asyncio.get_event_loop()
    task = asyncio.ensure_future(run())

    try:
        loop.run_until_complete(task)
    except KeyboardInterrupt as e:
        print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()

3. greenlet

該模塊旨在提供可自行調度的微線程,在greenlet中,target.switch(value)可以切換到指定的協程,從一個協程切換到另一個協程需要顯式指定。

使用前請安裝pip install greenlet

步驟

  • 創建任務
  • 創建greenlet對象
  • 手動switch切換任務
from greenlet import greenlet
import time

def func1():
    while True:
        print('正在執行 func1')
        time.sleep(1)
        f2.switch()

def func2():
    while True:
        print('正在執行 func2')
        time.sleep(1)
        f1.switch()

if __name__ == '__main__':
    # 創建任務對象 greenlet(函數名)
    f1 = greenlet(func1)
    f2 = greenlet(func2)

    # 手動切換任務
    f1.switch() # 執行func1

因為greenlet對象本身就是協程,它已經有了yeild的特性。而在函數里面手動切換任務,即使用greenlet().switch()來實現,這時的運行依然沒有開啟線程。

這樣下來所有的調度全部交由greenlet實現,確實很方便,還有更方便的

4.gevent

前邊使用greenlet發現調度不需要手動實現了,但是要手動切換任務,那么,gevent彌補了之前的不足,它可以實現自動切換任務的功能。

依舊是第三方庫,需要安裝pip install gevent -i https://pypi.douban.com/simple

原理

當一個greenlet遇到IO阻塞的時候,就自動切換到其他的greenlet執行,等到IO操作完成的,在適當的時候切換回來繼續執行。由於IO操作非常耗時,經常使程序處於等待狀態,有了gevent自動切換任務,就保證了總有greenlet在運行。

步驟

  • 指派任務
import gevent
import time

def func1(name):
    while True:
        print('%s 正在執行 func1' % name)
        time.sleep(1)

def func2():
    while True:
        print('%s 正在執行 func2' % name)
        time.sleep(1)

if __name__ == '__main__':
    names = ['Chancey', 'Wanger', 'SuXin']
    # 指派任務
    task_list = []
    for name in names:
        task_list.append(gevent.spawn(func1, name))
        task_list.append(gevent.spawn(func2, name))

    for task in task_list:
        task.join()

奇怪,沒有切換任務????我自己也研究好長時間,后來在官方文檔中看到

原來這里的time.sleep()並不能被gevent識別,需要用自己的方法,gevent.sleep()來延時

import gevent
import time

def func1(name):
    while True:
        print('%s 正在執行 func1' % name)
        gevent.sleep(1)

def func2(name):
    while True:
        print('%s 正在執行 func2' % name)
        gevent.sleep(1)

if __name__ == '__main__':
    # 指派任務
    f1 = gevent.spawn(func1, 'Chancey')
    f2 = gevent.spawn(func2, 'Wanger')

    f1.join()
    f2.join()

這里就有個問題,項目中的代碼封裝好了不能改怎么辦?屆時就可以用打補丁的方式讓gevent能夠識別到time.sleep()阻塞。

打補丁

在不修改源代碼的前提下,增加新的功能,這就用到了monkey

步驟

  • from gevent import monkey
  • monkey.patch_all():破解


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM