1. ucore lab7介紹
ucore在前面的實驗中實現了進程/線程機制,並在lab6中實現了搶占式的線程調度機制。基於中斷的搶占式線程調度機制使得線程在執行的過程中隨時可能被操作系統打斷,被阻塞掛起而令其它的線程獲得CPU。多個線程並發的執行,大大提升了非cpu密集型應用程序的cpu吞吐量,使得計算機系統中寶貴的cpu硬件資源得到了充分利用。
操作系統提供的內核線程並發機制的優點是明顯的,但同時也帶來了一些問題,其中首當其沖的便是線程安全問題。
並發帶來的線程安全問題
線程安全指的是在擁有共享數據的多條線程並行執行的程序中,線程安全的代碼會通過同步機制保證各個線程都可以正常且正確的執行,不會出現數據污染等意外情況。
舉一個經典的例子:在高級語言中對於某一共享整型變量i(假設i=5)進行的i++操作,在最終的機器代碼中會被分解為幾個更細致的機器指令:
1. 從內存的對應地址中讀取出變量i的值(高級語言的變量在機器層面表現為一個內存地址),寫入cpu的寄存器中(假設是edx)
2. 對寄存器edx進行+1運算(運算后edx寄存器中的值為5+1=6)
3. 將edx的值寫入變量i對應的內存空間中(在高級語言層面看,寫入edx中的新值后i變成了6)
通過之前lab5/lab6的學習,我們知道在i++具體的機器指令序列執行的每一步過程中,操作系統都可能通過時鍾中斷打斷對應線程的執行,進行線程的上下文切換。機器指令是原子性的,但高級語言中的一條指令底層可能對應多個機器指令,在執行的過程中可能被中斷介入,無法保證執行的連貫性。
線程安全問題舉例
例如,存在兩個並發執行的線程a、線程b,都對線程間的共享變量i(i=5)進行了i++操作。
兩個線程的執行i++時的機器指令流按照時間順序依次為:
1. 線程a讀取內存中變量i的值,寫入寄存器edx(此時內存中i的值為5,edx的值為5)。
2. 線程a令edx進行+1運算,此時寄存器edx的值為6。
3. 操作系統處理時鍾中斷,發現線程a的時間片已經用完,將其掛起,保存線程a的上下文(此時線程a的寄存器上下文中edx=6);並調度線程b開始獲取cpu執行。
4. 線程b讀取內存中變量i的值,寫入寄存器edx(此時內存中i的值為5,edx的值為5)。
5. 線程b令edx進行+1運算,此時寄存器edx的值為6。
6. 操作系統處理時鍾中斷,發現線程b的時間片已經用完,將其掛起,保存線程b的上下文(此時線程b的寄存器上下文中edx=6);並調度線程a開始獲取cpu執行。
7. 線程a恢復現場繼續往下執行,將現場恢復后edx的值寫回內存中變量i對應的內存地址中,寫回后變量i=6。
8. 操作系統處理時鍾中斷,發現線程a的時間片已經用完,將其掛起;並調度線程a開始獲取cpu執行。
9. 線程b恢復現場繼續往下執行,將現場恢復后edx的值寫會內存中變量i對應的內存地址中,寫回后變量i=6。
上述的例子中,由於操作系統的搶占式調度以及高級語言中i++操作的非原子性,使得原本初始值為5的變量i,在執行兩次i++之后得到的並不是預期的7,而是錯誤的6。這還僅僅是兩個並發線程對於一個共享變量的操作問題,實際的程序中會涉及到更多的並發線程和共享變量,使得所編寫的多線程並發程序正確性無法得到保證。
操作系統解決線程安全問題的手段
在絕大多數情況下,程序的正確性都比性能重要的多。操作系統在引入搶占式調度的線程並發機制的同時,也需要提供相應的手段來解決線程安全問題。
解決線程安全問題,主要有兩個思路:一是消除程序的並發性;二是阻止多個線程並發的訪問共享資源(共享內存、共享文件、共享外設等等)的訪問,即互斥:使得一個線程在訪問某一共享資源時,其它的線程不能進行同樣的操作。
第一種思路被一些I/O密集型的應用程序所使用,即整個程序(進程)中只有一個線程在工作,通過操作系統底層提供的i/o多路復用機制進行工作,早期的redis以及nodeJS就是工作在單線程模型下的。單線程工作的應用程序由於不存在多個線程並發執行的場景,消除了線程的並發性,自然也不需要處理線程安全問題了。
而操作系統解決線程安全問題的方式采用的是第二種思路(通用操作系統是用於同時為大量進程、線程服務的,因此不能再回過頭來禁止並發),通過一些機制限制並發線程同時訪問會引起線程安全問題的共享變量,保證訪問的互斥性。
在關於操作系統原理的理論書籍中介紹了很多用於實現互斥機制的辦法,而ucore在lab7中主要實現了“信號量”和“條件變量”這兩種效率較高的、主流的、基於休眠/喚醒機制的同步互斥機制。lab7中也以哲學家就餐問題為例,通過信號量Semaphore以及管程Monitor解決並發領域中很經典的線程同步問題。
通過lab7的學習,將能夠深入學習操作系統底層實現線程同步、互斥機制,理解信號量、條件變量、管程等同步互斥機制的工作原理;也可以對更上層的如java中的synchronized、AQS悲觀鎖、管程monitor、notify/wait等線程並發同步機制有更深的理解。
lab7是建立在之前實驗的基礎之上的,需要先理解之前的實驗才能更好的理解lab7中的內容。
可以參考一下我關於前面實驗的博客:
1. ucore操作系統學習(一) ucore lab1系統啟動流程分析
2. ucore操作系統學習(二) ucore lab2物理內存管理分析
3. ucore操作系統學習(三) ucore lab3虛擬內存管理分析
4. ucore操作系統學習(四) ucore lab4內核線程管理
5. ucore操作系統學習(五) ucore lab5用戶進程管理
6. ucore操作系統學習(六) ucore lab6線程調度器
2. ucore lab7實驗細節分析
ucore在lab7中的內容大致分為以下幾個部分:
1. 實現等待隊列
2. 實現信號量
3. 使用信號量解決哲學家就餐問題
4. 基於信號量實現條件變量
5. 基於信號量和條件變量實現管程
6. 使用管程解決哲學家就餐問題
2.1 實現等待隊列
等待隊列介紹
前面提到,ucore在lab7中實現的同步機制是基於休眠/喚醒機制的。為了保證線程對於臨界區訪問的互斥性,在前一個線程已經進入了臨界區后,后續要訪問臨界區的線程會被阻塞以等待前一個線程離開臨界區,在之前進入臨界區的線程離開臨界區后被阻塞的線程會被再次喚醒獲得進入臨界區的資格。
在有許多線程並發時,可能會有不止一個線程被阻塞在對應的臨界區,為此抽象出了等待隊列結構(wait_queue)用於維護這一被阻塞線程的集合。當線程由於互斥而被阻塞在臨界區時,將其加入等待隊列並放棄cpu進入阻塞態;當之前獲得臨界區訪問權限的線程離開后,再從對應的等待隊列中選擇一個被阻塞、處於等待狀態的線程喚醒,被喚醒的線程能接着進入臨界區。
利用等待隊列,使得自始至終都只有最多一個線程在臨界區中,保證了互斥性;而線程在等待隊列中的休眠(阻塞)/喚醒動作,則實現了線程之間對於臨界區訪問的同步。
等待隊列當然並不只適用於線程並發同步,當線程進入等待狀態以等待某一特定完成事件時(定時休眠一段時間、等待阻塞IO讀寫完成等等事件),底層都可以使用等待隊列來實現。
等待隊列實現
ucore在/kern/sync目錄下的wait.c、wait.h中實現了等待隊列wait_queue、等待隊列節點項wait_t以及相關的函數。
ucore的等待隊列底層是通過雙向鏈表結構實現的。和前面的實驗類似的,提供了一個宏定義le2wait用於訪問wait_link節點項對應的wait_t結構。
等待隊列結構:
/** * 等待隊列 * */ typedef struct { // 等待隊列的頭結點(哨兵節點) list_entry_t wait_head; } wait_queue_t; struct proc_struct; /** * 等待隊列節點項 * */ typedef struct { // 關聯的線程 struct proc_struct *proc; // 喚醒標識 uint32_t wakeup_flags; // 該節點所屬的等待隊列 wait_queue_t *wait_queue; // 等待隊列節點 list_entry_t wait_link; } wait_t; #define le2wait(le, member) \ to_struct((le), wait_t, member)
等待隊列結構底層操作:
// 初始化wait_t等待隊列項 void wait_init(wait_t *wait, struct proc_struct *proc); // 初始化等待隊列 void wait_queue_init(wait_queue_t *queue); // 將wait節點項插入等待隊列 void wait_queue_add(wait_queue_t *queue, wait_t *wait); // 將wait項從等待隊列中移除 void wait_queue_del(wait_queue_t *queue, wait_t *wait); // 獲取等待隊列中wait節點的下一項 wait_t *wait_queue_next(wait_queue_t *queue, wait_t *wait); // 獲取等待隊列中wait節點的前一項 wait_t *wait_queue_prev(wait_queue_t *queue, wait_t *wait); // 獲取等待隊列的第一項 wait_t *wait_queue_first(wait_queue_t *queue); // 獲取等待隊列的最后一項 wait_t *wait_queue_last(wait_queue_t *queue); // 等待隊列是否為空 bool wait_queue_empty(wait_queue_t *queue); // wait項是否在等待隊列中 bool wait_in_queue(wait_t *wait);
// 將wait項從等待隊列中刪除(如果存在的話) #define wait_current_del(queue, wait) \ do { \ if (wait_in_queue(wait)) { \ wait_queue_del(queue, wait); \ } \ } while (0) #endif /* !__KERN_SYNC_WAIT_H__ */

/** * 初始化wait_t等待隊列項 * */ void wait_init(wait_t *wait, struct proc_struct *proc) { // wait項與proc建立關聯 wait->proc = proc; // 等待的狀態 wait->wakeup_flags = WT_INTERRUPTED; // 加入等待隊列 list_init(&(wait->wait_link)); } /** * 初始化等待隊列 * */ void wait_queue_init(wait_queue_t *queue) { // 等待隊列頭結點初始化 list_init(&(queue->wait_head)); } /** * 將wait節點項插入等待隊列 * */ void wait_queue_add(wait_queue_t *queue, wait_t *wait) { assert(list_empty(&(wait->wait_link)) && wait->proc != NULL); // wait項與等待隊列建立關聯 wait->wait_queue = queue; // 將wait項插入頭結點前 list_add_before(&(queue->wait_head), &(wait->wait_link)); } /** * 將wait項從等待隊列中移除 * */ void wait_queue_del(wait_queue_t *queue, wait_t *wait) { assert(!list_empty(&(wait->wait_link)) && wait->wait_queue == queue); list_del_init(&(wait->wait_link)); } /** * 獲取等待隊列中wait節點的下一項 * */ wait_t * wait_queue_next(wait_queue_t *queue, wait_t *wait) { assert(!list_empty(&(wait->wait_link)) && wait->wait_queue == queue); list_entry_t *le = list_next(&(wait->wait_link)); if (le != &(queue->wait_head)) { // *wait的下一項不是頭結點,將其返回 return le2wait(le, wait_link); } return NULL; } /** * 獲取等待隊列中wait節點的前一項 * */ wait_t * wait_queue_prev(wait_queue_t *queue, wait_t *wait) { assert(!list_empty(&(wait->wait_link)) && wait->wait_queue == queue); list_entry_t *le = list_prev(&(wait->wait_link)); if (le != &(queue->wait_head)) { // *wait的前一項不是頭結點,將其返回 return le2wait(le, wait_link); } return NULL; } /** * 獲取等待隊列的第一項 * */ wait_t * wait_queue_first(wait_queue_t *queue) { // 獲取頭結點的下一項 list_entry_t *le = list_next(&(queue->wait_head)); if (le != &(queue->wait_head)) { // 頭結點的下一項不是頭結點,將其返回 return le2wait(le, wait_link); } // 頭結點的下一項還是頭結點,說明等待隊列為空(只有一個wait_head哨兵節點) return NULL; } /** * 獲取等待隊列的最后一項 * */ wait_t * wait_queue_last(wait_queue_t *queue) { // 獲取頭結點的前一項 list_entry_t *le = list_prev(&(queue->wait_head)); if (le != &(queue->wait_head)) { // 頭結點的前一項不是頭結點,將其返回 return le2wait(le, wait_link); } // 頭結點的前一項還是頭結點,說明等待隊列為空(只有一個wait_head哨兵節點) return NULL; } /** * 等待隊列是否為空 * */ bool wait_queue_empty(wait_queue_t *queue) { return list_empty(&(queue->wait_head)); } /** * wait項是否在等待隊列中 * */ bool wait_in_queue(wait_t *wait) { return !list_empty(&(wait->wait_link)); }
等待隊列休眠/喚醒等高層操作:
等待隊列對於線程的休眠、喚醒對應的高級操作依賴於上面介紹的、底層的等待隊列增刪改查操作。
// 將等待隊列中的wait項對應的線程喚醒 void wakeup_wait(wait_queue_t *queue, wait_t *wait, uint32_t wakeup_flags, bool del); // 將等待隊列中的第一項對應的線程喚醒 void wakeup_first(wait_queue_t *queue, uint32_t wakeup_flags, bool del); // 將等待隊列中的所有項對應的線程全部喚醒 void wakeup_queue(wait_queue_t *queue, uint32_t wakeup_flags, bool del); // 令對應wait項加入當前等待隊列;令當前線程阻塞休眠,掛載在該等待隊列中 void wait_current_set(wait_queue_t *queue, wait_t *wait, uint32_t wait_state);

/** * 將等待隊列中的wait項對應的線程喚醒 * */ void wakeup_wait(wait_queue_t *queue, wait_t *wait, uint32_t wakeup_flags, bool del) { if (del) { // 將wait項從等待隊列中刪除 wait_queue_del(queue, wait); } // 設置喚醒的原因標識 wait->wakeup_flags = wakeup_flags; // 喚醒對應線程 wakeup_proc(wait->proc); } /** * 將等待隊列中的第一項對應的線程喚醒 * */ void wakeup_first(wait_queue_t *queue, uint32_t wakeup_flags, bool del) { wait_t *wait; if ((wait = wait_queue_first(queue)) != NULL) { wakeup_wait(queue, wait, wakeup_flags, del); } } /** * 將等待隊列中的所有項對應的線程全部喚醒 * */ void wakeup_queue(wait_queue_t *queue, uint32_t wakeup_flags, bool del) { wait_t *wait; if ((wait = wait_queue_first(queue)) != NULL) { if (del) { do { wakeup_wait(queue, wait, wakeup_flags, 1); } while ((wait = wait_queue_first(queue)) != NULL); } else { do { wakeup_wait(queue, wait, wakeup_flags, 0); } while ((wait = wait_queue_next(queue, wait)) != NULL); } } } /** * 令對應wait項加入當前等待隊列;令當前線程阻塞休眠,掛載在該等待隊列中 * */ void wait_current_set(wait_queue_t *queue, wait_t *wait, uint32_t wait_state) { assert(current != NULL); wait_init(wait, current); current->state = PROC_SLEEPING; current->wait_state = wait_state; wait_queue_add(queue, wait); }
2.2 實現信號量
信號量是一種同步互斥機制的實現,普遍存在於現在的各種操作系統內核里,最早是由著名計算機科學家Dijkstra提出。
ucore信號量定義:
信號量的定義和使用非常簡單和基礎,包含了一個信號量的值value以及用於線程同步的等待隊列。
/** * 信號量 * */ typedef struct { // 信號量值 int value; // 信號量對應的等待隊列 wait_queue_t wait_queue; } semaphore_t; /** * 初始化信號量 * */ void sem_init(semaphore_t *sem, int value) { sem->value = value; // 初始化等待隊列 wait_queue_init(&(sem->wait_queue)); }
信號量的主要操作分別是down和up,對應於Dijkstra提出信號量時提出的P/V操作。
信號量作為同步互斥的基本結構,其down/up操作必須是原子性的,無法被打斷發生上下文切換。令軟件程序表現出原子性的方法有很多,由於ucore是運行在單核的80386cpu上的,簡單起見便直接使用關閉中斷的方式來實現信號量操作的原子性(多核cpu的情況下,關閉單核的中斷是不夠的,而關閉所有核心的中斷則性能損失太大,需要采取鎖總線等其它手段來實現軟件原子性)。
信號量的down操作:
信號量的down操作,是請求獲取一個信號量。
當信號量的value值大於0時,說明還能容納當前線程進入臨界區。
當信號量的value值等於0時,說明已經無法容納更多的線程了,此時需要將當前線程阻塞在信號量的等待隊列上,等待信號量的up操作將其喚醒。
/** * 信號量down操作 扣減信號量 * 當信號量value不足時將當前線程阻塞在信號量上,等待其它線程up操作時將其喚醒 * */ static __noinline uint32_t __down(semaphore_t *sem, uint32_t wait_state) { bool intr_flag; // 暫時關閉中斷,保證信號量的down操作是原子操作 local_intr_save(intr_flag); if (sem->value > 0) { // 信號量對應的value大於0,還有權使用 sem->value --; local_intr_restore(intr_flag); return 0; } // 信號量對應的value小於等於0,需要阻塞當前線程 wait_t __wait, *wait = &__wait; // 令當前線程掛在信號量的阻塞隊列中 wait_current_set(&(sem->wait_queue), wait, wait_state); // 恢復中斷,原子操作結束 local_intr_restore(intr_flag); // 當前線程進入阻塞狀態了,進行一次調度 schedule(); local_intr_save(intr_flag); // 喚醒后,原子操作將當前項從信號量的等待隊列中刪除 wait_current_del(&(sem->wait_queue), wait); local_intr_restore(intr_flag); if (wait->wakeup_flags != wait_state) { // 如果等待線程喚醒的標識與之前設置的參數wait_state不一致,將其狀態返回給調用方做進一步判斷 return wait->wakeup_flags; } return 0; }
信號量的up操作:
信號量的up操作,是增加一個信號量中的值。
當增加信號量值時發現當前信號量的等待隊列為空時,則說明當前沒有線程被阻塞、需要進入信號量管制的臨界區中,簡單的將信號量值加1。
當增加信號量時發現等待隊列不為空,則說明存在線程想要進入臨界區中,卻由於沒有滿足信號量的條件,被阻塞在了臨界區外。此時便從信號量的等待隊列中挑選出最早被阻塞的線程,將其喚醒,使得其得以進入臨界區。
/** * 信號量up操作 增加信號量或喚醒被阻塞在信號量上的一個線程(如果有的話) * */ static __noinline void __up(semaphore_t *sem, uint32_t wait_state) { bool intr_flag; // 暫時關閉中斷,保證信號量的up操作是原子操作 local_intr_save(intr_flag); { wait_t *wait; if ((wait = wait_queue_first(&(sem->wait_queue))) == NULL) { // 信號量的等待隊列為空,說明沒有線程等待在該信號量上 // 信號量value加1 sem->value ++; } else { assert(wait->proc->wait_state == wait_state); // 將等待隊列中的對應等待線程喚醒 wakeup_wait(&(sem->wait_queue), wait, wait_state, 1); } } local_intr_restore(intr_flag); }
信號量的down與up操作關系十分緊密,互相對照着看可以更好的理解其工作原理。
互斥信號量:
value值被初始化為1的信號量比較特殊,稱為二元信號量,也被叫做互斥信號量。互斥信號量能夠作為mutex互斥鎖,用於保證臨界區中數據不會被線程並發的訪問。
2.3 使用信號量解決哲學家就餐問題
哲學家就餐問題介紹
哲學家就餐問題是Dijkstra提出的一個經典的多線程同步問題。大概場景是在一個環形的圓桌上,坐着五個哲學家,而桌上有五把叉子和五個碗。一個哲學家平時進行思考,飢餓時便試圖取用其左右最靠近他的叉子,只有在他拿到兩只叉子時才能進餐。進餐完畢,放下叉子繼續思考。
解決哲學家就餐問題的基本思路是使用線程模擬哲學家,每個線程對應一個活動着的哲學家。但是由於5個並發活動的哲學家線程爭搶僅有的5把叉子,且哲學家只有在同時拿到兩根叉子時才能進餐,如果沒有良好的同步機制對這5個哲學家線程進行協調,那么哲學家線程互相之間容易發生死鎖(例如,五個哲學家線程同時拿起了自己左手邊的叉子,都無法拿起自己右邊的叉子,互相等待着。哲學家之間將永遠無法進餐,紛紛餓死)。
使用Dijkstra提出的信號量機制可以很好的解決哲學家就餐問題,下面看看ucore中是如何使用信號量解決哲學家就餐問題的。
哲學家線程主體執行邏輯:
ucore的lab7中的check_sync函數是整個lab7實驗的總控函數。在check_sync的前半部分使用kern_thread函數創建了N(N=5)個哲學家內核線程,用於執行philosopher_using_semaphore,模擬哲學家就餐問題。
philosopher_using_semaphore中哲學家循環往復的進行如下操作:
1. 哲學家進行思考(通過do_sleep系統調用進行休眠阻塞,模擬哲學家思考)
2. 通過phi_take_forks_sema函數嘗試着同時拿起左右兩個叉子(如果無法拿到左右叉子,則會陷入阻塞狀態)
3. 哲學家進行就餐(通過do_sleep系統調用進行休眠阻塞,模擬哲學家就餐)
4. 通過phi_put_forks_sema函數同時放下左右兩個叉子
拿起叉子的phi_take_forks_sema函數和放下叉子phi_put_forks_sema的函數內部都是通過信號量進行同步的,在下面進行更進一步的分析。
#define N 5 /* 哲學家數目 */ #define LEFT (i-1+N)%N /* i的左鄰號碼 */ #define RIGHT (i+1)%N /* i的右鄰號碼 */ #define THINKING 0 /* 哲學家正在思考 */ #define HUNGRY 1 /* 哲學家想取得叉子 */ #define EATING 2 /* 哲學家正在吃面 */ #define TIMES 4 /* 吃4次飯 */ #define SLEEP_TIME 10 void check_sync(void){ int i; //check semaphore 信號量解決哲學家就餐問題 sem_init(&mutex, 1); for(i=0;i<N;i++){ sem_init(&s[i], 0); int pid = kernel_thread(philosopher_using_semaphore, (void *)i, 0); if (pid <= 0) { panic("create No.%d philosopher_using_semaphore failed.\n"); } philosopher_proc_sema[i] = find_proc(pid); set_proc_name(philosopher_proc_sema[i], "philosopher_sema_proc"); } 。。。 條件變量(管程)解決哲學家就餐問題(暫時忽略) } //---------- philosophers problem using semaphore ---------------------- int state_sema[N]; /* 記錄每個人狀態的數組 */ /* 信號量是一個特殊的整型變量 */ semaphore_t mutex; /* 臨界區互斥 */ semaphore_t s[N]; /* 每個哲學家一個信號量 */ /** * 哲學家線程主體執行邏輯 * */ int philosopher_using_semaphore(void * arg) /* i:哲學家號碼,從0到N-1 */ { int i, iter=0; i=(int)arg; cprintf("I am No.%d philosopher_sema\n",i); while(iter++<TIMES) { /* 無限循環 */ cprintf("Iter %d, No.%d philosopher_sema is thinking\n",iter,i); /* 哲學家正在思考 */ // 使用休眠阻塞來模擬思考(哲學家線程阻塞N秒) do_sleep(SLEEP_TIME); // 哲學家嘗試着去拿左右兩邊的叉子(如果沒拿到會阻塞) phi_take_forks_sema(i); /* 需要兩只叉子,或者阻塞 */ cprintf("Iter %d, No.%d philosopher_sema is eating\n",iter,i); /* 進餐 */ // 使用休眠阻塞來模擬進餐(哲學家線程阻塞N秒) do_sleep(SLEEP_TIME); // 哲學家就餐結束,將叉子放回桌子。 // 當發現之前有臨近的哲學家嘗試着拿左右叉子就餐時卻沒有成功拿到,嘗試着喚醒對應的哲學家 phi_put_forks_sema(i); /* 把兩把叉子同時放回桌子 */ } cprintf("No.%d philosopher_sema quit\n",i); return 0; }
拿起叉子/放下叉子函數分析:
phi_take_forks_sema函數表示哲學家嘗試着拿起左右叉子想要就餐;而phi_put_forks_sema函數表示哲學家進餐結束后放下左右叉子。
兩者都是通過全局的互斥信號量mutex的down操作進行全局的互斥,保證在同一時刻只有一個哲學家線程能夠進入臨界區,對臨界區的資源叉子進行拿起/放下操作。對不同的哲學家線程進行互斥,保證查看左右叉子的狀態時不會出現並發問題。在后面對mutex的up操作用於釋放mutex互斥信號量,以離開臨界區,喚醒可能阻塞在mutex信號量中的其它哲學家線程,讓阻塞在mutex信號量中的另一個線程得以進入臨界區。
phi_take_forks_sema函數分析:
在執行phi_take_forks_sema拿叉子時,通過關鍵的phi_test_sema函數進行條件的判斷,判斷當前哲學家線程i的左右哲學家線程是否都未就餐。
如果條件滿足(在拿叉子時,phi_test_sema前哲學家i已經被預先設置為HUNGRY飢餓狀態了),則代表當前哲學家i可以進餐(其拿起了左右叉子,也代表着其相鄰的左右哲學家無法就餐)。
而如果條件不滿足則會在phi_take_forks_sema的最后,被down(&s[i])阻塞在信號量s[i]上,等待其被左右兩旁就餐完畢的哲學家將其喚醒。
phi_put_forks_sema函數分析:
在執行phi_put_forks_sema放下叉子時,首先通過設置state_sema[i]的狀態為Thinking,代表哲學家i已經就餐完畢重新進入思考狀態。同時哲學家i在放下叉子后,通過phi_test_sema(LEFT)和phi_test_sema(RIGHT)來判斷相鄰的哲學家在自己就餐的這段時間是否也陷入了飢餓狀態,卻由於暫時拿不到叉子而被阻塞了(LEFT和RIGHT宏利用取模,解決下標回環計算的問題)。如果確實存在這種情況,通過phi_test_sema函數嘗試着令相鄰的哲學家進行就餐(也許被阻塞哲學家的隔壁另一邊的哲學家依然在就餐,那么此時依然無法將其喚醒就餐;而需要等到另一邊的哲學家就餐完畢來嘗試將其喚醒)。
通過互斥信號量mutex實現哲學家線程就餐對臨界區資源-叉子訪問的互斥性,避免了並發時對叉子狀態判斷不准確的情況產生;同時利用信號量數組semaphore_t s[N]對哲學家拿取、放下叉子的操作進行同步,使得哲學家們在叉子資源有限、沖突的情況下有序的就餐,不會出死鎖、飢餓等現象。
/** * 哲學家i拿起左右叉子 */ void phi_take_forks_sema(int i) /* i:哲學家號碼從0到N-1 */ { // 拿叉子時需要通過mutex信號量進行互斥,防止並發問題(進入臨界區) down(&mutex); // 記錄下哲學家i飢餓的事實(執行phi_take_forks_sema嘗試拿叉子,說明哲學家i進入了HUNGRY飢餓狀態) state_sema[i]=HUNGRY; // 試圖同時得到左右兩只叉子 phi_test_sema(i); // 離開臨界區(喚醒可能阻塞在mutex上的其它線程) up(&mutex); // phi_test_sema中如果成功拿到叉子進入了就餐狀態,會先執行up(&s[i]),再執行down(&s[i])時便不會阻塞 // 反之,如果phi_test_sema中沒有拿到叉子,則down(&s[i])將會令哲學家i阻塞在信號量s[i]上 down(&s[i]); } /** * 哲學家i放下左右叉子 */ void phi_put_forks_sema(int i) /* i:哲學家號碼從0到N-1 */ { // 放叉子時需要通過mutex信號量進行互斥,防止並發問題(進入臨界區) down(&mutex); /* 進入臨界區 */ // 哲學家進餐結束(執行phi_put_forks_sema放下叉子,說明哲學家已經就餐完畢,重新進入THINKING思考狀態) state_sema[i]=THINKING; // 當哲學家i就餐結束,放下叉子時。需要判斷左、右臨近的哲學家在自己就餐的這段時間內是否也進入了飢餓狀態,卻因為自己就餐拿走了叉子而無法同時獲得左右兩個叉子。 // 為此哲學家i在放下叉子后需要嘗試着判斷在自己放下叉子后,左/右臨近的、處於飢餓的哲學家能否進行就餐,如果可以就喚醒阻塞的哲學家線程,並令其進入就餐狀態(EATING) phi_test_sema(LEFT); /* 看一下左鄰居現在是否能進餐 */ phi_test_sema(RIGHT); /* 看一下右鄰居現在是否能進餐 */ up(&mutex); /* 離開臨界區q(喚醒可能阻塞在mutex上的其它線程) */ } /** * 判斷哲學家i是否可以拿起左右叉子 */ void phi_test_sema(i) /* i:哲學家號碼從0到N-1 */ { // 當哲學家i處於飢餓狀態(HUNGRY),且其左右臨近的哲學家都沒有在就餐狀態(EATING) if(state_sema[i]==HUNGRY&&state_sema[LEFT]!=EATING &&state_sema[RIGHT]!=EATING) { // 哲學家i餓了(HUNGRY),且左右兩邊的叉子都沒人用。 // 令哲學家進入就餐狀態(EATING) state_sema[i]=EATING; // 喚醒阻塞在對應信號量上的哲學家線程(當是哲學家線程i自己執行phi_test_sema(i)時,則信號量直接加1,抵消掉phi_take_forks_sema中的down操作,代表直接拿起叉子就餐成功而不用進入阻塞態) up(&s[i]); } }
2.4 實現條件變量和管程
條件變量和信號量的功能很相似,條件變量也提供了類似的線程同步機制,和信號量的down/up操作對應的是wait和signal操作。在原始的定義中條件變量可以用信號量作為基礎實現;反過來信號量也能用已經實現的條件變量來實現。
ucore中的條件變量是基於信號量實現的,同時條件變量也作為管程Monitor結構的重要組成部分。
條件變量condvar結構定義:
/** * 條件變量 * */ typedef struct condvar{ // 條件變量相關的信號量,用於阻塞/喚醒線程 semaphore_t sem; // the sem semaphore is used to down the waiting proc, and the signaling proc should up the waiting proc // 等待在條件變量之上的線程數 int count; // the number of waiters on condvar // 擁有該條件變量的monitor管程 monitor_t * owner; // the owner(monitor) of this condvar } condvar_t;
管程monitor結構定義:
/** * 管程 * */ typedef struct monitor{ // 管程控制並發的互斥鎖(應該被初始化為1的互斥信號量) semaphore_t mutex; // the mutex lock for going into the routines in monitor, should be initialized to 1 // 管程內部協調各並發線程的信號量(線程可以通過該信號量掛起自己,其它並發線程或者被喚醒的線程可以反過來喚醒它) semaphore_t next; // the next semaphore is used to down the signaling proc itself, and the other OR wakeuped waiting proc should wake up the sleeped signaling proc. // 休眠在next信號量中的線程個數 int next_count; // the number of of sleeped signaling proc // 管程所屬的條件變量(可以是數組,對應n個條件變量) condvar_t *cv; // the condvars in monitor } monitor_t;
/** * 初始化管程 * */ void monitor_init (monitor_t * mtp, size_t num_cv) { int i; assert(num_cv>0); mtp->next_count = 0; mtp->cv = NULL; // 管程的互斥信號量值設為1(初始化時未被鎖住) sem_init(&(mtp->mutex), 1); //unlocked // 管程的協調信號量設為0,當任何一個線程發現不滿足條件時,立即阻塞在該信號量上 sem_init(&(mtp->next), 0); // 為條件變量分配內存空間(參數num_cv指定管程所擁有的條件變量的個數) mtp->cv =(condvar_t *) kmalloc(sizeof(condvar_t)*num_cv); assert(mtp->cv!=NULL); // 構造對應個數的條件變量 for(i=0; i<num_cv; i++){ mtp->cv[i].count=0; // 條件變量信號量初始化時設置為0,當任何一個線程發現不滿足條件時,立即阻塞在該信號量上 sem_init(&(mtp->cv[i].sem),0); mtp->cv[i].owner=mtp; } }
條件變量的等待操作實現:
cond_wait函數實現條件變量的wait操作。條件變量的wait操作和信號量的down功能類似。當條件變量對應的條件不滿足時,通過信號量的down操作,令當前線程阻塞、等待在條件變量所屬的信號量上。
// Suspend calling thread on a condition variable waiting for condition Atomically unlocks // mutex and suspends calling thread on conditional variable after waking up locks mutex. Notice: mp is mutex semaphore for monitor's procedures /** * 條件變量阻塞等待操作 * 令當前線程阻塞在該條件變量上,等待其它線程將其通過cond_signal將其喚醒。 * */ void cond_wait (condvar_t *cvp) { //LAB7 EXERCISE1: YOUR CODE cprintf("cond_wait begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count); /* * cv.count ++; * if(mt.next_count>0) * signal(mt.next) * else * signal(mt.mutex); * wait(cv.sem); * cv.count --; */ // 阻塞在當前條件變量上的線程數加1 cvp->count++; if(cvp->owner->next_count > 0) // 對應管程中存在被阻塞的其它線程 // 喚醒阻塞在對應管程協調信號量next中的線程 up(&(cvp->owner->next)); else // 如果對應管程中不存在被阻塞的其它線程 // 釋放對應管程的mutex二元信號量 up(&(cvp->owner->mutex)); // 令當前線程阻塞在條件變量上 down(&(cvp->sem)); // down返回,說明已經被再次喚醒,條件變量count減1 cvp->count --; cprintf("cond_wait end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count); }
條件變量的喚醒操作實現:
cond_signal函數用於實現條件變量的signal操作。條件變量的signal操作和信號量的up功能類似。當條件變量對應的條件滿足時,通過信號量的up操作,喚醒阻塞在對應條件變量中的線程。
// Unlock one of threads waiting on the condition variable. /** * 條件變量喚醒操作 * 解鎖(喚醒)一個等待在當前條件變量上的線程 * */ void cond_signal (condvar_t *cvp) { //LAB7 EXERCISE1: YOUR CODE cprintf("cond_signal begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count); /* * cond_signal(cv) { * if(cv.count>0) { * mt.next_count ++; * signal(cv.sem); * wait(mt.next); * mt.next_count--; * } * } */ // 如果等待在條件變量上的線程數大於0 if(cvp->count>0) { // 需要將當前線程阻塞在管程的協調信號量next上,next_count加1 cvp->owner->next_count ++; // 令阻塞在條件變量上的線程進行up操作,喚醒線程 up(&(cvp->sem)); // 令當前線程阻塞在管程的協調信號量next上 // 保證管程臨界區中只有一個活動線程,先令自己阻塞在next信號量上;等待被喚醒的線程在離開臨界區后來反過來將自己從next信號量上喚醒 down(&(cvp->owner->next)); // 當前線程被其它線程喚醒從down函數中返回,next_count減1 cvp->owner->next_count --; } cprintf("cond_signal end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count); }
條件變量與管程的交互:
仔細比對條件變量與信號量的實現,會發現大致的實現思路是一致的。但ucore中實現的條件變量是作為管程的一部分工作的,因此在wait和signal操作中都額外耦合了與對應管程owner交互的地方。
在管程中進入臨界區的線程發現條件不滿足而進行條件變量的wait操作時,需要釋放管程中臨界區的鎖,在wait操作掛起自身時令其它想要進入管程內的線程獲得臨界區的訪問權限。
在管程中臨界區的線程發現某一條件得到滿足時,將執行對應條件變量的signal操作以喚醒等待在其上的某一個線程。但是由於管程臨界區的互斥性,不能允許臨界區內有超過一個的線程在其中運行,因此執行signal操作的線程需要首先將自己阻塞掛起在管程的next信號量上,使得被喚醒的那一個線程獨占臨界區資源。當被喚醒的線程離開臨界區時,也會及時的喚醒掛起在管程next信號量上的對應線程。
2.5 使用管程解決哲學家就餐問題
由於並發環境下多個線程通過條件變量等同步機制交替的休眠/喚醒,邏輯執行流並不是連貫的,因此條件變量和管程的實現顯得比較繞,令人費解。通過學習如何用管程解決哲學家就餐問題,看看使用管程/條件變量是如何進行線程同步互斥的,加深對條件變量、管程工作機制的理解。
在checkSync函數的后半部分,是關於如何使用管程解決哲學家就餐問題。在check_sync的后半部分創建了N(N=5)個哲學家內核線程,用於執行philosopher_using_condvar函數,模擬哲學家就餐問題。
philosopher_using_condvar中哲學家循環往復的進行如下操作(整體流程和信號量的實現大體一致):
1. 哲學家進行思考(通過do_sleep系統調用進行休眠阻塞,模擬哲學家思考)
2. 通過phi_take_forks_condvar函數嘗試着同時拿起左右兩個叉子(如果沒有拿到左右叉子,陷入阻塞)
3. 哲學家進行就餐(通過do_sleep系統調用進行休眠阻塞,模擬哲學家就餐)
4. 通過phi_put_forks_condvar函數同時放下左右兩個叉子,回到思考狀態
拿起叉子的phi_take_forks_condvar函數和放下叉子phi_put_forks_condvar的函數內部都是通過條件變量進行同步的,在下面進行更進一步的分析。
checkSync函數:
void check_sync(void){ int i; //check semaphore sem_init(&mutex, 1); for(i=0;i<N;i++){ sem_init(&s[i], 0); int pid = kernel_thread(philosopher_using_semaphore, (void *)i, 0); if (pid <= 0) { panic("create No.%d philosopher_using_semaphore failed.\n"); } philosopher_proc_sema[i] = find_proc(pid); set_proc_name(philosopher_proc_sema[i], "philosopher_sema_proc"); } //check condition variable monitor_init(&mt, N); for(i=0;i<N;i++){ state_condvar[i]=THINKING; int pid = kernel_thread(philosopher_using_condvar, (void *)i, 0); if (pid <= 0) { panic("create No.%d philosopher_using_condvar failed.\n"); } philosopher_proc_condvar[i] = find_proc(pid); set_proc_name(philosopher_proc_condvar[i], "philosopher_condvar_proc"); } }
管程實現中拿起叉子/放下叉子函數分析:
phi_take_forks_condvar函數表達哲學家嘗試着拿起左右叉子想要就餐;而phi_put_forks_condvar函數表達哲學家進餐結束后放下左右叉子。
兩者通過管程中的互斥信號量mutex的down操作進行全局的互斥,保證在同一時刻只有一個哲學家線程能夠進入臨界區,對臨界區的資源叉子進行拿起/放下操作,對不同的哲學家線程進行互斥,保證查看左右叉子的狀態時不會出現並發問題。
在離開管程的臨界區時(注釋into routine in monitor和leave routine in monitor之間為管程的臨界區代碼),當前線程會根據管程內是否存在其它線程(mtp->next_count>0)而有不同的操作。當發現管程中的next信號量上存在其它線程阻塞在上面時,優先喚醒next信號量上的線程(阻塞在next上的線程是由於要喚醒等待在某一條件變量上的線程,為了保證臨界區互斥自願被阻塞的,因此被喚醒的線程在離開臨界區后需要第一時間將其喚醒);而如果next信號量中不存在休眠的線程,那么就和信號量的實現類似,釋放mutex互斥鎖,喚醒可能等待在其上的某一線程。
上述ucore實現的管程其線程交互的邏輯是基於Hoare語義的,此外還存在MESA語義的管程和Hansen語義的管程(MESA管程和Hansen管程實現類似前面的信號量實現哲學家就餐)。
Hoare管程在signal喚醒其它線程時會令自己陷入休眠,嚴格的保證了臨界區線程的互斥,理論上更加可靠,常見於教科書的理論中。由於Hoare語義的管程需要額外引入一個等待隊列(next信號量),因此其性能並不如其它兩種語義的管程,現實中被使用的地方很少。
phi_take_forks_condvar函數(同時拿起左右叉子):
void phi_take_forks_condvar(int i) { // 拿叉子時需要通過mutex信號量進行互斥,防止並發問題(進入臨界區) down(&(mtp->mutex)); //--------into routine in monitor-------------- // LAB7 EXERCISE1: YOUR CODE // I am hungry // try to get fork // I am hungry // 記錄下哲學家i飢餓的事實(執行phi_take_forks_condvar嘗試拿叉子,說明哲學家i進入了HUNGRY飢餓狀態) state_condvar[i]=HUNGRY; // 試圖同時得到左右兩只叉子 phi_test_condvar(i); if (state_condvar[i] != EATING) { // state_condvar[i]狀態不為EATING,說明phi_test_condvar嘗試拿左右叉子進餐失敗 cprintf("phi_take_forks_condvar: %d didn't get fork and will wait\n",i); // 等待阻塞在管程的條件變量cv[i]上 cond_wait(&mtp->cv[i]); } //--------leave routine in monitor-------------- if(mtp->next_count>0){ // 當離開管程臨界區時,如果發現存在線程等待在mtp->next上 // 在當前實驗中,執行到這里的當前線程可能是阻塞在cond_wait中被其它線程喚醒的,對應線程是通過phi_test_condvar的cond_signal操作喚醒當前線程的 // 執行cond_signal時為了保證管程臨界區內不存在並發的線程訪問,在喚醒其它線程時,會把自己阻塞在管程的next信號量上,等待此時離開臨界區的線程將其喚醒 up(&(mtp->next)); }else{ // 當離開管程臨界區時,沒有其它線程等待在mtp->next上,直接釋放管程的互斥鎖mutex即可(喚醒可能阻塞在mutex上的其它線程) up(&(mtp->mutex)); } }
phi_put_forks_condvar函數(同時放下左右叉子):
void phi_put_forks_condvar(int i) { // 放叉子時需要通過mutex信號量進行互斥,防止並發問題(進入臨界區) down(&(mtp->mutex)); //--------into routine in monitor-------------- // LAB7 EXERCISE1: YOUR CODE // I ate over // test left and right neighbors // I ate over // 哲學家進餐結束(執行phi_put_forks_condvar放下叉子,說明哲學家已經就餐完畢,重新進入THINKING思考狀態) state_condvar[i]=THINKING; // test left and right neighbors // 當哲學家i就餐結束,放下叉子時。需要判斷左、右臨近的哲學家在自己就餐的這段時間內是否也進入了飢餓狀態,卻因為自己就餐拿走了叉子而無法同時獲得左右兩個叉子。 // 為此哲學家i在放下叉子后需要嘗試着判斷在自己放下叉子后,左/右臨近的、處於飢餓的哲學家能否進行就餐,如果可以就喚醒阻塞的哲學家線程,並令其進入就餐狀態(EATING) phi_test_condvar(LEFT); // 看一下左鄰居現在是否能進餐 phi_test_condvar(RIGHT); // 看一下右鄰居現在是否能進餐 //--------leave routine in monitor-------------- // lab7的參考答案 if(mtp->next_count>0){ cprintf("execute here mtp->next_count>0 \n\n\n\n\n\n"); up(&(mtp->next)); }else{ cprintf("execute here mtp->next_count=0 \n\n\n\n\n"); up(&(mtp->mutex)); } // 個人認為放叉子和取叉子的情況並不一樣,不會出現mtp->next_count>0的情況,這里只需要釋放互斥鎖即可(如果這里理解的有問題,還請指正) // 當放叉子的線程在phi_put_forks_condvar中離開管程臨界區時,只有兩種情況 // 1. 沒有發現鄰居可以進餐,自身不會被阻塞 // 2. 發現有鄰居之前被拿不到叉子阻塞了,現在可以進餐了,phi_test_condvar中的cond_signal會暫時令自己阻塞在next信號量上 // 但是很快被自己叫醒的相鄰的哲學家線程在被喚醒后一離開臨界區就會將自己喚醒,在cond_signal被喚醒后的操作中mtp->next_count會自減,而變為0 // // 以上兩種情況下,由於管程本身最外面有一個mutex互斥信號量,所以不會出現兩個線程同時阻塞在next信號量中,因此也就不會出現參考答案中mtp->next_count>0的情況 // up(&(mtp->mutex)); }
phi_test_condvar函數(判斷哲學家i是否能拿起左右叉子開始就餐):
void phi_test_condvar (i) { // 當哲學家i處於飢餓狀態(HUNGRY),且其左右臨近的哲學家都沒有在就餐狀態(EATING) if(state_condvar[i]==HUNGRY&&state_condvar[LEFT]!=EATING &&state_condvar[RIGHT]!=EATING) { cprintf("phi_test_condvar: state_condvar[%d] will eating\n",i); // 哲學家i餓了(HUNGRY),且左右兩邊的叉子都沒人用。 // 令哲學家進入就餐狀態(EATING) state_condvar[i] = EATING ; cprintf("phi_test_condvar: signal self_cv[%d] \n",i); // 喚醒阻塞在對應信號量上的哲學家線程 cond_signal(&mtp->cv[i]) ; } }
2.6 信號量和管程的區別
信號量是一個簡單、高效的同步互斥機制,但也正是由於其過於底層,所以在編寫線程同步代碼時需要十分小心謹慎,對每一處信號量的使用仔細斟酌才能保證程序的正確性,對開發人員的心智是一個巨大的負擔。
而將管程作為一個整體的結構來看的話,會發現管程雖然將控制同步的代碼邏輯抽象為了一個固定的模板變得容易使用,但卻與要保護的臨界區業務邏輯代碼耦合的很嚴重,操作系統的開發者很難將管程控制同步的代碼植入進對應的應用程序內部。
因此操作系統通常只提供了信號量以及條件變量這種偏底層、耦合性低的同步互斥機制;而管程機制則更多的由高級語言的編譯器在語言層面實現,以簡化程序員開發復雜並發同步程序的復雜度。高級語言編譯器在編譯本地機器代碼時,可以在需要進行同步的代碼邏輯塊中利用操作系統底層提供的信號量或是條件變量機制來實現管程。
例如java中如果在方法定義時簡單的加上synchronized關鍵字就能控制多線程環境下不會並發的執行該方法。這是因為在編譯成字節碼時,相比於普通方法額外插入了一些管程Monitor相關的同步控制代碼(管程底層依賴的信號量、條件變量機制還是取決於對應的操作系統平台,只不過被jvm屏蔽掉了差異,讓java程序員感知不到)。
3. 總結
通過ucore的lab7的學習,讓我理解了等待隊列、信號量、條件變量以及管程的大致工作原理,也對平常會接觸到的java中的synchronized、AQS、Reentrantlock其底層機制有了進一步的認識。
lab7的學習使我收獲頗豐,但這對於線程同步相關領域的學習還是遠遠不夠。同屬於進程間通信IPC領域的經典問題除了哲學家就餐問題外,還有讀者/寫者問題等;對於線程安全問題,除了使用休眠/喚醒進行線程上下文切換的阻塞的方式之外,還有使用CAS等重試的方法;除了信號量、條件變量等基於單機系統內的線程同步方式外,還有基於分布式系統,通過網絡進行多機器線程同步的機制等等。雖然不夠了解的知識還有很多,但通過ucore操作系統的學習,為我學習相關的領域知識打下了基礎,也給了我相信最終能融會貫通這些知識的信心。
這篇博客的完整代碼注釋在我的github上:https://github.com/1399852153/ucore_os_lab (fork自官方倉庫)中的lab7_answer。
希望我的博客能幫助到對操作系統、ucore os感興趣的人。存在許多不足之處,還請多多指教。