Python-並發編程(線程)


  之前我們說了並發編程中的進程問題,幾天我們來聊聊並發編程中的線程問題.

一.背景知識

  1.進程

    之前我們已經了解了操作系統中進程的概念,程序並不能單獨運行,只有將程序裝載到內存中,系統為它分配資源才能運行,而這種執行的程序就稱之為進程。程序和進程的區別就在於:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬於動態概念。在多道編程中,我們允許多個程序同時加載到內存中,在操作系統的調度下,可以實現並發地執行。這是這樣的設計,大大提高了CPU的利用率。進程的出現讓每個用戶感覺到自己獨享CPU,因此,進程就是為了在CPU上實現多道編程而提出的。

  2.有了進程為什么還要線程

#什么是線程:
#指的是一條流水線的工作過程,關鍵的一句話:一個進程內最少自帶一個線程,其實進程根本不能執行,進程不是執行單位,是資源的單位,分配資源的單位
#線程才是執行單位
#進程:做手機屏幕的工作過程,剛才講的
#我們的py文件在執行的時候,如果你站在資源單位的角度來看,我們稱為一個主進程,如果站在代碼執行的角度來看,它叫做主線程,只是一種形象的說法,其實整個代碼的執行過程成為線程,也就是干這個活兒的本身稱為線程,但是我們后面學習的時候,我們就稱為線程去執行某個任務,其實那某個任務的執行過程稱為一個線程,一條流水線的執行過程為線程

#進程vs線程
#1 同一個進程內的多個線程是共享該進程的資源的,不同進程內的線程資源肯定是隔離的
#2 創建線程的開銷比創建進程的開銷要小的多


#並發三個任務:1啟動三個進程:因為每個進程中有一個線程,但是我一個進程中開啟三個線程就夠了
#同一個程序中的三個任務需要執行,你是用三個進程好 ,還是三個線程好?
#例子:
    # pycharm 三個任務:鍵盤輸入  屏幕輸出  自動保存到硬盤
    #如果三個任務是同步的話,你鍵盤輸入的時候,屏幕看不到
    #咱們的pycharm是不是一邊輸入你邊看啊,就是將串行變為了三個並發的任務
    #解決方案:三個進程或者三個線程,哪個方案可行。如果是三個進程,進程的資源是不是隔離的並且開銷大,最致命的就是資源隔離,但是用戶輸入的數據還要給另外一個進程發送過去,進程之間能直接給數據嗎?你是不是copy一份給他或者通信啊,但是數據是同一份,我們有必要搞多個進程嗎,線程是不是共享資源的,我們是不是可以使用多線程來搞,你線程1輸入的數據,線程2能不能看到,你以后的場景還是應用多線程多,而且起線程我們說是不是很快啊,占用資源也小,還能共享同一個進程的資源,不需要將數據來回的copy!

    進程有很多優點,它提供了多道編程,讓我們感覺我們每個人都擁有自己的CPU和其他資源,可以提高計算機的利用率。很多人就不理解了,既然進程這么優秀,為什么還要線程呢?其實,仔細觀察就會發現進程還是有很多缺陷的,主要體現在兩點上:

      1.進程只能在一個時間干一件事,如果想同時干兩件事或多件事,進程就無能為力了。

      2.進程在執行的過程中如果阻塞,例如等待輸入,整個進程就會掛起,即使進程中有些工作不依賴於輸入的數據,也將無法執行。

    如果這兩個缺點理解比較困難的話,舉個現實的例子也許你就清楚了:如果把我們上課的過程看成一個進程的話,那么我們要做的是耳朵聽老師講課,手上還要記筆記,腦子還要思考問題,這樣才能高效的完成聽課的任務。而如果只提供進程這個機制的話,上面這三件事將不能同時執行,同一時間只能做一件事,聽的時候就不能記筆記,也不能用腦子思考,這是其一;如果老師在黑板上寫演算過程,我們開始記筆記,而老師突然有一步推不下去了,阻塞住了,他在那邊思考着,而我們呢,也不能干其他事,即使你想趁此時思考一下剛才沒聽懂的一個問題都不行,這是其二。

    現在你應該明白了進程的缺陷了,而解決的辦法很簡單,我們完全可以讓聽、寫、思三個獨立的過程,並行起來,這樣很明顯可以提高聽課的效率。而實際的操作系統中,也同樣引入了這種類似的機制——線程。

  3.線程的出現

    60年代,在OS中能擁有資源和獨立運行的基本單位是進程,然而隨着計算機技術的發展,進程出現了很多弊端,一是由於進程是資源擁有者,創建、撤消與切換存在較大的時空開銷,因此需要引入 輕型進程;二是由於對稱多處理機(SMP)出現, 可以滿足多個運行單位,而多個進程並行開銷過大。
      因此在80年代,出現了 能獨立運行的基本單位——線程(Threads)
      注意:進程是資源分配的最小單位,線程是CPU調度的最小單位.
       每一個進程中至少有一個線程。 

    在傳統操作系統中,每個進程有一個地址空間,而且默認就有一個控制線程

    線程顧名思義,就是一條流水線工作的過程,一條流水線必須屬於一個車間,一個車間的工作過程是一個進程

    車間負責把資源整合到一起,是一個資源單位,而一個車間內至少有一個流水線

    流水線的工作需要電源,電源就相當於cpu

    所以,進程只是用來把資源集中到一起(進程只是一個資源單位,或者說資源集合),而線程才是cpu上的執行單位。

    多線程(即多個控制線程)的概念是,在一個進程中存在多個控制線程,多個控制線程共享該進程的地址空間,相當於一個車間內有多條流水線,都共用一個車間的資源。

    例如,北京地鐵與上海地鐵是不同的進程,而北京地鐵里的13號線是一個線程,北京地鐵所有的線路共享北京地鐵所有的資源,比如所有的乘客可以被所有線路拉。

二.進程與線程的關系 

    線程與進程的區別可以歸納為以下4點:
      1)地址空間和其它資源(如打開文件):進程間相互獨立,同一進程的各線程間共享。某進程內的線程在其它進程不可見。
      2)通信:進程間通信IPC,線程間可以直接讀寫進程數據段(如全局變量)來進行通信——需要進程同步和互斥手段的輔助,以保證數據的一致性。(就類似進程中的鎖的作用)
      3)調度和切換:線程上下文切換比進程上下文切換要快得多。
      4)在多線程操作系統中(現在咱們用的系統基本都是多線程的操作系統),進程不是一個可執行的實體,真正去執行程序的不是進程,是線程,你可以理解進程就是一個線程的容器。

三.線程的特點

  先簡單了解一下線程有哪些特點,里面的堆棧啊主存區啊什么的后面會講,大家先大概了解一下就好啦。

  在多線程的操作系統中,通常是在一個進程中包括多個線程,每個線程都是作為利用CPU的基本單位,是花費最小開銷的實體。線程具有以下屬性。
    1)輕型實體
      線程中的實體基本上不擁有系統資源,只是有一些必不可少的、能保證獨立運行的資源。
      線程的實體包括程序、數據和TCB。線程是動態概念,它的動態特性由線程控制塊TCB(Thread Control Block)描述。
TCB包括以下信息:
(1)線程狀態。
(2)當線程不運行時,被保存的現場資源。
(3)一組執行堆棧。
(4)存放每個線程的局部變量主存區。
(5)訪問同一個進程中的主存和其它資源。
用於指示被執行指令序列的程序計數器、保留局部變量、少數狀態參數和返回地址等的一組寄存器和堆棧。

    2)獨立調度和分派的基本單位。

    在多線程OS中,線程是能獨立運行的基本單位,因而也是獨立調度和分派的基本單位。由於線程很“輕”,故線程的切換非常迅速且開銷小(在同一進程中的)。
    3)共享進程資源。
    線程在同一進程中的各個線程,都可以共享該進程所擁有的資源,這首先表現在:所有線程都具有相同的進程id,這意味着,線程可以訪問該進程的每一個內存資源;此外,還可以訪問進程所擁有的已打開文件、定時器、信號量機構等。由於同一個進程內的線程共享內存和文件,所以線程之間互相通信不必調用內核。
    4)可並發執行。
    在一個進程中的多個線程之間,可以並發執行,甚至允許在一個進程中所有線程都能並發執行;同樣,不同進程中的線程也能並發執行,充分利用和發揮了處理機與外圍設備並行工作的能力。

四.線程的實際應用場景

  

 

  開啟一個字處理軟件進程,該進程肯定需要辦不止一件事情,比如監聽鍵盤輸入,處理文字,定時自動將文字保存到硬盤,這三個任務操作的都是同一塊數據,因而不能用多進程。只能在一個進程里並發地開啟三個線程,如果是單線程,那就只能是,鍵盤輸入時,不能處理文字和自動保存,自動保存時又不能輸入和處理文字。

之前我們將的socket是不是通過多進程去實現過呀,如果有500個人同時和我聊天,那我是不是要起500進程啊,能行嗎?不好,對不對,那么怎么辦,我就可以開幾個進程,然后每個進程里面開多個線程來處理多個請求和通信。再舉例:我用qq是一個進程,然后我和一個人聊天的時候,是不是還可以去接收別人給我發的消息啊,這個是不是並行的啊,就類似我一個進程開了多個線程來幫我並發接收消息。

五.內存中的線程

    多個線程共享同一個進程的地址空間中的資源,是對一台計算機上多個進程的模擬,有時也稱線程為輕量級的進程。

    而對一台計算機上多個進程,則共享物理內存、磁盤、打印機等其他物理資源。多線程的運行也多進程的運行類似,是cpu在多個線程之間的快速切換。

    不同的進程之間是充滿敵意的,彼此是搶占、競爭cpu的關系,如果迅雷會和QQ搶資源。而同一個進程是由一個程序員的程序創建,所以同一進程內的線程是合作關系,一個線程可以訪問另外一個線程的內存地址,大家都是共享的,一個線程干死了另外一個線程的內存,那純屬程序員腦子有問題。

    類似於進程,每個線程也有自己的堆棧,不同於進程,線程庫無法利用時鍾中斷強制線程讓出CPU,可以調用thread_yield運行線程自動放棄cpu,讓另外一個線程運行。

    線程通常是有益的,但是帶來了不小程序設計難度,線程的問題是:

      1. 父進程有多個線程,那么開啟的子線程是否需要同樣多的線程

      2. 在同一個進程中,如果一個線程關閉了文件,而另外一個線程正准備往該文件內寫內容呢?

    因此,在多線程的代碼中,需要更多的心思來設計程序的邏輯、保護程序的數據。

六.python與線程

  1.全局解釋器鎖GIL(用一下threading模塊之后再來看~~)

    Python代碼的執行由Python虛擬機(也叫解釋器主循環)來控制。Python在設計之初就考慮到要在主循環中,同時只有一個線程在執行。雖然 Python 解釋器中可以“運行”多個線程,但在任意時刻只有一個線程在解釋器中運行。
      對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。

      在多線程環境中,Python 虛擬機按以下方式執行:

        a、設置 GIL;

        b、切換到一個線程去運行;

        c、運行指定數量的字節碼指令或者線程主動讓出控制(可以調用 time.sleep(0));

        d、把線程設置為睡眠狀態;

        e、解鎖 GIL;

        d、再次重復以上所有步驟。
      在調用外部代碼(如 C/C++擴展函數)的時候,GIL將會被鎖定,直到這個函數結束為止(由於在這期間沒有Python的字節碼被運行,所以不會做線程切換)編寫擴展的程序員可以主動解鎖GIL。

   2.python線程模塊的選擇

    Python提供了幾個用於多線程編程的模塊,包括thread、threading和Queue等。thread和threading模塊允許程序員創建和管理線程。thread模塊提供了基本的線程和鎖的支持,threading提供了更高級別、功能更強的線程管理的功能。Queue模塊允許用戶創建一個可以用於多個線程之間共享數據的隊列數據結構。
    避免使用thread模塊,因為更高級別的threading模塊更為先進,對線程的支持更為完善,而且使用thread模塊里的屬性有可能會與threading出現沖突;其次低級別的thread模塊的同步原語很少(實際上只有一個),而threading模塊則有很多;再者,thread模塊中當主線程結束時,所有的線程都會被強制結束掉,沒有警告也不會有正常的清除工作,至少threading模塊能確保重要的子線程退出后進程才退出。 

    就像我們熟悉的time模塊,它比其他模塊更加接近底層,越是接近底層,用起來越麻煩,就像時間日期轉換之類的就比較麻煩,但是后面我們會學到一個datetime模塊,提供了更為簡便的時間日期處理方法,它是建立在time模塊的基礎上來的。又如socket和socketserver(底層還是用的socket)等等,這里的threading就是thread的高級模塊。

    thread模塊不支持守護線程,當主線程退出時,所有的子線程不論它們是否還在工作,都會被強行退出。而threading模塊支持守護線程,守護線程一般是一個等待客戶請求的服務器,如果沒有客戶提出請求它就在那等着,如果設定一個線程為守護線程,就表示這個線程是不重要的,在進程退出的時候,不用等待這個線程退出。

七.Threading模塊

  multiprocess模塊的完全模仿了threading模塊的接口,二者在使用層面,有很大的相似性,因而不再詳細介紹

  我們先簡單應用一下threading模塊來看看並發效果:

import time
from threading import Thread
#多線程並發,是不是看着和多進程很類似
def func(n):
    time.sleep(1)
    print(n)

#並發效果,1秒打印出了所有的數字
for i in range(10):
    t = Thread(target=func,args=(i,))
    t.start()

  1.線程創建方式

#方式1
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('太白',))
    t.start()
    print('主線程')
#方式2
import time
from threading import Thread
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('太白')
    t.start()
    print('主線程')

  2.多線程與多進程

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    #part1:在主進程下開啟多個線程,每個線程都跟主進程的pid一樣
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主線程/主進程pid',os.getpid())

    #part2:開多個進程,每個進程都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主線程/主進程pid',os.getpid())

  那么哪些東西存在進程里,那些東西存在線程里呢?

進程:導入的模塊、執行的python文件的文件所在位置、內置的函數、文件里面的這些代碼、全局變量等等,然后線程里面有自己的堆棧(類似於一個列表,后進先出)和寄存器,里面存着自己線程的變量,操作(add)等等,占用的空間很小。

from threading import Thread
from multiprocessing import Process
import os
import time
def work():
    print('hello')

if __name__ == '__main__':
    s1 = time.time()
    #在主進程下開啟線程
    t=Thread(target=work)
    t.start()
    t.join()
    t1 = time.time() - s1
    print('進程的執行時間:',t1)
    print('主線程/主進程')
    '''
    打印結果:
    hello
    進程的執行時間: 0.0
    主線程/主進程
    '''

    s2 = time.time()
    #在主進程下開啟子進程
    t=Process(target=work)
    t.start()
    t.join()
    t2 = time.time() - s2
    print('線程的執行時間:', t2)
    print('主線程/主進程')
    '''
    打印結果:
    hello
    線程的執行時間: 0.5216977596282959
    主線程/主進程
    '''

  上面是進程與線程開啟效率的比較.

  而線程中內存數據是共享的.

from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n  #修改全局變量的值
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('主',n) #毫無疑問子進程p已經將自己的全局的n改成了0,但改的僅僅是它自己的,查看父進程的n仍然為100


    n=1
    t=Thread(target=work)
    t.start()
    t.join()   #必須加join,因為主線程和子線程不一定誰快,一般都是主線程快一些,所有我們要等子線程執行完畢才能看出效果
    print('主',n) #查看結果為0,因為同一進程內的線程之間共享進程內的數據
# 通過一個global就實現了全局變量的使用,不需要進程的IPC通信方法

  在這里我們簡單總結一下:

    進程是最小的內存分配單位

    線程是操作系統調度的最小黨委

    線程被CPU執行了

    進程內至少含有一個線程

    進程中可以開啟多個線程 

      開啟一個線程所需要的時間要遠小於開啟一個進程

      多個線程內部有自己的數據棧,數據不共享

      全局變量在多個線程之間是共享的

  3.多線程實現socket

    server端

import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
    while True:
        data=conn.recv(1024)
        print(data)
        msg = input('服務端輸入:') #在多線程里面可以使用input輸入內容,那么就可以實現客戶端和服務端的聊天了,多進程不能輸入
        conn.send(bytes(msg,encoding='utf-8'))

if __name__ == '__main__':

    while True:
        conn,addr=s.accept()
        p=threading.Thread(target=action,args=(conn,))
        p.start()

    client端

import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue

    s.send(msg.encode('utf-8'))
    data=s.recv(1024)
    print(data)

    在socket通信里面是不是有大量的I/O啊,recv、accept等等,我們使用多線程效率更高,因為開銷小。

  4.Thread類的其他方法

Thread實例對象的方法
  # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名。
  # setName(): 設置線程名。

threading模塊提供的一些方法:
  # threading.currentThread(): 返回當前的線程變量。
  # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
  # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果

    代碼示例

from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())

if __name__ == '__main__':
    #在主進程下開啟線程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread())#主線程對象
    print(threading.current_thread().getName()) #主線程名稱
    print(threading.current_thread().ident) #主線程ID
    print(threading.get_ident()) #主線程ID
    print(threading.enumerate()) #連同主線程在內有兩個運行的線程
    print(threading.active_count())
    print('主線程/主進程')

    '''
    打印結果:
    <_MainThread(MainThread, started 14104)>
    MainThread
    14104
    14104
    [<_MainThread(MainThread, started 14104)>, <Thread(Thread-1, started 17976)>]
    2
    主線程/主進程
    Thread-1
    '''

    join方法

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('太白',))
    t2=Thread(target=sayhi,args=('alex',))
    t.start()
    t2.start()
    t.join()  #因為這個線程用了join方法,主線程等待子線程的運行結束

    print('主線程')
    print(t.is_alive())  #所以t這個線程肯定是執行結束了,結果為False
    print(t2.is_alive()) #有可能是True,有可能是False,看子線程和主線程誰執行的快
    '''
    egon say hello
    主線程
    False
    '''

  5.守護線程

  無論是進程還是線程,都遵循:守護xx會等待主xx運行完畢后被銷毀。需要強調的是:運行完畢並非終止運行

  詳細解釋

#1 主進程在其代碼結束后就已經算運行完畢了(守護進程在此時就被回收),然后主進程會一直等非守護的子進程都運行完畢后回收子進程的資源(否則會產生僵屍進程),才會結束,
#2 主線程在其他非守護線程運行完畢后才算運行完畢(守護線程在此時就被回收)。因為主線程的結束意味着進程的結束,進程整體的資源都將被回收,而進程必須保證非守護線程都運行完畢后才能結束,因為進程執行結束是要回收資源的,所有必須確保你里面的非守護子線程全部執行完畢。

  守護線程示例1

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('taibai',))
    t.setDaemon(True) #必須在t.start()之前設置
    t.start()

    print('主線程')
    print(t.is_alive())
    '''
    主線程
    True
    '''

  示例2

from threading import Thread
from multiprocessing import Process
import time
def func1():
    while True:
        print(666)
        time.sleep(0.5)
def func2():
    print('hello')
    time.sleep(3)

if __name__ == '__main__':
    # t = Thread(target=func1,)
    # t.daemon = True  #主線程結束,守護線程隨之結束
    # # t.setDaemon(True) #兩種方式,和上面設置守護線程是一樣的
    # t.start()
    # t2 = Thread(target=func2,) #這個子線程要執行3秒,主線程的代碼雖然執行完了,但是一直等着子線程的任務執行完畢,主線程才算完畢,因為通過結果你會發現我主線程雖然代碼執行完畢了,\
    # 但是主線程的的守護線程t1還在執行,說明什么,說明我的主線程還沒有完畢,只不過是代碼執行完了,一直等着子線程t2執行完畢,我主線程的守護線程才停止,說明子線程執行完畢之后,我的主線程才執行完畢
    # t2.start()
    # print('主線程代碼執行完啦!')
    p = Process(target=func1,)
    p.daemon = True
    p.start()

    p2 = Process(target=func2,)
    p2.start()
    time.sleep(1) #讓主進程等1秒,為了能看到func1的打印效果
    print('主進程代碼執行完啦!') #通過結果你會發現,如果主進程的代碼運行完畢了,那么主進程就結束了,因為主進程的守護進程p隨着主進程的代碼結束而結束了,守護進程被回收了,這和線程是不一樣的,主線程的代碼完了並不代表主線程運行完畢了,需要等着所有其他的非守護的子線程執行完畢才算完畢

八.鎖

  1.GIL鎖(Global Interpreter Lock)

    首先,一些語言(java、c++、c)是支持同一個進程中的多個線程是可以應用多核CPU的,也就是我們會聽到的現在4核8核這種多核CPU技術的牛逼之處。那么我們之前說過應用多進程的時候如果有共享數據是不是會出現數據不安全的問題啊,就是多個進程同時一個文件中去搶這個數據,大家都把這個數據改了,但是還沒來得及去更新到原來的文件中,就被其他進程也計算了,導致數據不安全的問題啊,所以我們是不是通過加鎖可以解決啊,多線程大家想一下是不是一樣的,並發執行就是有這個問題。但是python最早期的時候對於多線程也加鎖,但是python比較極端的(在當時電腦cpu確實只有1核)加了一個GIL全局解釋鎖,是解釋器級別的,鎖的是整個線程,而不是線程里面的某些數據操作,每次只能有一個線程使用cpu,也就說多線程用不了多核,但是他不是python語言的問題,是CPython解釋器的特性,如果用Jpython解釋器是沒有這個問題的,Cpython是默認的,因為速度快,Jpython是java開發的,在Cpython里面就是沒辦法用多核,這是python的弊病,歷史問題,雖然眾多python團隊的大神在致力於改變這個情況,但是暫沒有解決。(這和解釋型語言(python,php)和編譯型語言有關系嗎???待定!,編譯型語言一般在編譯的過程中就幫你分配好了,解釋型要邊解釋邊執行,所以為了防止出現數據不安全的情況加上了這個鎖,這是所有解釋型語言的弊端??)

  

    但是有了這個鎖我們就不能並發了嗎?當我們的程序是偏計算的,也就是cpu占用率很高的程序(cpu一直在計算),就不行了,但是如果你的程序是I/O型的(一般你的程序都是這個)(input、訪問網址網絡延遲、打開/關閉文件讀寫),在什么情況下用的到高並發呢(金融計算會用到,人工智能(阿爾法狗),但是一般的業務場景用不到,爬網頁,多用戶網站、聊天軟件、處理文件),I/O型的操作很少占用CPU,那么多線程還是可以並發的,因為cpu只是快速的調度線程,而線程里面並沒有什么計算,就像一堆的網絡請求,我cpu非常快速的一個一個的將你的多線程調度出去,你的線程就去執行I/O操作了,

  2.同步鎖

三個需要注意的點:
#1.線程搶的是GIL鎖,GIL鎖相當於執行權限,拿到執行權限后才能拿到互斥鎖Lock,其他線程也可以搶到GIL,但如果發現Lock仍然沒有被釋放則阻塞,即便是拿到執行權限GIL也要立刻交出來

#2.join是等待所有,即整體串行,而鎖只是鎖住修改共享數據的部分,即部分串行,要想保證數據安全的根本原理在於讓並發變成串行,join與互斥鎖都可以實現,毫無疑問,互斥鎖的部分串行效率要更高

#3. 一定要看本小節最后的GIL與互斥鎖的經典分析

  

GIL VS Lock

    機智的同學可能會問到這個問題,就是既然你之前說過了,Python已經有一個GIL來保證同一時間只能有一個線程來執行了,為什么這里還需要lock? 

 首先我們需要達成共識:鎖的目的是為了保護共享的數據,同一時間只能有一個線程來修改共享的數據

    然后,我們可以得出結論:保護不同的數據就應該加不同的鎖。

 最后,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),后者是保護用戶自己開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock

過程分析:所有線程搶的是GIL鎖,或者說所有線程搶的是執行權限

  線程1搶到GIL鎖,拿到執行權限,開始執行,然后加了一把Lock,還沒有執行完畢,即線程1還未釋放Lock,有可能線程2搶到GIL鎖,開始執行,執行過程中發現Lock還沒有被線程1釋放,於是線程2進入阻塞,被奪走執行權限,有可能線程1拿到GIL,然后正常執行到釋放Lock。。。這就導致了串行運行的效果

  既然是串行,那我們執行

  t1.start()

  t1.join

  t2.start()

  t2.join()

  這也是串行執行啊,為何還要加Lock呢,需知join是等待t1所有的代碼執行完,相當於鎖住了t1的所有代碼,而Lock只是鎖住一部分操作共享數據的代碼。

  詳解:

因為Python解釋器幫你自動定期進行內存回收,你可以理解為python解釋器里有一個獨立的線程,
每過一段時間它起wake up做一次全局輪詢看看哪些內存數據是可以被清空的,此時你自己的程序 里的線程和
py解釋器自己的線程是並發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程中的clearing時刻,
可能一個其它線程正好又重新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,為了解決類似的問題,
python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題, 這可以說是Python早期版本的遺留問題。

  看一段代碼:解釋為什么要加鎖,如果下面代碼中work函數里面的那個time.sleep(0.005),我的電腦用的這個時間片段,每次運行都呈現不同的結果,我們可以改改時間試一下。

from threading import Thread,Lock
import os,time
def work():
    global n
    # lock.acquire() #加鎖
    temp=n
    time.sleep(0.1) #一會將下面循環的數據加大並且這里的時間改的更小試試
    n=temp-1
    # time.sleep(0.02)
    # n = n - 1
    '''如果這樣寫的話看不出來效果,因為這樣寫就相當於直接將n的指向改了,就好比從10,經過1次減1之后,n就直接指向了9,速度太快,看不出效果,那么我們怎么辦呢,找一個中間變量來接收n,然后對這個中間變量進行修改,然后再賦值給n,多一個給n賦值的過程,那么在這個過程中間,我們加上一點阻塞時間,來看效果,就像讀文件修改數據之后再寫回文件的過程。那么這個程序就會出現結果為9的情況,首先一個進程的全局變量對於所有線程是共享的,由於我們在程序給中間變量賦值,然后給n再次賦值的過程中我們加了一些I/O時間,遇到I/O就切換,那么每個線程都拿到了10,並對10減1了,然后大家都得到了9,然后再賦值給n,所有n等於了9'''
    # lock.release()
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    # for i in range(10000):  #如果這里變成了10000,你在運行一下看看結果
    for i in range(100):  #如果這里變成了10000,你在運行一下看看結果
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果肯定為0,由原來的並發執行變成串行,犧牲了執行效率保證了數據安全

  上面這個代碼示例,如果循環次數變成了10000,在我的電腦上就會出現不同的結果,因為在線程切換的那個time.sleep的時間內,有些線程還沒有被切換到,也就是有些線程還沒有拿到n的值,所以計算結果就沒准了。

   鎖通常被用來實現對共享資源的同步訪問。為每一個共享資源創建一個Lock對象,當你需要訪問該資源時,調用acquire方法來獲取鎖對象(如果其它線程已經獲得了該鎖,則當前線程需等待其被釋放),待資源訪問完后,再調用release方法釋放鎖:

import threading

R=threading.Lock()

R.acquire() #
#R.acquire()如果這里還有一個acquire,你會發現,程序就阻塞在這里了,因為上面的鎖已經被拿到了並且還沒有釋放的情況下,再去拿就阻塞住了
'''
對公共數據的操作
'''
R.release()

  通過上面的代碼示例1,我們看到多個線程搶占資源的情況,可以通過加鎖來解決,看代碼:

from threading import Thread,Lock
import os,time
def work():
    global n
    lock.acquire() #加鎖
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果肯定為0,由原來的並發執行變成串行,犧牲了執行效率保證了數據安全

  看上面代碼的圖形解釋:

  GIL鎖與互斥鎖綜合分析

分析:
    #1.100個線程去搶GIL鎖,即搶執行權限
    #2. 肯定有一個線程先搶到GIL(暫且稱為線程1),然后開始執行,一旦執行就會拿到lock.acquire()
    #3. 極有可能線程1還未運行完畢,就有另外一個線程2搶到GIL,然后開始運行,但線程2發現互斥鎖lock還未被線程1釋放,於是阻塞,被迫交出執行權限,即釋放GIL
    #4.直到線程1重新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,然后其他的線程再重復2 3 4的過程 

  互斥鎖與join的區別

#不加鎖:並發執行,速度快,數據不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''


#不加鎖:未加鎖部分並發執行,加鎖部分串行執行,速度慢,數據安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加鎖的代碼並發運行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    #加鎖的代碼串行運行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''

#有的同學可能有疑問:既然加鎖會讓運行變成串行,那么我在start之后立即使用join,就不用加鎖了啊,也是串行的效果啊
#沒錯:在start之后立刻使用jion,肯定會將100個任務的執行變成串行,毫無疑問,最終n的結果也肯定是0,是安全的,但問題是
#start后立即join:任務內的所有代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的
#單從保證數據安全方面,二者都可以實現,但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多么的恐怖
'''

  3.死鎖與遞歸鎖

    進程也有死鎖與遞歸鎖,在進程那里忘記說了,放到這里一切說了額,進程的死鎖和線程的是一樣的,而且一般情況下進程之間是數據不共享的,不需要加鎖,由於線程是對全局的數據共享的,所以對於全局的數據進行操作的時候,要加鎖。

    所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖

from threading import Lock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()

  

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖>>>\033[0m' %self.name)
        mutexB.acquire()
        print('\033[42m%s 拿到B鎖>>>\033[0m' %self.name)
        mutexB.release()
        mutexA.release()

    def func2(self):
        mutexB.acquire()  
        print('\033[43m%s 拿到B鎖???\033[0m' %self.name)
        time.sleep(2)
        #分析:當線程1執行完func1,然后執行到這里的時候,拿到了B鎖,線程2執行func1的時候拿到了A鎖,那么線程2還要繼續執行func1里面的代碼,再去拿B鎖的時候,發現B鎖被人拿了,那么就一直等着別人把B鎖釋放,那么就一直等着,等到線程1的sleep時間用完之后,線程1繼續執行func2,需要拿A鎖了,但是A鎖被線程2拿着呢,還沒有釋放,因為他在等着B鎖被釋放,那么這倆人就尷尬了,你拿着我的老A,我拿着你的B,這就尷尬了,倆人就停在了原地

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖???\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

'''
Thread-1 拿到A鎖>>>
Thread-1 拿到B鎖>>>
Thread-1 拿到B鎖???
Thread-2 拿到A鎖>>>
然后就卡住,死鎖了
'''

    解決方法,遞歸鎖,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。

    這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:

from threading import RLock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()

  典型問題:科學家吃面 ,看下面代碼示例:

import time
from threading import Thread,Lock
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了面條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃面'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了面條' % name)
    print('%s 吃面' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['taibai','egon','wulaoban']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()

  遞歸鎖解決死鎖問題

import time
from threading import Thread,RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了面條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃面'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1) 
    noodle_lock.acquire()
    print('%s 搶到了面條' % name)
    print('%s 吃面' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['taibai','wulaoban']:
    t1 = Thread(target=eat1,args=(name,))
    t1.start()
for name in ['alex','peiqi']:
    t2 = Thread(target=eat2,args=(name,))
    t2.start()

  遞歸鎖大致描述:  當我們的程序中需要兩把鎖的時候,你就要注意,別出現死鎖,最好就去用遞歸鎖。

 

九.線程隊列

  線程之間的通信我們列表行不行呢,當然行,那么隊列和列表有什么區別呢?

  queue隊列 :使用import queue,用法與進程Queue一樣

  queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

  class queue.Queue(maxsize=0) #先進先出
import queue #不需要通過threading模塊里面導入,直接import queue就可以了,這是python自帶的
#用法基本和我們進程multiprocess中的queue是一樣的
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
# q.put_nowait() #沒有數據就報錯,可以通過try來搞
print(q.get())
print(q.get())
print(q.get())
# q.get_nowait() #沒有數據就報錯,可以通過try來搞
'''
結果(先進先出):
first
second
third
'''

  class queue.LifoQueue(maxsize=0) #last in fisrt out

import queue

q=queue.LifoQueue() #隊列,類似於棧,棧我們提過嗎,是不是先進后出的順序啊
q.put('first')
q.put('second')
q.put('third')
# q.put_nowait()

print(q.get())
print(q.get())
print(q.get())
# q.get_nowait()
'''
結果(后進先出):
third
second
first
'''

  class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高
q.put((-10,'a'))
q.put((-5,'a'))  #負數也可以
# q.put((20,'ws'))  #如果兩個值的優先級一樣,那么按照后面的值的acsii碼順序來排序,如果字符串第一個數元素相同,比較第二個元素的acsii碼順序
# q.put((20,'wd'))
# q.put((20,{'a':11})) #TypeError: unorderable types: dict() < dict() 不能是字典
# q.put((20,('w',1)))  #優先級相同的兩個數據,他們后面的值必須是相同的數據類型才能比較,可以是元祖,也是通過元素的ascii碼順序來排序

q.put((20,'b'))
q.put((20,'a'))
q.put((0,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先級越高,優先級高的優先出隊):
'''

  這三種隊列都是線程安全的,不會出現多個線程搶占同一個資源或數據的情況。

十.python標准模塊--concurrent.futures

  到這里就差我們的線程池沒有講了,我們用一個新的模塊給大家講,早期的時候我們沒有線程池,現在python提供了一個新的標准或者說內置的模塊,這個模塊里面提供了新的線程池和進程池,之前我們說的進程池是在multiprocessing里面的,現在這個在這個新的模塊里面,他倆用法上是一樣的。

為什么要將進程池和線程池放到一起呢,是為了統一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式一樣,而且只要通過這個concurrent.futures導入就可以直接用他們兩個了

concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
#submit(fn, *args, **kwargs)
異步提交任務

#map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操作

#shutdown(wait=True) 
相當於進程池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源后才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait參數為何值,整個程序都會等到所有任務執行完畢
submit和map必須在shutdown之前

#result(timeout=None)
取得結果

#add_done_callback(fn)
回調函數

  ThreadPoolExecutor的簡單實用

import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    print('%s打印的:'%(threading.get_ident()),n)
    return n*n
tpool = ThreadPoolExecutor(max_workers=5) #默認一般起線程的數據不超過CPU個數*5
# tpool = ProcessPoolExecutor(max_workers=5) #進程池的使用只需要將上面的ThreadPoolExecutor改為ProcessPoolExecutor就行了,其他都不用改
#異步執行
t_lst = []
for i in range(5):
    t = tpool.submit(func,i) #提交執行函數,返回一個結果對象,i作為任務函數的參數 def submit(self, fn, *args, **kwargs):  可以傳任意形式的參數
    t_lst.append(t)  #
    # print(t.result())
    #這個返回的結果對象t,不能直接去拿結果,不然又變成串行了,可以理解為拿到一個號碼,等所有線程的結果都出來之后,我們再去通過結果對象t獲取結果
tpool.shutdown() #起到原來的close阻止新任務進來 + join的作用,等待所有的線程執行完畢
print('主線程')
for ti in t_lst:
    print('>>>>',ti.result())

# 我們還可以不用shutdown(),用下面這種方式
# while 1:
#     for n,ti in enumerate(t_lst):
#         print('>>>>', ti.result(),n)
#     time.sleep(2) #每個兩秒去去一次結果,哪個有結果了,就可以取出哪一個,想表達的意思就是說不用等到所有的結果都出來再去取,可以輪詢着去取結果,因為你的任務需要執行的時間很長,那么你需要等很久才能拿到結果,通過這樣的方式可以將快速出來的結果先拿出來。如果有的結果對象里面還沒有執行結果,那么你什么也取不到,這一點要注意,不是空的,是什么也取不到,那怎么判斷我已經取出了哪一個的結果,可以通過枚舉enumerate來搞,記錄你是哪一個位置的結果對象的結果已經被取過了,取過的就不再取了

#結果分析: 打印的結果是沒有順序的,因為到了func函數中的sleep的時候線程會切換,誰先打印就沒准兒了,但是最后的我們通過結果對象取結果的時候拿到的是有序的,因為我們主線程進行for循環的時候,我們是按順序將結果對象添加到列表中的。
# 37220打印的: 0
# 32292打印的: 4
# 33444打印的: 1
# 30068打印的: 2
# 29884打印的: 3
# 主線程
# >>>> 0
# >>>> 1
# >>>> 4
# >>>> 9
# >>>> 16

  ProcessPoolExecutor的使用:

只需要將這一行代碼改為下面這一行就可以了,其他的代碼都不用變
tpool = ThreadPoolExecutor(max_workers=5) #默認一般起線程的數據不超過CPU個數*5
# tpool = ProcessPoolExecutor(max_workers=5)

你就會發現為什么將線程池和進程池都放到這一個模塊里面了,用法一樣

  map的使用

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import threading
import os,time,random
def task(n):
    print('%s is runing' %threading.get_ident())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    s = executor.map(task,range(1,5)) #map取代了for+submit
    print([i for i in s])

  回調函數簡單應用

import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    return n*n

def call_back(m):
    print('結果為:%s'%(m.result()))

tpool = ThreadPoolExecutor(max_workers=5)
t_lst = []
for i in range(5):
    t = tpool.submit(func,i).add_done_callback(call_back)

  回調函數的應用,需要你自己去練習

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<進程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,需要用obj.result()拿到結果

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM