說到Barrier,很多語言中已經是標准庫中自帶的概念,一般情況下,只需要直接使用就行了。而最近一些機緣巧合的機會,我需要在c++中使用這么個玩意兒。但是c++標准庫里還沒有這個概念,只有boost里面有這樣現成的東西,而我又不想為了這么一個小東西引入個boost。所以,我借着這個機會研究了下,發現其實這些多線程/並發中的東西還是蠻有意思的。
閱讀本文你可能需要如下的一些知識:
-
多線程編程的概念。
-
c++的基本語法和有關多線程的語法。
第二條可能也沒有那么重要,因為如果理解了多線程的這些東西,什么語言都可以實現其核心概念。好了,廢話少扯,進入正題。
一、什么是Barrier?
首先,得介紹下Barrier的概念,Barrier從字面理解是屏障的意思,主要是用作集合線程,然后再一起往下執行。再具體一點,在Barrier之前,若干個thread各自執行,然后到了Barrier的時候停下,等待規定數目的所有的其他線程到達這個Barrier,之后再一起通過這個Barrier各自干自己的事情。
這個概念特別像小時候集體活動的過程,大家從各自的家里到學校集合,待人數都到齊之后,之后再一起坐車出去,到達指定地點后一起行動或者各自行動。
而在計算機的世界里,Barrier可以解決的問題很多,比如,一個程序有若干個線程並發的從網站上下載一個大型xml文件,這個過程可以相互獨立,因為一個文件的各個部分並不相關。而在處理這個文件的時候,可能需要一個完整的文件,所以,需要有一條虛擬的線讓這些並發的部分集合一下從而可以拼接成為一個完整的文件,可能是為了后續處理也可能是為了計算hash值來驗證文件的完整性。而后,再交由下一步處理。
二、如何實現一個Barrier?
並發的很多東西都擁有一個壞處就是你很難證明某種實現不是錯誤的,因為很多時候確實情況太多了,無論是死鎖,飢餓對於人腦都是太大的負擔。而反過來,對於我扯這篇文章,也是一個好處,正因為很難證明不是錯誤的,所以我的扯淡可以更放心一點。
在研究Barrier的實現中,我查閱了蠻多的資料的。說實話,其實現方式挺多的。在剔除了一些我能明確證明其有可能是錯誤的,我選擇了我自己覺得最容易理解的一種。
第一節說過,barrier很像是以前的班級集合,站在一個老師的角度,你需要知道的東西至少有這兩個:
-
班級有多少人。
-
目前已經到了多少人。
只有當目前已經到了的人等於班級人數之后才能出發。
所以如果按照這個類比,實現一個barrier至少需要以下的幾個變量:
-
需要同時在barrier等待的線程的個數。
-
當前到達barrier的線程的個數。
而按照barrier的邏輯,主要應該有這些操作:
-
當一個線程到達barrier的時候,增加計數。
-
如果個數不等於當前需要等待的線程個數,等待。
-
如果個數達到了需要等待的線程個數,通知/喚醒所有等待的進程,讓所有進程通過barrier。
在不考慮加鎖的情況下,按照上面的邏輯,偽代碼大概應該像這樣:
thread_count = n; <-- n是需要一起等待的線程的個數 arrived_count = 0; <-- 到達線程的個數 ------------------------------------------------------------- 以上是全局變量,只會初始化一次,以下是barrier開始的代碼 ------------------------------------------------------------- arrived_count += 1; if(arrived_count == thread_count) notify_all_threads_and_unblok(); else block_and_wait();
而在多線程環境下,很明顯arrived_count這種全局變量更新需要加鎖。所以,對於這個代碼,綜合稍微再改動一下,偽代碼可以更新下成為這樣:
thread_count = n; <-- n是需要一起等待的線程的個數 arrived_count = 0; <-- 到達線程的個數 ------------------------------------------------------------- 以上是全局變量,只會初始化一次,以下是barrier開始的代碼 ------------------------------------------------------------- lock(); arrived_count += 1; unlock(); if(arrived_count == thread_count) notify_all_threads_and_unblok(); else block_and_wait();
這里,在有的語言中,鎖的粒度可能小了點,取決於notify_all_threads和wait在這個語言中的定義,但是作為偽代碼,為了可能展示起來比較方便。
而如果你有並發編程的知識,你應該敏感的認識到notify_all_threads_and_unblock,block_and_wait這種在這里雖然是簡單的幾個單詞,但是其包含的操作步驟明顯不止一個,更別說背后的機器指令了。所以作為一個並發概念下運行的程序,不可以簡單的就放這樣一個操作在這里,如果都是任何函數,指令,代碼都是自帶原子性的,那么寫多線程/並發程序也沒有啥好研究的了。所以對於這兩個操作,我們必須具體的擴展下。
對於notify_all_threads_and_unblock和block_and_wait包含相當多的操作,所以下面,得把這兩個操作具體的展開。
1 thread_count = n; <-- n是需要一起等待的線程的個數 2 arrived_count = 0; <-- 到達線程的個數 3 could_release = false; 4 ------------------------------------------------------------- 5 以上是全局變量,只會初始化一次,以下是barrier開始的代碼 6 ------------------------------------------------------------- 7 lock(); 8 if(arrived_count == 0) 9 could_release = false; 10 11 arrived_count += 1; 12 unlock(); 13 if(arrived_count == thread_count) 14 could_realse = true; 15 arrived_count = 0; 16 else 17 while(could_release == false) 18 spin()
這里多了一個變量could_release完成上面說的兩個操作。原理也很簡單,如果等待的個數沒有到達指定數目,這個值始終是false,在代碼中使用循環讓線程阻塞在spin處(當然,假設spin是原子性的)。如果到達了thread_count,改變could_release的值,這樣循環條件不滿足,代碼可以繼續執行。而在13行的if里面把arrived_count重新設置為0是因為如果不這樣做,那么這個barrier就只能用一次,因為沒有地方再把這個表示到達線程數目變量的初始值重新設置了。
我覺得這里需要停一下,來思一下上面的代碼,首先,這個代碼有很多看起來很像有問題的地方。比如對於could_release和arrived_count的重置處,這都是賦值,而在並發程序中,任何寫操作都需要仔細思考是否需要加鎖,在這里,加鎖當然沒問題。但是盲目的加鎖會導致性能損失。
多線程程序最可怕的就是陷入細節,所以,我一般都是整體的思考下是不是有問題。對於一個barrier,錯誤就是指沒有等所有的線程都到達了就停止了等待,人沒來齊就發車了。而怎么會導致這樣的情況呢?只有當arrived_count值在兩個線程不同步才會導致錯誤。秉承這個原則,看看上面的代碼,arrived_count的更新是加鎖的,所以在到達if之前其值是可以信賴的。而if這段判斷本身是讀操作,其判斷就是可以信賴的,因為arrived_count的值更新是可靠的,所以進來的線程要么進入if,要么進入else。不存在線程1更新了arrived_count的值而線程2讀到了arrived_count的值而導致沒有到thread_count就更新了could_release的情況。
沒辦法,這類的程序就是很繞,所以我一般都不陷入細節。
現在看起來,一切都很完美,但是多線程程序最惡心的地方就是可能的死鎖,飢餓等等。而這些又很難證明,而上面這段代碼,在某些情況下就是會導致死鎖。考慮thread_count等於2,也就是這個barrier需要等待兩個線程一起通過。
現在有兩個線程,t1和t2,t1先執行直到17行,卡住,這時候t2獲得寶貴的cpu機會。很明顯,這時會進入14行,更新could_release的值。如果這個時候t1獲得執行機會,萬事大吉,t1會離開while區域,繼續執行。直到下次再次到達這個barrier。
但是如果這個時候t1並沒有獲得執行機會,t2一直執行,雖然喚醒了could_relase,但是t1會一直停留在18行。要知道,這個含有barrier的代碼可能是在一個循環之中,如果t2再次到達barrier的區域,這時候arrived_count等於0(因為arrived_count在上一次t2進入13行之后重置了),這個時候could_relase會變成false。現在t1,t2都在18行了,沒有人有機會去更新could_relase的值,線程死鎖了。
怎么辦?仔細思考下,是喚醒機制有問題,很明顯,如果能夠在喚醒的時候原子式的喚醒所有的線程,那么上面所說的問題就不存在了。在很多語言里都有這樣的方法可以完成上面說的原子性的喚醒所有線程,比如c++里面的notify_all。但是,如果沒有這個函數,該如何實現呢?
上面死鎖問題的誕生在於一個線程不恰當的更新了全局的could_relase,導致全部的判斷條件跟着錯誤的改變。解決這樣的問題,需要的是一個只有每個線程各自能看到,可以獨立更新,互相不干擾而又能被使用的變量。幸好,在設計多線程概念時,有一個概念叫做thread local,剛好能夠滿足這個要求。而運用這樣的變量,上述的概念可以表述成為:
1 thread_count = n; <-- n是需要一起等待的線程的個數 2 arrived_count = 0; <-- 到達線程的個數 3 could_release = false; 4 thread_local_flag = could_release; <-- 線程局部變量,每個線程獨立更新 5 ------------------------------------------------------------- 6 以上是全局變量,只會初始化一次,以下是barrier開始的代碼 7 ------------------------------------------------------------- 8 thread_local_flag = !thread_local_flag 9 lock(); 10 arrived_count += 1; 11 unlock(); 12 if(arrived_count == thread_count) 13 could_realse = thread_local_flag; 14 arrived_count = 0; 15 else 16 while(could_release != thread_local_flag) 17 spin()
這里要着重解釋下,為什么不會死鎖,由於thread_local_flag是每個線程獨立更新的,所以很明顯,其是不用加鎖的。其余代碼和上面的偽代碼類似,不同的是,如果發生上面一樣的情況,t2更新thread_local_flag的時候,只有其局部的變量會被置反而不會影響其余的線程的變量,而因為could_realse是全局變量,在t2第一次執行到13行的時候已經設置成thread_local_flag一樣的值了。這個時候, 哪怕t2再次執行到16行也會因為其內部變量已經被置反而阻塞在這個while循環之中。而t1只要獲得執行機會,就可以通過這個barrier。
有點繞,但是仔細想想還是蠻有意思的。
三、如何運用c++實現Barrier?
雖然上面說了那么多,但是c++中實現Barrier不需要這么復雜,這要感謝c++ 11中已經自帶了很多原子性的操作,比如上面說的notify_all。所以,代碼就沒有那么復雜了,當然,c++也有thread_local,如果不畏勞苦,可以真的從最基礎的寫起。
#include <iostream> #include <condition_variable> #include <thread> #include <chrono> using namespace std; class TestBarrier{ public: TestBarrier(int nThreadCount): m_threadCount(nThreadCount), m_count(0), m_release(0) {} void wait1(){ unique_lock<mutex> lk(m_lock); if(m_count == 0){ m_release = 0; } m_count++; if(m_count == m_threadCount){ m_count = 0; m_release = 1; m_cv.notify_all(); } else{ m_cv.wait(lk, [&]{return m_release == 1;}); } } private: mutex m_lock; condition_variable m_cv; unsigned int m_threadCount; unsigned int m_count; unsigned int m_release; };
這里多虧了c++標准庫中引進的condition_variable,使得上面的概念可以簡單高效而又放心的實現,你也不需要操心什么線程局部量。而關於c++並發相關的種種知識可能需要專門的若干篇幅才能說清楚,如果你並不熟悉c++,可以跳過這些不知所雲的部分。驗證上述代碼可以使用如下代碼:
unsigned int threadWaiting = 5; TestBarrier barrier(5); void func1(){ this_thread::sleep_for(chrono::seconds(3)); cout<<"func1"<<endl; barrier.wait1(); cout<<"func1 has awakended!"<<endl; } void func2(){ cout<<"func2"<<endl; barrier.wait1(); cout<<"func2 has awakended!"<<endl; } void func3(){ this_thread::sleep_for(chrono::seconds(1)); cout<<"func3"<<endl; barrier.wait1(); cout<<"func3 has awakended!"<<endl; } int main(){ for(int i = 0; i < 5; i++){ thread t1(func1); thread t2(func3); thread t3(func2); thread t4(func3); thread t5(func2); t1.join(); t2.join(); t3.join(); t4.join(); t5.join(); } }
好了,在我機器上的運行結果是這樣的,由於輸出沒有同步,所以輸出可能並沒有想象的那么整潔。但是不影響整體結果,可以看到,所有線程到齊之后才各自執行各自后面的代碼:
這篇文章也在我的公眾號同步發表,我的這個公眾號嘛,佛系更新,當然,本質上是想到一個話題不容易(懶的好借口),歡迎關注哦: