Python多線程編程


本文大綱

  1. 進程與線程
  2. Python的GIL
  3. 多線程編程及線程間通信

進程與線程

程序就是一堆代碼也就是在磁盤上的一個或多個文件。當程序運行起來也就被加載到內存中開始執行它的指令這時候才是真正的進程。運行中的QQ、Word就是一個進程。

那線程又是什么呢?無論怎么說一個進程至少包含一個線程作為它的指令執行體,線程你可以理解為進程中要執行的一個任務,那么其實進程可以包含多個任務,可是在單核心CPU的時代這些任務只能順序執行,哪怕這些任務是獨立的。比如Word的自動保存功能,你在編寫文檔時定期會自動保存這時候其實就是一個線程執行了這個任務。

進程管理資源而將線程分配到某個cup上執行,也就是說線程是CPU調度的最小單位。一個進程可以擁有多個線程,如果進程運行在多核心CPU上,那就可以把多個線程分配到多個核心上去執行,最大化並行處理。就算是單核心CPU也可以通過模擬出來並行(CPU時間片概念),雖然這帶來更多上下文切換但是線程的切換比進程的切換開銷要小的多,因為除了CPU資源之外其他的資源進程內的線程都是共享的。

注意:線程分為內核線程和用戶線程,區分標准就是線程的調度者在核心內部還是外部。內核線程更利於並發使用多核心處理器的資源,而用戶線程更多考慮的是上下文切換開銷。目前主流操作系統中都是兩者結合使用只是組合會有差異。我們這里說的多線程或者多進程其實是用戶線程,無論你是用Python還是Java編寫的程序。

場景描述

單進程模型:
比如一個WEB服務器,進程正在運行,監聽在某一個端口,這時候如果有一個用戶請求,那么是不是應該讓這個進程來處理呢?可以,但是一般不會這么做,因為如果來了第二個請求,顯然之前那個進程還在處理第一個請求,根本無法處理第二個。所以不會讓那個進程來處理,而是基於它產生一個子進程,讓子進程來處理,這樣看似問題解決了,但是你想,來10個請求我可以產生10個子進程,如果來1萬呢?比如一個子進程消耗1M內存,那么這就要消耗將近10G內存,這顯然不可接受,這還沒有考慮產生一個子進程的其他系統開銷,如果每個進程都訪問主頁,那么就要把主頁數據加載到內存,如果一個主頁消耗2M內存,那么1個請求就要占用1萬個2M內存空間,加上之前的1M,總量就將近30G。如果CPU只有一顆的話,那性能將會非常糟糕,會有頻繁的CPU切換。

單進程多線程模型:
上面是進程模型,如果改用線程模型的話效率會大大提高,一個父進程生產一個子進程,一個子進程內部產生多個線程,因為產生一個線程基本沒有什么內存占用或者是非常小(CentOS的默認線程棧8K),而且線程是可以共享進程資源的,還是上面的例子,只需要打開一次2M的主頁,1萬個請求共享這一個,這就大大節約了內存空間。但是如果你還是只有一個CPU的話,那么還是無法實現多個線程同時運行,依然存在切換問題。如果是多核心則會更好,如果還沒有資源征用(典型的場景就是A線程要讀取一個正在被B線程寫入的文件)那么效果會更好。

任務可以分為計算密集型和IO密集型。前者主要是大量占用CPU后者主要是訪問磁盤或者網絡請求等。上面的兩個例子其實是IO密集型,會有大量用戶請求發送到服務器,這種任務類型如果不使用多線程那么就需要使用多路復用。典型的就是Nginx。

Python中的GIL

Python代碼的執行是由python解釋器(解釋器主循環)進行控制的。在主循環中同時只能有一個控制線程在執行,就像單核心CPU中的多進程一樣。盡管Python解釋器中可以運行多個線程,但是在任意某一時刻只能有一個線程在運行。

GIL:全局解釋器鎖,這個是進程級別的,進程里面任何的線程要想被執行都要經過這個鎖,這就意味着進程內多個線程同一時刻只能有一個線程被執行其他都睡眠或者等待IO,哪怕你有多個核心的CPU也不行,簡單來說就是Python中的線程不是並發的,在JAVA中線程可以並發。這個鎖曾經嘗試去掉過但是去掉之后導致單線程程序性能下降很多,雖然去掉會帶來多線程的性能提升但是經過權衡覺得不值得,最后還是加上了。Python為什么一開始不設計真正的多線程?這個跟年代有關Python誕生在1989年,那個年代INTEL才推出80486其實就是80386的加強版,別多多核心,也就是剛剛加入了浮點運算,當然同期摩托羅拉的CPU也不是多核心。而JAVA是1995年誕生,那時候雖然也沒有多核心但是性能要高的多,單核心性能高了我們就可以利用CPU時間片概念模擬多線程並發;另外就是我們常用的Python是CPython解釋器是C寫的它的多線程依賴操作系統,這種Python有GIL,如果是JPython或者PYPY則沒有GIL限制,所以所GIL不是語言決定的,而是執行這個語言的解釋器決定的。另外Python的進程也是依賴操作系統的,調用操作系統的庫函數實現的,進程沒有GIL限制。

那么這里就涉及到一個問題,如果一個多線程程序,其中一個線程等待IO(網絡或者磁盤)時間很長那么其他線程就一直阻塞在哪里嗎?顯然不是,那如何處理的呢?一個線程無論任何時候開始睡眠或者等待IO,其他線程都有機會獲取GIL然后執行自己的Python代碼,這就是協同式多任務處理,另外Python還支持搶占式多任務。

協同時多任務

比如當一個線程進行網絡IO操作時,其實無法估計這個IO需要多久,此時這個線程就阻塞在這里並且沒有運行任何Python代碼,那么這個線程就會釋放GIL(被解釋器強制釋放),從而讓其他線程獲取GIL來運行他們的Python代碼。這就是協同式多任務,它允許並發;多個線程同時等待不同事件。
比如2個線程打開2個套接字,兩個線程同一時刻只能有一個執行Python,但一旦線程開始連接它就會放棄GIL,這樣其他線程就可以運行,這就意味着2個線程並發等待套接字連接。其實你可以看到它允許順序的並發等待,當IO有返回時也就是它需要重新獲得鎖來執行Python代碼的時候,在這個時間片里只能有一個線程運行。

協同時多任務也叫做協作式多任務,本意是指程序或者線程自己控制執行時間,A線程執行完了就通知B線程,看起來很完美但是如果A線程代碼有問題導致無法正常執行完畢那后面的線程就都卡死了。在Python中解釋器就充當了那個中間人的作用它保證某個線程卡死不會一直占用CPU。

搶占式多任務

搶占式顧名思義就是進程或者線程可以在執行時被任意打斷也就是被別的線程或者進程搶去它的執行時間。搶的機制避免了代碼問題線程卡死但是也帶了其他問題就是所有線程或者進程的執行時間無法公平。

上面說的是線程主動放棄鎖,那么對應的就有主動獲取鎖,當多個線程都有返回的時候(所有線程都處在一個可以繼續執行的狀態),就會發生對鎖的爭搶(搶GIL)。總之最終只有一個線程被運行。假設有一個獲取鎖的線程繼續執行但是它的數據有問題導致線程變成死循環那后面的線程是不是就無法運行了呢?答案是否定的。Python代碼執行首先會被翻譯成二進制字節碼,然后解釋器函數讀取這些二進制碼然后執行,在Python 2 中檢測間隔為在虛擬機中運行的字節碼數量達到一定值就強制線程釋放GIL,在Python 3.2及以后改變了GIL的釋放和獲取機制,默認是0.005秒也就是5毫秒換句話說如果一個線程5毫秒沒有釋放鎖就被強制釋放,也就是說結解釋器會主動定期輪詢所有線程而不會讓一個線程一直運行。那這個GIL能保證線程安全嗎?

Python的線程安全

一個線程可以隨時失去GIL那么可以得出GIL是無法保證線程安全的,從開發這角度來說程序是一條一條執行,但是翻譯成字節碼我們看起來的一條語句可能是多條尤其是對於原子性操作。比如 += 這樣的操作,看下圖

無論是GIL是按照字節流多少還是按照時間,假設剛執行了上面0、3操作就需要釋放GIL,那么后去的6、7則無法執行,相當於 += 操作沒有完成。所以作為開發人員仍然需要手動加鎖。我這里的例子並沒有加循環進行累計雖然發生概率不高,但是也要引起注意。下圖為手動加鎖:

那么對於原子性操作比如sort()方法,是不能被中斷的即使到了該是否GIL的時候。因為sort是單字節碼,因此線程沒有機會在調用期間抓取GIL。有時候你分不清哪些是原子的哪些不是,所以可以遵循一個原則:始終圍繞共享可變狀態的讀取和寫入加鎖。反正threading.Lock是廉價的。

如果你使用JAVA語言則需要程序員自行加、解鎖來保證線程安全。在Python中是粗粒度鎖,也就是語言本身就提供一種機制來保證線程安全。所以在Python中除了特殊需要一般情況下沒必要使用更加細粒度的線程鎖。
從上面來看可以更加詳細的了解Python多線程,而不是一味的認為它的多線程沒有意義。所以如果真的需要並行計算呢?答案就是多進程,那么到底具體什么場景需要使用多線程什么時候多進程呢?

計算密集型:要用多進程,大量使用CPU計算能力的程序或者任務,而進程本身不太去訪問IO設備。比如計算圓周率、對視頻進行解碼、解壓縮和壓縮、加密解密。雖然多進程會更叫消耗資源,但是可以更多的利用CPU核心的計算能力。
IO密集型:要用多線程,因為IO操作不占用CPU能力,所以不用利用CPU的多核心。IO分為網絡IO和磁盤IO,就是CPU的等待時間主要消耗在等待輸入和輸出上,比如網絡爬蟲這種類型屬於IO密集型,在一個線程等待IO的同時會放棄GIL,然后讓后面的線程工作,這顯然比單線程要效率高。

從運算結果來看GIL不能保證線程安全

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: rex.cheny
# E-mail: rex.cheny@outlook.com
import multiprocessing
import threading

import time


def foo(n):
    # print(n)
    global count
    count += 1
    time.sleep(1)
    count -= 1
    print("子進程 %d 執行完畢。" % n)


count = 0
def main():
    thList = []
    for i in range(200):
        t = threading.Thread(target=foo, args=(i,))
        t.start()
        thList.append(t)

    for t in thList:
        t.join()

    print(count)
    print("主線程執行完畢")


if __name__ == '__main__':
    main()

如果是線程安全的那么結果就是0,可是真的結果呢?不一定是多少,我這次執行的是6,如果你不在count += 1和 -= 1之間加睡眠你是看不到效果的。如果你沒有count -= 1這一步,你是看不到效果的,因為雖然你用sleep模擬了執行時間長來觸發線程之間的GIL釋放和獲取但是你要明白 count += 1這個是一個完整操作在睡眠前就已經執行完畢了,所以是否睡眠與結果毫無相關,所以必須是兩次對同一數據進行操作然后兩次操作中間增加睡眠時間來觸發線程釋放GIL才能有直觀的效果。

如果要想保證線程安全怎么辦呢?用鎖

這樣就保證了結果,上例子中應該去掉sleep,因為這樣程序就是串行的了,但是如果面對這種根本無法觸發釋放GIL鎖的操作,你不加睡眠的話人為加鎖對執行結果沒有影響。鎖和信號量會在后面講到。

多線程編程及線程間通信

在Python中有兩個模塊可以實現線程,就是thread模塊和threading模塊的Thread類。thread可以實現的功能threading也可以實現,可以把threading看做高級的thread,在做多線程編程的時候推薦使用threading,所以我這里也只用這個來舉例。

我們先說一下創建線程的方法:

  • 創建線程實例,然后傳給它一個函數
  • 創建線程實例,然后傳遞給它一個可調用的類(也就是實現了類里的__call__方法)
  • 派生線程子類,實現run()方法,然后基於子類創建實例並運行它

三種方法用哪一個呢?總之不推薦使用第二種因為難以閱讀。如果是簡單任務第一個種就夠了,如果是復雜程序就要遵循面向對象編程思想那么第三種更合適。

我們先用第一種方法來創建

(例子1)

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import sys
from time import sleep
from threading import Thread


# 測試使用 被線程執行的方法
def foo1(arg):
    print("子線程任務-獲取的參數值為:", arg)
    sleep(2)
    print("子線程任務執行完畢。")

def simpleExample():
    """
    創建一個線程實例,然后傳遞一個需要線程執行的函數 target=foo1
    傳遞一組參數給需要執行的函數,形式為元祖,如果沒有參數就使用空元祖 args=(1,)
    還可以設置線程名稱,當然不是必須的,如果不設置線程名稱以 Thread-開頭后面跟一個數字,默認從1開始
    設置線程名稱還可以使用 Thread.name = "子線程" 不推薦使用 .setName和.getName來設置和獲取線程名稱
    """
    t1 = Thread(target=foo1, args=(1,), name="子線程")

    # 啟動線程
    t1.start()
    # 獲取線程名稱
    print(t1.name)

    """
    獲取線程是否還存活,線程執行完畢后,也就是返回了,那么這個值就是False。線程一旦start()那么就是活着的,
    所以線程是在start到返回之間是存活的。
    """
    print("線程是否還活着:", t1.isAlive())

    print("主線程代碼執行完畢。")


def main():
    simpleExample()


if __name__ == "__main__":
    sys.exit(main())

這個程序很簡單就是傳遞一個函數去執行。當線程執行時,主線程繼續執行直到執行完所有主線程代碼,從上圖可以看出“子線程任務執行完畢”是最后一個輸出,這個是子線程輸出的。這時候你可能回想,上面的主線程不等子線程完成它自己的代碼就執行完了,如果我的主線程需要獲取子線程執行結果然后再繼續運行主線程怎么辦?看下面的代碼

下面的代碼是創建5個子線程,然后每個子線程都在一個列表里寫上自己的名字,然后主線程顯示這個列表。

(例子2)

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import sys
from time import sleep
from threading import Thread

def foo2(arg, name):
    arg.append(name)
    sleep(2)
    print("子線程: %s 任務執行完畢。" % name)

def aboutJoin():
    namelist = ["Thread-A", "Thread-B", "Thread-C", "Thread-D", "Thread-E"]
    threadlist = []

    testlist = []
    for i in namelist:
        t = Thread(target=foo2, args=(testlist, i), name="子線程-" + str(i))
        t.start()
        threadlist.append(t)

    # for i in namelist:
    #     threadlist[namelist.index(i)].start()

    for i in namelist:
        """
        線程啟動后阻塞主線程,主線程需要等待線程完成后才可以繼續執行(join后面的語句),你在線程調用的方法中加入sleep效果最明顯。
        你不加阻塞其實不影響運行,只是輸出信息會亂因為主線程會繼續執行,線程也會執行,那具體調度執行順序由系統控制,
        所以不加阻塞每次輸出的信息前后順序會不一樣。
        join(timeout),它可以加一個超時,不加就是等待線程執行完畢,加超時則是等待多久無論線程是否執行完畢都向后繼續執行主線程。
        """
        threadlist[namelist.index(i)].join()

    for i in testlist:
        print(i)
    print("主線程代碼執行完畢")

def main():
    aboutJoin()


if __name__ == "__main__":
    sys.exit(main())

說明:上面那一段注釋的代碼是啟動線程,我寫的是創建時線程實例就啟動,其實你可以先全部創建(創建后並不馬上調用start),然后在啟動(用注釋的代碼)。

從執行結果來看,主線程等待所有子進程完畢,然后輸出那個列表,當主線程執行完最后一條代碼后,程序退出。

守護線程

thread模塊不支持守護線程,所以我們必須是用threading模塊。守護線程的意思是當主線程退出時所有子線程也將終止,無論子線程的任務是否完成。守護線程一般用來做那些不重要的任務,比如等待客戶端請求,如果沒有請求那么這個線程就是空閑的。它只負責接收請求然后把請求轉發給其他線程來處理,然后自己繼續等待請求。所以對於這種不具有重要邏輯處理的線程可以作為守護線程。換句話說主線程活着守護線程就活着,主線程沒了守護線程也就沒了。

有人說把處理客戶請求的線程弄成守護線程,這也是可以的,但是有個問題,如果要重啟應用或者手動停止服務,當前沒有處理完請求的那些線程難道直接終止嗎?留給大家思考,不過很多人也應該聽過優化停機這種說法,在這種場景下執行了停止服務操作,可以不再接收新的請求,但是要處理完老的請求。

(例子3)

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import sys
from time import sleep
from threading import Thread

def foo3(arg, name):
    sleep(2)
    arg.append(name)
    print("子線程: %s 任務執行完畢。" % name)


def demoForDaemon():
    namelist = ["Thread-A", "Thread-B", "Thread-C", "Thread-D", "Thread-E"]
    threadlist = []
    """
    創建5個線程,然后它們設置為守護線程,這時候你會發現主線程很快執行完畢然后程序退出
    這時候你看不到任何線程輸出內容。因為所有線程都隨主線程的退出而退出了。但是如果你注釋掉 subT.daemon = True
    你會發雖然主線程代碼執行完畢但是不會退出,它會等着線程,然后你就看到輸出的列表.
    如果把線程設置為守護線程則表示這個線程其實不重要,進程退出時可以不需要管這個線程是否執行完了自己的代碼。
    """
    for name in namelist:
        subT = Thread(target=foo3, args=(threadlist, name))

        """
        這個值默認是False,這個是設置子線程為守護模式,就是說主線程是不是等待線程執行完畢在退出。
        比如你設置了True,也就是主線程不等待,那么如果主線程執行代碼需要1秒鍾,而線程執行它里面的代碼
        需要10秒,那1秒過后主線程執行完畢退出,線程也會隨之退出這時候線程的代碼根本沒有執行完。通常來講
        主線程產生的所有線程都應該隨主線程退出而銷毀,這樣就可以避免線程出現死循環主線程退出了子線程還在繼續執行的情況。
        必須在start()方法之前設置。
        """
        # subT.daemon = True
        subT.start()

    # 這個指令是為了演示雖然把主線程設置為所有線程的守護進程,但是因為睡眠了所以主線程沒有退出,這時候
    # 你就會看到線程的正常輸出,至於線程是否執行完畢則取決於主線程什么時候退出,這里設置睡眠可以理解為
    # join()操作。只不過你不能真的當join()來使用,畢竟真實場景中子線程的執行時間不可預知。
    # sleep(5)
    print(threadlist)
    print("主線程代碼執行完畢。")


def main():
    demoForDaemon()


if __name__ == "__main__":
    sys.exit(main())

下面開啟daemon

(例子4)

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import sys
from time import sleep
from threading import Thread

def foo3(arg, name):
    sleep(2)
    arg.append(name)
    print("子線程: %s 任務執行完畢。" % name)


def demoForDaemon():
    namelist = ["Thread-A", "Thread-B", "Thread-C", "Thread-D", "Thread-E"]
    threadlist = []
    """
    創建5個線程,然后它們設置為守護線程,這時候你會發現主線程很快執行完畢然后程序退出
    這時候你看不到任何線程輸出內容。因為所有線程都隨主線程的退出而退出了。但是如果你注釋掉 subT.daemon = True
    你會發雖然主線程代碼執行完畢但是不會退出,它會等着線程,然后你就看到輸出的列表.
    如果把線程設置為守護線程則表示這個線程其實不重要,進程退出時可以不需要管這個線程是否執行完了自己的代碼。
    """
    for name in namelist:
        subT = Thread(target=foo3, args=(threadlist, name))

        """
        這個值默認是False,這個是設置子線程為守護模式,就是說主線程是不是等待線程執行完畢在退出。
        比如你設置了True,也就是主線程不等待,那么如果主線程執行代碼需要1秒鍾,而線程執行它里面的代碼
        需要10秒,那1秒過后主線程執行完畢退出,線程也會隨之退出這時候線程的代碼根本沒有執行完。通常來講
        主線程產生的所有線程都應該隨主線程退出而銷毀,這樣就可以避免線程出現死循環主線程退出了子線程還在繼續執行的情況。
        必須在start()方法之前設置。  set/getDaemon() 方法已經過時,不推薦使用了.
        """
        subT.daemon = True
        subT.start()

    # 這個指令是為了演示雖然把主線程設置為所有線程的守護進程,但是因為睡眠了所以主線程沒有退出,這時候
    # 你就會看到線程的正常輸出,至於線程是否執行完畢則取決於主線程什么時候退出,這里設置睡眠可以理解為
    # join()操作。只不過你不能真的當join()來使用,畢竟真實場景中子線程的執行時間不可預知。
    # sleep(5)
    print(threadlist)
    print("主線程代碼執行完畢。")


def main():
    demoForDaemon()


if __name__ == "__main__":
    sys.exit(main())

線程安全

之前說過GIL並不能保證線程安全,那在某些場景下又需要線程安全,那應該怎么做呢?通常有兩種方式,鎖和信號量。在說這兩個東西之前,先說兩個概念

  • 臨界資源:是同一時刻僅允許一個進程/線程使用的共享資源。各進程/線程采取互斥的方式,實現共享的資源稱作臨界資源。臨界資源是資源。
  • 臨界區:每個進程/線程中訪問臨界資源的那段代碼稱為臨界區。臨界區是代碼。

簡單來說多個線程/進程同一時刻可能對同一個資源又添加又修改或者是僅修改可是每次修改的數量不同。典型場景就是購物,多個人購買一個商品,但是商品庫存是一定的,可是每個人買的數量不同。控制不好可能會出現銷售數量大於庫存數量。

模擬用戶搶購商品,在扣減庫存的時候進行鎖定,扣減完畢釋放鎖

(例子5)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: rex.cheny
# E-mail: rex.cheny@outlook.com

import random
from threading import Thread
from threading import Lock


# 庫存數量,這個就屬於臨界資源
STOCK = 100
#
LOCK = Lock()


def buy(username, amount):
    """
    購買商品方法,從 LOCK.acquire() 到 LOCK.release() 之間屬於臨界區,因為這中間的代碼就開始對STOCK進行操作了
    :param username: 用戶名稱
    :param amount: 購買數量
    :return:
    """
    LOCK.acquire()
    global STOCK
    print("用戶 %s 下單購買 %d 個商品" % (username, amount))
    if STOCK >= amount:
        STOCK = STOCK - amount

        print("%s 成功的購買了 %d 個商品。" % (username, amount))
        print("當前庫存數量:%d" % STOCK)
    else:
        print("用戶 %s 您好,當前庫存不足,您最多可以購買 %d 個商品, 請重新下單" % (username, STOCK))
    LOCK.release()


def main():
    print("庫存數量:%d" % STOCK)
    userlist = ['User1', 'User2', 'User3', 'User4', 'User5', 'User6', 'User7', 'User8', 'User9']
    userthreadlist = []
    for user in userlist:
        userT = Thread(target=buy, args=(user, random.randint(10, 100)))
        userT.start()
        userthreadlist.append(userT)

    for userT in userthreadlist:
        userT.join()

    print("搶購后當前庫存數量:%d" % STOCK)


if __name__ == '__main__':
    main()

執行效果

這個程序只是為了展示鎖如何使用以及效果,所以程序還有很多不完整的地方,比如不能重新下單直到庫存為0,有興趣可以自己去完善。更加簡潔的使用鎖的方式

# -*- coding: utf-8 -*-
# @Time    : 2020/2/13 14:48
# @Author  : rex.chen
# @Email   : rex.cheny@outlook.com
# @File    : lockTest.py
# @Software: PyCharm

import threading
import time

locker = threading.Lock()
cups = []


def produce_cups(count=100):
    while True:
        """
        這里等同於那種 with open() as 語句,通過上下文的方式使用鎖,進入with語法塊說明獲取所成功,代碼執行完畢自動釋放。
        它的原理就是調用__enter__語句獲取鎖,最后調用__exit__語句釋放鎖。 RLock里面也是這么實現的。
        """
        with locker:
            if len(cups) < count:
                time.sleep(0.001)
                cups.append(1)

        """
        這里跳出循環為什么不寫在 with 語句塊里呢?因為在with 語句里面直接break后就跳出去了,它不會執行__exit__
        所以也就不會釋放鎖。
        """
        if len(cups) == count:
            break


def main():
    t_list = []
    for i in range(20):
        t = threading.Thread(target=produce_cups, args=(1000,))
        t_list.append(t)
        t.start()

    for t in t_list:
        t.join()

    print(len(cups))


if __name__ == '__main__':
    main()

 

信號量

下面我演示的是和上面一樣的功能只是使用了信號量,我這里用的是二進制信號量。信號量解釋網上很多,我這里摘抄一個。

信號量是線程的同步機制,它其實就是一個計數器,當資源消耗時遞減,當資源釋放是遞增。
進程或者線程可以共享信號量,如果一個線程或者進程執行獲取信號量操作時,它將得到信號量,然后就可以執行,而后一個進程獲取信號量時如果是0就會
被阻塞,那么它必須等待前一個進程釋放。所以如果信號量不是0那么該進程就可以執行。這里說的是二進制信號量,也就是0或者1.

(例子6)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: rex.cheny
# E-mail: rex.cheny@outlook.com

import random
from threading import Thread, BoundedSemaphore


# 庫存數量,這個就屬於臨界資源
STOCK = 100
# 實例化一個信號量對象,BoundedSemaphore 的一個功能就是計數器永不會超過初始值,為了防止釋放次數多余獲得次數
candytray = BoundedSemaphore(2)


def buy(username, amount):
    """
    購買商品方法,從 candytray.acquire() 到 candytray.release() 之間屬於臨界區,因為這中間的代碼就開始對STOCK進行操作了
    :param username: 用戶名稱
    :param amount: 購買數量
    :return:
    """
    # 下面的語句是信號量遞減,如果某個線程執行到這里發現信號量是0,則會被阻塞並等待其他線程執行release()操作
    candytray.acquire()
    global STOCK
    print("用戶 %s 下單購買 %d 個商品" % (username, amount))
    if STOCK >= amount:
        STOCK = STOCK - amount

        print("%s 成功的購買了 %d 個商品。" % (username, amount))
        print("當前庫存數量:%d" % STOCK)
    else:
        print("用戶 %s 您好,當前庫存不足,您最多可以購買 %d 個商品, 請重新下單" % (username, STOCK))
    # 下面的語句是信號量遞增
    candytray.release()


def main():
    print("庫存數量:%d" % STOCK)
    userlist = ['User1', 'User2', 'User3', 'User4', 'User5', 'User6', 'User7', 'User8', 'User9']
    userthreadlist = []
    for user in userlist:
        userT = Thread(target=buy, args=(user, random.randint(1, 100)))
        # userT.start()
        userthreadlist.append(userT)

    for userT in userthreadlist:
        userT.start()

    for userT in userthreadlist:
        userT.join()

    print("搶購后當前庫存數量:%d" % STOCK)


if __name__ == '__main__':
    main()

執行效果

對於非二進制信號量來說就是一個更大的整數。

所以如果為了控制對共享資源的訪問那么通常會把信號量設置為1.如果你設置大於1,則表示針對該資源將有可能出現同一時刻被多於1個線程或進程訪問。
那么有沒有需要讓信號量大於1的時候呢?當然有,比如購買火車票,售票大廳容納500人,售票窗口肯定比500少,這時候就需要兩種信號量,一種代表
售票大廳的容納人數其信號量就為500、另一種代表售票窗口,如果售票窗口就1個,那么此時代表售票窗口的信號量就為1.

其實很多時候我們並不需要用信號量來控制線程或者進程的多少,這個可以用池來解決。通常使用信號量是為了避免多個線程或者進程對同一資源訪問從而
造成數據不一致。當然解決這種問題除了使用信號量之外最常規的就是使用鎖來實現。

線程通信

在本節(例子2)其實就演示主線程和線程通信的效果,就是傳遞一個列表到所有線程,每個子線程在列表中添加自己的名字,然后在主線程中顯示。當然這個並不算是嚴謹的線程間通信雖然它也通信了,雖然使用列表或者字典等傳遞給線程也可以共享數據但是這不是線程安全的。其實線程通信比較容易,因為每個線程共享進程的內存空間,所以這也就為什么會有線程安全的問題。因為畢竟每個進程都是獨立的內存空間是隔離的。為了讓線程安全通信我們之前說了鎖和信號量,不過在python標准庫中還有一個叫做Queue的對象。這個模塊可以提供線程共享數據而且是線程安全的。

(例子7)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: rex.cheny
# E-mail: rex.cheny@outlook.com

import random
from threading import Thread
from queue import Queue, LifoQueue, PriorityQueue
from time import sleep

"""
Queue本身就包含了鎖,所以線程可以安全的傳遞數據。
q = Queue(10) 創建一個先進先出的隊列,如果設置了最大隊列長度,那么在隊列滿的時候將被阻塞,否則隊列長度則沒有限制
    LifoQueue(MAX_SIZE) 后入先出隊列
    PriorityQueue(MAX_SIZE) 優先級隊列  put((優先級,數據)), 優先級可以是字母也可以是數字,數字越小優先級越高

常用方法:
    qsize() 獲取隊列當前長度
    empty() 判斷當前隊列是否為空,如果隊列為空則返回True
    full() 判斷隊列是否已經滿了,如果滿了則返回 True
    put(DATA) 將數據放入隊列
    get(DATA) 從隊列中取出一個數據

"""


def productmessage(queue):
    count = 1
    while True:
        if count >= 10:
            print("寫入消息退出命令 EXIT。")
            queue.put("EXIT")
            break
        msg = random.randint(50, 100)
        print("寫入消息 %d" % msg)
        queue.put(msg)
        print("當前隊列長度:%d" % queue.qsize())
        count += 1
        sleep(random.randint(1, 3))


def consumemessage(queue):
    while True:
        msg = queue.get(1)
        if msg == "EXIT":
            print("收到退出命令 EXIT。")
            break
        print("線程讀取了一條數據:%s 當前隊列長度:%d" % (msg, queue.qsize()))
        sleep(random.randint(1, 3))


def main():
    # 創建隊列並設置隊列長度
    q = Queue(10)

    wT = Thread(target=productmessage, args=(q, ))
    rT = Thread(target=consumemessage, args=(q, ))

    wT.start()
    rT.start()


if __name__ == '__main__':
    main()

在使用put和get的時候它有一個非同步方式。因為如果使用同步方式,那么當隊列空了你還繼續get將會阻塞,默認是這種方式,同理put也是,如果隊列沒有空余位置,那么就會等待這個等待時間由timeout設置。put方法默認是阻塞的。這里我們使用了Queue,它其實本身使用的是dqueue,而這個東西在字節碼的層面上就已經實現了線程安全。

事件通知

Event也是線程通信的一種,它使用起來比較簡單它不能傳遞具體數據只能用於發送通知。這種在某些場景下需要。

(例子7)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
from threading import Event
from threading import Thread
import time

"""
線程的事件驅動
服務器和客戶端同時會監聽一個標志,默認是Fasle,如果有事件觸發,則觸發的一方會把標志設置為True,這時候需要處理的一方
就知道有事件,那么它將處理這個事情。
"""


def Server(event):
    print("Server: 等活兒中。。。。。")
    # 這個是事件驅動的等待,屬於阻塞的等待,這就是服務器去等待這個標志,如果不要阻塞的就用 event.isSet() 判斷是否為True,不是True就干其他的事情
    event.wait()
    print("Server: 客戶端觸發了一個事件。")
    print("Server: 服務器開始處理。。。。。。")

    # 這個是把標准位恢復為False,因為客戶端一點調用event.set(),那么標志就是True,如果這里不清空,那么就一直是True,
    # 那么下面Client里面 event.wait()就會立刻獲取True,而不會等待服務器下面的處理。
    event.clear()
    # time.sleep(3)

    print("Server: 服務器處理完畢。")
    # 服務器也要去更新這個標志,表示已經處理完了,用於通知客戶端,因為客戶端此時在關注這個事件
    event.set()


def Client(event):
    print("Client: 我去派個活兒。。。。。")
    # 這個是事件驅動的通知,就是服務器在等待的時候,客戶端調用這個方法表示去更新那個標志
    event.set()

    print("Client: 活兒已派等待服務器處理事件:")
    # time.sleep(1)
    # 客戶端等待服務器更新標志位
    event.wait()   # 這里要想執行就必須等待上面 Server 里面的 event.set()執行完了才可以
    print("Client: 謝謝服務器。")


def main():
    # 創建一個事件對象
    event = Event()

    srvT = Thread(target=Server, args=(event,))
    srvT.start()

    cliT = Thread(target=Client, args=(event,))
    cliT.start()


if __name__ == "__main__":
    try:
        main()
    finally:
        sys.exit()

通過派生子類的方式創建線程

 通過派生子類創建線程比較簡單,子類的使用方式和直接使用Thread是一樣的,不過至少要重寫run方法,這里就是具體運行你自己的邏輯,構造方法可以省略。

(例子8)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
from threading import Thread


"""
這里演示通過集成Thread類來運行線程
之前是通過  t = Thread(target=FUN, args(X,)) 這種方式把需要讓線程執行的方法或者說是FUN傳遞進去,然后通過t.start()來運行線程
實際上 Thread類里的 start()方法 調用的是 Thread中的run()方法,這個方法里面非常簡單就是單純的執行我們傳遞進去的方法和參數,那么
如果我們自己來寫一個類,這個類從Thread繼承,然后通過重寫run()也是可以的,看下面實例。
"""


class MyThread(Thread):
    # 如果你自己重寫了構造方法,那么你就必須在這個方法里調用父類的構造方法,否則初始化會失敗
    def __init__(self, name):
        # 調用父類的構造方法,至於是否傳遞參數根據需要,我這里只是傳遞一個名字,其實也可以省略。
        super(MyThread, self).__init__(name=name)

    def run(self):
        print("我的線程運行了。")


def main():
    mt = MyThread("AAA")
    mt.start()
    print(mt.getName())


if __name__ == "__main__":
    try:
        main()
    finally:
        sys.exit()

為什么main函數的代碼是主線程呢?

加入該py文件名稱為a.py。你通過python ./a.py運行,這里的進程是python解釋器或者是虛擬機,a.py里面的代碼就是該進程主要執行的代碼段,這就是一項任務,也就是主線程。

如果我運行20個線程你看看效果,在休眠20秒內,你查看系統只有一個進程存在,也就是Python解釋器。

如果改為20個進程呢?

再次運行,這次我們把進程號打印出來,你主程序是19311,其他20個的父進程都是19311,這20個都是fork出來的子進程。在類Unix操作系統有fork,在Windows上沒有,它應該是通過其他方式產生的子進程。

真的有20個,其實是21個,為什么?還是一樣的道理,20個是你生成的,那這20個是由誰生成的呢?當然是你上面這段代碼,這段代碼由誰執行呢,當然是Python解釋器啊,所以先由解釋器運行起來執行這段代碼(一個進程),然后產生的20個進程。所以是21個。

這里是為什么是22個,哈哈,本條命令本身就包含關鍵詞python,所以也算一個統計值但是它並不算是進程就是一個關鍵字而已。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM