Python並發編程二(多線程、協程、IO模型)


1.python並發編程之多線程(理論)

1.1線程概念

在傳統操作系統中,每個進程有一個地址空間,而且默認就有一個控制線程

線程顧名思義,就是一條流水線工作的過程(流水線的工作需要電源,電源就相當於cpu),而一條流水線必須屬於一個車間,一個車間的工作過程是一個進程,車間負責把資源整合到一起,是一個資源單位,而一個車間內至少有一條流水線。

所以,進程只是用來把資源集中到一起(進程只是一個資源單位,或者說資源集合),而線程才是cpu上的執行單位。

多線程(即多個控制線程)的概念是,在一個進程中存在多個線程,多個線程共享該進程的地址空間,相當於一個車間內有多條流水線,都共用一個車間的資源。例如,北京地鐵與上海地鐵是不同的進程,而北京地鐵里的13號線是一個線程,北京地鐵所有的線路共享北京地鐵所有的資源,比如所有的乘客可以被所有線路拉。

 

1.2線程與進程的區別

  1. Threads share the address space of the process that created it; processes have their own address space.
  2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
  3. Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
  4. New threads are easily created; new processes require duplication of the parent process.
  5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
  6. Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.

總結上述區別,無非兩個關鍵點,這也是我們在特定的場景下需要使用多線程的原因:

  1. 同一個進程內的多個線程共享該進程內的地址資源
  2. 創建線程的開銷要遠小於創建進程的開銷(創建一個進程,就是創建一個車間,涉及到申請空間,而且在該空間內建至少一條流水線,但創建線程,就只是在一個車間內造一條流水線,無需申請空間,所以創建開銷小)

 

1.3為何使用多線程

多線程指的是,在一個進程中開啟多個線程,簡單的講:如果多個任務共用一塊地址空間,那么必須在一個進程內開啟多個線程。詳細的講分為4點:

  1. 多線程共享一個進程的地址空間

      2. 線程比進程更輕量級,線程比進程更容易創建可撤銷,在許多操作系統中,創建一個線程比創建一個進程要快10-100倍,在有大量線程需要動態和快速修改時,這一特性很有用

      3. 若多個線程都是cpu密集型的,那么並不能獲得性能上的增強,但是如果存在大量的計算和大量的I/O處理,擁有多個線程允許這些活動彼此重疊運行,從而會加快程序執行的速度。

      4. 在多cpu系統中,為了最大限度的利用多核,可以開啟多個線程,比開進程開銷要小的多。(這一條並不適用於python)

 

1.4多線程應用舉例

開啟一個字處理軟件進程,該進程肯定需要辦不止一件事情,比如監聽鍵盤輸入,處理文字,定時自動將文字保存到硬盤,這三個任務操作的都是同一塊數據,因而不能用多進程。只能在一個進程里並發地開啟三個線程,如果是單線程,那就只能是,鍵盤輸入時,不能處理文字和自動保存,自動保存時又不能輸入和處理文字。

 

1.5在用戶空間實現的線程

 線程的實現可以分為兩類:用戶級線程(User-Level Thread)和內核級線程(Kernel-Level Thread),后者又稱為內核支持的線程或輕量級進程。在多線程操作系統中,各個系統的實現方式並不相同,在有的系統中實現了用戶級線程,有的系統中實現了內核級線程。

 用戶級線程內核的切換由用戶態程序自己控制內核切換,不需要內核干涉,少了進出內核態的消耗,但不能很好的利用多核Cpu,目前Linux pthread大體是這么做的。

在用戶空間模擬操作系統對進程的調度,來調用一個進程中的線程,每個進程中都會有一個運行時系統,用來調度線程。此時當該進程獲取cpu時,進程內再調度出一個線程去執行,同一時刻只有一個線程執行。

 

1.6在內核空間實現的線程

內核級線程:切換由內核控制,當線程進行切換的時候,由用戶態轉化為內核態。切換完畢要從內核態返回用戶態;可以很好的利用smp,即利用多核cpu。windows線程就是這樣的。

 

1.7 用戶級與內核級線程的對比

 一: 以下是用戶級線程和內核級線程的區別:

  1. 內核支持線程是OS內核可感知的,而用戶級線程是OS內核不可感知的。
  2. 用戶級線程的創建、撤消和調度不需要OS內核的支持,是在語言(如Java)這一級處理的;而內核支持線程的創建、撤消和調度都需OS內核提供支持,而且與進程的創建、撤消和調度大體是相同的。
  3. 用戶級線程執行系統調用指令時將導致其所屬進程被中斷,而內核支持線程執行系統調用指令時,只導致該線程被中斷。
  4. 在只有用戶級線程的系統內,CPU調度還是以進程為單位,處於運行狀態的進程中的多個線程,由用戶程序控制線程的輪換運行;在有內核支持線程的系統內,CPU調度則以線程為單位,由OS的線程調度程序負責線程的調度。
  5. 用戶級線程的程序實體是運行在用戶態下的程序,而內核支持線程的程序實體則是可以運行在任何狀態下的程序。

    二: 內核線程的優缺點

  優點:

  1. 當有多個處理機時,一個進程的多個線程可以同時執行。

  缺點:

  1. 由內核進行調度。

    三: 用戶進程的優缺點

  優點:

  1. 線程的調度不需要內核直接參與,控制簡單。
  2. 可以在不支持線程的操作系統中實現。
  3. 創建和銷毀線程、線程切換代價等線程管理的代價比內核線程少得多。
  4. 允許每個進程定制自己的調度算法,線程管理比較靈活。
  5. 線程能夠利用的表空間和堆棧空間比內核級線程多。
  6. 同一進程中只能同時有一個線程在運行,如果有一個線程使用了系統調用而阻塞,那么整個進程都會被掛起。另外,頁面失效也會產生同樣的問題。

  缺點:

  1. 資源調度按照進程進行,多個處理機下,同一個進程中的線程只能在同一個處理機下分時復用

 

1.8混合實現

 用戶級與內核級的多路復用,內核同一調度內核線程,每個內核線程對應n個用戶線程

 

 

2.python並發編程之多線程(python實現)

2.1線程實現的兩種方式

2.1.1第一種方式:面向過程

import time
import random
from threading import Thread


def eat(name):
    print('%s eating' % name)
    time.sleep(random.randrange(1, 5))
    print('%s eat end' % name)


if __name__ == '__main__':
    t1 = Thread(target=eat, args=('jack',))
    t1.start()
    print('')

 

2.1.2第二種方式:面向對象

#方式二:面向對象
class Mythread(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print('%s eating' % self.name)
        time.sleep(random.randrange(1, 5))
        print('%s eat end' % self.name)


if __name__ == '__main__':
    t1 = Mythread('th1')
    t2 = Mythread('th2')
    t1.start()
    t2.start()
    print("")

 

2.2多線程與多進程的區別

  1. 開啟速度:線程遠遠快過進程

  2. pid:線程的pid與主進程一致而進程間的pid互不相同

  3. 地址空間:進程之間地址空間是隔離的,同一進程內開啟的多個線程是共享該進程地址空間的

 

2.3Thread對象的其他屬性或方法

2.3.1Thread實例對象的方法:

  • isAlive(): 返回線程是否活動的
  • getName(): 返回線程名
  • setName(): 設置線程名

2.3.2threading模塊提供的一些方法:

  • threading.currentThread(): 返回當前的線程變量
  • threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程
  • threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果

2.3.3驗證

from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主進程下開啟線程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主線程
    print(threading.enumerate()) #連同主線程在內有兩個運行的線程
    print(threading.active_count())
    print('主線程/主進程')
驗證

 

2.4守護線程

無論是進程還是線程,都遵循:守護xxx會等待主xxx運行完畢后被銷毀

需要強調的是:運行完畢並非終止運行

1、對主進程來說,運行完畢指的是主進程代碼運行完畢

2、對主線程來說,運行完畢指的是主線程所在的進程內所有非守護線程統統運行完畢,主線程才算運行完畢

 即主進程執行完后會死掉(不管子進程),而主線程運行完后會等所有非守護線程執行完畢。

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必須在t.start()之前設置,此處用法為setDaemon(True),進程中是daemon=True
    t.start()

    print('主線程')
    print(t.is_alive())

 

2.5GIL全局解釋器鎖

GIL本質就是一把互斥鎖,既然是互斥鎖,所有互斥鎖的本質都一樣,都是將並發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。

可以肯定的一點是:保護不同的數據的安全,就應該加不同的鎖。

首先確定一點:每次執行python程序,都會產生一個獨立的進程。例如python test.py,python aaa.py,python bbb.py會產生n個不同的python進程,在一個python的進程內,不僅有test.py的主線程或者由該主線程開啟的其他線程,還有解釋器開啟的垃圾回收等解釋器級別的線程,總之,所有線程都運行在這一個進程內,毫無疑問

  1. 所有數據都是共享的,這其中,代碼作為一種數據也是被所有線程共享的(test.py的所有代碼以及Cpython解釋器的所有代碼) 例如:test.py定義一個函數work,在進程內所有線程都能訪問到work的代碼,於是我們可以開啟m個線程然后target都指向該代碼,能訪問到意味着就是可以執行。

  2. 所有線程的任務,都需要將任務的代碼當做參數傳給解釋器的代碼去執行,即所有的線程要想運行自己的任務,首先需要解決的是能夠訪問到解釋器的代碼。

  3. 如果多個線程的target=work,那么執行流程是多個線程先訪問到解釋器的代碼,即拿到執行權限,然后將target的代碼交給解釋器的代碼去執行。

解釋器的代碼是所有線程共享的,所以垃圾回收線程也可能訪問到解釋器的代碼而去執行,這就導致了一個問題:對於同一個數據100,可能線程1執行x=100的同時,而垃圾回收執行的是回收100的操作,解決這種問題沒有什么高明的方法,就是加鎖處理,如下圖的GIL,保證python解釋器同一時間只能執行一個任務的代碼。

 

2.5.1GIL與Lock

Python已經有一個GIL來保證同一時間只能有一個線程來執行了,為什么這里還需要lock?

首先,我們需要達成共識:鎖的目的是為了保護共享的數據,同一時間只能有一個線程來修改共享的數據

然后,我們可以得出結論:保護不同的數據就應該加不同的鎖。

最后,問題就很明朗了,GIL與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),后者保護用戶自己開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock,如下圖

分析:

1、100個線程去搶GIL鎖,即搶執行權限 2、肯定有一個線程先搶到GIL(暫且稱為線程1),然后開始執行,一旦執行就會拿到lock.acquire() 3、極有可能線程1還未運行完畢,就有另外一個線程2搶到GIL,然后開始運行,但線程2發現互斥鎖lock還未被線程1釋放,於是阻塞,被迫交出執行權限,即釋放GIL 4、直到線程1重新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,然后其他的線程再重復2 3 4的過程

 

2.5.2GIL與多線程

有了GIL的存在,同一時刻同一進程中只有一個線程被執行。也就是說,進程可以利用多核,但是開銷大,而python的多線程開銷小,但卻無法利用多核優勢。

要解決這個問題,我們需要在幾個點上達成一致:

  1. cpu到底是用來做計算的,還是用來做I/O的?
  2. 多cpu,意味着可以有多個核並行完成計算,所以多核提升的是計算性能
  3. 每個cpu一旦遇到I/O阻塞,仍然需要等待,所以多核對I/O操作沒什么用處
#分析:
我們有四個任務需要處理,處理方式肯定是要玩出並發的效果,解決方案可以是:
方案一:開啟四個進程
方案二:一個進程下,開啟四個線程

#單核情況下,分析結果: 
  如果四個任務是計算密集型,沒有多核來並行計算,方案一徒增了創建進程的開銷,方案二勝
  如果四個任務是I/O密集型,方案一創建進程的開銷大,且進程的切換速度遠不如線程,方案二勝

#多核情況下,分析結果:
  如果四個任務是計算密集型,多核意味着並行計算,在python中一個進程中同一時刻只有一個線程執行用不上多核,方案一勝
  如果四個任務是I/O密集型,再多的核也解決不了I/O問題,方案二勝

 
#結論:現在的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提升,甚至不如串行(沒有大量切換),但是,對於IO密集型的任務效率還是有顯著提升的

應用場景:

  • 多線程用於IO密集型,如socket,爬蟲,web
  • 多進程用於計算密集型,如金融分析

 

 

2.6死鎖現象與遞歸鎖  

2.6.1死鎖現象

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()
死鎖現象

執行效果:

Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖 #出現死鎖,整個程序阻塞住

 

2.6.2遞歸鎖

針對死鎖現象的解決方案,遞歸鎖,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖,二者的區別是:遞歸鎖可以連續acquire多次,而互斥鎖只能acquire一次。

from threading import Thread,RLock
import time

mutexA=mutexB=RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的情況,則counter繼續加1,這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()
遞歸鎖

 

2.7信號量Semaphore

 信號量也是一把鎖,可以指定信號量為5,對比互斥鎖同一時間只能有一個任務搶到鎖去執行,信號量同一時間可以有5個任務拿到鎖去執行,如果說互斥鎖是合租房屋的人去搶一個廁所,那么信號量就相當於一群路人爭搶公共廁所,公共廁所有多個坑位,這意味着同一時間可以有多個人上公共廁所,但公共廁所容納的人數是一定的,這便是信號量的大小。

from threading import Thread,Semaphore import threading import time def func(): sm.acquire() print('%s get sm' %threading.current_thread().getName()) time.sleep(3) sm.release() if __name__ == '__main__': sm=Semaphore(5) for i in range(23): t=Thread(target=func) t.start()
信號量

解析:

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。

 

2.8Event

線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標志被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標志為假,那么這個線程將會被一直阻塞直至該標志為真。一個線程如果將一個Event對象的信號標志設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那么它將忽略這個事件, 繼續執行。

from threading import Event

event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;

event.clear():恢復event的狀態值為False。
event

 

 例如,有多個工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務正常才讓那些工作線程去連接MySQL服務器,如果連接不成功,都會去嘗試重新連接。那么我們就可以采用threading.Event機制來協調各個工作線程的連接操作

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('鏈接超時')
        print('<%s>第%s次嘗試鏈接' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>鏈接成功' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()
event應用

 

2.9定時器

定時器,指定n秒后執行某操作

from threading import Timer

def hello():
    print("hello, world")

t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed
定時器

 

2.10線程queue

有三種不同的queue:

  1. class queue.Queue(maxsize=0)  #隊列:先進先出

  2. class queue.LifoQueue(maxsize=0)  #堆棧:last in fisrt out

  3. class queue.PriorityQueue(maxsize=0)   #優先級隊列:存儲數據時可設置優先級的隊列

 

2.11進程池與線程池

       在剛開始學多進程或多線程時,我便迫不及待地想基於多進程或多線程實現並發的套接字通信,然而這種實現方式的致命缺陷是:服務的開啟的進程數或線程數都會隨着並發的客戶端數目地增多而增多,這會對服務端主機帶來巨大的壓力,甚至於不堪重負而癱瘓,於是我們必須對服務端開啟的進程數或線程數加以控制,讓機器在一個自己可以承受的范圍內運行,這就是進程池或線程池的用途,例如進程池,就是用來存放進程的池子,本質還是基於多進程,只不過是對開啟進程的數目加上了限制。

介紹:

  • 官網:https://docs.python.org/dev/library/concurrent.futures.html

  • concurrent.futures模塊提供了高度封裝的異步調用接口

  • ThreadPoolExecutor:線程池,提供異步調用

  • ProcessPoolExecutor: 進程池,提供異步調用

  • Both implement the same interface, which is defined by the abstract Executor class.

 基本方法:

1、submit(fn, *args, **kwargs)
異步提交任務

2、map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操作

3、shutdown(wait=True) 
相當於進程池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源后才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait參數為何值,整個程序都會等到所有任務執行完畢
submit和map必須在shutdown之前

4、result(timeout=None)
取得結果

5、add_done_callback(fn)
回調函數

 

2.11.1用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())
進程池用法

 

把ProcessPoolExecutor換成ThreadPoolExecutor,其余用法全部相同
線程池用法

 

2.11.2map方法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #map取代了for+submit
map方法

 

2.11.3回調函數

可以為進程池或線程池內的每個進程或線程綁定一個函數,該函數在進程或線程的任務執行完畢后自動觸發,並接收任務的返回值當作參數,該函數稱為回調函數

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<進程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,需要用obj.result()拿到結果
回調函數

 

 

3.python並發編程之協程

對於單線程下,我們不可避免程序中出現io操作,但如果我們能在自己的程序中(即用戶程序級別,而非操作系統級別)控制單線程下的多個任務能在一個任務遇到io阻塞時就切換到另外一個任務去計算,這樣就保證了該線程能夠最大限度地處於就緒態,即隨時都可以被cpu執行的狀態,相當於我們在用戶程序級別將自己的io操作最大限度地隱藏起來,從而可以迷惑操作系統,讓其看到:該線程好像是一直在計算,io比較少,從而更多的將cpu的執行權限分配給我們的線程。

協程的本質就是在單線程下,由用戶自己控制一個任務遇到io阻塞了就切換另外一個任務去執行,以此來提升效率。為了實現它,我們需要找尋一種可以同時滿足以下條件的解決方案:

  1. 可以控制多個任務之間的切換,切換之前將任務的狀態保存下來,以便重新運行時,可以基於暫停的位置繼續執行。
  2. 作為1的補充:可以檢測io操作,在遇到io操作的情況下才發生切換。

3.1協程介紹

協程:是單線程下的並發,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。

需要強調的是:

  1. python的線程屬於內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運行)

  2. 單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)

 

對比操作系統控制線程的切換,用戶在單線程內控制協程的切換

優點如下:

  1. 協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級

  2. 單線程內就可以實現並發的效果,最大限度地利用cpu

缺點如下:

  1. 協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程

  2. 協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程

總結協程特點:

  1. 必須在只有一個單線程里實現並發
  2. 修改共享數據不需加鎖
  3. 用戶程序里自己保存多個控制流的上下文棧
  4. 附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模塊(select機制))

 

3.2greenlet模塊

如果我們在單個線程內有20個任務,要想實現在多個任務之間切換,使用yield生成器的方式過於麻煩(需要先得到初始化一次的生成器,然后再調用send。。。非常麻煩),而使用greenlet模塊可以非常簡單地實現這20個任務直接的切換

from greenlet import greenlet

def eat(name):
    print('%s eat 1' %name)
    g2.switch('egon')  # 2.切換到play函數執行並傳入參數egon
    print('%s eat 2' %name)
    g2.switch()  # 4.切換回eat函數執行
def play(name):
    print('%s play 1' %name)
    g1.switch()  # 3.切換回eat函數執行
    print('%s play 2' %name)

g1=greenlet(eat)
g2=greenlet(play)

g1.switch('egon')  # 1.可以在第一次switch時傳入參數,以后都不需要
greenlet模塊

greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。

單線程里的這20個任務的代碼通常會既有計算操作又有阻塞操作,我們完全可以在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提高效率,這就用到了Gevent模塊。

 

3.3Gevent模塊

Gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。

#用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)創建一個協程對象g1,spawn括號內第一個參數是函數名,如eat,后面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的

g2=gevent.spawn(func2)

g1.join() #等待g1結束

g2.join() #等待g2結束

#或者上述兩步合作一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

遇到IO阻塞時會自動切換任務

import gevent
def eat(name):
    print('%s eat 1' %name)
    gevent.sleep(2)
    print('%s eat 2' %name)

def play(name):
    print('%s play 1' %name)
    gevent.sleep(1)
    print('%s play 2' %name)


g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,name='egon')
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print('')
gevent遇到阻塞

上例gevent.sleep(2)模擬的是gevent可以識別的io阻塞,而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行代碼,打補丁,就可以識別了

from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊之前

或者我們干脆記憶成:要用gevent,需要將from gevent import monkey;monkey.patch_all()放到文件的開頭

import gevent
import time
def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')

def play():
    print('play 1')
    time.sleep(1)
    print('play 2')

g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('')
適用於所有阻塞

 

4.python並發編程之IO模型

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM