並發和性能


一、並發和並行

1、多任務

  • 多任務的概念
    • 簡單的說,就事操作系統可以同時運行多個任務
  • CPU與多任務的關系:
  • 單核CPU可不可以多任務?
    • 也可以執行多任務,由於CPU執行代碼都是順序執行的,那么單核CPU是怎么執行多任務的呢?
    • 答案就是操作系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,在切換到任務3,執行0.01秒.......這樣反復下去,表面上看,每個任務都是交替執行的,但是,由於CPU的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣。
  • 真正的並行執行多任務只能在多核CPU上實現,但是由於任務數量遠遠多於CPU的核心數量,所以,操作系統也會自動把多任務輪流調度到每個核心上執行。

2、並發和並行

  • 並發: 指的是任務數多於CPU核數,通過操作系統的各種任務調度算法,實現用多任務“一起”執行(實際上總有一些任務不在執行,因為切換任務的速度相當快,看上去一起在執行而已)
  • 並行: 指的是任務數小於等於CPU核數,即任務真的在一起執行的
  • 並發

  • 並行

3、同步和異步

  • 同步(同步協調): 是指線程在訪問某一資源時,獲得了資源的返回結果之后才會執行其他操作,(先做某件事,再做某件事)
  • 異步: 與同步相對,是指線程再訪問某一資源時,無論是否取得返回結果,都進行下一步操作;當有了資源返回結果時,系統自會通知線程。

二、線程

關於線程得官方文檔

  • 問題:
    • 當前有兩件事情,做事情1需要5秒,做事情2需要6秒
def func1():
    for i in range(5):
        print("-----正再做事情1------")
        time.sleep(1)


def func2():
    for i in range(6):
        print("-----正再做事情2------")
        time.sleep(1)
  • 單任務
    • 先做事情一
    • 在做事情二
  • 多任務
    • 兩個事情同時做
    • 怎么才能同時做呢?
      • 多線程執行

1、Threadting模塊介紹

Python的_thread模塊是比較底層的模塊,Python的threading模塊是懟thread模塊做了一層包裝的,可以更加方便的使用

  • 創建多線程:threading.Thread(target=func1)
  • 參數target指定線程執行的任務(函數)

Thread類提供了以下方法

  • run() :用以表示線程活動的方法
  • start() :啟動線程活動
  • join(timeout) :等待子線程的方法,默認等待子線程執行結束
  • lsAlive() :返回線程是否活動的
  • getName() :返回線程名
  • setName() :設置線程名

threading提供的以下方法

  • threading.current_thread() :返回當前執行的線程
  • threading.enumerate() :返回正在運行的所有線程(list)
  • threading.active_count() :返回正在運行的線程數量

2、多線線程實現多任務

  • 利用threading模塊實現
import threading
import time


def fun_1():
    for i in range(5):
        print("線程{}:{}".format(threading.current_thread(), i))
        time.sleep(1)


def fun_2():
    for i in range(6):
        print("線程{}:{}".format(threading.current_thread(), i))
        time.sleep(1)


def main():
    # 建立線程1 執行函數1
    t1 = threading.Thread(target=fun_1, name="th_1")
    # 建立線程2 執行函數2
    t2 = threading.Thread(target=fun_2, name="th_2")
    s_time = time.time()
    # fun_1()
    # fun_2()
    # 設置名字
    t1.setName("TH_1")
    print(t1.getName())
    # 開始
    t1.start()
    t2.start()
    # 查看線程集合,和線程數
    print(threading.enumerate())
    print(threading.active_count())
    # 等待子線程執行完畢
    t1.join()
    t2.join()
    e_time = time.time()
    print("時間:", e_time - s_time)


if __name__ == '__main__':
    main()

  • 重寫run方法實現多線程
    • 線程啟動start方法就是運行了run方法
    • 通過繼承Thread類,重寫run方法,來創見線程
    • 創建線程時不需要在傳任務函數
    • 如果任務函數需要參數,需要重寫init方法,並返回父類init方法
import threading
import time
import requests

# 計算時間裝飾器
def add_time(fun):
    def inner(*args):
        start_time = time.time()
        fun(*args)
        end_time = time.time()
        print("---------------線程:{},執行時間:{}-----------------".format(threading.current_thread(), end_time - start_time))
        return end_time - start_time

    return inner


# 重寫run方法創建多線程
class RequestThread(threading.Thread):
    def __init__(self, url):
        self.url = url
        super().__init__()

    @add_time
    def run(self):
        for i in range(100):
            res = requests.get(self.url).status_code
            print("線程{}得執行結果為:{}".format(threading.current_thread(), res))


@add_time
def main():
    t_list = []  # 線程列表
    # 建立線程
    for i in range(10):
        t = RequestThread("http://httpbin.org/post")
        t_list.append(t)
    # 開啟線程
    for th in t_list:
        th.start()
    # 等待線程結束
    for th in t_list:
        th.join()


if __name__ == '__main__':
    total_time = main()
    print("每個接口平均時間:", total_time / 1000)

3、多線程-共享全局變量

  • 問題:1000000次的bug
    • 兩個線程完成對全局變量的2百萬次修改
    • 一個線程修改1百萬次
    # 最后結果
    # 線程2修改完a = 1269517
    # 線程1修改完a = 1302318
    # 最后修改完a = 1302318
  • 可見增加了兩百萬次,a只有一百多萬
    • 因為多線程對全局變量的修改時不安全的
      • 比如線程1獲取了a的值為10w,此時切換到線程2
      • 線程2修改到20w去換到線程1
      • 此時線程1對a的修改不是從20w開始,而是10w,所以會導致a被重新覆蓋了,導致全局變量不安全的情況發生

4、同步&互斥鎖

  • 上面的bug如何解決?
    • 控制線程的執行,避免同時獲取數據
  • 互斥鎖
    • 線程同步能夠保證保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖
    • 互斥鎖為資源設定一個狀態:鎖定/非鎖定
    • 某個線程要更改共享數據時,先將其鎖定,此時資源的狀態為"鎖定",其他線程不能更改直到該線程釋放了資源,將資源狀態變成“非鎖定”,其它線程才可以去獲取鎖,然后再次鎖定該資源
    • 互斥鎖保證了每次只有一個線程進行寫入操作,從而保證了多線程情況下數據的正確性
  • threading模塊中定義了Lock類,可以方便的處理鎖定
    • 創建鎖:lock = threading.Lock()
    • 鎖定 : lock.acquire()
    • 釋放鎖 :lock.release()
  • 注意
    • 如果這個鎖之前沒有上鎖的,那么acquire不會阻塞
    • 如果在調用acpuire對這個鎖上鎖之前,它被其他線程上了鎖,那么此時acpuire會阻塞,直到這個鎖被其他線程解鎖為止
    • 上鎖之后多線程執行肯定會變慢
      • 上鎖,釋放鎖需要時間
      • 上了鎖的代碼塊等於同步執行,必須執行完鎖內的代碼塊,並釋放了鎖,其他線程才有機會獲取鎖去執行。等於多線程鎖內的代碼是同步執行的,在只考慮鎖內代碼塊的執行效率,甚至比單線程還差
  • 案例
import threading

# 全局變量,利用多線程
# 對a進行+1 ,兩千萬次,一個線程加已一千萬次
import time

a = 0


def fun_1(lock):
    global a
    for i in range(1000000):
        # 修改前上鎖
        lock.acquire()
        a += 1
        # 修改完釋放鎖
        lock.release()
    print("線程1修改完a={}".format(a))


def fun_2(lock):
    global a
    for i in range(1000000):
        # 修改前上鎖
        lock.acquire()
        a += 1
        # 修改完釋放鎖
        lock.release()
    print("線程2修改完a={}".format(a))


def main():
    # 創建鎖
    lock = threading.Lock()

    # 建立兩個線程,把鎖傳進任務上鎖
    s_time = time.time()
    t1 = threading.Thread(target=fun_1, args=(lock,))
    t2 = threading.Thread(target=fun_2, args=(lock,))

    t1.start()
    t2.start()

    # 等待子線程執行完畢
    t1.join()
    t2.join()
    e_time = time.time()
    print("最后修改完a={}".format(a))
    print("最后修改完得時間:{}".format(e_time - s_time))

    #
    # 鎖后時間:


if __name__ == '__main__':
    main()
    """
    上鎖前:
    線程2修改完a = 1269517
    線程1修改完a = 1302318
    最后修改完a = 1302318
    時間:0.15441060066223145
    
    上鎖后:
    線程2修改完a=1976899
    線程1修改完a=2000000
    最后修改完a=2000000
    時間:1.1480515003204346
    
    """

5、死鎖

  • 如果多線程存在多把鎖
  • 線程1獲取了鎖1,還要在獲取了鎖2才能執行
  • 線程2獲取了鎖2,還要在獲取鎖1才能執行
  • 此時,任務1就等着任務2去釋放了鎖2,才有機會執行
  • 任務2就等着任務1釋放了鎖1,才有機會執行
  • 最后導致互相等對方釋放鎖,導致死鎖

6、GIL全局解釋器鎖(擴展)

  • 控制線程切換運行得鎖-解釋器默認得鎖
  • 利用一個時間得閾值去切換線程
  • 或者遇到IO阻塞得時候去切換
  • 多線程之間快速切換執行大大縮短時間就是因為任務在等待,但是CPU不會等待線程,它會去執行別得任務,多線程並發就是大大縮短了等待得時間
  • 具體請參考官方文檔--線程
  • 注意:
    • Python語言和GIL沒有半毛錢關系,僅僅是由於歷史原因在Cpython解釋器,難以移除GIL。
    • GIL:全局解釋器鎖,每個線程在執行得過程都需要先獲取GIL,保證同一時刻只有一個線程可以執行代碼。
    • Python使用多線程只能只用一個CPU是因為,最一開始設計Python解釋器的人沒有想到多核的情況,后面代碼設計越多越多,導致很難去修改了,一直到現在。
    • 有人曾經刪除過了GIL鎖,但是效果非常差,甚至不如單核;---參考文檔
    • Python使用多進程是可以利用多核CPU資源的

7、單線程和多線程

  • 單線程和多線程到底誰更快
    • IO密集型多線程會比單線程快很多,充分省去了IO等待的時間
    • CPU密集型,計算非常多的這種任務,對CPU消耗非常大的,單線程理論上比多線程要快一點,但是計算不大的這種,單線程和多線程消耗的時間其實差不多的

三、列隊

  • Python的Queue模塊中提供了同步的,線程安全的隊列類,這些隊列都實現了鎖源語,能夠在多線程中直接使用,可以使用隊列來實現線程間的同步
    • FIFO :(先入先出)隊列Queue
    • LIFO :(后入先出)隊列LifoQueue
    • 優先級隊列:PriorityQueue

1、Queue隊列的方法

from queue import Queue

q = Queue(3) # 創建隊列 
# 操作方法就可以了

  • Queue.qsize() : 返回當前隊列包含的消息數量
  • Queue.empty() : 隊列為空返回True,反之False
  • Queue.full() : 隊列滿了返回True,反之False
  • Queue.get(self,block=True,timeout=None) : 獲取隊列中的值
    • timeout (block=True) :隊列內為空,get值的等待時間,時間內等不到報錯,默認一直等
    • block : True:隊列為空等待獲取值,False:隊列為空報錯停止執行線程
  • Queue.put(self,block=True,timeout=None) : 寫入隊列
    • timeout(block=True) :隊列滿了寫入隊列值的等待時間,等待時間后還是滿的報錯,默認一直等待
    • block : True:隊列滿了等待寫入值,False:隊列滿了報錯停止執行線程
  • Queue.get_nowait() : 相當於Queue.get(False)
  • Queue.put_nowait() : 相當於Queue.put(False)
  • Queue.task_done() : 在完成一項工作之后,使用該方法,可以向隊列發送一個信號,表示該任務執行完畢
  • Queue.join() 實際上意味着等待隊列種多有的任務執行完之后,在往下執行,否者一直等待
    • 任務執行完畢,意味着着收到了Queue.task_done()這個信號
    • 如果用了join方法,沒有發送信號會一直等待
    • put了多少個數據,在get使用之后,就的發送多少個Queue.task_done() 使用完畢的信號,不然也會一直等待
    • 如果發送的Queue.task_done()信號,比put的個數多,會報錯

2、LifoQueue隊列的方法

from queue import LifoQueue

q = LifoQueue(3) # 創建隊列 
# 調用操作方法就可以了
  • 繼承的Queue,和Queue的方法LifoQueue都有
  • 唯一不同的就是先進入的后出來
  • 內部重寫了Queue的幾個私有方法實現的先入后出,並沒有擴展新方法

3、PriorityQueue隊列的方法

from queue import PriorityQueue

q = PriorityQueue(3) # 創建隊列 
# 調用操作方法就可以了
# 不同點,生產值的時候,需要傳元組
q.put((1,333)
  • 繼承的Queue,和Queue的方法PriorityQueue都有
  • 唯一不同的就是生產值的時候,需要傳元組
    • 元組第一個元素,是優先級數值
    • get值的時候,優先get出數值最小的值
    • get來出來的也是元組
  • 內部重寫了Queue的幾個私有方法實現的優先級,並沒有擴展新方法

4、生產者消費者模式實現

  • 為什么要使用生產者和消費模式

    • 在線程世界里,生產者是就是生產數據的線程;消費者就是消費數據的線程
    • 如果生產者處理速度很快,而消費者處理速度很慢,那么消費者必須等待消費者處理完,才能繼續生產數據,同理返回來消費者消費的快,消費者必須等待生產
    • 為了解決這個問題於是引入了生產和消費者模式
  • 什么是生產者和消費者模式

    • 生產者消費者模式是通過一個容器來解決生產者和消費者的強偶問題
    • 生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊
    • 所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列中去,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力
  • 小案例

# 1、用一個隊列來存儲商品
# 2、創建一個專門生產商品的線程類,當商品數量少於50時,開始生產商品,每次生產200個商品,沒生產完一輪 暫停1秒
# 3、創建一個專門消費商品的線程類,當商品數量大於10時就開始消費,,循環消費,每次消費3個。當商品實例少於10的時候,暫停2秒
# 4、創建一個線程生產商品,5個線程消費商品
import queue
import time
import threading


q = queue.Queue(250)


class ProGoods(threading.Thread):
    """生存商品"""

    def run(self):
        while True:
            if q.qsize() < 50:
                for j in range(200):
                    q.put("商品{}".format(str(j)))
                time.sleep(1)


class ConGoods(threading.Thread):
    """消費商品"""

    def run(self):
        while True:
            if q.qsize() > 10:
                for j in range(3):
                    q.get()
                    print(q.qsize())
            else:
                time.sleep(2)


def main():
    # 創建一個線程生產商品
    pro_thread = ProGoods()

    # 5個線程消費商品
    con_thread_list = []
    for i in range(5):
        con_thread = ConGoods()
        con_thread_list.append(con_thread)

    # 開啟生存
    pro_thread.start()

    # 開啟消費
    for con_thread in con_thread_list:
        con_thread.start()

    # 等待子線程
    pro_thread.join()
    for con_thread in con_thread_list:
        con_thread.join()

    print("最后剩余商品:{}".format(q.qsize()))


if __name__ == '__main__':
    main()

四、進程

1、進程介紹

  • 什么是進程
    • 程序:例如xxx.py這是程序,是一個靜態的
    • 進程:一個程序運行起來,代碼+用到的資源稱之為進程,它是操作系統分配資源的基本單位
    • 不僅可以通過線程來完成多任務,進程也是可以的
  • 進程的狀態
    • 工作中,任務數往往大於CPU的核數,即一定有一些任務在等待CPU執行,因此有了不同的狀態
    • 就緒狀態:運行的條件都已經滿足了,正在等待CPU執行
    • 執行狀態:CPU正在執行其功能
    • 等待狀態:等待某些條件滿足,例如一個程序sleep了,此時就處於等待狀態

2、進程、線程對比

  • 功能
    • 進程,能夠完成多任務,比如在一台電腦上能夠同時運行多個軟件
    • 線程,能夠完成多任務,比如一個QQ中的多個聊天窗口
  • 定義不同
    • 進程是系統進行資源分配和調度的一個獨立單位
    • 線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位,線程自己基本上不能擁有系統資源,只擁有一點在運行中必不可少的資源,但是它可與同屬於一個進程的其他線程共享進程所擁有的全部資源
  • 區別
    • 一個程序至少擁有一個進程,一個進程至少擁有一個線程
    • 線程的划分適度小於進程(資源比進程小),使用多進程成功的並發性高
    • 進程在執行過程中擁有獨立的內存單元,而多個線程共享內存,從而極大的提高了程序的運行效率
    • 線程不能獨立執行,必須依存在進程中
    • 可以將進程理解為一個工廠的流水線,而其中的線程就是這個流水線上的工人
  • 優缺點
    • 線程個進程在使用上各有優缺點:線程執行開銷小,但不利於資源的管理和保護,而進程正相反

3、multiprocessing模塊

  • Process(group=None, target=None, name=None, args=(), kwargs={})
    • target: 給子進程傳遞執行的任務代碼
    • args:給target指定的函數傳遞參數,元組方式
    • kwargs:給target指定的函數傳遞參數,字段方式
    • name:給進程設置一個名字
    • group:指定進程組,大多數情況下用不到
  • Process實例的常用方法
    • start() :啟動子進程實例
    • is_alive() :判斷子進程是否還在活着
    • jion(timeout) :是否等待子進程執行結束,或者等待多少
    • terminate() :不管任務是否完成,立即終止子進程
  • Process實例對象常用的屬性
    • name :當前進程的名字
    • pid :當前進程的pid(進程號)
      • 主進程中可以進程p.pid獲取
      • 子進程中用os.getpid()獲取

4、進程之間的通信 -multiprocessing.Queue

  • 進程 multiprocessing.Queue 和線程 queue.Queue的區別
    • 方法一摸一樣
    • queue.Queue :是進程內多線程之間非阻塞隊列
    • multiprocessing.Queue :是跨進程通信隊列

1、進程中的Queue的使用

  • 可以使用multiprocessing模塊中的Queue實現多進程之間的數據傳遞,Queue本身是一個消息隊列程序,首先用一個實例來演示一下Queue的工作原理:
    • 在父進程中創建兩個子進程,
    • 主往Queue中寫數據,兩個子進程在Queue中讀數據
  • 注意進程之間的Queue要當作參數傳給任務函數(不共享全局變量)
import os
import time
import requests
from multiprocessing import Process, Queue

a = 0


def work1(q):
    global a
    while True:
        a += 1
        try:
            url = q.get_nowait()
            print("進程{},a ={},url ={}:".format(os.getpid(), a, url))
            requests.get(url)
        except queue.Empty:
            break


def work2(q):
    global a
    while True:
        a += 1
        try:
            url = q.get_nowait()
            print("進程{},a ={},url ={}:".format(os.getpid(), a, url))
            requests.get(url)
        except queue.Empty:
            break


if __name__ == '__main__':

    q = Queue(100)
    for i in range(10):
        q.put("http://127.0.0.1:5000")
    # 將隊列對象參數傳進任務函數
    p1 = Process(target=work1, args=(q,))
    p2 = Process(target=work2, args=(q,))
    # 開啟
    st = time.time()
    p1.start()
    p2.start()
    # print(p1.name)
    # print(p1.pid)
    # 等待
    p1.join()
    p2.join()
    et = time.time()
    print("時間:", et - st)

5、進程池Pool

  • 當需要創建的子進程數量過多的時候,我們可以利用進程池來創建
  • 初始化Pool的時候,可以指定一個最大進程數,當前新的請求提交到Pool中中時,如果池中還沒有滿,那么就會創建一個新的進程用來執行該請求,但是如果池中的進程數已經達到最大值,那么該請求就會等待,直到池中有進程結束,才會用之前的進程來執行新的任務
  • Pool常用的方法
  • apply_async(func,args=(),kwds={},callback=None,error_callback=None): 使用非阻塞的方式調用func(並行執行,堵塞方式必須等待上一個進程退出才能執行下一個進程)
  • close() : 關閉進程池
  • terminate():不管子進程是否結束,立即終止
  • join() :主進程阻塞,等待子進程結束,必須在close或terminate之后使用

6、進程池中的隊列

  • multiprocessing模塊中Manager來創建隊列
    • q = Manager().Queue()

案例

import os
import queue
import time
import requests
from multiprocessing import Pool, Manager

a = 0


def work(q):
    global a
    while True:
        a += 1
        try:
            url = q.get_nowait()
            print("進程{},a ={},url ={}:".format(os.getpid(), a, url))
            requests.get(url)
        except queue.Empty:
            break


if __name__ == '__main__':
    # 進程池隊列
    q = Manager().Queue(100)
    for i in range(100):
        q.put("http://127.0.0.1:5000")
    # 進程池
    p = Pool(5)
    # 開啟5個進程消費
    st = time.time()
    for i in range(5):
        p.apply_async(func=work, args=(q,))

    # 關閉進程池
    p.close()
    # 等待子進程結束,必須在關閉進程池后使用
    # 如果不等待,主進程結束,子進程也會跟着結束
    p.join()
    et = time.time()
    print("時間:", et - st)

五 、協程

1、利用yield機制來實現多任務

  • 內容回顧
    • 什么是生成器
    • 生成器證明定義
  • 案例
"""
生成器
生成器表達式
在函數中使用yield關鍵字:生成器
協程是通過生成器實現的多任務,在任務間不停的切換
"""
import time


def work1():
    for i in range(10):
        time.sleep(0.1)
        print("---work1----{}".format(i))
        yield


def work2():
    for i in range(10):
        time.sleep(0.1)
        print("---work2----{}".format(i))
        yield


# 創建兩個生成器對象,實現多任務
g1 = work1()
g2 = work2()

while True:
    try:
        next(g1)
        next(g2)
    except StopIteration:
        break
        
# 協程:微線程
"""
協程的本質上是單任務
協程依賴於線程
協程相對與線程來說,占用的資源更少,幾乎不要占用什么資源
"""

2、什么是協程

  • 協程是Python中另外一種實現多任務的方式,只不過比線程更小的占用執行單元,為啥說它是一個執行單元?因為它自帶CPU上下文,這樣只要在合適gr的時機,我們可以把一個協程切換到另一個協程,只要這個過程中保存或恢復CPU上下文那么程序還是可以運行的
  • 通俗的理解:在一個線程中的某個函數,可以在任何地方保存當前函數的一些臨時變量信息,然后切換到另外一個函數中執行,注意不是通過調用函數的方式做到的,並且切換的次數一級什么時候在切換到原來的函數都由開發自己確定

3、協程和線程的差異

  • 線程:在實現多任務時,切換從系統層面遠不止保存和恢復CPU上下文這么簡單,操作系統為了程序運行的高效性,每個線程都有自己緩存Chche等數據,操作系統還會幫你做這些數據的恢復操作,所以線程的切換比較耗性能
  • 協程:協程的切換只是單純的操作CPU的上下文,所以一秒切換個上百萬次都系統都抗的住

4、greenlet模塊

為了更好使用協程來完成多任務,Python中的greenlet模塊對其封裝,從而使得切換任務變得更加簡單

  • 安裝方式使用
    • 安裝: pip install greenlet
    • 使用:創建多任務的greenlet對象
    • 切換:switch()方法傳參和任務切換
import greenlet


def work1():
    for i in range(10):
        print("---work1----{}".format(i))
        # 標記切換g2任務
        g2.switch()


def work2():
    for i in range(10):
        print("---work2----{}".format(i))
         # 標記切換g1任務
        g1.switch()
        
# 創建兩個greenlet對象
g1 = greenlet.greenlet(work1)
g2 = greenlet.greenlet(work2)

# 切進任務1,有參數可以傳參數 *args, **kwargs
g1.switch()

5、gevent模塊

  • 安裝:
    • pip install gevent
  • gevent.spawn(*args, **kwargs)
    • 創建並開啟協程
    • 第一個參數傳任務函數
    • 其他參數依次傳入即可
  • gevent.sleep()
    • 切換的標志
    • gevent里等待方法
  • gevent().join()
    • 線程等待協程的方法
import gevent

"""
協程:gevent,
對greenlet的再次封裝
協程存在於線程之中,線程默認不會等等待協程執行的
"""


def work1(a):
    for i in range(10):
        print("---work{}----{}".format(a, i))
        # 切換的標志
        gevent.sleep(0.001)


def work2(a):
    for i in range(10):
        print("---work{}----{}".format(a, i))
        # 切換的標志
        gevent.sleep(0.001)


# 創建兩個gevent對象
# 參數*args, **kwargs
# 默認就已經開啟執行了
g1 = gevent.spawn(work1, a=1)
g2 = gevent.spawn(work2, a=2)

# 線程等待協程執行完
g1.join()
g2.join()

  • gevent切換還是要主動用自己的等待標志才會切換,還不夠強大
    • gevent有一個補丁可以智能的切換,在IO阻塞的時候自動切換
    • 導入補丁:from gevent import monkey
    • 線程中調用:monkey.patch_all()
    • 此時只要有耗時操作就會自動切換
    • 注意: 一個進程內調用一次monkey.patch_all()方法即可
      • 多進程內每個子進程內調用,不能在主進程中調用
      • 多線程主線程內調用,不能每個子線程都調用
from gevent import monkey

# 放在導包之前,會改系統的環境
# 不然會有警告
monkey.patch_all()
import time
import requests
import gevent


def work1(a):
    for i in range(10):
        res = requests.get("http://www.baidu.com").status_code
        print("---work{}----{}的結果:{}".format(a, i, res))


def work2(a):
    for i in range(10):
        res = requests.get("http://www.baidu.com").status_code
        print("---work{}----{}的結果:{}".format(a, i, res))


# 創建兩個gevent對象
# 參數*args, **kwargs
# 默認就已經開啟執行了
st = time.time()
g1 = gevent.spawn(work1, a=1)
g2 = gevent.spawn(work2, a=2)

# 線程等待協程執行完
g1.join()
g2.join()
# work1(1)
# work2(2)
et = time.time()
print("時間:", et - st)

# 兩個協程:時間: 0.4506347179412842
# 單線程 : 時間: 0.7228543758392334
  • 協程的隊列
    • 線程、進程、進程池的隊列都可以共享使用

六、總結

1、綜合練習

開啟多個進程,每個進程開啟多線程,每個線程開啟多個協程消費

import warnings
import time
import requests
import threading
import gevent
from gevent import monkey
from multiprocessing import Pool, Manager

warnings.filterwarnings("ignore")


# 10000個請求,開啟2個進程,每個進程中實現3個線程,每個線程中實現5個協程去處理 計算時間

def consume_request(q):
    """
    取隊列的url請求
    """
    a = 0
    while q.qsize() > 0:
        url = q.get()
        requests.get(url)
        a += 1
    print("協程完成消費次數:",a)


def gevents(q):
    """
    每個線程創建5個協程
    """
    gev_list = []
    for i in range(5):
        gev = gevent.spawn(consume_request, q)
        gev_list.append(gev)
    for gev in gev_list:
        gev.join()


def threads(pro_q):
    """
    創建三個線程
    """
    # 每個子進程調用一次協程補丁
    monkey.patch_all()

    th_list = []
    for i in range(3):
        th = threading.Thread(target=gevents, args=(pro_q,))
        th_list.append(th)
    for th in th_list:
        th.start()
    for th in th_list:
        th.join()


def create_process():
    """創建兩個進程"""
    pro_q = Manager().Queue()
    for i in range(1000):
        pro_q.put("http://127.0.0.1:5000")
    s_time = time.time()
    pool = Pool(2)
    for i in range(2):
        pool.apply_async(func=threads, args=(pro_q,))
    pool.close()
    pool.join()
    e_time = time.time()
    print("耗時:", e_time - s_time)


if __name__ == '__main__':
    create_process()

2、簡單總結

  • 進程是資源分配的資源
  • 線程是操作系統調度的單位
  • 進程切換需要的資源最大,效率最低
  • 線程切換需要的資源一般,效率一般(當然是了在不考慮GIL鎖的情況下)
  • 協程切換任務資源很小,效率高
  • 對進程、多線程根據CPU核數不一樣可能是並行,但是協程是在一個線程中,所以是並發
  • Python中的線程由於GIL鎖的存在,並不能實現並行


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM