並發編程


目錄

並發編程:

  • 程序:

    • 程序就是一堆文件
  • 進程:

    • 進程是分配資源的基本單位,為線程提供資源,一個程序可以開啟多個進程

  • 進程被誰運行:

    • CPU最終運行你的程序

      • 操作系統調度作用,將你磁盤上的程序加載到內存,然后交給CPU處理,一個CPU在運行的一個程序,就是開啟了一個進程

  • 操作系統:

    • 操作系統定義:
      • 操作系統是存在於硬件與軟件之間,管理,協調,控制軟件與硬件的交互

    • 操作系統的作用:
      • 如果沒有操作系統,去寫一個程序,你要完成兩層功能:

        • 第一層:你要學會底層硬件:CPU,內存,磁盤是如何工作使用的
        • 第二層:去調度這些底層的硬件
      • 操作系統兩個作用:

        • 1,將一些復雜的硬件操作封裝成簡單的接口,便於使用

        • 2,操作系統可以合理的調度分配多個進程與CPU的關系,讓其有序化

    • 操作系統(計算機)的發展史:
      • 第一代電子計算機:操作插線與你的程序結合

      • 第二代計算機:磁帶存儲,批處理系

      • 第三代計算機:集成電路,多道程序系統

知識點解析:

  • 多道技術解決的問題:

    • 多道技術是在不同任務間切換執行,由於計算機切換速度非常快,用戶是無感狀態

    • 時間復用:
      • 利用閑置時間,進行復用,一個進程占用cpu時間太長也會切換
    • 空間復用:
      • 一個內存可以加載多個進程,提高內存的利用率
    • 數據隔離:
      • 解決軟件之間的隔離,互不影響

進程:

  • 程序就是一堆代碼

  • 進程是分配資源的基本單位,為線程提供資源,一個程序可以開啟多個進程

  • 概念:
    • 串行:所有的進程有CPU一個一個解決
    • 並行:多個CPU,真正的同時運行多個進程
    • 並發:單個CPU,同時執行多個進程(來回切換),看起來像是同時運行,空間復用
    • 阻塞:遇到IO(recv,input)才會阻塞
    • 非阻塞:沒有IO
  • tail -f access.log |grep '404'
    執行程序tail,開啟一個子進程,執行程序grep,開啟另外一個子進程,兩個進程之間基於管道'|'通訊,將tail的結果作為grep的輸入。
    進程grep在等待輸入(即I/O)時的狀態稱為阻塞,此時grep命令都無法運行
    

  • 進程的創建:
    • 什么是開啟多個進程:socket:server,client兩個進程

    • python中,如果一次想開啟多個進程,必須是一個主進程,開啟多個子進程

    • linux,windows:有主進程開啟子進程

    • 相同點:主進程開啟子進程,兩個進程都有相互隔離的獨立空間,互不影響

    • 不同點:

      • linux:子進程空間的初始數據完全是從主(父)進程copy一份

      • windows:子進程空間初始數據完全是從主(父)進程copy一份,但是有所不同

創建進程的兩種方法:

  • 函數-創建進程:

  • #這樣的實例雖然創建了子進程,但是在生產環境中子進程結束的時間不定
    from multiprocessing import Process
    import time
    #當前py文件就是主進程,先運行主進程
    def task(name):
        print(f"{name}is running")
        time.sleep(3)  #阻塞
        print(f"{name}is done")
    
    if __name__ == '__main__':                  #windows開啟必須寫在mian下面
         p = Process(target=task,args=("海洋",)) #target要封裝的內容,對象args一定是個元祖形式
         p.start()      #子進程 通知操作系統在內存中開辟一個空間,將p這個進程放進去,讓cpu執行
         print("___主進程")
    

  • 類-創建進程(了解):

  • from multiprocessing import Process
    import time
    class MyProcess(Process):
    
        def __init__(self,name):
            super().__init__()      #放在最上面,必須要繼承父類init
            self.name = name
    
        def run(self):
            print(f"{self.name}is running")
            time.sleep(3)  #阻塞
            print(f"{self.name}is done")
    
    if __name__ == '__main__':
        p = MyProcess("海洋")
        p.start()
        print("====主進程")
    

進程PID:

  • tasklist | findstr pycharm win查看某個進程

  • import os print(os.getpid()) 查看當前的pid

  • import os print(os.getppid()) 查看父進程

進程之間數據隔離:

  • import time
    from  multiprocessing import  Process
    X = 1000
    
    def task():
        global x
        x = 2
    
    if __name__ == '__main__':
        p1 = Process(target = task,)
        p1.start()
        time.sleep(1)
        print(f"主進程{X}")
        print(f"主進程{X}")
    
    import time
    from  multiprocessing import  Process
    X = 256                                #滿足小數據池
    
    def task():
       print(f"子進程{id(X)}")
    
    if __name__ == '__main__':
        print(f"主進程{id(X)}")
        p1 = Process(target = task,)
        p1.start()
        time.sleep(1)
        print()
    

join方法:

  • join 主進程等待子進程結束之后,在執行

  • join開啟一個進程:

    • from multiprocessing import Process
      import time
      
      def task(name):
          time.sleep(1)
          print(f"{name}is running")
      
      if __name__ == '__main__':
           p = Process(target=task,args=("海洋",))
           p.start()
           p.join()           #告知主進程,p進程結束之后,主進程在結束,join有些阻塞的意思
           print("___主進程")
      
      #      p1.start()
      #      p2.start()       #p1,p2,p3三個子進程先后運行順序不定,start只是通知一下操作系統
      #      p3.start()       #操作系統調用cpu先運行誰,誰先執行
      
  • join串行:

    • from multiprocessing import Process
      import time
      
      def task(name,sec):
          time.sleep(sec)
          print(f"{name}is running")
      
      if __name__ == '__main__':
           p1 = Process(target=task, args=("海洋",1))
           p2 = Process(target=task, args=("俊麗",2))
           p3 = Process(target=task ,args=("寶寶",3))
           start_time = time.time()
      
           p1.start()
           p1.join()
           p2.start()
           p2.join()
           p3.start()
           p3.join()
      
           print(f"主進程{time.time() - start_time}")
      
  • join並發:

    • from multiprocessing import Process
      import time
      
      def task(sec):
          time.sleep(sec)
          print(f"is running")
      
      if __name__ == '__main__':
           start_time = time.time()
           list = []
      
           for i in range(1,4):
                p = Process(target=task, args=(i,))
                p.start()
                list.append(p)
      
           for i in list:
                i.join()
      
           print(f"主進程{time.time() - start_time}")
      

進程對象的其他屬性:

  • 屬性:

    • from multiprocessing import Process
      import time
      
      def task(name):
          print(f"{name}is running")
          time.sleep(3)
          print(f"{name}is done")
      
      if __name__ == '__main__':
           p = Process(target=task,args=("海洋",),name="俊麗")  #name給進程對象設置name屬性
           p.start()
           # print(p.pid)         #獲取到進程號
      
           time.sleep(1)          #睡一秒,子進程已經執行完成
           p.terminate()          #強制結束子進程,強制執行也會有執行時間
                                  #terminate跟start一樣工作原理,都要通知操作系統開啟子進程
                                  #內存終止或者開啟都要需要時間的
                      
           time.sleep(1)          #睡一秒,讓terminate殺死
           print(p.is_alive())    #判斷子進程是否存活,只是查看內存中p子進程是否還運行
           print("主進程")
      
  • 僵屍進程:

    • init是所有進程的父進程:
      
      僵屍進程,僵屍是什么,死而沒有消失
      
      主進程創建多個短暫周期的子進程,當子進程退出,是需要等待父進程處理,而父進程沒有及時對子進程回收,那么子進程的進程符仍然保存在系統中,這種進程就是僵死進程
      
      什么進程描述符:每一個進程都有描述符,io請求,數據指針
      
      from multiprocessing import Process
      import time
      import os
      
      def task(name):
          print(f"{name}is running")
          print(f"子進程開始了:{os.getpid()}")
          time.sleep(50)
      
      
      if __name__ == '__main__':
          for i in range(100):
              p = Process(target=task, args=("海洋",))
              p.start()
              print(f"___主進程:{os.getpid()}")
      
      

  • 孤兒進程:

    • 孤兒進程:孤兒進程是因為主進程的退出,他下面的所有子進程都變成孤兒進程了,init會對孤兒進行回收,釋		   放掉占用系統的資源,這種回收也是為了節省內存。
      
      孤兒進程無害,如果僵屍進程掛了,init會對孤兒進程回收,init是所有進程的祖進程,linux中為1,0系統
      
      

  • 守護進程:

    • 將一個子進程設置成守護進程,當父進程結束,子進程一定會結束,避免孤兒進程產生,應為回收機制

    • 父進程不能創建子進程

    • #守護進程會在主進程代碼執行結束后終止,守護進程內無法在開啟子進程
      
      from multiprocessing import Process
      import time
      import os
      
      def task(name):
          print(f"{name}is running")
          print(f"子進程開始了:{os.getpid()}")
          time.sleep(50)
      
      if __name__ == '__main__':
           p = Process(target=task,args=("海洋",))
           p.daemon = True  #將p子進程設置成守護進程,守護子進程,只要主進程結束
                            #子進程無論執行與否都馬上結束,daemon,開啟在start上面
           p.start()
           print(f"___主進程:{os.getpid()}")
      
      

進程之間的通信方式:

  • 第一種:基於文件+鎖的形式:效率低,麻煩

  • 第二種:基於隊列,推薦的使用形式

  • 第三種:基於管道,管道自己加鎖,底層可能會出現數據丟失損壞,隊列和管道都是將數據存放於內存中

互斥鎖:

  • 互斥鎖保證了每次只有一個線程進行寫入操作,只有當這個線程解鎖,在運行其他資源,上鎖和解鎖都需要自己添加

  • 三台電腦同時調用打印機去打印,開啟三個進程使用互斥鎖,實現公平搶占資源

    • #上鎖:
      #一定要是同一把鎖:只能按照這個規律,上鎖一次,解鎖一次
      
      #互斥鎖與join區別:
      #共同點:都是完成了進程之間的串行
      #區別:join認為控制進程的串行,互斥鎖是解決搶占的資源,保證公平性
      
      from multiprocessing import Process
      from multiprocessing import Lock
      import time
      import os
      import random
      
      def task1(lock):
          print("test1")                     #驗證CPU遇到IO切換
          lock.acquire()
          print("task1 開始打印")
          time.sleep(random.randint(1,3))
          print("task1 打印完成")
          lock.release()
      
      def task2(lock):
          print("test2")
          lock.acquire()                      #上鎖
          print("task2 開始打印")
          time.sleep(random.randint(1,3))#阻塞,cpu切換任務,別的任務都在鎖,回來繼續執行這個程序
          print("task2 打印完成")
          lock.release()                      #解鎖
      
      def task3(lock):
          print("test2")
          lock.acquire()
          # lock.acquire()                    #死鎖錯誤示例
          print("task3 開始打印")
          time.sleep(random.randint(1,3))
          print("task3 打印完成")
          lock.release()
      
      if __name__ == '__main__':
           lock = Lock()                              #一把鎖
      
           p1 = Process(target=task1,args=(lock,))    #三個進程哪個先到先執行
           p2 = Process(target=task2,args=(lock,))
           p3 = Process(target=task3,args=(lock,))
      
           p1.start()
           p2.start()
           p3.start()
      
      
  • 互斥鎖買票示例:

    • #買票系統:
      #買票之前先要查票,在你查票的同時,100個人也在查看此票
      #買票時,你要從服務端獲取到票數,票數>0 ,買票,然后服務端票數減一,中間有網絡延遲
      
      #多進程原則上是不能互相通信的,他們在內存級別是有數據隔離,不代表磁盤上的數據隔離,他們可以共同操作一個文件
      #多個進程搶占同一個資源,要想公平按照順序,只能串行
      
      from multiprocessing import Process
      from multiprocessing import Lock
      import random
      import json
      import time
      import os
      
      def search():
          time.sleep(random.random())  #一秒之內
          with open("db.json", encoding="utf-8") as f1:
              dic = json.load(f1)
          print(f"剩余票數{dic['count']}")
      
      def get():
          with open("db.json",encoding="utf-8") as f1:
              dic = json.load(f1)
          time.sleep(random.randint(1,3))  #時間延遲
      
          if dic['count'] > 0:
              dic['count'] -= 1
              with open("db.json",encoding="utf-8",mode="w") as f1:
                  json.dump(dic,f1)
              print(f'{os.getpid()}用戶購買成功')
          else:
              print("沒票了")
      
      def task(lock):
          search()
      
          lock.acquire()
          get()
          lock.release()
      
      if __name__ == '__main__':
          lock = Lock()
          for i in range(5):
              p = Process(target=task,args=(lock,))
              p.start()
      缺點:
      	1.操作文件效率低
          2.自己加鎖很麻煩,很容易出現死鎖,遞歸鎖
      
      

隊列:

  • 進程之間的通信最好的方式是基於隊列

  • 隊列是實現進程之間通信的工具,存在內存中的一個容器,最大的特點是符合先進先出的原則

  • 隊列模式:
    • 多個進程搶占一個資源:串行,有序以及數據安全,買票

    • 多個進程實現並發的效果:生產者消費模型

隊列參數:

  • from multiprocessing import Queue
    q = Queue(3)                #可以設置元素個數,當數據已經達到上限,在插入夯住
    
    def func():
        print("in func")
    
    q.put("海洋")               #插入數據
    q.put({"count":1})
    q.put(func)
    q.put("333",block=False)    #默認為True 當你插入的數據超過最大限度,默認阻塞
    # q.put(333,timeout=8)      #超過八秒在put不進數據,就會報錯
    
    print(q.get())
    print(q.get())
    ret = q.get()
    ret()
    # q.get()  #當你將數據取完,夯住,等待隊列put值,起另一個進程往隊列中插入數據
    
    #q.put()
    #1,maxsize()    #數據量不易過大,精簡的重要數據
    #2,put bolck    #默認為True阻塞 當你插入的數據超過最大限度,可以設置報錯
    #3,put timeout  #延時報錯,超過三秒在put不進數據,就會報錯
    
    #get
    #2,get bolck    #取值為空報錯
    #3,get timeout  #取值超過三秒報錯
    
    

搶售模型 (並行示例):

  • #小米:搶手機,預期發售10個,100人去搶
    from multiprocessing import Queue
    from multiprocessing import Process
    import os
    
    def task(q):
        try:
            q.put(f'{os.getpid()}')
        except Exception:
            return
    
    if __name__ == '__main__':
        q = Queue(10)             #創建隊列,可以存放十個人
    
        for i in range(100):
            p = Process(target=task,args=(q,))
            p.start()
    
        for i in range(1,11):  #數量超過隊列會取
            print(f'排名第{i}的用戶:{q.get()}') #獲取隊列中的信息,先進來的先取出來
    
    #利用隊列進行進程之間的通信:簡單,方便,不用自己手動加鎖,隊列自帶阻塞,可持續化取數據
    
    

生產者消費者模型(並發示例):

  • 利用隊列進行通信,生產者生產數據,消費者獲取數據使用,平衡了生產力和消費力,生產者和消費者是一種解耦合性(通過容器解決),可持續化取數據

  • 模型,設計模式,歸一化設計,理論等等,教給你一個編程的思路,以后遇到類似的情況,以后直接調用就即可

  • 生產者:生產數據的進程

  • 消費者:生產出來的數據進行處理

  • #吃包子:廚師生產包子,不可能直接給你喂到嘴里,放在一個盆里,消費者從盆中取出包子食用
    #三個主體:生產者(廚師),容器隊列(盤 緩沖區),消費者(人)
    
    #如果沒有容器,生產者與消費者強解耦性,不合理,所以我們要有一個容器,緩沖區平衡了生產力與消費力
    
    # 生產者消費者多應用於並發:
    from multiprocessing import Queue
    from multiprocessing import Process
    import time
    import random
    
    def producer(name,q):
        for i in range(1,6):
            time.sleep(random.randint(1,3))
            res = f'{i}號包子'
            q.put(res)
            print(f'生產者{name}:生產了{res}')
    
    def consumer(name,q):
        while 1:
            try:
                time.sleep(random.randint(1, 3))
                ret = q.get(timeout = 5)           #五秒還吃不到退出
                print(f'消費者{name}:吃了{ret}')
            except Exception:
                return
    
    if __name__ == '__main__':
        q = Queue()    #盆
    
        p1 = Process(target=producer,args=("俊麗",q,))  #生產
        p2 = Process(target=consumer,args=("海洋",q,))  #消費
    
        p1.start()
        p2.start()
    
    

線程:

  • 進程:進程是分配資源的基本單位,內存中開辟空間,為線程提供資源,一個程序可以開啟多個進程

  • 線程:CPU調度的最小單位,執行單位,線程也被稱作為輕量級的進程,動態的

    • 主線程是進程空間存活在內存中的一個必要條件

  • 開啟QQ:開啟一個進程,在內存中開辟空間加載數據,啟動一個線程執行代碼

  • 線程依賴進程的一個進程可以包含多個線程,但是一定有一個主線程,線程才是CPU執行的最小單元

  • 進程線程對比:

    • 1,開啟多進程開銷非常大,10-100倍,而開啟線程開銷非常小

    • 2.開啟多進程速度慢,開啟多線程速度快

    • 3.進程之間數據不共享,線程共享數據

  • 多線程應用場景:

    • 並發:一個CPU可以來回切換(線程之間切換),多進程並發,多線程的並發

    • 多進程並發:開啟多個進程,並發的執行

    • 多線程並發:開啟線程,並發的執行

    • 如果遇到並發:多線程居多

開啟線程的兩種方式:

  • 線程絕對要比進程開啟速度快

  • 函數開啟:
    • #先打印海洋,線程要比進程速度快,如果是進程先打印主線程
      from threading import Thread
      
      def task(name):
          print(f'{name} is running')
      
      if __name__ == '__main__':
          t = Thread(target=task,args=("海洋",))
          t.start()
          print("主線程")
          
      #子進程睡眠3秒,先運行主進程
      from threading import Thread
      import time
      x = 1000
      
      def task():
          time.sleep(3)
          print('子線程....')
      
      def main():
          print('111')
          print('222')
          print('333')
      
      if __name__ == '__main__':
          t = Thread(target=task)
          t.start()
          main()
      
      
  • 類開啟:
    • from threading import Thread
      
      class MyThread(Thread):
          def __init__(self,name):
              super().__init__()
              self.name = name
      
          def run(self):
              print(f'{self.name} is running')
      
      if __name__ == '__main__':
          t = MyThread("海洋")
          t.start()
          print("主線程")
      
      
  • 線程pid:
    • #主線程和子線程pid一樣
      from threading import Thread
      import os
      
      def task():
          print(f'子線程:{os.getpid()}')
      
      if __name__ == '__main__':
          t = Thread(target=task,)
          t.start()
          print(f"主線程:{os.getpid()}")
      
      
  • 線程之間數據共享:
    • from threading import Thread
      x = 1000
      def task():
          global x
          x = 0
      
      if __name__ == '__main__':
          t = Thread(target=task, )
          t.start()
          t.join()  # 告知主線程,等待子線程運行完畢在執行
          print(f'主線程:{x}')
      
      

線程的方法:

  • from threading import Thread
    import threading
    import time
    
    def task(name):
        time.sleep(1)
        print(f'{name} is running')
    
    if __name__ == '__main__':
        for i in range(5):
            t = Thread(target=task,args=("海洋",))
            t.start()              #線程對象的方法
        # print(t.is_alive())     #判斷線程是否存活
     			
        #threading模塊的方法
        print(threading.current_thread().name)  #返回線程對象.name
        print(threading.enumerate())            #返回列表,返回的是所有線程對象
        print(threading.active_count())         #獲取活躍的線程數量(包括主線程)
        print("主線程")
    
    

守護線程:

  • 守護線程必須等待主線程結束才結束,主線程必須等待所有的非守護線程結束才能結束,因為主線程的結束意味着進程的結束,這就是一個守護機制

  • 多線程是同一個空間,同一個進程,進程代表,空間,資源,靜態的:

  • from threading import Thread
    import time
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('egon',))
        t.setDaemon(True) #必須在t.start()之前設置
        t.start()
    
        print('主線程')
        print(t.is_alive())   #判斷進程是否存在也是主線程
    
    from threading import Thread
    import time
    
    def foo():
        print(123)
        time.sleep(3)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(1)
        print("end456")
    
    if __name__ == '__main__':
    
        t1=Thread(target=foo)
        t2=Thread(target=bar)
    
        t1.daemon = True
        t1.start()
        t2.start()		      #t2非守護線程,主線程等待子線程結束
        print("main-------")
    
    

線程互斥鎖:

  • join:
  • from threading import Thread
    import time
    x = 100
    
    def task(name):
        global x
        temp = x
        time.sleep(3)
        temp -= 1
        x = temp
    
    if __name__ == '__main__':
        t = Thread(target=task,args=("海洋",))
        t.start()
        t.join()
        print(f"主線程{x}")
        
    #多個線程搶占一個資源
    from threading import Thread
    import time
    x = 100
    
    def task(name):
        global x
        temp = x
        time.sleep(3)
        temp -= 1
        x = temp
    
    if __name__ == '__main__':
        tl = []
        for i in range(100):
            t = Thread(target=task,args=("海洋",))
            tl.append(t)
            t.start()
    
        for i in tl:
            i.join()
    
        print(f"主進程{x}")   #多個線程搶占一個資源
    
    

互斥鎖:

  • 所有線程串行執行,多個 線程共同搶占一個數據,保證了數據安全:

  • from threading import Thread
    from threading import Lock
    import time
    x = 100
    
    def task(lock):
        lock.acquire()
        global x
        temp = x
        time.sleep(0.1)
        temp -= 1
        x = temp
        lock.release()
    
    if __name__ == '__main__':
        lock = Lock()
        tl = []
        for i in range(100):
            t = Thread(target=task,args=(lock,))
            tl.append(t)
            t.start()
    
        for i in tl:
            i.join()
    
        print(f"主線程{x}")   #多個線程搶占一個資源,join讓主線程等待子線程執行完成在執行,結果0
    
    

線程死鎖現象:

  • 多個線程或者進程競爭資源,如果開啟的互斥鎖過多,遇到互相搶鎖造成互相等待情況,程序夯住,

  • 還有一種是給同時給一個線程或者進程連續加鎖多次,利用遞歸鎖解決Rlock

  • from threading import Thread
    from threading import Lock
    import time
    
    lock_A = Lock()
    lock_B = Lock()
    
    class Mtthread(Thread):
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            lock_A.acquire()
            print(f"{self.name}誰拿到A鎖")
    
            lock_B.acquire()
            print(f"{self.name}誰拿到B鎖")
            lock_B.release()
    
            lock_A.release()
    
        def f2(self):
            lock_B.acquire()
            print(f"{self.name}誰拿到B鎖")
    
            time.sleep(1)
            lock_A.acquire()
            print(f"{self.name}誰拿到A鎖")
            lock_A.release()
    
            lock_B.release()
    
    if __name__ == '__main__':
        t1 = Mtthread()
        t1.start()
    
        t2 = Mtthread()
        t2.start()
    
        t3 = Mtthread()
        t3.start()
        print(f"主進程")  
    
    

遞歸鎖:

  • 遞歸鎖上有引用次數,每次引用計數+1,解鎖計數-1,只有計數為0.在運行下個進程

  • #遞歸鎖:
    #遞歸鎖是一把鎖,鎖上有記錄,只要acquire一次,鎖上就計數一次,acquire2次就計數兩次
    #release 1次減一,只要遞歸鎖計數不為0,其他線程不能搶
    
    from threading import Thread
    from threading import RLock
    import time
    
    lock_A = lock_B = RLock()
    
    class Mtthread(Thread):
        def run(self):
            # lock_A.acquire()
            # lock_B.acquire()
            # print(111)
            # lock_A.release()
            # lock_B.release()
    
            self.f1()
            self.f2()
    
        def f1(self):
            lock_A.acquire()
            print(f"{self.name}誰拿到A鎖")
    
            lock_B.acquire()
            print(f"{self.name}誰拿到B鎖")
            lock_B.release()
    
            lock_A.release()
    
        def f2(self):
            lock_B.acquire()
            print(f"{self.name}誰拿到B鎖")
    
            time.sleep(1)
            lock_A.acquire()
            print(f"{self.name}誰拿到A鎖")
            lock_A.release()
    
            lock_B.release()
    
    if __name__ == '__main__':
        t1 = Mtthread()
        t1.start()
        t2 = Mtthread()
        t2.start()
        t3 = Mtthread()
        t3.start()
        print(f"主進程")  
    
    

信號量:

  • 信號量准許多個線程或者進程同時進入

  • from  threading import Thread
    from  threading import current_thread
    from  threading import Semaphore
    import time
    import random
    
    sm = Semaphore(4)
    
    def chi():
        sm.acquire()
        print(f"{current_thread().name}正在吃飯")
        time.sleep(random.randint(1,3))
        sm.release()
    
    if __name__ == '__main__':
        for i in range(20):
            t = Thread(target=chi)
            t.start()
    
    

GIL鎖:

  • 全局解釋器鎖,就是一把互斥鎖,將並發變成串行,同一時刻只能有一個線程進入解釋器,自動加鎖和釋放鎖,犧牲效率保護python解釋器內部數據安全

  • 優點:
    • 強行加鎖,保證解釋器里面的數據安全

  • 缺點:
    • 多進程可以利用多核,多進程的每個進程里面都有python解釋器程序

    • 單進程的多線程不能利用多核,python解釋器內部程序,不支持多線程同時解釋

  • 討論:
    • python-單核處理IO阻塞的多線程,java多核處理IO阻塞問題,效率差不多

    • 單核處理三個IO線程,多核處理三個IO線程,多核快些

  • 代碼的執行:
    • CPython獨有GIL鎖:
      • 將你的py文件當做實參傳送給解釋器傳換成c語言字節碼,在交給虛擬機轉換成010101機器碼,這些代碼都是線程執行,進程進行調度資源

    • lpython:交互式解釋器,可以補全代碼

    • Jpython:java語言字節碼,剩下的一樣

    • pypy:動態編譯,JAT技術,執行效率要比Cpython塊,但是技術還有缺陷bug

驗證Python開發效率:

  • 單核CPU:
    • 一核,都是單進程多線程並發快,因為單核開啟多進程也是串行。

  • 多核CPU:
    • 計算密集型:

      • 多進程的並行比多線程的並發執行效率高很多(因為不同進程運行在不同核心上,並行執行)

    • IO密集型:

      • 多線程要比多進程處理速度快,因為進程開銷大,而線程處理其實也是串行,只不過處理速度比進程更快些,線程一次只能處理一個事情(空間復用)

      • 開啟150個進程(開銷大,速度慢),執行IO任務耗時長

      • 開啟150個線程(開銷小,速度快),執行IO任務耗時短

  • 如果你的任務是io密集型並且任務數量大,用單進程下的多線程處理阻塞效率高

  • 計算密集型:
    • from multiprocessing import Process
      from threading import Thread
      import time
      import os
      # print(os.cpu_count())
      
      def task1():
          res = 1
          for i in range(1, 100000000):
              res += i
      def task2():
          res = 1
          for i in range(1, 100000000):
              res += i
      def task3():
          res = 1
          for i in range(1, 100000000):
              res += i
      def task4():
          res = 1
          for i in range(1, 100000000):
              res += i
      
      if __name__ == '__main__':
          # 四個進程 四個cpu 並行 效率
          start_time = time.time()
          p1 = Process(target=task1)
          p2 = Process(target=task2)
          p3 = Process(target=task3)
          p4 = Process(target=task4)
      
          p1.start()
          p2.start()
          p3.start()
          p4.start()
      
          p1.join()
          p2.join()
          p3.join()
          p4.join()
          print(f'主: {time.time() - start_time}')   # 10.125909328460693
      
      # 一個進程 四個線程
      #     start_time = time.time()
      #     p1 = Thread(target=task1)
      #     p2 = Thread(target=task2)
      #     p3 = Thread(target=task3)
      #     p4 = Thread(target=task4)
      #
      #     p1.start()
      #     p2.start()
      #     p3.start()
      #     p4.start()
      #
      #     p1.join()
      #     p2.join()
      #     p3.join()
      #     p4.join()
      #     print(f'主: {time.time() - start_time}')  # 22.927688121795654
      
      
  • 計算IO密集型:

    • from multiprocessing import Process
      from threading import Thread
      import time
      import os
      # print(os.cpu_count())
      
      def task1():
          res = 1
          time.sleep(3)
      
      if __name__ == '__main__':
      # 開啟150個進程(開銷大,速度慢),執行IO任務, 耗時 8.382229089736938
      #     start_time = time.time()
      #     l1 = []
      #     for i in range(150):
      #         p = Process(target=task1)
      #         l1.append(p)
      #         p.start()
      #     for i in l1:
      #         i.join()
      #     print(f'主: {time.time() - start_time}')
      
      # 開啟150個線程(開銷小,速度快),執行IO任務, 耗時 3.0261728763580322
      #     start_time = time.time()
      #     l1 = []
      #     for i in range(150):
      #         p = Thread(target=task1)
      #         l1.append(p)
      #         p.start()
      #     for i in l1:
      #         i.join()
      #     print(f'主: {time.time() - start_time}') 
      
      

GIL鎖和互斥鎖關系:

  • 線程計算密集型:
    • 當程序執行,開啟100個線程時,第一個線程先要拿到GIL鎖,然后拿到lock鎖,執行代碼,釋放lock鎖,最后釋放GIL鎖
  • 線程IO密集型:
    • 當程序執行,開啟100個線程時,第一個線程先要拿到GIL鎖,然后拿到lock鎖,遇到阻塞,CPU切走,GIL釋放,第一個線程掛起

    • 第二個線程執行,搶到GIL鎖,進入要搶lock,但是lock鎖還沒釋放,阻塞掛起

  • 自己加互斥鎖,一定要加在處理共享數據的地方,加的范圍不要擴大,范圍過大,影響並發

  • GIL鎖單進程的多線程不能利用多核,不能並行,但是可以並發

  • 互斥鎖:
    • GIL自動上鎖解鎖,文件中的互斥鎖Lock,手動上鎖解鎖

    • GIL鎖,保護解釋器的數據安全,互斥鎖是保護的文件的數據安全

線程池:

  • 線程池在系統啟動時創建了大量的空閑線程,線程執行直接調用線程池中已經開啟好的空閑線程,當線程執行結束,該線程不會死亡,而是將線程變成空閑狀態,放回進程池。

  • 線程池提高效率,資源復用

  • 進程池:放置進程的一個容器

  • 線程池:放置線程的一個容器

  • 完成一個簡單的socket通信,服務端必須與一個客戶端交流完畢,並且這個客戶端斷開連接之后,服務端才能接待下一個客戶:

  • #開啟進程池或者線程池:
    #線程池好還是進程池好:io阻塞或者計算密集型
    from  concurrent.futures import ProcessPoolExecutor
    from  concurrent.futures import ThreadPoolExecutor
    import time
    import os
    import random
    
    def task(name):
        # print(name)
        print(f"{os.getpid()}准備接客")
        time.sleep(random.randint(1,3))
    
    if __name__ == '__main__':
        # p = ProcessPoolExecutor(max_workers=5)  #限制進程數量,默認為cpu個數
        p = ThreadPoolExecutor()  				  #線程默認是CPU個數的五倍
    
        for i in range(23):
            p.submit(task,1)                      #給進程池放置任務啟動,1為傳參
    
    

阻塞,非阻塞:

  • 程序運行中的狀態,阻塞,運行,就緒

  • 阻塞:當你程序遇到IO阻塞掛起,CPU切換,等到IO結束之后再執行

  • 非阻塞:程序沒有IO,或者遇到IO通過某種手段讓cpu去執行其他任務,盡可能的占用CPU

同步:

  • 任務發出去之后等待,直到這個任務最終結束之后,給我一個返回值,發布下一個任務

  • 同步示例:
  • from concurrent.futures import ProcessPoolExecutor
    import os
    import time
    import random
    
    def task():
        print(f"{os.getpid()}is running")
        time.sleep(1)
        return f'{os.getpid()} is finish'
    
    if __name__ == '__main__':
        p = ProcessPoolExecutor(4)
    
        for i in range(10):
            obj = p.submit(task,)
            print(obj.result())      #同步等待一個進程內容全部執行完成在執行下一個
    
    

異步:

  • 將任務發給進程,不管任務如何,直接運行下一個

  • 異步示例:
  • from concurrent.futures import ProcessPoolExecutor
    import os
    import time
    import random
    
    def task():
        print(f'{os.getpid()} is running')
        time.sleep(random.randint(0,2))
        return f'{os.getpid()} is finish'
    
    if __name__ == '__main__':
        p = ProcessPoolExecutor(4)
        obj_l1 = []
        for i in range(10):
            obj = p.submit(task,)   # 異步發出.
            obj_l1.append(obj)
    
        # time.sleep(3)
        p.shutdown(wait=True)
        # 1. 阻止在向進程池投放新任務,
        # 2. wait = True 十個任務是10,一個任務完成了-1,直至為零.進行下一行.
        for i in obj_l1:
            print(i.result())
    
    

異步+回調機制:

  • 異步發布任務,就不管任務結果
  • 回調:
    • 回調是你異步發布任務執行完成后,將結果丟給回調函數add_done_callback,回調函數幫你分析結果,進程繼續完成下一個任務
    • 回調就是對特定的事件或者條件進行響應

  • 爬蟲:游覽器做的事情很簡單:
    • 瀏覽器 封裝頭部,發送一個請求--->www.taobao.com ----> 服務器獲取到請求信息,分析正確--->給你返回一個文件,--->游覽器將這個文件的代碼渲染,就成了你看的樣子

    • 爬蟲:利用reauests模塊功能模擬游覽器封裝頭,給服務器發送一個請求,騙過服務器之后,服務器也會給你返回一個文件,爬蟲拿到文件,進行數據清洗獲取到你想要的信息

  • 爬蟲分兩步:
    • 第一步:爬取服務器端的文件(IO阻塞)

    • 第二部:拿到文件,進行數據分析(非IO,IO極少)

  • 錯誤版本示例:
    • import requests
      from concurrent.futures import ProcessPoolExecutor
      from multiprocessing import Process
      import time
      import random
      import os
      
      def get(url):
          response = requests.get(url)
          print(f'{os.getpid()} 正在爬取:{url}')
          time.sleep(random.randint(1,3))
          if response.status_code == 200:
              return response.text
      
      def parse(text):
          print(f'{os.getpid()} 分析結果:{len(text)}')
      
      if __name__ == '__main__':
          url_list = [
              'http://www.taobao.com',
              'http://www.JD.com',
              'http://www.JD.com',
              'http://www.JD.com',
              'http://www.baidu.com',
              'https://www.cnblogs.com/jin-xin/articles/11232151.html',
              'https://www.cnblogs.com/jin-xin/articles/10078845.html',
              'http://www.sina.com.cn',
              'https://www.sohu.com',
              'https://www.youku.com',
          ]
          pool = ProcessPoolExecutor(4)
          obj_list = []
          for url in url_list:
              obj = pool.submit(get, url)
              obj_list.append(obj)
      
          pool.shutdown(wait=True)
      
          for obj in obj_list:          #抓取網頁是串行,輸出的結果
              parse(obj.result())
      
      #爬取一個網頁需要2s,並發爬取10個網頁:2.多s.
      #分析任務: 1s.    10s. 總共12.多秒.
       
      # 現在這個版本的過程:
      # 異步發出10個爬取網頁的任務,然后4個進程並發(並行)的先去完成4個爬取網頁的任務,然后誰先結束,誰進行下一個
      # 爬取任務,直至10個任務全部爬取成功.
      # 將10個爬取結果放在一個列表中,串行的分析.
      
      
      import requests
      from concurrent.futures import ProcessPoolExecutor
      from multiprocessing import Process
      import time
      import random
      import os
      
      def get(url):
          response = requests.get(url)
          print(f'{os.getpid()} 正在爬取:{url}')
          time.sleep(random.randint(1,3))
          if response.status_code == 200:
              parse(response.text)
      
      def parse(text):
          print(f'{os.getpid()} 分析結果:{len(text)}')
      
      if __name__ == '__main__':
          url_list = [
              'http://www.taobao.com',
              'http://www.JD.com',
              'http://www.JD.com',
              'http://www.JD.com',
              'http://www.baidu.com',
              'https://www.cnblogs.com/jin-xin/articles/11232151.html',
              'https://www.cnblogs.com/jin-xin/articles/10078845.html',
              'http://www.sina.com.cn',
              'https://www.sohu.com',
              'https://www.youku.com',
          ]
          pool = ProcessPoolExecutor(4)
          for url in url_list:
              obj = pool.submit(get, url)
      
          # pool.shutdown(wait=True)
          print('主')
      #異步發出10個 爬取網頁+分析 的任務,然后4個進程並發(並行)的先去完成4個爬取網頁+分析 的任務,
      #然后誰先結束,誰進行下一個 爬取+分析 任務,直至10個爬取+分析 任務全部完成成功.
      
      
      
      
  • 正確版本示例:
    • import requests
      from concurrent.futures import ProcessPoolExecutor
      from multiprocessing import Process
      import time
      import random
      import os
      
      def get(url):
          response = requests.get(url)
          print(f'{os.getpid()} 正在爬取:{url}')
          if response.status_code == 200:
              return response.text
      
      def parse(obj):
          time.sleep(1)
          print(f'{os.getpid()} 分析結果:{len(obj.result())}')
      
      if __name__ == '__main__':
      
          url_list = [
              'http://www.taobao.com',
              'http://www.JD.com',
              'http://www.JD.com',
              'http://www.JD.com',
              'http://www.baidu.com',
              'https://www.cnblogs.com/jin-xin/articles/11232151.html',
              'https://www.cnblogs.com/jin-xin/articles/10078845.html',
              'http://www.sina.com.cn',
              'https://www.sohu.com',
              'https://www.youku.com',
          ]
          start_time = time.time()
          pool = ProcessPoolExecutor(4)
          for url in url_list:
              obj = pool.submit(get, url)
              obj.add_done_callback(parse)
              # 增加一個回調函數
              # 現在的進程完成的還是網絡爬取的任務,拿到了返回值之后,結果丟給回調函數add_done_callback,
              # 回調函數幫助你分析結果
              # 進程繼續完成下一個任務.
          pool.shutdown(wait=True)   #阻止發布新的任務,代替join
      
          print(f'主: {time.time() - start_time}')
          
      # 回調函數是主進程幫助你實現的, 回調函數幫你進行分析任務. 明確了進程的任務: 只有一個網絡爬取.
      # 分析任務: 回調函數執行了.對函數之間解耦.
      
      # 極值情況: 如果回調函數是IO任務,那么由於你的回調函數是主進程做的,所以有可能影響效率.
      
      # 回調不是萬能的,如果回調的任務是IO,
      # 那么異步 + 回調機制 不好.此時如果你要效率只能犧牲開銷,再開一個線程進程池.
      
      

隊列模式:

  • FIFO 先進先出原則:
    • import queue
      q = queue.Queue(3)
      q.put(1)
      q.put(2)
      q.put('海洋')
      
      print(q.get())
      print(q.get())
      print(q.get())
      
      
  • LIFO 棧.-先進后出:
    • import queue
      q = queue.LifoQueue()
      q.put(1)
      q.put(3)
      q.put('海洋')
      
      print(q.get())
      print(q.get())
      print(q.get())
      
      
  • 優先級隊列:
    • # 需要元組的形式,(int,數據) int 代表優先級,數字越低,優先級越高.
      import queue
      q = queue.PriorityQueue(3)
      
      q.put((10, '垃圾消息'))
      q.put((-9, '緊急消息'))
      q.put((3, '一般消息'))
      
      print(q.get())
      print(q.get())
      print(q.get())
      
      

事件Event:

  • 並發的執行某個任務,多進程多線程,幾乎同時執行,一個線程執行到中間時,通知另一個線程開始執行

  • import time
    from threading import Thread
    from threading import current_thread
    from threading import Event
    			
    event = Event()  # 默認是False
    def task():
        print(f'{current_thread().name} 檢測服務器是否正常開啟....')
        time.sleep(3)   # 先運行task阻塞三秒,在將event修改為True
        event.set()     # 改成了True
    
    def task1():
        print(f'{current_thread().name} 正在嘗試連接服務器')
        # event.wait()  # 輪詢檢測event是否為True,當其為True,繼續下一行代碼. 阻塞
        event.wait(1)
        # 設置超時時間,如果1s中以內,event改成True,代碼繼續執行.
        # 設置超時時間,如果超過1s中,event沒做改變,代碼繼續執行.
        print(f'{current_thread().name} 連接成功')
        
    if __name__ == '__main__':
        t1 = Thread(target=task1,)
        t2 = Thread(target=task1,)
        t3 = Thread(target=task1,)
    
        t = Thread(target=task)
        t.start()
    
        t1.start()
        t2.start()
        t3.start()
    
    

協程:

  • 協程的本質也是一個線程,而使用協程目的是為了減少系統開銷,協程是我們通過程序來控制任務切換,協程速度比系統更快,最大限度的利用CPU,更加輕量級

  • 線程協程的區別:

    • 協程沒有鎖,協程又稱微線程
    • 線程和協程不同的是,線程是搶占式調度切換,而協程是需要自己調度
    • 線程和進程,調度是CPU決定的,而協程就是上帝,在一個線程中規定某個代碼塊的執行順序

  • 1,協程切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,更加輕量級

  • 2.單線程內就可以實現並發的效果,最大限度的利用CPU

  • 3.修改共享的數據不需要加鎖

  • 協程就像線程一樣也是在多任務間來回切換

  • 在其他語言中,協程的意義不大,多線程即可以解決I/O問題,在python中有GIL鎖,在同一時間只有一個線程在工作,所以一個線程里面IO操作特別多,協程比較適用

  • 串行:多個任務執行時,一個任務從開始執行,遇到IO等待,等待IO阻塞結束之后再執行下一個

  • 並行:多核多個線程或者進程同時執行,四個CPU同時執行四個任務

  • 並發:多個任務看起來是同時執行,CPU在多個任務之間來回切換,遇到IO阻塞,計算密集型執行時間過長

    • 並發本質:遇到IO阻塞,計算密集型執行時間過長,保持原來的狀態

  • 一個線程實現開發:

    • 多進程:操作系統控制,多個進程的多個任務切換 + 保持狀態

    • 多線程程:操作系統控制,多個線程的多個任務切換 + 保持狀態

    • 協程:程序控制一個線程的多個任務的切換以及保持狀態

      • 微並發,處理任務不宜過多

      • 協程他會調度CPU,如果協程管控的任務中,遇到阻塞,他會快速的(比操作系統快),切換到另一個任務,並且能將上一個任務掛起(保持狀態),讓操作系統以為CPU一直在工作

  • 串行和協程對比:
    • 密集型數據串行和協程對比,肯定串行速度快,因為協程運行還要來回切換
    • import time
      def task1():
          res = 1
          for i in range(1,100000):
              res += i
      
      def task2():
          res = 1
          for i in range(1,100000):
              res -= i
      
      start_time = time.time()
      task1()
      task2()
      print(f'串行消耗時間:{time.time()-start_time}')  # 串行消耗時間:0.012489557266235352
      
      
      def task1():
          res = 1
          for i in range(1, 100000):
              res += i
              yield res
      
      def task2():
          g = task1()
          res = 1
          for i in range(1, 100000):
              res -= i
              next(g)
      
      start_time = time.time()
      task2()
      print(f'協程消耗時間:{time.time() - start_time}')  # 協程消耗時間:0.02991938591003418
      
      
  • 開啟協程:
    • 遇到gevent阻塞切換:
    • import gevent
      import time
      def eat(name):
          print('%s eat 1' %name)     # 1
          gevent.sleep(2)              #協程識別gevent,可以進行IO切換
          # time.sleep(300)            #協程不識別切換不了,不可切換
          print('%s eat 2' %name)     # 4
      
      def play(name):
          print('%s play 1' %name)    # 2
          gevent.sleep(1)
          # time.sleep(3)
          print('%s play 2' %name)    # 3
      
      g1 = gevent.spawn(eat, '海洋')
      g2 = gevent.spawn(play, name='俊麗')   #協程異步發布任務
      # g1.join()
      # g2.join()
      #或者gevent.joinall([g1,g2])
      gevent.joinall([g1,g2])                #主線程等待協程執行完畢
      print('主')                            #5
      
      
    • 所有IO阻塞都可以切換:
    • import threading
      from gevent import monkey
      monkey.patch_all()         # 將你代碼中的所有的IO都標識.
      
      import gevent              # 直接導入即可
      import time
      
      def eat():
          print(f'線程1:{threading.current_thread().getName()}')    # 1
          print('eat food 1')                                      # 2
          time.sleep(3)          # 加上mokey就能夠識別到time模塊的sleep了
          print('eat food 2')                                      # 6
      
      def play():
          print(f'線程2:{threading.current_thread().getName()}')    # 3
          print('play 1')                                          # 4
          time.sleep(1)  
          # 來回切換,直到一個I/O的時間結束,這里都是我們個gevent做得,不再是控制不了的操作系統了。
          print('play 2')                                          # 5
      
      g1=gevent.spawn(eat)
      g2=gevent.spawn(play)
      gevent.joinall([g1,g2])
      print(f'主:{threading.current_thread().getName()}')          # 7
      


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM