《Cython系列》8. 使用 Cython 釋放 GIL 實現並行執行


楔子

在前面的章節中,我們看到 Cython 可以將 Python 的性能提升 10 倍、100 倍、甚至 1000 倍,而這些性能的提升只需要我們做一些簡單的修改即可。並且我們還了解了 Cython 的類型化 memoryview,通過類型化 memoryview,我們實現了一個比內置的 sum 函數快了 100 多倍的算法。

但以上的這些改進都是基於單線程的,這一次我們來學習 Cython 的多線程特性,如何在 Cython 中並行執行。而在 Cython 中有一個 prange 函數,它可以輕松地幫我們將普通的 for 循環轉成使用多個線程的循環,接入所有可用的 CPU 核心。使用的時候我們會看到,平常令人尷尬的 CPU 並行操作,通過 prange 會有很好的表現。

不過在介紹 prange 之前,我們必須要先了解 Python 的運行時(runtime)和本機線程的某些交互,這部分會涉及到全局解釋器鎖(GIL)

線程並行和全局解釋器鎖

在討論 CPython 基於線程的並行時,全局解釋器鎖(GIL)是一個繞不開的話題。根據 Python 的官方文檔,我們知道 GIL 是一個互斥鎖,用於防止本機多個線程同時執行字節碼。換句話說 ,GIL 確保 CPython 在程序執行期間,同一時刻只會使用操作系統的一個線程。不管你的 CPU 是多少核,以及你開了多少個線程,但是同一時刻只會使用操作系統的一個線程、去調度一個 CPU。而且 GIL 不僅影響 Python 代碼,也會影響 Python/C API。

首先我們來分析一下為什么會有 GIL 這個東西存在?看兩行代碼:

import dis

dis.dis("del name")
"""
  1           0 DELETE_NAME              0 (name)
              2 LOAD_CONST               0 (None)
              4 RETURN_VALUE

"""

當我們使用 del 刪除一個變量的時候,對應的指令是 DELETE_NAME,這個指令對應的源碼有興趣可以自己去查看一下。總之這條指令做的事情就是通過宏 Py_DECREF 減少一個對象的引用計數,並且判斷減少之后其引用計數是否為 0,如果為 0 就進行回收。偽代碼如下:

--obj->ob_refcnt
if (obj -> ob_refcnt == 0){
	銷毀obj
}

所以總共是兩步:第一步先將對象的引用計數減 1;第二步判斷引用計數是否為 0,為 0 則進行銷毀。那么問題來了,假設有兩個線程 A 和 B,內部都引用了全局變量 obj,此時 obj 指向的對象的引用計數為 2,然后讓兩個線程都執行 del obj 這行代碼。

其中 A 線程先執行,如果 A 線程在執行完 --obj->ob_refcnt 之后,會將對象的引用計數減一,但不幸的是這個時候調度機制將 A 掛起了,喚醒了 B。而 B 也執行 del obj,但是它比較幸運,將兩步都一塊執行完了。但由於之前 A 已經將引用計數減1,所以 B 線程再減 1 之后會發現對象的引用計數為0,從而執行了對象的銷毀動作,內存被釋放。

然后 A 又被喚醒了,此時開始執行第二個步驟,但由於 obj->ob_refcnt 已經被減少到 0,所以條件滿足,那么 A 依舊會對 obj 指向的對象進行釋放,但是這個對象所占內存已經被釋放了,所以 obj 此時就成了懸空指針。如果再對 obj 指向的對象進行釋放,最終會引發什么結果,只有天知道。

關鍵來了,所以 Python 引入了 GIL,GIL 施加在是解釋器層面上的一把超級大鎖,它是字節碼級別的互斥鎖,作用就是:在同時一刻,只讓一個線程執行字節碼,並且保證每一條字節碼在執行的時候都不會被打斷。

所以由於 GIL 的存在,會使得線程只有把當前的某條字節碼指令執行完畢之后才有可能會發生調度。因此無論是 A 還是 B,線程調度時,要么發生在 DELETE_NAME 這條指令執行之前,要么發生在 DELETE_NAME 這條指令執行完畢之后,但是不存在一條指令(不僅是DELETE_NAME,而是所有指令)執行到一半的時候發生調度。

因此 GIL 才被稱之為是字節碼級別的互斥鎖,它是保護字節碼指令只有在執行完畢之后才會發生線程調度。所以回到上面那個 del obj 這個例子中來。由於引入了 GIL,所以就不存在我們之前說的:在 A 將引用計數減一之后,掛起 A、喚醒 B 這一過程。因為A已經開始了 DELETE_NAME 這條指令的執行,所以在沒執行完之前是不會發生線程調度的,此時就不會發生懸空指針的問題了。

事實上,GIL 在單核時代,其最初的目的就是為了解決引用計數的安全性問題,只不過 Python 的作者龜叔沒想到多核會發展的這么快。

那么,GIL是否會被移除呢?因為對於現在的多核 CPU 來說,Python 的GIL無疑是進行了限制。

關於能否移除 GIL,從我個人的角度來說不太可能,這都幾十年了,能移除早就移除了。

而且事實上,在 Python 誕生沒多久,就有人發現了這一詭異之處,因為當時的程序猿發現使用多線程在計算上居然沒有任何性能上的提升,反而還比單線程慢了一點。當時 Python 的官方人員直接回復:不要使用多線程,而是使用多進程。

此時站在上帝視角的我們知道,因為 GIL 的存在使得同一時刻只有一個核被使用,所以對於純計算的代碼來說,理論上多線程和單線程是沒有區別的。但是由於多線程涉及上下文的切換,會額外有一些開銷,所以反而還慢一些。

因此在得知 GIL 的存在之后,有兩位勇士站了出來表示要移除 GIL,當時 Python 還處於1.5 的版本,非常的古老了。當它們在去掉 GIL 的時候,發現多線程的效率相比之前確實提升了,但是單線程的效率只有原來的一半,這顯然是不能接受的。因為把 GIL 去掉了,就意味着需要更細粒度的鎖,這就會導致大量的加鎖、解鎖,而加鎖、解鎖對於操作系統來說是一個比較重量級的操作,所以 GIL 的移除是極其困難的。

另外還有一個關鍵,就是當 GIL 被移除之后,會使得擴展模塊的編寫難度大大增加。像很多現有的 C 擴展,在很大程度上依賴 GIL 提供的解決方案,如果要移除 GIL,就需要重新解決這些庫的線程安全性問題。比如:我們熟知的 Numpy,Pandas、Scipy,甚至是 TensorFlow、Torch 等深度學習框架,如果GIL被移除,那么這些框架也沒法用了。

所以我們可以看到,如果 GIL 被移除了,那么很多知名的 Python 第三方庫可能就要重新洗牌了。因此在 2020 年的今天,生態如此成熟的 Python,幾乎是不可能擺脫 GIL 了。

然后是 Python 中的線程模型,我們知道 Python 啟動一個線程,底層會啟動一個 C 線程,最終啟動一個操作系統的線程。所以 Python 中的線程實際上是封裝了 C 的線程,進而封裝了 OS 線程,因此一個 Python 線程最終對應一個 OS 線程。實際執行的肯定是 OS 線程,而 OS 線程 Python 解釋器是沒有權限控制的,它能控制的只是 Python 的線程。假設有 4 個 Python 線程,那么肯定對應 4 個 OS 線程,但是 Python 解釋器每次只讓一個 Python 線程去調用 OS 線程執行,其它的線程只能干等着,只有當前的 Python 線程將 GIL 釋放了,其它的某個線程在拿到 GIL 時,才可以調用相應的 OS 線程去執行。

所以 Python 線程是調用 C 的線程、進而調用操作系統的 OS 線程,而每個線程在執行過程中 Python 解釋器是控制不了的,因為 Python 的控制范圍只有在解釋器這一層,Python 無權干預 C 的線程、更無權干預 OS 線程。

但是注意:GIL 並不是 Python 語言的特性,它是 CPython 解釋器開發人員為了方便內存管理才加上去的,只不過我們大部分用的都是 CPython 解釋器,所以很多人認為 GIL 是 Python 語言的一個特性,但其實不是的。Python 只是一門語言,而 CPython 是對使用 Python 語言編寫的源代碼進行解釋執行的一個解釋器。而解釋器不止 CPython 一種,還有 JPython、PyPy 等等,而 JPython解釋器就沒有GIL。因此 Python 語言本身是和 GIL 無關的,只不過我們平時在說 Python 的 GIL 的時候,指的都是 CPython 解釋器里面的 GIL,這一點要注意。

所以就類似於上圖的結果,一個線程執行一會兒,另一個線程執行一會兒。

因此我們知道,對於 Python 而言,解釋執行字節碼是 Python 的核心所在,所以 CPython 通過 GIL 來互斥不同線程執行字節碼。如果一個線程想要執行,就必須拿到 GIL,而一旦拿到 GIL,其他線程就無法執行了,如果想執行,那么只能等當前線程將 GIL 釋放、被自己獲取之后才可以執行。然而實際上,GIL 保護的不僅僅是 Python 的解釋器,同樣還有 Python 的 C API,在 Python 和 C/C++ 混合開發時,如果涉及到原生線程和 Python 線程相互合作,也需要通過 GIL 進行互斥。

那么 Python 會在什么情況下釋放鎖?

關於 GIL 的釋放 Python 有一個自己的調度機制:

  • 1. 當遇見 io 阻塞的時候會把鎖釋放, 因為 io 阻塞是不耗費 CPU 的, 所以此時虛擬機會把該線程的鎖釋放;
  • 2. 即便是耗費 CPU 的運算等等, 也不會一直執行, 會在執行一小段時間之后釋放鎖, 為了保證其他線程都有機會執行, 就類似於 CPU 的時間片輪轉的方式;

在 Python 的多線程機制中,這兩個問題是分別由不同的層次解決的,對於何時進行線程調度問題,是由 Python 自身決定的。考慮一下操作系統是如何進行進程切換的,當一個進程運行了一段時間之后,發生了時鍾中斷,操作系統響應時鍾,並開始進行進程的調度。同樣,Python 中也是模擬了這樣的時鍾中斷,來激活線程的調度。我們知道 Python 解釋字節碼的原理就是按照指令的順序一條一條執行,而 Python 內部維護着一個數值,這個數值就是 Python 內部的時鍾。在 Python2 中如果一個線程執行的字節碼指令數達到了這個值,那么會進行線程切換,並且這個值在 Python3 中仍然存在。

import sys
# 我們看到默認是執行 100 條字節碼啟動線程調度機制,進行切換
# 這個方法 Python2、3中 都存在
print(sys.getcheckinterval())  # 100

# 但是在 Python3 中,我們更應該使用這個函數,表示線程切換的時間間隔。
# 表示一個線程在執行 0.005s 之后進行切換
print(sys.getswitchinterval())  # 0.005

# 上面的方法我們都可以手動設置
# 通過sys.setcheckinterval(N)和sys.setswitchinterval(N)設置即可

但是在 Python3.8 的時候,使用 sys.getcheckinterval 和 sys.setcheckinterval 會被警告,表示這兩個方法已經廢棄了。

所以現在我們知道 Python 會在何時切換線程了,那下面就是在切換的時候 Python 會從那些眾多等待的線程中選擇哪一個呢?其實對於這一點我們無需關心,因為我們控制不了,Python 是借用了底層操作系統所提供的調度機制來決定下一個進入 Python 解釋器的線程究竟是誰。

所以目前為止可以得到如下結論:

  • GIL 對於 Python 對象的內存管理來說是不可或缺的;
  • GIL 和 Python 語言本身沒有什么關系, 它只是 CPython 解釋器為了方便管理內存所引入的一個實現, 但是對於其它的 Python 解釋器則不一定需要 GIL, 比如 JPython;

目前我們介紹了很多關於 Python GIL 的問題,主要是為了解釋 GIL 到底是個什么東西(底層就是一個結構體實例),以及為什么要有 GIL。然后重點來了:在 Cython 中我們是可以釋放 GIL 的,因為 GIL 是為了解決 Python 的內存管理而引入的,但如果是那些不需要和 Python 代碼一起工作的 C 代碼,那么是可以在沒有 GIL 的情況下運行的。

因為 GIL 是字節碼級別的互斥鎖,顯然這是在解釋器解釋執行字節碼的時候所施加的。而且不僅是 GIL,還有 Python 的動態性,都是在解釋器解釋字節碼的時候所賜予的。而  Cython 代碼經過編譯之后直接指向了 C 一級的結構,所以它相當於繞過了解釋器解釋執行這一步,因此也就是失去了相應動態特性(換來的是速度的提升)。那么同理,既然能繞過解釋執行這一步,那么就意味着也能繞過 GIL 的限制,因為 GIL 是在解釋執行字節碼的時候施加的。

因此當我們在 Cython 中創建了不綁定任何 Python 對象的 C 級結構時,也就是在處理 Cython 的 C-only 部分時,我們可以將全局解釋器鎖給釋放掉。換句話說,我們可以使用 Cython 繞過 GIL,實現基於線程的並行。

注意:GIL 是為了保護 Python 對象的內存管理而設置的,如果我們嘗試釋放 GIL 時,那么一定一定一定不能和 Python 對象發生任何的交互,必須是純 C 的數據結構。

那么在 Cython 中要如何管理 GIL 呢?為此 Cython 提供了兩種機制:nogil 函數屬性和 with nogil 上下文管理器。

nogil 函數屬性

我們可以告訴 Cython,在 GIL 釋放的情況下應該並行調用 C 級函數,一般這個函數來自於外部庫或者使用 cdef、cpdef 聲明。但是注意,def 函數不可以沒有 GIL,因為它是 Python 函數。

然后我們來看看如何釋放:

cdef int func(int a, double b) nogil:
    pass

我們只需要在函數的結尾 (冒號之前)加上 nogil 即可,然后在調用的時候就可以通過並行的方式調用,但是注意:在函數中我們不可以創建任何 Python 的對象,記住是任何 Python 對象。在編譯時,Cython 盡其所能確保 nogil 函數不接收 Python 中的對象,或者以其它的方式與之交互。在實踐中,這方面做得很好,如果和 Python 對象發生了交互,那么編譯時會報出錯誤。不過話雖如此,但 cython 編譯器並不能保證它可以百分百做到精確捕捉每一個這樣的錯誤(事實上除非你是刻意不想讓 cython 編譯器捕捉,否則 cython 編譯器都能捕捉到),因此我們在編寫 nogil 函數時需要時刻保持警惕。例如我們可以將 Python 對象轉成 void *,從而將其偷運到 nogil 函數中(但這么做明顯就是故意而為之)。

我們也可以將外部庫的 C、C++ 函數聲明為 nogil 的形式:

cdef extern from "math.h":
    double sin(double x) nogil
    double cos(double x) nogil
    double tan(double x) nogil

通常情況下,外部庫的函數不會與 Python 對象交互,因此我們聲明 nogil 函數還有另一種方式:

cdef extern from "math.h" nogil:
    double sin(double x)
    double cos(double x)
    double tan(double x)

注意:我們以上只是聲明了一個可以不需要 GIL 的函數,然后我們調用的話,還需要借助 with nogil 上下文管理器才能真正擺脫 GIL。

nogil 上下文管理器

為了釋放和獲取 GIL,Cython 必須生成合適的 Python/C API 調用。而一旦 GIL 被釋放,那么便可以獨立地執行 C 代碼,而之后要重新和 Python 對象交互,則再度獲取GIL,因此這個過程我們很自然的想到了上下文管理器。

cdef double func(int a, double b) nogil:
    return <double> a + b


def add(int a, double b):
    
    cdef double res 
    # 進入 with nogil 上下文時,會釋放 GIL, 所以內部必須是不能和 Python 有任何交互的純 C 操作
    with nogil:  
        # res 的賦值均不涉及 Python /C API,所以它們是 C 操作,可以放在 with nogil 上下文中
        res = 0.0
        res = 3.14

        # 如果在 with nogil: 里面如果出現了函數調用,那么該函數必須是使用 nogil 聲明的函數
        # 而使用 nogil 聲明的函數,其內部都是純 C 操作、不涉及 Python,否則是編譯不過去的
        # 但如果定義函數不使用 nogil 聲明,那么即使內部不涉及 Python,也不可以在 with nogil: 上下文中調用
        # 而這里的 func 是一個 nogil 函數,因此它可以在里面被調用
        res = func(a, b)
        
    # 運行結束之后, 會再度獲取 GIL
    return res 
import pyximport
pyximport.install(language_level=3)

import cython_test
print(cython_test.add(1, 2.0))  # 3.0

我們看到結果上沒有任何問題,在調用 func 這個 nogil 函數之前釋放掉 GIL,然后當函數執行完畢、退出上下文管理之后,再獲取 GIL。而且我們的參數和返回值都要是 C 的類型,並且在 with nogil: 這個上下文管理器中也不可以使用 Python 對象,否則會編譯錯誤。比如:我們里面加上一個 print,那么 Cython 就會很生氣,因為 print 會將內部的參數強制轉換為 PyObject *。

並且我們看到在 res = func(a, b) 之前,我們先在外面聲明了一個 res,但如果不聲明會怎么樣?答案是會出現編譯錯誤,因為如果不在外面聲明的話,那么 res 就是一個 Python 變量了,因此會將結果(C 的浮點數)轉成 PyFloatObject,返回其 PyObject *,這樣就會涉及和 Python 的交互。那么將變量的聲明寫在 with nogil: 內部可以嗎?答案也是不行的,因為 cdef 不允許出現在 nogil 上下文管理器中。

我們實際演示一下:

# 返回值如果不寫的話默認是 object,所以必須指定一個 C 的返回值
cpdef int func(int a, int b) nogil:
    return a + b

# 我們不在 with nogil 上下文中調用也是可以的,只不過此時函數聲明為 nogil 就沒有太大意義了
print(func(1, 2))

# 我們也可以在全局進行調用
cdef int res
with nogil:
    res = func(22, 33)
print(res) 

with nogil 上下文管理器的一個用途是在阻塞操作期間釋放GIL,從而允許其它 Python 線程執行另一個代價昂貴的操作。

另外,如果里面出現了除法該怎么辦呢?

cpdef double func(int a, int b) nogil:
    return a / b
>>> import cython_test
>>> cython_test.func(22, 11)
2.0
>>> 
>>> cython_test.func(22, 0)
ZeroDivisionError: float division
Exception ignored in: 'cython_test.func'
ZeroDivisionError: float division
0.0
>>> 

我們看到並沒有出現異常,但我們希望在出現異常的時候能夠拋出,該怎么做呢?還記得之前說的方法嗎?

cpdef double func(int a, int b) nogil except ? -1:  # except ? -1要寫在nogil的后面
    return a / b
>>> import cython_test
>>> cython_test.func(22, 0)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "cython_test.pyx", line 1, in cython_test.func
    cpdef double func(int a, int b) nogil except ? -1:
  File "cython_test.pyx", line 2, in cython_test.func
    return a / b
ZeroDivisionError: float division
>>> 

如果我們是在 with nogil 中出現了除零錯誤,那么 Cython 會生成正確的錯誤處理代碼,並且任何錯誤都會在重新獲取 GIL 之后進行傳播。

但如果一個 nogil 函數里面大部分都是純 C 代碼,只有一小部分是 Python 代碼,那么我們可以在執行到 Python 代碼時獲取 GIL,舉個栗子:

cpdef int func(int a, int b) nogil:
    # 由於 print("-------") 涉及到 Python / C API,那么就不可以在沒有 GIL 的時候執行
    # 因此該函數本來不可以聲明為 nogil 函數的,但可以通過 with gil 上下文,讓其獲取 GIL
    # 所以對於一個 nogil 函數,如果里面出現了涉及 Python / C API,那么應該放在 with gil 上下文中
    # 否則的話,編譯報錯,因為 nogil 函數中出現了需要 GIL 的操作
    cdef int res = a + b;
    with gil:
        print("-------")
    return res

cdef int res
with nogil:
    res = func(11, 22)
    # 同理在 with nogil 中如果涉及了 Python / C API,我們也可以使用 with gil
    with gil:
        print(res)
        # 在 with gil 中如果有不需要 Python / C API 的操作,那么也可以繼續 with nogil
        with nogil:
            res = 666
            # 同理
            with gil:
                print(res)
"""
-------
33
666
"""

# 當然上面的做法有點神經病了,因為進入 with nogil 上下文會釋放 GIL、上下文結束會獲取 GIL
# 進入 with gil 上下文會獲取 GIL,上下文結束會釋放 GIL。所以應該寫成下面這種方式:
with nogil:
    res = func(11, 22)  # 並行操作
    with gil:
        print(res)
    res = 666  # 並行操作
    with gil:
        print(res)
"""
-------
33
666
"""

所以 Cython 支持我們自由操控 GIL,但需要注意的是:with nogil 上下文必須在已經持有 GIL 的情況下使用,表示要釋放 GIL;with gil 上下文必須在已經釋放 GIL 的情況下使用,表示要持有 GIL。比如下面的代碼就是不合法的:

with gil:
    pass
"""
Trying to acquire the GIL while it is already held.
"""
# 編譯會報錯,因為當前已經是處於 GIL 持有的狀態下的,而 with gil 又會獲取 GIL
# 所以 with gil 上下文只能出現在 nogil 函數中、或者 with nogil 上下文中


with nogil:
    with nogil:
        pass
"""
Trying to release the GIL while it was previously released.
"""
# 同樣的道理,因為外層的 with nogil 已經把 GIL 釋放了,此時已經不再持有 GIL 了
# 而內層的 with nogil 會再次嘗試釋放 GIL,因此報錯

自由操控 GIL 的感覺還是蠻爽的,但是不建議亂用,因為 GIL 的獲取和釋放是一個阻塞的線程同步操作,比較昂貴。如果只是簡單的 C 計算,沒有必要特意釋放,只有在遇到大量的 C 計算時,才建議這么做。

可能有人覺得 Cython 咋這么 牛B 嘞,它說釋放就釋放,它說獲取就獲取,下面我們就來解釋一下。

下面會涉及解釋器相關的知識,可以不用看,因為這不影響我們使用 Cython 並行執行,因為 Cython 幫我們屏蔽了很多的內部細節。但如果你想知道更多細節的話,那么非常推薦。

GIL 在 C 的層面要如何釋放?

首先我們必須要澄清一點,GIL 只有在多線程的情況下才會出現,如果是單線程,那么 CPython 解釋器是不會創建 GIL 的。一旦我們啟動了多線程,那么 GIL 就被創建了。而線程如果想安全地訪問 Python 對象,那么必須要持有全局解釋器鎖(GIL),如果沒有這個鎖,那么多線程基本上算是廢了,即便是最簡單的操作都有可能發生問題。例如兩個線程同時引用了一個對象,那么這個對象的引用計數應該增加 2,但可能出現只增加 1 的情況。

因此存在一個鐵打不動的規則:單線程除外,如果是多線程,那么只有獲得了 GIL 的線程才能操作 Python 對象或者調用 Python / C API。而為了保證每個線程都能有機會執行,解釋器有着自己的一套規則,可以定期迫使線程釋放 GIL,讓其它線程有機會執行,因為線程都是搶占式的。但當出現了 IO 阻塞,會立即強制釋放。

而 Python 為維護 OS 線程執行的狀態信息,提供了一個線程狀態對象:PyThreadState。雖然真正用來執行的線程以及狀態肯定是由操作系統進行維護的,但 Python 虛擬機在運行的時候總需要其它的一些與線程相關的狀態和信息,比如:是否發生了異常等等,這些信息顯然操作系統沒有辦法提供。所以 PyThreadState 對象正是 Python 為 OS 線程准備的,在虛擬機層面保存其狀態信息的對象,也就是線程狀態對象。在 Python 中,當前活動的 OS 線程對應的 PyThreadState 對象可以通過調用 PyThreadState_GET 獲得,有了線程狀態對象之后,就可以設置一些額外信息了。

並且 Python 底層有一個全局變量,保存了當前正在活動的 PyThreadState 對象的指針。

因此以上都是一些概念性的東西,下面來看看底層是怎么做的呢?如果用大白話解釋的話:

將線程狀態對象保存在變量中
釋放全局解釋器鎖
... 做一些耗時的純 C 操作 ....
獲取全局解釋器鎖
從變量中重新獲取線程狀態對象

以上在編寫擴展模塊的時候非常常用,因此 Python 底層提供了兩個宏:

// 從名字上來看, 直譯就是開始允許多線程(並行執行)
// 這一步就是釋放 GIL, 意思就是這 GIL 不要也罷
Py_BEGIN_ALLOW_THREADS
    
/* 做一些耗時的純 C 操作, 當然 IO 操作也是如此, 只不過它是解釋器自動調度的, 而我們使用這兩個宏很明顯是為了耗時的 C 操作 */    
    
// 執行完畢之后, 如果要和 Python 對象進行交互, 那么必須要再度獲取 GIL, 相當於結束多線程的並行執行
Py_END_ALLOW_THREADS    

Py_BEGIN_ALLOW_THREADS 宏會打開一個新的 block 並且定義一個隱藏的局部變量;Py_END_ALLOW_THREADS 宏則是關閉這個 block 。這兩個宏還有一個高級的用途:如果 Python 編譯為不支持線程的版本(幾乎沒見過),他們定義為空,因此保存線程狀態並鎖定操作。

如果支持線程,那么這個 block 會進行展開:

PyThreadState *_save;     
_save = PyEval_SaveThread();  
//...  ...     
PyEval_RestoreThread(_save); 

我們也可以使用更低級的 API 來實現這一點:

PyThreadState *_save;     
_save = PyThreadState_Swap(NULL);     
PyEval_ReleaseLock();     
//...  ...     
PyEval_AcquireLock();     
PyThreadState_Swap(_save);

當然低級的一些 API 會有一些微妙的差異,因為鎖操作不一定保持全局變量的一致性,而 PyEval_RestoreThread 可以對這個全局變量進行保存和恢復。同樣,如果是不支持線程的解釋器,那么PyEval_SaveThread() 和 PyEval_RestoreThread() 就會不操作鎖,在這種情況下 PyEval_ReleaseLock() 和 PyEval_AcquireLock() 不可用,這使得不支持線程的解釋器可以動態加載支持線程的擴展。

總之全局解釋器鎖用於保護當前線程狀態對象的指針,當釋放鎖並保存狀態的時候,當前線程狀態對象的指針必須在鎖釋放之前獲取(因為另一個線程會獲取鎖,全局變量會保存新的線程狀態對象的指針)。相反,在獲取鎖並恢復線程狀態對象(將全局變量設置為其指針)時,鎖必須要先獲取。

但是注意了,如果直接從 C 中創建線程(pthread)的時候,它們沒有對應的線程狀態對象。這些線程在它們使用 Python/C API 之前必須自舉,首先應該要創建線程狀態對象,然后獲取鎖,最后保存它們的線程狀態對象的指針。而從 Python2.3 之后,C 中的 pthread 可以通過 PyGILState_* 系列函數自動完成以上所有步驟。

// 為當前 C 線程創建一個線程狀態對象
PyGILState_STATE gstate;  
gstate = PyGILState_Ensure();  

/* 執行你的 Python 操作*/  

/* 釋放線程狀態對象, 而下面就不能再有任何的 Python/C API 了 */  
PyGILState_Release(gstate);  

注意:PyGILState_* 系列函數假定只有一個進程(由 Py_Initialize() 自動創建),如果是多進程則是不支持的,因此我們也能看出多進程之間是可以利用多核的。但很明顯,進程之間的通信又是一件麻煩的事情。

而我們說 Cython 干的事情本質上和這是一樣的,都是編寫擴展模塊,只不過我們寫的是 Cython 代碼,而 cython 編譯器可以將 Cython 代碼翻譯成 C 代碼。而 cython 編譯在翻譯成 C 之后,with nogil 上下文管理器同樣會被翻譯成釋放、獲取 GIL 的 Python/C API。至於它用的到底是什么 API 顯然我們並不需要關心,我們只需要知道整體的脈絡即可。總之:通過底層的 Python/C API,我們可以顯式地控制 GIL。但如果你真的對細節感興趣,那么不妨將這個 pyx 文件編譯成擴展模塊,同時會生成一個對應的 C 文件,而這個 C 文件就是 cython 編譯器對 pyx 文件的翻譯結果,里面包含了所有的細節。而擴展模塊正是根據這個 C 文件進行編譯的,也就是編譯成擴展模塊的第二步,而 cython 編譯器將 pyx 文件翻譯成 C 文件則是第一步;如果你對自己的 C 語言水平和 Python/C API 的掌握很有自信,想要自己把握一切,那么你也可以不借助 Cython,而是自己實現第一步,也就是直接編寫 C 代碼。

cython 編譯器是一個翻譯官,將 pyx 文件轉成優化的 C 文件不是一件容易的事,因為 cython 編譯器要考慮很多很多事情,所以翻譯之后的 C 文件內容會非常多,但是這並不影響它的效率,況且這也不是我們需要關注的點。

那么問題來了,這么做究竟能不能有效利用多核呢?我們來驗證一下:

# cython_test.pyx
cdef int func() nogil:
    cdef int a = 0
    # 開啟死循環, 執行計算操作
    while 0 < 1:
        a += 1
    return a

def py_func():
    # 一個包裝器, 一旦進入了 with nogil: , 那么此線程的 GIL 就會被釋放掉, 被其它線程獲取
    # 此時兩個線程會並行執行
    with nogil:
        res = func()
    return res
import pyximport
pyximport.install(language_level=3)

import threading
import cython_test

# 開啟一個線程, 執行 cython_test.py_func()
t1 = threading.Thread(target=lambda : cython_test.py_func())
t1.start()

# 主線程同樣開啟死循環, 執行純計算邏輯
a = 0
while True:
    a += 1

然后我在我的阿里雲服務器(CentOS)上測試一下,CPU 是兩核心,選擇在 CentOS 上測試是因為查看 CPU 利用率會很方便,一個 top 命令即可。

我們看到兩個核心基本上都跑滿了,證明確實是利用了多核,如果我們不使用 with nogil 的話。

cdef int func() nogil:
    cdef int a = 0
    while 0 < 1:
        a += 1
    return a

def py_func():
    # 直接調用, 此時是不會釋放 GIL 的, 雖然 func 是一個 nogil 函數, 但我們需要通過 with nogil 上下文管理, 才能釋放它
    res = func()
    return res

其它代碼不變,再來測試一下:

我們看到只用了一個核心。

所以如果想利用多核,那么使用 Python/C API 主動釋放,而在 Cython 中可以通過 with nogil: 上下文管理來實現。進入上下文,釋放 GIL,獨立執行,完事了再獲取 GIL 退出上下文。雖然釋放掉 GIL 之后,理論上該線程是無法繼續執行的,必須等待自己再次獲取之后才能執行。但我們說這是擴展模塊,它是 C 編譯之后的二進制碼,如果是通過 Python/C API 主動釋放 GIL 的話,那么它就不再受 Python 解釋器的制約了,因為它繞過了解釋執行這一步。只有當自己再主動獲取 GIL 之后,才會回到正常的 GIL 調度中來。

因此當我們需要執行一個耗時的純 C 函數,那么便可以將其申明為 nogil 函數,然后通過 with nogil 的方式實現並行執行。我們只需要做少量額外的工作,便能夠獲取性能上的收益。

所以理解 GIL 以及如何管理 GIL 是非常有必要的,目前為止我們算是知道了如何在 Cython 中釋放 GIL 達到並行執行的效果。但是這還不夠,假設有一個循環,需要遍歷 4 次,而我們的機器正好有 4 個核,我們希望這 4 層循環能夠並行執行該怎么辦呢?雖然我們也可以通過上面的方式實現,但明顯會比較麻煩。

而我們今天的主角是 prange 便可以實現這一點,只不過為了引出它做了大量的准備工作,但這一切都是值得的。

使用 prange 並行執行循環

下面來介紹 prange,我們通過 prange 可以實現循環的並行執行。這個 prange 的特殊功能是 Cython 獨一無二的,並且 prange 只能與 for 循環搭配使用,不能獨立存在。

Cython 使用 OpenMP API 實現 prange,用於多平台共享內存的處理。但 OpenMP 需要 C 或者 C++ 編譯器支持,並且編譯時需要指定特定的編譯參數來啟動。例如:當我們使用 gcc 時,必須在編譯和連接二進制文件的時候指定一個 -fopenmp,以確保啟用 OpenMP。許多編譯器均 OpenMP ,包括免費的和商業的。但 Clang/LLVM 則是一個最顯著的例外,它只在一個單獨的分支中得到了初步的支持,而為它完全實現的 OpenMP 還在開發當中。

而使用 prange,我們需要從 cython.parallel 中進行導入(cimport)。但是在這之前,我們先來看一個栗子:

import numpy as np
from cython cimport boundscheck, wraparound

cdef inline double norm2(double complex z) nogil:
    """
    接收一個復數 z, 計算它的模的平方, 它要被下面的 escape 函數多次調用, 這里通過 inline 聲明成內聯函數
    :param z: 
    :return: 
    """
    return z.real * z.real + z.imag * z.imag


cdef int escape(double complex z,
                double complex c,
                double z_max,
                int n_max) nogil:
    """
    這個函數具體做什么, 不是我們的重點, 我們不需要關心
    """
    cdef:
        int i = 0
        double z_max2 = z_max * z_max
    while norm2(z) < z_max2 and i < n_max:
        z = z * z + c
        i += 1
    return i


@boundscheck(False)
@wraparound(False)
def calc_julia(int resolution,
               double complex c,
               double bound=1.5,
               double z_max=4.0,
               int n_max=1000):
    """
    我們將要在 Python 中調用的函數
    """
    cdef:
        double step = 2.0 * bound / resolution
        int i, j
        double complex z
        double real, imag
        int[:, :: 1] counts
    counts = np.zeros((resolution + 1, resolution + 1), dtype="int32")

    for i in range(resolution + 1):
        real = -bound + i * step
        for j in range(resolution + 1):
            imag = -bound + j * step
            z = real + imag * 1j
            counts[i, j] = escape(z, c, z_max, n_max)

    return np.array(counts, copy=False)

我們來調用一下這個 calc_julia 函數,這個函數做什么不需要關系,我們只需要將注意力放在那兩層 for 循環(准確的說是外層循環)上即可,這里我們采用編譯的形式。

import cython_test
import numpy as np
import matplotlib.pyplot as plt
arr = cython_test.calc_julia(1000, 0.322 + 0.05j)
plt.imshow(np.log(arr))
plt.show()

那么對於 calc_julia 這個函數,耗時多少呢?我們來測試一下:

使用 prange

所以接下來我們的 prange 就閃亮登場了,很明顯對於外層循環而言,它里面的邏輯是彼此獨立的,當前循環不依賴上一層循環的結果,因此這非常適合並行執行。我們只需要做簡單的修改即可:

# distutils: extra_compile_args = -fopenmp
# distutils: extra_link_args = -fopenmp

import numpy as np
from cython cimport boundscheck, wraparound
from cython.parallel cimport prange  # 導入 prange

cdef inline double norm2(double complex z) nogil:
    return z.real * z.real + z.imag * z.imag


cdef int escape(double complex z,
                double complex c,
                double z_max,
                int n_max) nogil:
    cdef:
        int i = 0
        double z_max2 = z_max * z_max
    while norm2(z) < z_max2 and i < n_max:
        z = z * z + c
        i += 1
    return i


@boundscheck(False)
@wraparound(False)
def calc_julia(int resolution,
               double complex c,
               double bound=1.5,
               double z_max=4.0,
               int n_max=1000):
    cdef:
        double step = 2.0 * bound / resolution
        int i, j
        double complex z
        double real, imag
        int[:, :: 1] counts
    counts = np.zeros((resolution + 1, resolution + 1), dtype="int32")
	
    # 外層循環使用 prange
    for i in prange(resolution + 1, nogil=True):
        real = -bound + i * step
        for j in range(resolution + 1):
            imag = -bound + j * step
            z = real + imag * 1j
            counts[i, j] = escape(z, c, z_max, n_max)

    return np.array(counts, copy=False)

我們只需要將外層循環的 range 換成 prange 即可,里面指定 nogil=True,至於這個函數的其它參數以及用法后面會說。通過 prange 便可以實現並行循環的效果,而之前也提到,一旦使用了 prange,那么必須確保在編譯的時候啟用 OpenMP,像 gcc 這樣的編譯器的鏈接標志是 -fopenmp。而在 Cython 中我們可以通過注釋的方式的方式指定,直接寫在 cython_test.pyx 的頂部即可,這樣編譯的時候會自動將將 -fopenmp 標志帶過去。

編譯方式和之前一樣,然后導入調用時會輸出一樣的結果,但問題是效率上有多少提升呢?我們再來測一下。

我們看到效率大概是提升了兩倍,因為我 Windows 上使用的不是 gcc,所以這里是在 CentOS 上演示的,而我的 CentOS 服務器只有兩個核,因此效率提升大概兩倍左右。

所以我們只是做了一些非常簡單的修改,便可帶來如此巨大的性能提升,簡直妙啊。所以 prange 是要搭配 for 循環來使用的,如果內部的邏輯彼此獨立,即第二層循環不依賴第一層循環的某些結果,那么不妨使用 prange 吧。

注意還沒完,我們還能做得更好,下面就來看看 prange 里面的其它的參數,這樣我們能更好利用 prange 的並行特性。

prange 的其它參數

prange 函數的原型如下:

# 第一個參數 self 我們不需要管, 這個函數實際上是在一個類里面
def prange(self, start=0, stop=None, step=1, nogil=False, schedule=None, chunksize=None, num_threads=None):

我們先來看前三個參數,start、stop、step

  • prange(3): 相當於 start=0、stop=3
  • prange(1, 3): 相當於 start=1、stop=3
  • prange(1, 3, 2): 相當於 start=1、stop=3、step=2

類似於 range,同樣不包含結尾 stop,然后是第四個參數 nogil,它默認是 False,但事實上我們必須將其設置為 True,否則會報出編譯錯誤。

然后剩下的三個參數:schedule、chunksize、num_threads,如果我們不指定的話,那么 cython編譯器采取的策略是將整個循環分成多個大小相同的連續塊,然后給每一個可用線程一個塊。然而整個策略實際上並不是最好的,因為每一層循環用的時間不一定一樣,如果一個線程很快就完成了,那么不就造成資源上的浪費了嗎?

我們再來修改一下,將 schedule 指定為 "static",chunksize 指定為 1:

for i in prange(resolution + 1, nogil=True, schedule="static", chunksize=1):

其它地方不變,只是加兩個參數,然后我們來重新測試一下。

我們看到效率上並沒有什么提升,原因在我的機器只有兩個核,如果核數再多一些的話,那么會發生速度就明顯的提升。

那么我們就來解釋一下剩余的三個參數的含義,首先是 schedule,它有以下幾個選項:

1. static

整個循環在編譯時會以一種固定的方式分配給多個線程,如果 chunksize 沒有指定,那么會分成 num_threads 個連續塊,一個線程一個塊。如果指定了 chunksize,那么每一塊會以輪詢調度算法(Round Robin)交給線程進行處理,適用於任務均勻分布的情況。

2. dynamic

線程在運行時動態地向調度器申請下一個塊,chunksize 默認為 1,當任務負載不均時,動態調度是最佳的選擇。

3. guided

塊是動態分布的,就像 dynamic 一樣,但這與 dynamic 還不同,chunksize 的比例不是固定的,而是和 剩余迭代次數 / 線程數 成比例關系。

4. runtime

不常用。

控制 schedule 和 chunksize 可以方便地探索不同的並行執行策略、以及工作負載分配,通常指定 schedule 為 "static",加上設置一個合適的 chunksize 是最好的選擇。而 dynamic 和 guided 適用於動態變化的執行上下文,但會導致運行時開銷。

當然還有最后一個參數 num_threads,很明顯不需要解釋,就是使用的線程數量。如果不指定,那么 prange 會使用盡可能多的線程。

所以我們只是做了一點修改,便可以帶來巨大的性能提升,這種性能提升與 Cython 在純 Python 上帶來的性能提升成倍增關系。

在 reductions 操作上使用 prange

我們經常會循環遍歷數組計算它們的累和、累積等等,這種數據量減少的操作我們稱之為 reduction 操作。而 prange 對這樣的操作也是支持並行執行的,我們舉個栗子:

from cython cimport boundscheck, wraparound
from cython.parallel cimport prange

@boundscheck(False)
@wraparound(False)
def calc_julia(int [:, :: 1] counts,
               int val):
    cdef:
        int total = 0
        int i, j, M, N
    N, M = counts.shape[: 2]
    for i in range(M):
        for j in range(N):
            if counts[i, j] == val:
                total += 1

    return total / float(counts.size)

顯然我們是希望計算一個數組中值 val 的元素的個數,下面測試一下:

我們看到大概 15.1 毫秒,如果我們改成 prange 的話,看看會有什么效果呢?

# distutils: extra_compile_args = -fopenmp
# distutils: extra_link_args = -fopenmp
from cython cimport boundscheck, wraparound
from cython.parallel cimport prange

@boundscheck(False)
@wraparound(False)
def calc_julia(int [:, :: 1] counts,
               int val):
    cdef:
        int total = 0
        int i, j, M, N
    N, M = counts.shape[: 2]
    for i in prange(M, nogil=True):
        for j in range(N):
            if counts[i, j] == val:
                total += 1

    return total / float(counts.size)

速度比原來快了一倍多,還是很可觀的,如果你的 CPU 是多核的,那么效率會更明顯。

這里我們沒有使用 schedule 和 chunksize 參數,你也可以加上去。當然啦,如果占用內存過大的話,它可能無法像預期的一樣顯著地提升性能,因為 prange 的優化重點是在 CPU 上面。

但是可能有人會有疑問,多個線程同時對 total 變量進行自增操作,這么做不會造成沖突嗎?答案是不會的,因為加法是可交換的,即無論是 a + b 還是 b + a,結果都是相同的。Cython(通過 OpenMP)生成線程代碼,每個線程計算循環子集的和,然后所有線程再將各自的和匯總在一起。

如果是交給 Numpy 來做的話,那么等價於如下:

np.sum(counts == val) / float(counts.size)

但是效率如何呢?我們來對比一下:

我們采用並行計算用的是 6.47 毫秒,Numpy 用的是 22 毫秒,看樣子是我們贏了,並且 CPU 核心數越多,差距越明顯,這便是並行計算的威力。當然對於這種算法來說,還是直接交給 Numpy 吧,畢竟人家都幫你封裝好了,一個函數調用就可以解決了。

因此有效利用計算機硬件資源確實是最直接的辦法。

並行編程的局限性

雖然 Cython 的 prange 容易使用,但其實還是有局限性的,當然這個局限性和 Cython無關,因為理想化的並行擴展本身就是一個難以實現的事情。我們舉個栗子:

def filter(...):
    for i in range(nrows):
        for j in range(ncols):
            b[i, j] = (a[i, j] + a[i - 1, j] + a[i + 1, j] +
                       a[i, j - 1] + a[i, j + 1]) / 5.0)

假設我們要做一個過濾器,計算每一個點、加上它周圍的四個點的平均值。但如果這里將外層的 range 換成 prange,那么它的整體性能不會得到提升。因為內層循環訪問的是不連續的數組元素,由於缺乏數據本地性,CPU 的緩存無法生效,從而導致 prange 變慢。

那么我們什么時候使用 prange 呢?遵循以下法則即可:

  • 1. prange 能夠很好的利用 CPU 並行操作, 這一點我們已經說過了
  • 2. 非本地讀寫的那些和內存綁定的操作很難提高速度
  • 3. 用較少的線程更容易實現加速, 因為對於 CPU 密集而言, 即便指定了超越核心數的線程也是沒有意義的
  • 4. 使用優化的線程並行庫是將 CPU 所有核心都用於常規的計算的最佳方式

當然,其實我們在開發的時候是可以隨時使用 prange 的,只要循環體不和 Python 對象進行交互即可。

總結

Cython 允許我們繞過全局解釋器鎖,只要我們把和 Python 無關的代碼分離出來即可。對於那些不需要和 Python 交互的 C 代碼,可以輕松的使用 prange 實現基於線程的並行。

而在其它語言中,基於線程的並行很容易出錯,並且難以正確處理。而 Cython 的 prange 則不需要我們在這方面費心,能夠輕松地處理很多性能瓶頸。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM