本文為上海交大 ipads 研究所陳海波老師等人所著的《現代操作系統:原理與實現》的課程實驗(LAB)的學習筆記的第四篇:多核處理。所有章節的筆記可在此處查看:chcore | 康宇PL's Blog
踩坑總結
本着早看見少踩坑的原則,我先說一些在實踐中總結的一點玄學經驗,如果你能搞清楚這種現象的真實原因請務必在博客底端評論區賜教一下。
薛定諤的評測結果
你可能會遇到手動 make run-xxx
樣例時可以正確運行,但 make grade
有時候莫名其妙就過不了的情況。或者干脆手動 make run
時就遇到概率過不了的情況。我的猜測是因為 qemu 跑的太快導致跟時鍾中斷處理那部分相關的代碼沒有很好的運行。總之你在項目根目錄的 CMakeLists.txt 里把 CMAKE_BUILD_TYPE
改成 Debug 模式,這樣編譯的時候優化等級會低一點,就會跑的慢一點了,此時再測試大部分情況下就能正常了。考慮部分用於調試的輸出語句可能影響評測程序的運行,所以保險起見可以在同個文件中將下面代碼里的 DLOG_LEVEL
賦值為 1 。
if (CMAKE_BUILD_TYPE STREQUAL "Debug")
add_definitions("-DLOG_LEVEL=2")
記得 commit 代碼前把這些選項都還原回去。
但薛定諤的詭異之處在於它的不可預測性,當我在虛擬機里跑的時候情況又反過來了,Debug 模式容易出錯,Release 模式很少出錯。這還真是神奇吶......
2021/07/03 補記
今天得到網友 james_ling 的提醒,ChCore 里的 printf()
是線程不安全的。源代碼中也有提到這一點:
/*
* NOTE: this function is not thread safe. Use it at your own risk when multi threading
*/
void printf(char *fmt, ...);
而在我的實現中部分代碼用到了 kwarn()
這個宏,它間接調用了 printf()
,因此也是線程不安全的。原先我只是看見 ChCore 原始代碼里部分地方用到了它也學着用了。現在想來需要保證使用 kwarn()
的地方都得保證拿到大內核鎖才行。我粗暴的把所有 kwarn()
都換成 kdebug()
后再在 Release 模式下 make grade 就沒有再遇到薛定諤現象了
多線程調試
這節需要用到多線程調試,如果你還沒有了解的話請立即去搜索下“gdb 怎么調試多線程”。
多核支持
多核啟動
在源碼里看到 BSP、 AP、SMP 時我是非常疑惑的,因為整個講義里完全沒說這是啥。搜了下才知道這是說主 CPU(Boot Strap Processor)、副 CPU(Application Processor)、對稱多處理器(Symmetrical Multi-Processing)。
分析一下最新版的 boot/start.S 里的 _start
函數。可知其邏輯:
1)當前 CPU 為主 CPU:正常初始化
2)當前 CPU 為副 CPU:設置好棧后循環等待直到 secondary_boot_flag[cpuid] != 0
,跳轉到 secondary_init_c
BEGIN_FUNC(_start)
/* 當前 cpuid 放在 X8 寄存器里 */
mrs x8, mpidr_el1
and x8, x8, #0xFF
cbz x8, primary
/* Wait for bss clear */
wait_for_bss_clear:
adr x0, clear_bss_flag
ldr x1, [x0]
cmp x1, #0
bne wait_for_bss_clear
/* Turn to el1 from other exception levels. */
bl arm64_elX_to_el1
/* Prepare stack pointer and jump to C. */
/* 設置當前棧為 boot_cpu_stack[cpuid] */
mov x1, #0x1000
mul x1, x8, x1
adr x0, boot_cpu_stack
add x0, x0, x1
add x0, x0, #0x1000
mov sp, x0
wait_until_smp_enabled:
/* CPU ID should be stored in x8 from the first line */
/* while( secondary_boot_flag[cpuid] == 0 ); */
mov x1, #8
mul x2, x8, x1
ldr x1, =secondary_boot_flag
add x1, x1, x2
ldr x3, [x1]
cbz x3, wait_until_smp_enabled
/* Set CPU id */
mov x0, x8
bl secondary_init_c
primary:
/* Turn to el1 from other exception levels. */
bl arm64_elX_to_el1
/* Prepare stack pointer and jump to C. */
adr x0, boot_cpu_stack
add x0, x0, #0x1000
mov sp, x0
bl init_c
/* Should never be here */
b .
END_FUNC(_start)
主 CPU 在完成自己的初始化后調用 enable_smp_cores
,在此設置 secondary_boot_flag[cpuid] = 1
,讓副 CPU 可以繼續執行完成初始化。
為了保證並發安全,故要求副 CPU 有序的、逐個的初始化,每個副 CPU 初始化完應設置 cpu_status[cpuid] = cpu_run
,只有在上個設置好后才可以設置下個副 CPU 的 secondary_boot_flag[cpuid]
void enable_smp_cores(void *addr)
{
int i = 0;
long *secondary_boot_flag;
/* Set current cpu status */
cpu_status[smp_get_cpu_id()] = cpu_run;
secondary_boot_flag = (long *)phys_to_virt(addr);
for (i = 0; i < PLAT_CPU_NUM; i++) {
*(secondary_boot_flag + i) = 1;
while(cpu_status[i] != cpu_run);
}
kinfo("All %d CPUs are active\n", PLAT_CPU_NUM);
}
副 CPU 設置 cpu_status[cpuid]
的代碼則在 secondary_start
中。
void secondary_init_c(int cpuid)
{
el1_mmu_activate();
secondary_cpu_boot(cpuid);
}
BEGIN_FUNC(secondary_cpu_boot)
/* We store the logical cpuid in TPIDR_EL1 */
msr TPIDR_EL1, x0
mov x1, #KERNEL_STACK_SIZE
mul x2, x0, x1
ldr x3, =kernel_stack
add x2, x2, x3
add x2, x2, KERNEL_STACK_SIZE
mov sp, x2
bl secondary_start
END_FUNC(secondary_cpu_boot)
void secondary_start(void)
{
kinfo("AP %u is activated!\n", smp_get_cpu_id());
exception_init_per_cpu();
cpu_status[smp_get_cpu_id()] = cpu_run;
/* Where the AP first returns to the user mode */
sched();
eret_to_thread(switch_context());
}
排號自旋鎖
下面展示了一種排號自旋鎖的實現。排號鎖結構體包括 next
和 owner
成員,owner
表示鎖的當前持有者的序號,next
表示下一個可分發的序號。當嘗試獲取排號鎖時會通過原子操作分配一個 next
表示當前的序號,並同時將鎖的 next
自增一。拿到序號后一直等到 ower
和當前序號相等時便算是拿到了鎖,可以繼續執行,否則一直忙等待。解鎖操作則只需要將 owner++
,相當於將鎖傳給了下一個序號的等待者。
struct lock {
volatile int owner;
volatile int next;
};
void lock_init(struct lock *lock) {
lock->owner = 0;
lock->next = 0;
}
void lock(struct lock *lock) {
/* 原子操作,相當於 my_ticket = lock->next; lock->next++; */
volatile int my_ticket = atomic_FAA(&lock->next, 1);
while(lock->owner != my_ticker);
}
void unlock(struct lock *lock) {
lock->owner++;
}
int is_lock(struct lock *lock) {
return lock->owner != lock->next;
}
ChCore 里已經實現好了 lock()
函數,我們只要寫兩行 unlock()
和 is_lock()
就行。
大內核鎖
ChCore 使用最簡單的方法解決內核態間多核的並發控制問題:只要操作內核數據就得獲得大內核鎖,並在退出內核態前釋放掉。保證了同時只有一個 CPU 執行內核代碼、訪問內核數據。
大內核鎖的本體就是排號自旋鎖的簡單封裝
struct lock big_kernel_lock;
void kernel_lock_init(void) {
lock_init(&big_kernel_lock);
}
void lock_kernel(void) {
lock(&big_kernel_lock);
}
void unlock_kernel(void) {
// 解鎖前要保證鎖處於上鎖狀態,部分樣例會檢測這一點
if (is_locked(&big_kernel_lock))
unlock(&big_kernel_lock);
}
ChCore 中上鎖的地方比較零散,包括中斷處理時、系統調用時、激活副 CPU 前等等。對於涉及中斷的地方要特別考慮下該中斷是在發生自內核態還是在發生自用戶態,對於發生自內核態的(所有 EL1t
和 EL1h
后綴的)都不要加鎖,因為同處內核態不能重復加鎖。
解鎖的地方只有一處 exception_return
,因為只要想從內核態返回用戶態,都得經過此處。
調度
ChCore 中先實現協作式線程調度,再在此基礎上實現搶占式調度。
協作式調度需要線程主動使用 sys_yield()
將 CPU 控制權讓位給其他線程,搶占式調度則是由內核給每個線程分配一定的 CPU 時間片,當線程的時間片用完后由內核強制的將 CPU 控制權移交給另一個線程。
協作式調度
ChCore 中將調度算法封裝為了包含若干函數指針的結構體 sched_ops
,選用不同的調度算法時只要切換下 sched_ops
就行。
/* Indirect function call may downgrade performance */
struct sched_ops {
int (*sched_init) (void);
int (*sched) (void);
int (*sched_enqueue) (struct thread * thread);
int (*sched_dequeue) (struct thread * thread);
struct thread *(*sched_choose_thread) (void);
void (*sched_handle_timer_irq) (void);
/* Debug tools */
void (*sched_top) (void);
};
// 時間片輪轉調度策略
struct sched_ops rr = {
.sched_init = rr_sched_init,
.sched = rr_sched,
.sched_enqueue = rr_sched_enqueue,
.sched_dequeue = rr_sched_dequeue,
.sched_choose_thread = rr_sched_choose_thread,
.sched_handle_timer_irq = rr_sched_handle_timer_irq,
};
當前 Lab 中 ChCore 使用 Round Robin (時間片輪轉)調度策略。每個 CPU 都有自己的就緒隊列,表示已經准備好的、可以調度的線程。另外每個 CPU 還有一個空閑進程 idle
,用於在沒有線程就緒時上去頂位。如果不這樣做那 CPU 發現沒有能調度的線程時就會卡在內核態,而我們進入內核態時都是持有大內核鎖的,你這個 CPU 在內核態干等着不出來,那其他 CPU 就拿不到大內核鎖,無法進入內核態了。
rr_sched_init
為已經實現好的初始化函數,會將就緒隊列 rr_ready_queue
和空閑線程 idle_threads
初始化。它只在主 CPU 初始時被調用一次。
rr_sched_enqueue
表示將線程插入就緒隊列,並設置進程狀態為就緒態,關聯到當前 CPU 上。需要注意下合法性判斷(指針為空、是否為 idle 線程、是否已經處於就緒態)。
rr_sched_dequeue
為出隊列操作。彈出的線程被標記為處於 TS_INTER
這個特殊的中間態。依舊是要注意合法性檢測。
rr_sched_choose_thread
負責從就緒隊列中選擇一個線程。就緒隊列為空時就選擇 idle
線程。
rr_sched
為調度操作的核心函數。先將當前線程插入就緒隊列,然后通過 rr_sched_choose_thread
取出下一個就緒的線程,然后用 switch_to_thread
將其設為運行態,並讓當前線程指針指向它。
實際實現的時候容易在諸多的小細節上出錯,多用用 make qemu
根據最新的 BUG 信息來面向樣例編程吧。
下面代碼中涉及 budget 和 affinity的操作將在搶占式調度中講解
int rr_sched_enqueue(struct thread *thread)
{
if (thread == NULL || thread->thread_ctx == NULL)
return -1;
if (thread->thread_ctx->type == TYPE_IDLE) {
return 0;
}
if (thread->thread_ctx->state == TS_READY) {
return -1;
}
s32 aff = thread->thread_ctx->affinity;
if (aff == NO_AFF)
thread->thread_ctx->cpuid = smp_get_cpu_id();
else if (aff < PLAT_CPU_NUM)
thread->thread_ctx->cpuid = aff;
else {
//kwarn("thread->thread_ctx->affinity >= PLAT_CPU_NUM\n");
return -1;
}
thread->thread_ctx->state = TS_READY;
list_append(&thread->ready_queue_node, &rr_ready_queue[thread->thread_ctx->cpuid]);
return 0;
}
int rr_sched_dequeue(struct thread *thread)
{
if (thread == NULL
|| thread->thread_ctx == NULL
|| list_empty(&thread->ready_queue_node)
|| thread->thread_ctx->affinity >= PLAT_CPU_NUM
|| thread->thread_ctx->type == TYPE_IDLE
|| thread->thread_ctx->state != TS_READY) {
return -1;
}
thread->thread_ctx->state = TS_INTER;
list_del(&thread->ready_queue_node);
return 0;
}
struct thread *rr_sched_choose_thread(void)
{
u32 cpu_id = smp_get_cpu_id();
if (list_empty(&rr_ready_queue[cpu_id]))
goto ret_idle;
struct thread *ret = list_entry(rr_ready_queue[cpu_id].next, struct thread, ready_queue_node);
if (rr_sched_dequeue(ret)) {
goto ret_idle;
}
return ret;
ret_idle:
return &idle_threads[cpu_id];
}
int rr_sched(void)
{
if (current_thread != NULL
&& current_thread->thread_ctx->type != TYPE_IDLE) {
if (current_thread->thread_ctx->sc->budget > 0) {
return -1;
}
if (rr_sched_enqueue(current_thread)) {
return -1;
}
}
struct thread *target = rr_sched_choose_thread();
rr_sched_refill_budget(target, DEFAULT_BUDGET);
switch_to_thread(target);
return 0;
}
線程切換流程
目前 ChCore 的線程切換過程步驟有:
1)當前線程主動(協作式調度)或者被動(搶占式調度)的引發中斷,陷入內核態,在中斷程序入口處調用 exception_enter
保存上下文。
2)調用 sched
函數。切換 current_thread
3)eret_to_thread(switch_context())
,恢復 current_thread
的上下文到寄存器中。
為了用戶態能夠主動讓出 CPU,我們提供了 sys_yield
系統調用。
void sys_yield(void)
{
sched();
eret_to_thread(switch_context());
}
此時 make run-yield_single
可以觀察到在單 CPU 下兩個線程可以通過 sys_yield
來交替的運行。
Hello, I am thread 0
Hello, I am thread 1
Iteration 0, thread 0, cpu 0
Iteration 0, thread 1, cpu 0
Iteration 1, thread 0, cpu 0
Iteration 1, thread 1, cpu 0
搶占式調度
時鍾中斷與搶占
為了支持內核搶占 CPU,我們需要啟用時鍾中斷。每個一小段時間觸發一個硬件定時器中斷。
void exception_init_per_cpu(void)
{
timer_init();
set_exception_vector();
disable_irq();
}
相應的給每個線程分配一個整形變量調度預算 budget。每次觸發時鍾中斷都會讓 budget 減一。當且僅當 budget 變成零時該線程才可以被搶占。所以需要在 rr_sched
里加點驗證操作。
static inline void rr_sched_refill_budget(struct thread *target, u32 budget)
{
if(target && target->thread_ctx && target->thread_ctx->type != TYPE_IDLE)
target->thread_ctx->sc->budget = budget;
}
int rr_sched(void)
{
// ......
if (rr_sched_enqueue(current_thread)) {
return -1;
}
// ......
}
時鍾中斷發生時將沿着 handle_irq -> plat_handle_irq -> handle_timer_irq -> sched_handle_timer_irq -> rr_sched_handle_timer_irq
的順序逐級調用,最終我們要在 rr_sched_handle_timer_irq
對 budget 做一下變更操作。
void rr_sched_handle_timer_irq(void)
{
// 各種判斷不能少,部分樣例會針對這一點
if (current_thread == NULL
|| current_thread->thread_ctx->type == TYPE_IDLE)
return;
if (current_thread->thread_ctx->sc->budget > 0)
current_thread->thread_ctx->sc->budget--;
}
在時鍾中斷返回前也要檢查下能不能進行調度。
void handle_irq(int type)
{
if (type > ERROR_EL1h)
lock_kernel();
plat_handle_irq();
sched();
eret_to_thread(switch_context());
}
系統調用 sys_yield
是立刻進行切換,所以我們也要對 budget 做一下清零操作。
void sys_yield(void)
{
current_thread->thread_ctx->sc->budget = 0;
sched();
eret_to_thread(switch_context());
}
處理器親和性
目前 ChCore 創建的線程與父線程都在同一個 CPU 上,沒有辦法分發到其他的 CPU 上。為了解決這一問題我們給每個線程引入一個親和性 affinity 標識。當將線程插入就緒隊列時,如果它的 affinity 為 NO_AFF
,那就插到當前 CPU 的就緒隊列里;否則插到 affinity 號 CPU 的就緒隊列里。如此變成了將線程分發到其他 CPU 上的操作。
int rr_sched_enqueue(struct thread *thread)
{
// ......
s32 aff = thread->thread_ctx->affinity;
if (aff == NO_AFF)
thread->thread_ctx->cpuid = smp_get_cpu_id();
else if (aff < PLAT_CPU_NUM)
thread->thread_ctx->cpuid = aff;
else {
//kwarn("thread->thread_ctx->affinity >= PLAT_CPU_NUM\n");
return -1;
}
thread->thread_ctx->state = TS_READY;
list_append(&thread->ready_queue_node, &rr_ready_queue[thread->thread_ctx->cpuid]);
return 0;
}
int rr_sched_dequeue(struct thread *thread)
{
if ( // ......
|| thread->thread_ctx->affinity >= PLAT_CPU_NUM)
return -1;
// ......
}
同時我們還對用戶提供了 get 和 set 親和性的系統調用。
int sys_set_affinity(u64 thread_cap, s32 aff)
{
// ......
if (thread == NULL || thread->thread_ctx == NULL)
return -1;
thread->thread_ctx->affinity = aff;
return 0;
}
int sys_get_affinity(u64 thread_cap)
{
// ......
if (thread == NULL || thread->thread_ctx == NULL)
return -1;
return thread->thread_ctx->affinity;
}
spawn()
Linux 里使用 fork()
和 exec()
兩個步驟來執行一個程序文件。 但還存在 spawn()
這個接口實現了前兩者組合后的功能。在 Linux 下查閱 posix_spawn()
的 manpage 可知, spawn()
是為了在某些沒有 MMU 的小型機上用來在一定范圍內替代 fork()
的。
對於 ChCore 來說 spawn()
的流程與 Lab 3 里的 process_create_root()
比較相似,Lab 3 分析好了這一部分做起來還是蠻簡單的。
spawn() 中的用戶棧
先來分析下講義里的這張圖:
從中我們可以提取出的信息有:
子進程的用戶棧基地址為 MAIN_THREAD_STACK_BASE
,大小為 MAIN_THREAD_STACK_SIZE
,這幾個宏都在 def.h 有定義。
用戶棧頂部的一個 PAGE_SIZE
的空間用來存放用戶程序的各種啟動參數,程序開始運行時棧頂指針 SP 應該指向這個頁的末尾處,即 MAIN_THREAD_STACK_BASE + MAIN_THREAD_STACK_SIZE - PAGE_SIZE
。
spawn() 的流程
下面分析下 spawn()
的源碼,為了節約篇幅刪掉了異常處理的代碼。建議對比着 Lab 3 里對 process_create_root()
的分析來看。
int spawn(char *path, int *new_process_cap, int *new_thread_cap,
struct pmo_map_request *pmo_map_reqs, int nr_pmo_map_reqs, int caps[],
int nr_caps, int aff)
{
struct user_elf user_elf;
int ret;
// 將 ELF 文件整個讀入內存中
ret = readelf_from_kernel_cpio(path, &user_elf);
// 創建子進程並解析 ELF 文件
return launch_process_with_pmos_caps(&user_elf, new_process_cap,
new_thread_cap, pmo_map_reqs,
nr_pmo_map_reqs, caps, nr_caps,
aff);
}
int launch_process_with_pmos_caps(struct user_elf *user_elf,
int *child_process_cap,
int *child_main_thread_cap,
struct pmo_map_request *pmo_map_reqs,
int nr_pmo_map_reqs, int caps[], int nr_caps,
s32 aff)
{
// 創建新進程作為子進程
new_process_cap = usys_create_process();
// 將 ELF 文件的各個程序段映射到子進程的對應位置處
// 類似於 load_binary
for (i = 0; i < 2; ++i) {
p_vaddr = user_elf->user_elf_seg[i].p_vaddr;
ret = usys_map_pmo(new_process_cap,
user_elf->user_elf_seg[i].elf_pmo,
ROUND_DOWN(p_vaddr, PAGE_SIZE),
user_elf->user_elf_seg[i].flags);
}
pc = user_elf->elf_meta.entry;
// 創建一個大小為 MAIN_THREAD_STACK_SIZE 的 PMO 作為用戶棧
// PMO 類型取 PMO_DATA 原因參考 thread_create_main 中對 PMO 的處理
pmo_requests[0].size = MAIN_THREAD_STACK_SIZE;
pmo_requests[0].type = PMO_DATA;
ret = usys_create_pmos((void *)pmo_requests, 1);
// 創建好的 cap 就是用戶棧的 cap
main_stack_cap = pmo_requests[0].ret_cap;
// 為了實現共享內存機制,將父進程制定的 capability 傳遞給子進程
if (nr_caps > 0) {
/* usys_transfer_caps is used during process creation */
ret = usys_transfer_caps(new_process_cap, caps, nr_caps,
transfer_caps);
}
// 還是為了支持共享內存,把傳遞過來的 PMO 挨個映射一遍
if (nr_pmo_map_reqs) {
ret =
usys_map_pmos(new_process_cap, (void *)pmo_map_reqs,
nr_pmo_map_reqs);
}
// 類似於 thread_create_main 里的 prepare_env
// 把子進程要用的各種參數存放到用戶棧最頂上的那一個初始頁中
stack_top = MAIN_THREAD_STACK_BASE + MAIN_THREAD_STACK_SIZE;
stack_offset = MAIN_THREAD_STACK_SIZE - PAGE_SIZE;
construct_init_env(init_env, stack_top, &user_elf->elf_meta,
user_elf->path, pmo_map_reqs,
nr_pmo_map_reqs, transfer_caps, nr_caps);
// 把做好的初始頁寫到用戶棧頂部
ret = usys_write_pmo(main_stack_cap, stack_offset, init_env,
PAGE_SIZE);
// 把做好的用戶棧映射到子進程的地址空間中
pmo_map_requests[0].pmo_cap = main_stack_cap;
pmo_map_requests[0].addr = MAIN_THREAD_STACK_BASE;
pmo_map_requests[0].perm = VM_READ | VM_WRITE;
ret =
usys_map_pmos(new_process_cap, (void *)pmo_map_requests, 1);
// 創建子進程的主線程,指定 SP 的地址
stack_va = MAIN_THREAD_STACK_BASE + MAIN_THREAD_STACK_SIZE - PAGE_SIZE;
main_thread_cap =
usys_create_thread(new_process_cap, stack_va, pc,
(u64) NULL, MAIN_THREAD_PRIO, aff);
// 返回子進程和其主線程的 cap
if (child_process_cap != NULL)
*child_process_cap = new_process_cap;
if (child_main_thread_cap != NULL)
*child_main_thread_cap = main_thread_cap;
進程間通信
IPC 實例
ChCore 里的進程間通信(IPC,Inter-Process Communication)類似於網絡編程中的客戶端—服務端模型。我們直接通過講義里的例子來研究下用戶眼里的 IPC 長啥樣:
正常狀態下進程間通信分為三步:
1)服務端調用 ipc_register_server()
登記自己的消息處理函數,上圖里該函數為 ipc_dispatcher()
2)客戶端調用 ipc_register_client()
登記自己的相關信息到指定的服務端上。
3)客戶端調用 ipc_call()
向服務端發送信息,服務端在自己的消息處理函數里以函數參數的形式捕獲到消息來進行后續的邏輯處理,最后使用 ipc_return()
返回處理的結果。
那客戶端是如何標記自己要找的那個服務端的呢?下面引用一小段 ipc_data.c 中的代碼:
ret = spawn("/ipc_data_server.bin", &new_process_cap, &new_thread_cap,
pmo_map_reqs, 1, NULL, 0, 1);
// ...
ret = ipc_register_client(new_thread_cap, &client_ipc_struct);
可以發現客戶端是通過 spawn()
得到子進程的主線程的 cap,並以此來向該進程建立連接的。
IPC 原理圖
接下來的部分將逐一分析 ChCore 是如何實現 IPC 的各個系統調用的。這里先放一張講義中的圖,后面不理解流程的話可以回頭過來看看。
ipc_register_server()
ipc_register_server()
為 ChCore 提供的系統調用,用戶只需要指定一個信號處理函數即可將當前進程等記為 IPC 的服務端進程。
該函數會創建一個 ipc_vm_config
結構體,分別用宏來指定了服務端線程里的運行時棧的和緩沖區起始地址和大小。這里請留意下目前只是地址,並未分配內存。至於它們的作用后面再講。
struct ipc_vm_config vm_config = {
.stack_base_addr = SERVER_STACK_BASE,
.stack_size = SERVER_STACK_SIZE,
.buf_base_addr = SERVER_BUF_BASE,
.buf_size = SERVER_BUF_SIZE,
};
同時為了支持 IPC,每個線程的 thread
結構體都增加了 active_conn
和 server_ipc_config
這兩個條目。server_ipc_config
中的各項含義請見注釋。
#define IPC_MAX_CONN_PER_SERVER 32
struct server_ipc_config {
u64 callback; // 消息處理函數的地址
u64 max_client; // 線程支持的最大客戶數量
/* bitmap for shared buffer and stack allocation */
// conn_bmp 是一個按二進制位使用的位圖,在后面分配緩存區和棧時用到
unsigned long *conn_bmp;
struct ipc_vm_config vm_config; // 每個服務端都有一個 vm_config
};
前面用宏定義好的 vm_config
將被轉發到 register_server()
中。在這里面設置好了 server_icp_config
的每一項,並綁定到進程的 thread
結構體中。注意此時還是未給 vm_config
的棧和緩沖區分配內存。
static int register_server(struct thread *server, u64 callback, u64 max_client,
u64 vm_config_ptr)
{
int r;
struct server_ipc_config *server_ipc_config;
struct ipc_vm_config *vm_config;
BUG_ON(server == NULL);
// Create the server ipc_config
server_ipc_config = kmalloc(sizeof(struct server_ipc_config));
server->server_ipc_config = server_ipc_config;
// Init the server ipc_config
server_ipc_config->callback = callback;
server_ipc_config->max_client = max_client;
server_ipc_config->conn_bmp =
kzalloc(BITS_TO_LONGS(max_client) * sizeof(long));
// Get and check the parameter vm_config
vm_config = &server_ipc_config->vm_config;
r = copy_from_user((char *)vm_config, (char *)vm_config_ptr,
sizeof(*vm_config));
return r;
}
ipc_register_client()
ipc_register_client(int server_thread_cap, ipc_struct_t * ipc_struct);
typedef struct ipc_struct {
u64 conn_cap; // ICP 連接的 cap,類似於 Linux 下每個管道都有一個 fd
u64 shared_buf; // 共享緩沖區的地址
u64 shared_buf_len; // 長度
} ipc_struct_t;
客戶端登記時調用上面的函數,其中 server_thread_cap
可以通過 spawn()
獲得,不過這也造成了因為 cap 只能在父子進程間傳遞,所以目前 ChCore 只能在父子間通信。ipc_struct
則是用來向客戶端返回 ipc_register_client()
使用的,返回的參數見注釋。
再往深處到達 ipc_register_client
,在這里客戶端也被創建了一個 vm_config
。同服務端做下對比可發現這里並未設置棧的起始地址和長度,原因下個函數就會揭曉。
struct ipc_vm_config vm_config = {
.buf_base_addr = CLIENT_BUF_BASE,
.buf_size = CLIENT_BUF_SIZE,
};
到了 create_connection()
函數中,在這里將構建用於通信的 ipc_connection
。
struct ipc_connection {
struct thread *source; /* Source Thread */
struct thread *target; /* Target Thread */
u64 server_conn_cap; /* Conn cap in server */
u64 callback; /* Target function */
u64 server_stack_top; /* Shadow server stack top */
u64 server_stack_size;
struct shared_buf buf; /* Shared buffer */
};
上面的 source
為客戶端的當前線程,target
為我們在服務端進程下創建的一個新線程,作用將在 ipc_call()
中分析 。
static int create_connection(struct thread *source, struct thread *target,
struct ipc_vm_config *client_vm_config)
{
struct ipc_connection *conn = NULL;
// Get the ipc_connection
conn = obj_alloc(TYPE_CONNECTION, sizeof(*conn));
conn->target = create_server_thread(target);
棧的處理方法就比較特別了:我們前面知道了服務端的 stack_base_addr
和 stack_size
,還有一個位圖 conn_bmp
。接下來我們尋找到位圖中首個未使用的標志位,是第幾位就把臨時變量 idx
記為幾。
// Get the server's ipc config
server_ipc_config = target->server_ipc_config;
vm_config = &server_ipc_config->vm_config;
conn_idx = find_next_zero_bit(server_ipc_config->conn_bmp,
server_ipc_config->max_client, 0);
set_bit(conn_idx, server_ipc_config->conn_bmp);
然后通過 stack_base_addr + conn_idx * vm_config->stack_size
計算出當前這個連接使用的棧基地址,並分配一個 PMO 作為棧的內存塊,然后掛載到服務端的地址空間里。為什么只掛載到服務端我們將在 ipc_call()
中分析。
// Create the server thread's stack
server_stack_base =
vm_config->stack_base_addr + conn_idx * vm_config->stack_size;
stack_size = vm_config->stack_size;
stack_pmo = kmalloc(sizeof(struct pmobject));
pmo_init(stack_pmo, PMO_DATA, stack_size, 0);
vmspace_map_range(target->vmspace, server_stack_base, stack_size,
VMR_READ | VMR_WRITE, stack_pmo);
conn->server_stack_top = server_stack_base + stack_size;
緩沖區的處理類似於棧但又不大相同。因為緩沖區算是客戶端和服務端間的共享內存,所以要分別計算在客戶端和服務端的虛擬地址。
// Create and map the shared buffer for client and server
server_buf_base =
vm_config->buf_base_addr + conn_idx * vm_config->buf_size;
client_buf_base = client_vm_config->buf_base_addr;
buf_size = MIN(vm_config->buf_size, client_vm_config->buf_size);
client_vm_config->buf_size = buf_size;
但分配只要分配一個 PMO 就行,把它分別掛載到客戶端地址空間和服務端地址空間中。
buf_pmo = kmalloc(sizeof(struct pmobject));
pmo_init(buf_pmo, PMO_DATA, buf_size, 0);
vmspace_map_range(current_thread->vmspace, client_buf_base, buf_size,
VMR_READ | VMR_WRITE, buf_pmo);
vmspace_map_range(target->vmspace, server_buf_base, buf_size,
VMR_READ | VMR_WRITE, buf_pmo);
創建好的共享緩沖區將作為一個 shared_buf
結構體掛載到 ipc_connection
中。
/*
struct shared_buf {
u64 client_user_addr;
u64 server_user_addr;
u64 size;
};
*/
conn->buf.client_user_addr = client_buf_base;
conn->buf.server_user_addr = server_buf_base;
server_conn_cap
是將當前的 ipc_connection
掛載到服務端的 cap 列表里得出的。
conn_cap = cap_alloc(current_process, conn, 0);
server_conn_cap =
cap_copy(current_process, target->process, conn_cap, 0, 0);
conn->server_conn_cap = server_conn_cap;
return conn_cap;
out_free_stack_pmo:
}
ipc_call()
IPC 中傳遞消息的單位為 ipc_msg
。消息的本體將寫在共享緩沖區里,這里記錄的是消息的相對於緩沖區起始地址的偏移和長度。如果消息比較短,那就作為一種 data
直接寫到緩沖區里。如果消息比較長,那就申請一塊 PMO 然后把它的 cap
寫入到緩沖區里。
typedef struct ipc_msg {
u64 server_conn_cap;
u64 data_len;
u64 cap_slot_number;
u64 data_offset;
u64 cap_slots_offset;
} ipc_msg_t;
ipc_msg
的構建是在 ipc_create_msg
中完成的,這里面有個有趣的點是 ipc_msg
的本體是放在 icp_conn
的共享緩沖區開頭處的。
ipc_msg_t *ipc_create_msg(ipc_struct_t * icb, u64 data_len, u64 cap_slot_number)
{
ipc_msg_t *ipc_msg;
ipc_msg = (ipc_msg_t *) icb->shared_buf;
ipc_msg->data_len = data_len;
ipc_msg->cap_slot_number = cap_slot_number;
// ......
}
之后層層調用來到 sys_ipc_call()
,也是我們需要實現的函數。
首先要取出當前進程的活躍連接。
u64 sys_ipc_call(u32 conn_cap, ipc_msg_t * ipc_msg)
{
struct ipc_connection *conn = NULL;
u64 arg;
int r;
conn = obj_get(current_thread->process, conn_cap, TYPE_CONNECTION);
if (!conn) {
r = -ECAPBILITY;
goto out_fail;
}
直接用現成的 ipc_send_cap()
將 ipc_msg
里的 cap 都轉移到 conn->target->process
即服務端進程的 cap 列表里。
/**
* Lab4
* Here, you need to transfer all the capbiliies of client thread to
* capbilities in server thread in the ipc_msg.
*/
r = ipc_send_cap(conn, ipc_msg);
if (r < 0)
goto out_obj_put;
r = copy_to_user((char *)&ipc_msg->server_conn_cap,
(char *)&conn->server_conn_cap, sizeof(u64));
if (r < 0)
goto out_obj_put;
講義提示 arg 為 ipc_dispatcher()
的參數,查閱可知參數為 ipc_msg_t
。直接把當前函數的 ipc_msg
放進去是不可以的,因為我們這個地址是要傳給服務端的,而當前線程是在客戶端的,根本不在一個地址空間里。所以只能通過其他方法搞。想到前面提到 ipc_msg
的本體是放在 icp_conn
的共享緩沖區開頭處的,所以參數就是緩沖區在服務端的起始地址,即 conn->buf.server_user_addr
/**
* Lab4
* The arg is actually the 64-bit arg for ipc_dispatcher
* Then what value should the arg be?
* */
// void ipc_dispatcher(ipc_msg_t * ipc_msg)
arg = conn->buf.server_user_addr;
thread_migrate_to_server(conn, arg);
BUG("This function should never\n");
out_obj_put:
obj_put(conn);
out_fail:
return r;
}
接下來到了 thread_migrate_to_server()
。在這個函數中我們將從客戶端線程遷移到服務端線程,並在它的回調函數處繼續執行。
static u64 thread_migrate_to_server(struct ipc_connection *conn, u64 arg)
{
struct thread *target = conn->target;
conn->source = current_thread;
target->active_conn = conn;
current_thread->thread_ctx->state = TS_WAITING;
obj_put(conn);
這時候我們前面設置好的 server_stack_top
終於派上用場了,服務端線程的堆頂就該是它。
/**
* Lab4
* This command set the sp register, read the file to find which field
* of the ipc_connection stores the stack of the server thread?
* */
arch_set_thread_stack(target, conn->server_stack_top);
下一條指令為回調函數的地址。
/**
* Lab4
* This command set the ip register, read the file to find which field
* of the ipc_connection stores the instruction to be called when switch
* to the server?
* */
arch_set_thread_next_ip(target, conn->target->server_ipc_config->callback);
這個變量名都寫好了,直接賦值。
/**
* Lab4
* The argument set by sys_ipc_call;
*/
arch_set_thread_arg(target, arg);
最后設置下上下文,然后切換到服務端線程。
/**
* Passing the scheduling context of the current thread to thread of
* connection
*/
target->thread_ctx->sc = current_thread->thread_ctx->sc;
/**
* Switch to the server
*/
switch_to_thread(target);
eret_to_thread(switch_context());
/* Function never return */
BUG_ON(1);
return 0;
}
ipc_return()
ipc_return()
就比較簡單了。設置下返回值,然后切換到 conn
的客戶端線程,即 conn->source
void sys_ipc_return(u64 ret)
{
struct ipc_connection *conn = current_thread->active_conn;
// ...
thread_migrate_to_client(conn, ret);
// ...
}
static int thread_migrate_to_client(struct ipc_connection *conn, u64 ret_value)
{
struct thread *source = conn->source;
current_thread->active_conn = NULL;
/**
* Lab4
* The return value returned by server thread;
*/
arch_set_thread_return(source, ret_value);
/**
* Switch to the client
*/
switch_to_thread(source);
eret_to_thread(switch_context());
/* Function never return */
BUG_ON(1);
return 0;
}
ipc_reg_call()
這里我們需要實現一個更加輕量級的 ipc_call()
,我們不需要向服務端的消息處理函數傳遞 ipc_msg
的地址,只需要傳遞一個 64 位的數值作為消息即可。照着 ipc_call()
改一改傳給 thread_migrate_to_server
的參數就出來了。
u64 sys_ipc_reg_call(u32 conn_cap, u64 arg0)
{
struct ipc_connection *conn = NULL;
int r;
conn = obj_get(current_thread->process, conn_cap, TYPE_CONNECTION);
if (!conn) {
r = -ECAPBILITY;
goto out_fail;
}
thread_migrate_to_server(conn, arg0);
BUG("This function should never\n");
out_obj_put:
obj_put(conn);
out_fail:
return r;
}
實現好后還需要在系統調用表里登記一下
const void *syscall_table[NR_SYSCALL] = {
[0 ... NR_SYSCALL - 1] = sys_debug,
// .....
/*
* Lab4
* Add syscall
*/
[SYS_ipc_reg_call] = sys_ipc_reg_call,
// ....
參考資料
05lover/chcore: 2020 os - github
a-daydream/chcore-lab-mospi-2020 - gitee
后記
本來以為爆肝一天就能把這個 Lab 干出來,沒想到在寫調度模塊時竟然陷進了 bug 的泥潭。自己懟了半天找不出錯來,還是用網友的代碼挨個替換后運行才發現的問題,一整天的時間只做完了多核支持和調度兩大部分。這種自學的東西最怕出錯誤了,又沒人溝通可能自己掉進了牛角尖里都不知道,耗掉大量的時間。所以我把自己搗鼓的過程和思路寫成博客發出來也是為了方便后人掉了坑里后能有個大體的參考,最起碼來說能向我一樣用蠻力替換的方法定位出自己的代碼錯在哪了。