ChCore Lab4 多核處理 實驗筆記


本文為上海交大 ipads 研究所陳海波老師等人所著的《現代操作系統:原理與實現》的課程實驗(LAB)的學習筆記的第四篇:多核處理。所有章節的筆記可在此處查看:chcore | 康宇PL's Blog

踩坑總結

本着早看見少踩坑的原則,我先說一些在實踐中總結的一點玄學經驗,如果你能搞清楚這種現象的真實原因請務必在博客底端評論區賜教一下。

薛定諤的評測結果

你可能會遇到手動 make run-xxx 樣例時可以正確運行,但 make grade 有時候莫名其妙就過不了的情況。或者干脆手動 make run 時就遇到概率過不了的情況。我的猜測是因為 qemu 跑的太快導致跟時鍾中斷處理那部分相關的代碼沒有很好的運行。總之你在項目根目錄的 CMakeLists.txt 里把 CMAKE_BUILD_TYPE 改成 Debug 模式,這樣編譯的時候優化等級會低一點,就會跑的慢一點了,此時再測試大部分情況下就能正常了。考慮部分用於調試的輸出語句可能影響評測程序的運行,所以保險起見可以在同個文件中將下面代碼里的 DLOG_LEVEL 賦值為 1 。

if (CMAKE_BUILD_TYPE STREQUAL "Debug")
    add_definitions("-DLOG_LEVEL=2")

記得 commit 代碼前把這些選項都還原回去。

但薛定諤的詭異之處在於它的不可預測性,當我在虛擬機里跑的時候情況又反過來了,Debug 模式容易出錯,Release 模式很少出錯。這還真是神奇吶......

2021/07/03 補記

今天得到網友 james_ling 的提醒,ChCore 里的 printf() 是線程不安全的。源代碼中也有提到這一點:

/*
 * NOTE: this function is not thread safe. Use it at your own risk when multi threading
 */
void printf(char *fmt, ...);

而在我的實現中部分代碼用到了 kwarn() 這個宏,它間接調用了 printf(),因此也是線程不安全的。原先我只是看見 ChCore 原始代碼里部分地方用到了它也學着用了。現在想來需要保證使用 kwarn() 的地方都得保證拿到大內核鎖才行。我粗暴的把所有 kwarn() 都換成 kdebug() 后再在 Release 模式下 make grade 就沒有再遇到薛定諤現象了

多線程調試

這節需要用到多線程調試,如果你還沒有了解的話請立即去搜索下“gdb 怎么調試多線程”。

多核支持

多核啟動

在源碼里看到 BSP、 AP、SMP 時我是非常疑惑的,因為整個講義里完全沒說這是啥。搜了下才知道這是說主 CPU(Boot Strap Processor)、副 CPU(Application Processor)、對稱多處理器(Symmetrical Multi-Processing)。

分析一下最新版的 boot/start.S 里的 _start 函數。可知其邏輯:

1)當前 CPU 為主 CPU:正常初始化

2)當前 CPU 為副 CPU:設置好棧后循環等待直到 secondary_boot_flag[cpuid] != 0,跳轉到 secondary_init_c

BEGIN_FUNC(_start)
    /* 當前 cpuid 放在 X8 寄存器里 */
	mrs	x8, mpidr_el1
	and	x8, x8,	#0xFF
	cbz	x8, primary

	/* Wait for bss clear */
wait_for_bss_clear:
	adr	x0, clear_bss_flag
	ldr	x1, [x0]
	cmp     x1, #0
	bne	wait_for_bss_clear

	/* Turn to el1 from other exception levels. */
	bl 	arm64_elX_to_el1

	/* Prepare stack pointer and jump to C. */
    /* 設置當前棧為 boot_cpu_stack[cpuid] */
	mov	x1, #0x1000
	mul	x1, x8, x1
	adr 	x0, boot_cpu_stack
	add	x0, x0, x1
	add	x0, x0, #0x1000
        mov	sp, x0

wait_until_smp_enabled:
	/* CPU ID should be stored in x8 from the first line */
	/* while( secondary_boot_flag[cpuid] == 0 ); */
	mov	x1, #8
	mul	x2, x8, x1
	ldr	x1, =secondary_boot_flag
	add	x1, x1, x2
	ldr	x3, [x1]
	cbz	x3, wait_until_smp_enabled

	/* Set CPU id */
	mov	x0, x8
	bl 	secondary_init_c

primary:

	/* Turn to el1 from other exception levels. */
	bl 	arm64_elX_to_el1

	/* Prepare stack pointer and jump to C. */
	adr 	x0, boot_cpu_stack
	add 	x0, x0, #0x1000
	mov 	sp, x0

	bl 	init_c

	/* Should never be here */
	b	.
END_FUNC(_start)

主 CPU 在完成自己的初始化后調用 enable_smp_cores,在此設置 secondary_boot_flag[cpuid] = 1,讓副 CPU 可以繼續執行完成初始化。

為了保證並發安全,故要求副 CPU 有序的、逐個的初始化,每個副 CPU 初始化完應設置 cpu_status[cpuid] = cpu_run,只有在上個設置好后才可以設置下個副 CPU 的 secondary_boot_flag[cpuid]

void enable_smp_cores(void *addr)
{
	int i = 0;
	long *secondary_boot_flag;

	/* Set current cpu status */
	cpu_status[smp_get_cpu_id()] = cpu_run;
	secondary_boot_flag = (long *)phys_to_virt(addr);
	for (i = 0; i < PLAT_CPU_NUM; i++) {
		*(secondary_boot_flag + i) = 1;
		while(cpu_status[i] != cpu_run);
	}

	kinfo("All %d CPUs are active\n", PLAT_CPU_NUM);
}

副 CPU 設置 cpu_status[cpuid] 的代碼則在 secondary_start 中。

void secondary_init_c(int cpuid)
{
	el1_mmu_activate();
	secondary_cpu_boot(cpuid);
}

BEGIN_FUNC(secondary_cpu_boot)
    /* We store the logical cpuid in TPIDR_EL1 */
    msr     TPIDR_EL1, x0

    mov     x1, #KERNEL_STACK_SIZE
    mul     x2, x0, x1
    ldr     x3, =kernel_stack
    add     x2, x2, x3
    add     x2, x2, KERNEL_STACK_SIZE
    mov     sp, x2
    bl      secondary_start
END_FUNC(secondary_cpu_boot)
    
void secondary_start(void)
{
	kinfo("AP %u is activated!\n", smp_get_cpu_id());
	exception_init_per_cpu();

	cpu_status[smp_get_cpu_id()] = cpu_run;

	/* Where the AP first returns to the user mode */
	sched();
	eret_to_thread(switch_context());
}

排號自旋鎖

下面展示了一種排號自旋鎖的實現。排號鎖結構體包括 nextowner 成員,owner 表示鎖的當前持有者的序號,next 表示下一個可分發的序號。當嘗試獲取排號鎖時會通過原子操作分配一個 next 表示當前的序號,並同時將鎖的 next 自增一。拿到序號后一直等到 ower 和當前序號相等時便算是拿到了鎖,可以繼續執行,否則一直忙等待。解鎖操作則只需要將 owner++,相當於將鎖傳給了下一個序號的等待者。

struct lock {
	volatile int owner;
    volatile int next;
};

void lock_init(struct lock *lock) {
    lock->owner = 0;
    lock->next = 0;
}

void lock(struct lock *lock) {
    /* 原子操作,相當於 my_ticket = lock->next; lock->next++; */
    volatile int my_ticket = atomic_FAA(&lock->next, 1);
    while(lock->owner != my_ticker);
}

void unlock(struct lock *lock) {
    lock->owner++;
}

int is_lock(struct lock *lock) {
    return lock->owner != lock->next;
}

ChCore 里已經實現好了 lock() 函數,我們只要寫兩行 unlock()is_lock() 就行。

大內核鎖

ChCore 使用最簡單的方法解決內核態間多核的並發控制問題:只要操作內核數據就得獲得大內核鎖,並在退出內核態前釋放掉。保證了同時只有一個 CPU 執行內核代碼、訪問內核數據。

大內核鎖的本體就是排號自旋鎖的簡單封裝

struct lock big_kernel_lock;
void kernel_lock_init(void) {
	lock_init(&big_kernel_lock);
}
void lock_kernel(void) {
	lock(&big_kernel_lock);
}
void unlock_kernel(void) {
    // 解鎖前要保證鎖處於上鎖狀態,部分樣例會檢測這一點
    if (is_locked(&big_kernel_lock))
		unlock(&big_kernel_lock);
}

ChCore 中上鎖的地方比較零散,包括中斷處理時、系統調用時、激活副 CPU 前等等。對於涉及中斷的地方要特別考慮下該中斷是在發生自內核態還是在發生自用戶態,對於發生自內核態的(所有 EL1tEL1h 后綴的)都不要加鎖,因為同處內核態不能重復加鎖。

解鎖的地方只有一處 exception_return,因為只要想從內核態返回用戶態,都得經過此處。

調度

ChCore 中先實現協作式線程調度,再在此基礎上實現搶占式調度。

協作式調度需要線程主動使用 sys_yield() 將 CPU 控制權讓位給其他線程,搶占式調度則是由內核給每個線程分配一定的 CPU 時間片,當線程的時間片用完后由內核強制的將 CPU 控制權移交給另一個線程。

協作式調度

ChCore 中將調度算法封裝為了包含若干函數指針的結構體 sched_ops,選用不同的調度算法時只要切換下 sched_ops 就行。

/* Indirect function call may downgrade performance */
struct sched_ops {
	int (*sched_init) (void);
	int (*sched) (void);
	int (*sched_enqueue) (struct thread * thread);
	int (*sched_dequeue) (struct thread * thread);
	struct thread *(*sched_choose_thread) (void);
	void (*sched_handle_timer_irq) (void);
	/* Debug tools */
	void (*sched_top) (void);
};

// 時間片輪轉調度策略
struct sched_ops rr = {
	.sched_init = rr_sched_init,
	.sched = rr_sched,
	.sched_enqueue = rr_sched_enqueue,
	.sched_dequeue = rr_sched_dequeue,
	.sched_choose_thread = rr_sched_choose_thread,
	.sched_handle_timer_irq = rr_sched_handle_timer_irq,
};

當前 Lab 中 ChCore 使用 Round Robin (時間片輪轉)調度策略。每個 CPU 都有自己的就緒隊列,表示已經准備好的、可以調度的線程。另外每個 CPU 還有一個空閑進程 idle,用於在沒有線程就緒時上去頂位。如果不這樣做那 CPU 發現沒有能調度的線程時就會卡在內核態,而我們進入內核態時都是持有大內核鎖的,你這個 CPU 在內核態干等着不出來,那其他 CPU 就拿不到大內核鎖,無法進入內核態了。

rr_sched_init 為已經實現好的初始化函數,會將就緒隊列 rr_ready_queue 和空閑線程 idle_threads 初始化。它只在主 CPU 初始時被調用一次。

rr_sched_enqueue 表示將線程插入就緒隊列,並設置進程狀態為就緒態,關聯到當前 CPU 上。需要注意下合法性判斷(指針為空、是否為 idle 線程、是否已經處於就緒態)。

rr_sched_dequeue 為出隊列操作。彈出的線程被標記為處於 TS_INTER 這個特殊的中間態。依舊是要注意合法性檢測。

rr_sched_choose_thread 負責從就緒隊列中選擇一個線程。就緒隊列為空時就選擇 idle 線程。

rr_sched 為調度操作的核心函數。先將當前線程插入就緒隊列,然后通過 rr_sched_choose_thread 取出下一個就緒的線程,然后用 switch_to_thread 將其設為運行態,並讓當前線程指針指向它。

實際實現的時候容易在諸多的小細節上出錯,多用用 make qemu 根據最新的 BUG 信息來面向樣例編程吧。

下面代碼中涉及 budget 和 affinity的操作將在搶占式調度中講解

int rr_sched_enqueue(struct thread *thread)
{
	if (thread == NULL || thread->thread_ctx == NULL) 
		return -1;
	if (thread->thread_ctx->type == TYPE_IDLE) {
		return 0;
	}
	if (thread->thread_ctx->state == TS_READY) {
		return -1;
	}
	s32 aff = thread->thread_ctx->affinity;
	if (aff == NO_AFF)
		thread->thread_ctx->cpuid = smp_get_cpu_id();
	else if (aff < PLAT_CPU_NUM)
		thread->thread_ctx->cpuid = aff;
	else {
		//kwarn("thread->thread_ctx->affinity >= PLAT_CPU_NUM\n");
		return -1;
	}
	thread->thread_ctx->state = TS_READY;
	list_append(&thread->ready_queue_node, &rr_ready_queue[thread->thread_ctx->cpuid]);
	return 0;
}

int rr_sched_dequeue(struct thread *thread)
{
	if (thread == NULL
		|| thread->thread_ctx == NULL
		|| list_empty(&thread->ready_queue_node)
		|| thread->thread_ctx->affinity >= PLAT_CPU_NUM
		|| thread->thread_ctx->type == TYPE_IDLE
		|| thread->thread_ctx->state != TS_READY) {
		return -1;
	}
	thread->thread_ctx->state = TS_INTER;
	list_del(&thread->ready_queue_node);
	return 0;
}

struct thread *rr_sched_choose_thread(void)
{
	u32 cpu_id = smp_get_cpu_id();
	if (list_empty(&rr_ready_queue[cpu_id]))
		goto ret_idle;
	struct thread *ret = list_entry(rr_ready_queue[cpu_id].next, struct thread, ready_queue_node);
	if (rr_sched_dequeue(ret)) {
		goto ret_idle;
	}
	return ret;
ret_idle:
	return &idle_threads[cpu_id];
}

int rr_sched(void)
{
	if (current_thread != NULL
		&& current_thread->thread_ctx->type != TYPE_IDLE) {
		if (current_thread->thread_ctx->sc->budget > 0) {
			return -1;
		}
		if (rr_sched_enqueue(current_thread)) {
			return -1;
		}
	}
	struct thread *target = rr_sched_choose_thread();
	rr_sched_refill_budget(target, DEFAULT_BUDGET);
	switch_to_thread(target);
	return 0;
}

線程切換流程

目前 ChCore 的線程切換過程步驟有:

1)當前線程主動(協作式調度)或者被動(搶占式調度)的引發中斷,陷入內核態,在中斷程序入口處調用 exception_enter 保存上下文。

2)調用 sched 函數。切換 current_thread

3)eret_to_thread(switch_context()),恢復 current_thread 的上下文到寄存器中。

為了用戶態能夠主動讓出 CPU,我們提供了 sys_yield 系統調用。

void sys_yield(void)
{
	sched();
	eret_to_thread(switch_context());
}

此時 make run-yield_single 可以觀察到在單 CPU 下兩個線程可以通過 sys_yield 來交替的運行。

Hello, I am thread 0
Hello, I am thread 1
Iteration 0, thread 0, cpu 0
Iteration 0, thread 1, cpu 0
Iteration 1, thread 0, cpu 0
Iteration 1, thread 1, cpu 0

搶占式調度

時鍾中斷與搶占

為了支持內核搶占 CPU,我們需要啟用時鍾中斷。每個一小段時間觸發一個硬件定時器中斷。

void exception_init_per_cpu(void)
{
	timer_init();
	set_exception_vector();
	disable_irq();
}

相應的給每個線程分配一個整形變量調度預算 budget。每次觸發時鍾中斷都會讓 budget 減一。當且僅當 budget 變成零時該線程才可以被搶占。所以需要在 rr_sched 里加點驗證操作。

static inline void rr_sched_refill_budget(struct thread *target, u32 budget)
{
	if(target && target->thread_ctx && target->thread_ctx->type != TYPE_IDLE)
		target->thread_ctx->sc->budget = budget;
}

int rr_sched(void)
{
// ......
		if (rr_sched_enqueue(current_thread)) {
			return -1;
		}
// ......
}

時鍾中斷發生時將沿着 handle_irq -> plat_handle_irq -> handle_timer_irq -> sched_handle_timer_irq -> rr_sched_handle_timer_irq 的順序逐級調用,最終我們要在 rr_sched_handle_timer_irq 對 budget 做一下變更操作。

void rr_sched_handle_timer_irq(void)
{
    // 各種判斷不能少,部分樣例會針對這一點
	if (current_thread == NULL
		|| current_thread->thread_ctx->type == TYPE_IDLE)
		return;
	if (current_thread->thread_ctx->sc->budget > 0)
		current_thread->thread_ctx->sc->budget--;
}

在時鍾中斷返回前也要檢查下能不能進行調度。

void handle_irq(int type)
{
	if (type > ERROR_EL1h)
		lock_kernel();
	plat_handle_irq();
	sched();
	eret_to_thread(switch_context());
}

系統調用 sys_yield 是立刻進行切換,所以我們也要對 budget 做一下清零操作。

void sys_yield(void)
{
	current_thread->thread_ctx->sc->budget = 0;
	sched();
	eret_to_thread(switch_context());
}

處理器親和性

目前 ChCore 創建的線程與父線程都在同一個 CPU 上,沒有辦法分發到其他的 CPU 上。為了解決這一問題我們給每個線程引入一個親和性 affinity 標識。當將線程插入就緒隊列時,如果它的 affinity 為 NO_AFF,那就插到當前 CPU 的就緒隊列里;否則插到 affinity 號 CPU 的就緒隊列里。如此變成了將線程分發到其他 CPU 上的操作。

int rr_sched_enqueue(struct thread *thread)
{
	// ......
	s32 aff = thread->thread_ctx->affinity;
	if (aff == NO_AFF)
		thread->thread_ctx->cpuid = smp_get_cpu_id();
	else if (aff < PLAT_CPU_NUM)
		thread->thread_ctx->cpuid = aff;
	else {
		//kwarn("thread->thread_ctx->affinity >= PLAT_CPU_NUM\n");
		return -1;
	}
	thread->thread_ctx->state = TS_READY;
	list_append(&thread->ready_queue_node, &rr_ready_queue[thread->thread_ctx->cpuid]);
	return 0;
}

int rr_sched_dequeue(struct thread *thread)
{
	if ( // ......
		|| thread->thread_ctx->affinity >= PLAT_CPU_NUM)
		return -1;
    // ......
}

同時我們還對用戶提供了 get 和 set 親和性的系統調用。

int sys_set_affinity(u64 thread_cap, s32 aff)
{
	// ......
	if (thread == NULL || thread->thread_ctx == NULL)
		return -1;
	thread->thread_ctx->affinity = aff;
	return 0;
}

int sys_get_affinity(u64 thread_cap)
{
	// ......
	if (thread == NULL || thread->thread_ctx == NULL)
		return -1;
	return thread->thread_ctx->affinity;
}

spawn()

Linux 里使用 fork()exec() 兩個步驟來執行一個程序文件。 但還存在 spawn() 這個接口實現了前兩者組合后的功能。在 Linux 下查閱 posix_spawn() 的 manpage 可知, spawn() 是為了在某些沒有 MMU 的小型機上用來在一定范圍內替代 fork() 的。

對於 ChCore 來說 spawn() 的流程與 Lab 3 里的 process_create_root() 比較相似,Lab 3 分析好了這一部分做起來還是蠻簡單的。

spawn() 中的用戶棧

先來分析下講義里的這張圖:

從中我們可以提取出的信息有:

子進程的用戶棧基地址為 MAIN_THREAD_STACK_BASE,大小為 MAIN_THREAD_STACK_SIZE,這幾個宏都在 def.h 有定義。

用戶棧頂部的一個 PAGE_SIZE 的空間用來存放用戶程序的各種啟動參數,程序開始運行時棧頂指針 SP 應該指向這個頁的末尾處,即 MAIN_THREAD_STACK_BASE + MAIN_THREAD_STACK_SIZE - PAGE_SIZE

spawn() 的流程

下面分析下 spawn() 的源碼,為了節約篇幅刪掉了異常處理的代碼。建議對比着 Lab 3 里對 process_create_root() 的分析來看。

int spawn(char *path, int *new_process_cap, int *new_thread_cap,
	  struct pmo_map_request *pmo_map_reqs, int nr_pmo_map_reqs, int caps[],
	  int nr_caps, int aff)
{
	struct user_elf user_elf;
	int ret;
	
    // 將 ELF 文件整個讀入內存中
	ret = readelf_from_kernel_cpio(path, &user_elf);
    
    // 創建子進程並解析 ELF 文件
	return launch_process_with_pmos_caps(&user_elf, new_process_cap,
					     new_thread_cap, pmo_map_reqs,
					     nr_pmo_map_reqs, caps, nr_caps,
					     aff);
}

int launch_process_with_pmos_caps(struct user_elf *user_elf,
				  int *child_process_cap,
				  int *child_main_thread_cap,
				  struct pmo_map_request *pmo_map_reqs,
				  int nr_pmo_map_reqs, int caps[], int nr_caps,
				  s32 aff)
{
    	// 創建新進程作為子進程
		new_process_cap = usys_create_process();
    	
		// 將 ELF 文件的各個程序段映射到子進程的對應位置處
		// 類似於 load_binary
		for (i = 0; i < 2; ++i) {
			p_vaddr = user_elf->user_elf_seg[i].p_vaddr;
			ret = usys_map_pmo(new_process_cap,
					   user_elf->user_elf_seg[i].elf_pmo,
					   ROUND_DOWN(p_vaddr, PAGE_SIZE),
					   user_elf->user_elf_seg[i].flags);
		}
		pc = user_elf->elf_meta.entry;
    
		// 創建一個大小為 MAIN_THREAD_STACK_SIZE 的 PMO 作為用戶棧
		// PMO 類型取 PMO_DATA 原因參考 thread_create_main 中對 PMO 的處理
		pmo_requests[0].size = MAIN_THREAD_STACK_SIZE;
		pmo_requests[0].type = PMO_DATA;
		ret = usys_create_pmos((void *)pmo_requests, 1);

		// 創建好的 cap 就是用戶棧的 cap
		main_stack_cap = pmo_requests[0].ret_cap;
    
    	// 為了實現共享內存機制,將父進程制定的 capability 傳遞給子進程
		if (nr_caps > 0) {
			/* usys_transfer_caps is used during process creation */
			ret = usys_transfer_caps(new_process_cap, caps, nr_caps,
						 transfer_caps);
		}
    
		// 還是為了支持共享內存,把傳遞過來的 PMO 挨個映射一遍
		if (nr_pmo_map_reqs) {
			ret =
			    usys_map_pmos(new_process_cap, (void *)pmo_map_reqs,
					  nr_pmo_map_reqs);
		}
    
		// 類似於 thread_create_main 里的 prepare_env
		// 把子進程要用的各種參數存放到用戶棧最頂上的那一個初始頁中
		stack_top = MAIN_THREAD_STACK_BASE + MAIN_THREAD_STACK_SIZE;
		stack_offset = MAIN_THREAD_STACK_SIZE - PAGE_SIZE;
		construct_init_env(init_env, stack_top, &user_elf->elf_meta,
				   user_elf->path, pmo_map_reqs,
				   nr_pmo_map_reqs, transfer_caps, nr_caps);
    
    	// 把做好的初始頁寫到用戶棧頂部
		ret = usys_write_pmo(main_stack_cap, stack_offset, init_env,
				     PAGE_SIZE);
    
    	// 把做好的用戶棧映射到子進程的地址空間中
		pmo_map_requests[0].pmo_cap = main_stack_cap;
		pmo_map_requests[0].addr = MAIN_THREAD_STACK_BASE;
		pmo_map_requests[0].perm = VM_READ | VM_WRITE;

		ret =
		    usys_map_pmos(new_process_cap, (void *)pmo_map_requests, 1);
    
    	// 創建子進程的主線程,指定 SP 的地址
		stack_va = MAIN_THREAD_STACK_BASE + MAIN_THREAD_STACK_SIZE - PAGE_SIZE;
		main_thread_cap =
		    usys_create_thread(new_process_cap, stack_va, pc,
				       (u64) NULL, MAIN_THREAD_PRIO, aff);
    
    	// 返回子進程和其主線程的 cap
		if (child_process_cap != NULL)
			*child_process_cap = new_process_cap;
		if (child_main_thread_cap != NULL)
			*child_main_thread_cap = main_thread_cap;

進程間通信

IPC 實例

ChCore 里的進程間通信(IPC,Inter-Process Communication)類似於網絡編程中的客戶端—服務端模型。我們直接通過講義里的例子來研究下用戶眼里的 IPC 長啥樣:

正常狀態下進程間通信分為三步:

1)服務端調用 ipc_register_server() 登記自己的消息處理函數,上圖里該函數為 ipc_dispatcher()

2)客戶端調用 ipc_register_client() 登記自己的相關信息到指定的服務端上。

3)客戶端調用 ipc_call() 向服務端發送信息,服務端在自己的消息處理函數里以函數參數的形式捕獲到消息來進行后續的邏輯處理,最后使用 ipc_return() 返回處理的結果。

那客戶端是如何標記自己要找的那個服務端的呢?下面引用一小段 ipc_data.c 中的代碼:

ret = spawn("/ipc_data_server.bin", &new_process_cap, &new_thread_cap,
		    pmo_map_reqs, 1, NULL, 0, 1);
// ...

ret = ipc_register_client(new_thread_cap, &client_ipc_struct);

可以發現客戶端是通過 spawn() 得到子進程的主線程的 cap,並以此來向該進程建立連接的。

IPC 原理圖

接下來的部分將逐一分析 ChCore 是如何實現 IPC 的各個系統調用的。這里先放一張講義中的圖,后面不理解流程的話可以回頭過來看看。

ipc_register_server()

ipc_register_server() 為 ChCore 提供的系統調用,用戶只需要指定一個信號處理函數即可將當前進程等記為 IPC 的服務端進程。

該函數會創建一個 ipc_vm_config 結構體,分別用宏來指定了服務端線程里的運行時棧的和緩沖區起始地址和大小。這里請留意下目前只是地址,並未分配內存。至於它們的作用后面再講。

struct ipc_vm_config vm_config = {
    .stack_base_addr = SERVER_STACK_BASE,
    .stack_size = SERVER_STACK_SIZE,
    .buf_base_addr = SERVER_BUF_BASE,
    .buf_size = SERVER_BUF_SIZE,
};

同時為了支持 IPC,每個線程的 thread 結構體都增加了 active_connserver_ipc_config 這兩個條目。server_ipc_config 中的各項含義請見注釋。

#define IPC_MAX_CONN_PER_SERVER 32
struct server_ipc_config {
	u64 callback;	// 消息處理函數的地址
	u64 max_client;	// 線程支持的最大客戶數量
	/* bitmap for shared buffer and stack allocation */
    // conn_bmp 是一個按二進制位使用的位圖,在后面分配緩存區和棧時用到
	unsigned long *conn_bmp; 
	struct ipc_vm_config vm_config; // 每個服務端都有一個 vm_config
};

前面用宏定義好的 vm_config 將被轉發到 register_server() 中。在這里面設置好了 server_icp_config 的每一項,並綁定到進程的 thread 結構體中。注意此時還是未給 vm_config 的棧和緩沖區分配內存。

static int register_server(struct thread *server, u64 callback, u64 max_client,
			   u64 vm_config_ptr)
{
	int r;
	struct server_ipc_config *server_ipc_config;
	struct ipc_vm_config *vm_config;
	BUG_ON(server == NULL);

	// Create the server ipc_config
	server_ipc_config = kmalloc(sizeof(struct server_ipc_config));
	server->server_ipc_config = server_ipc_config;

	// Init the server ipc_config
	server_ipc_config->callback = callback;
	server_ipc_config->max_client = max_client;
	server_ipc_config->conn_bmp =
	    kzalloc(BITS_TO_LONGS(max_client) * sizeof(long));
		
	// Get and check the parameter vm_config
	vm_config = &server_ipc_config->vm_config;
	r = copy_from_user((char *)vm_config, (char *)vm_config_ptr,
			   sizeof(*vm_config));

	return r;
}

ipc_register_client()

ipc_register_client(int server_thread_cap, ipc_struct_t * ipc_struct);

typedef struct ipc_struct {
	u64 conn_cap; // ICP 連接的 cap,類似於 Linux 下每個管道都有一個 fd
	u64 shared_buf; // 共享緩沖區的地址
	u64 shared_buf_len; // 長度
} ipc_struct_t;

客戶端登記時調用上面的函數,其中 server_thread_cap 可以通過 spawn() 獲得,不過這也造成了因為 cap 只能在父子進程間傳遞,所以目前 ChCore 只能在父子間通信。ipc_struct 則是用來向客戶端返回 ipc_register_client() 使用的,返回的參數見注釋。

再往深處到達 ipc_register_client,在這里客戶端也被創建了一個 vm_config。同服務端做下對比可發現這里並未設置棧的起始地址和長度,原因下個函數就會揭曉。

struct ipc_vm_config vm_config = {
    .buf_base_addr = CLIENT_BUF_BASE, 
    .buf_size = CLIENT_BUF_SIZE,
};

到了 create_connection() 函數中,在這里將構建用於通信的 ipc_connection

struct ipc_connection {
	struct thread *source; /* Source Thread */
	struct thread *target; /* Target Thread */
	u64 server_conn_cap; /* Conn cap in server */
	u64 callback; /* Target function */
	u64 server_stack_top; /* Shadow server stack top */
	u64 server_stack_size;
	struct shared_buf buf; /* Shared buffer */
};

上面的 source 為客戶端的當前線程,target 為我們在服務端進程下創建的一個新線程,作用將在 ipc_call() 中分析 。

static int create_connection(struct thread *source, struct thread *target,
			     struct ipc_vm_config *client_vm_config)
{
	struct ipc_connection *conn = NULL;
	// Get the ipc_connection
	conn = obj_alloc(TYPE_CONNECTION, sizeof(*conn));
	conn->target = create_server_thread(target);

棧的處理方法就比較特別了:我們前面知道了服務端的 stack_base_addrstack_size,還有一個位圖 conn_bmp 。接下來我們尋找到位圖中首個未使用的標志位,是第幾位就把臨時變量 idx 記為幾。

	// Get the server's ipc config
	server_ipc_config = target->server_ipc_config;
	vm_config = &server_ipc_config->vm_config;
	conn_idx = find_next_zero_bit(server_ipc_config->conn_bmp,
				      server_ipc_config->max_client, 0);
	set_bit(conn_idx, server_ipc_config->conn_bmp);

然后通過 stack_base_addr + conn_idx * vm_config->stack_size 計算出當前這個連接使用的棧基地址,並分配一個 PMO 作為棧的內存塊,然后掛載到服務端的地址空間里。為什么只掛載到服務端我們將在 ipc_call() 中分析。

	// Create the server thread's stack
	server_stack_base =
	    vm_config->stack_base_addr + conn_idx * vm_config->stack_size;
	stack_size = vm_config->stack_size;
	stack_pmo = kmalloc(sizeof(struct pmobject));
	pmo_init(stack_pmo, PMO_DATA, stack_size, 0);
	vmspace_map_range(target->vmspace, server_stack_base, stack_size,
			  VMR_READ | VMR_WRITE, stack_pmo);
	conn->server_stack_top = server_stack_base + stack_size;

緩沖區的處理類似於棧但又不大相同。因為緩沖區算是客戶端和服務端間的共享內存,所以要分別計算在客戶端和服務端的虛擬地址。

	// Create and map the shared buffer for client and server
	server_buf_base =
	    vm_config->buf_base_addr + conn_idx * vm_config->buf_size;
	client_buf_base = client_vm_config->buf_base_addr;
	buf_size = MIN(vm_config->buf_size, client_vm_config->buf_size);
	client_vm_config->buf_size = buf_size;

但分配只要分配一個 PMO 就行,把它分別掛載到客戶端地址空間和服務端地址空間中。

	buf_pmo = kmalloc(sizeof(struct pmobject));
	pmo_init(buf_pmo, PMO_DATA, buf_size, 0);
	vmspace_map_range(current_thread->vmspace, client_buf_base, buf_size,
			  VMR_READ | VMR_WRITE, buf_pmo);
	vmspace_map_range(target->vmspace, server_buf_base, buf_size,
			  VMR_READ | VMR_WRITE, buf_pmo);

創建好的共享緩沖區將作為一個 shared_buf 結構體掛載到 ipc_connection 中。

/*
struct shared_buf {
	u64 client_user_addr;
	u64 server_user_addr;
	u64 size;
};
*/
	conn->buf.client_user_addr = client_buf_base;
	conn->buf.server_user_addr = server_buf_base;

server_conn_cap 是將當前的 ipc_connection 掛載到服務端的 cap 列表里得出的。

	conn_cap = cap_alloc(current_process, conn, 0);

	server_conn_cap =
	    cap_copy(current_process, target->process, conn_cap, 0, 0);
	conn->server_conn_cap = server_conn_cap;

	return conn_cap;
 out_free_stack_pmo:
}

ipc_call()

IPC 中傳遞消息的單位為 ipc_msg。消息的本體將寫在共享緩沖區里,這里記錄的是消息的相對於緩沖區起始地址的偏移和長度。如果消息比較短,那就作為一種 data 直接寫到緩沖區里。如果消息比較長,那就申請一塊 PMO 然后把它的 cap 寫入到緩沖區里。

typedef struct ipc_msg {
	u64 server_conn_cap;
	u64 data_len;
	u64 cap_slot_number;
	u64 data_offset;
	u64 cap_slots_offset;
} ipc_msg_t;

ipc_msg 的構建是在 ipc_create_msg 中完成的,這里面有個有趣的點是 ipc_msg 的本體是放在 icp_conn 的共享緩沖區開頭處的。

ipc_msg_t *ipc_create_msg(ipc_struct_t * icb, u64 data_len, u64 cap_slot_number)
{
	ipc_msg_t *ipc_msg;
	ipc_msg = (ipc_msg_t *) icb->shared_buf;
	ipc_msg->data_len = data_len;
	ipc_msg->cap_slot_number = cap_slot_number;
	// ......
}

之后層層調用來到 sys_ipc_call(),也是我們需要實現的函數。

首先要取出當前進程的活躍連接。

u64 sys_ipc_call(u32 conn_cap, ipc_msg_t * ipc_msg)
{
	struct ipc_connection *conn = NULL;
	u64 arg;
	int r;

	conn = obj_get(current_thread->process, conn_cap, TYPE_CONNECTION);
	if (!conn) {
		r = -ECAPBILITY;
		goto out_fail;
	}

直接用現成的 ipc_send_cap()ipc_msg 里的 cap 都轉移到 conn->target->process 即服務端進程的 cap 列表里。

	/**
	 * Lab4
	 * Here, you need to transfer all the capbiliies of client thread to
	 * capbilities in server thread in the ipc_msg.
	 */
	r = ipc_send_cap(conn, ipc_msg);
	if (r < 0)
		goto out_obj_put;
	
	r = copy_to_user((char *)&ipc_msg->server_conn_cap,
			 (char *)&conn->server_conn_cap, sizeof(u64));
	if (r < 0)
		goto out_obj_put;

講義提示 arg 為 ipc_dispatcher() 的參數,查閱可知參數為 ipc_msg_t。直接把當前函數的 ipc_msg 放進去是不可以的,因為我們這個地址是要傳給服務端的,而當前線程是在客戶端的,根本不在一個地址空間里。所以只能通過其他方法搞。想到前面提到 ipc_msg 的本體是放在 icp_conn 的共享緩沖區開頭處的,所以參數就是緩沖區在服務端的起始地址,即 conn->buf.server_user_addr

	/**
	 * Lab4
	 * The arg is actually the 64-bit arg for ipc_dispatcher
	 * Then what value should the arg be?
	 * */

	// void ipc_dispatcher(ipc_msg_t * ipc_msg)
	arg = conn->buf.server_user_addr;
	thread_migrate_to_server(conn, arg);

	BUG("This function should never\n");
 out_obj_put:
	obj_put(conn);
 out_fail:
	return r;
}

接下來到了 thread_migrate_to_server()。在這個函數中我們將從客戶端線程遷移到服務端線程,並在它的回調函數處繼續執行。

static u64 thread_migrate_to_server(struct ipc_connection *conn, u64 arg)
{
	struct thread *target = conn->target;

	conn->source = current_thread;
	target->active_conn = conn;
	current_thread->thread_ctx->state = TS_WAITING;
	obj_put(conn);

這時候我們前面設置好的 server_stack_top 終於派上用場了,服務端線程的堆頂就該是它。

	/**
	 * Lab4
	 * This command set the sp register, read the file to find which field
	 * of the ipc_connection stores the stack of the server thread?
	 * */
	arch_set_thread_stack(target, conn->server_stack_top);

下一條指令為回調函數的地址。

	/**
	 * Lab4
	 * This command set the ip register, read the file to find which field
	 * of the ipc_connection stores the instruction to be called when switch
	 * to the server?
	 * */
	arch_set_thread_next_ip(target, conn->target->server_ipc_config->callback);

這個變量名都寫好了,直接賦值。

	/**
	 * Lab4
	 * The argument set by sys_ipc_call;
	 */
	arch_set_thread_arg(target, arg);

最后設置下上下文,然后切換到服務端線程。

	/**
	 * Passing the scheduling context of the current thread to thread of
	 * connection
	 */
	target->thread_ctx->sc = current_thread->thread_ctx->sc;

	/**
	 * Switch to the server
	 */
	switch_to_thread(target);
	eret_to_thread(switch_context());

	/* Function never return */
	BUG_ON(1);
	return 0;
}

ipc_return()

ipc_return() 就比較簡單了。設置下返回值,然后切換到 conn 的客戶端線程,即 conn->source

void sys_ipc_return(u64 ret)
{
	struct ipc_connection *conn = current_thread->active_conn;
    // ...
	thread_migrate_to_client(conn, ret);
    // ...
}

static int thread_migrate_to_client(struct ipc_connection *conn, u64 ret_value)
{
	struct thread *source = conn->source;
	current_thread->active_conn = NULL;

	/**
	 * Lab4
	 * The return value returned by server thread;
	 */
	arch_set_thread_return(source, ret_value);
	/**
	 * Switch to the client
	 */
	switch_to_thread(source);
	eret_to_thread(switch_context());

	/* Function never return */
	BUG_ON(1);
	return 0;
}

ipc_reg_call()

這里我們需要實現一個更加輕量級的 ipc_call(),我們不需要向服務端的消息處理函數傳遞 ipc_msg 的地址,只需要傳遞一個 64 位的數值作為消息即可。照着 ipc_call() 改一改傳給 thread_migrate_to_server 的參數就出來了。

u64 sys_ipc_reg_call(u32 conn_cap, u64 arg0)
{
	struct ipc_connection *conn = NULL;
	int r;

	conn = obj_get(current_thread->process, conn_cap, TYPE_CONNECTION);
	if (!conn) {
		r = -ECAPBILITY;
		goto out_fail;
	}

	thread_migrate_to_server(conn, arg0);

	BUG("This function should never\n");
 out_obj_put:
	obj_put(conn);
 out_fail:
	return r;
}

實現好后還需要在系統調用表里登記一下

const void *syscall_table[NR_SYSCALL] = {
	[0 ... NR_SYSCALL - 1] = sys_debug,
// .....
	/* 
	 * Lab4
	 * Add syscall
	 */
	[SYS_ipc_reg_call] = sys_ipc_reg_call,
// ....

參考資料

隨手寫:此BSP非彼BSP - 艾弗18F的文章 - 知乎

05lover/chcore: 2020 os - github

a-daydream/chcore-lab-mospi-2020 - gitee

后記

本來以為爆肝一天就能把這個 Lab 干出來,沒想到在寫調度模塊時竟然陷進了 bug 的泥潭。自己懟了半天找不出錯來,還是用網友的代碼挨個替換后運行才發現的問題,一整天的時間只做完了多核支持和調度兩大部分。這種自學的東西最怕出錯誤了,又沒人溝通可能自己掉進了牛角尖里都不知道,耗掉大量的時間。所以我把自己搗鼓的過程和思路寫成博客發出來也是為了方便后人掉了坑里后能有個大體的參考,最起碼來說能向我一樣用蠻力替換的方法定位出自己的代碼錯在哪了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM