xv6學習筆記(5) : 鎖與管道與多cpu
1. xv6鎖結構
1. xv6操作系統要求在內核臨界區操作時中斷必須關閉。
如果此時中斷開啟,那么可能會出現以下死鎖情況:
- 進程A在內核態運行並拿下了p鎖時,觸發中斷進入中斷處理程序。
- 中斷處理程序也在內核態中請求p鎖,由於鎖在A進程手里,且只有A進程執行時才能釋放p鎖,因此中斷處理程序必須返回,p鎖才能被釋放。
- 那么此時中斷處理程序會永遠拿不到鎖,陷入無限循環,進入死鎖。
2. xv6中的自旋鎖
Xv6中實現了自旋鎖(Spinlock)用於內核臨界區訪問的同步和互斥。自旋鎖最大的特征是當進程拿不到鎖時會進入無限循環,直到拿到鎖退出循環。Xv6使用100ms一次的時鍾中斷和Round-Robin調度算法來避免陷入自旋鎖的進程一直無限循環下去。Xv6允許同時運行多個CPU核,多核CPU上的等待隊列實現相當復雜,因此使用自旋鎖是相對比較簡單且能正確執行的實現方案。
v6中鎖的定義如下
// Mutual exclusion lock.
struct spinlock {
uint locked; // Is the lock held?
// For debugging:
char *name; // Name of lock.
struct cpu *cpu; // The cpu holding the lock.
uint pcs[10]; // The call stack (an array of program counters)
// that locked the lock.
};
核心的變量只有一個locked
,當locked
為1時代表鎖已被占用,反之未被占用,初始值為0。
在調用鎖之前,必須對鎖進行初始化。
void initlock(struct spinlock *lk, char *name) {
lk->name = name;
lk->locked = 0;
lk->cpu = 0;
}
這里的需要注意的就是如何對locked
變量進行原子操作占用鎖和釋放鎖。
這兩步具體被實現為acquire()
和release()
函數。
1. acquire函數
- 首先禁止中斷防止死鎖(如果這里不禁止中斷就可能會出現上述的死鎖問題
- 然后利用
xchg
來上鎖
void
acquire(struct spinlock *lk)
{
pushcli(); // disable interrupts to avoid deadlock.
if(holding(lk))
panic("acquire");
// The xchg is atomic.
while(xchg(&lk->locked, 1) != 0)
;
// Tell the C compiler and the processor to not move loads or stores
// past this point, to ensure that the critical section's memory
// references happen after the lock is acquired.
__sync_synchronize();
// Record info about lock acquisition for debugging.
lk->cpu = mycpu();
getcallerpcs(&lk, lk->pcs);
}
xchg
這里采用內聯匯編實現
最前面的lock表示這是一條指令前綴,它保證了這條指令對總線和緩存的獨占權,也就是這條指令的執行過程中不會有其他CPU或同CPU內的指令訪問緩存和內存。
static inline uint xchg(volatile uint *addr, uint newval) {
uint result;
// The + in "+m" denotes a read-modify-write operand.
asm volatile("lock; xchgl %0, %1" :
"+m" (*addr), "=a" (result) :
"1" (newval) :
"cc");
return result;
}
最后,acquire()
函數使用__sync_synchronize
為了避免編譯器對這段代碼進行指令順序調整的話和避免CPU在這塊代碼采用亂序執行的優化。
這樣就保證了原子操作的加鎖。也就是把lk->locked
設置為1
2. release函數
// Release the lock.
void release(struct spinlock *lk) {
if(!holding(lk))
panic("release");
lk->pcs[0] = 0;
lk->cpu = 0;
// Tell the C compiler and the processor to not move loads or stores
// past this point, to ensure that all the stores in the critical
// section are visible to other cores before the lock is released.
// Both the C compiler and the hardware may re-order loads and
// stores; __sync_synchronize() tells them both not to.
__sync_synchronize();
// Release the lock, equivalent to lk->locked = 0.
// This code can't use a C assignment, since it might
// not be atomic. A real OS would use C atomics here.
asm volatile("movl $0, %0" : "+m" (lk->locked) : );
popcli();
}
release
函數為了保證設置locked為0的操作的原子性,同樣使用了內聯匯編。
2. 信號量
在xv6中沒有信號量,但是我看博客有大佬利用xv6中提供的接口實現了信號量
3. 喚醒與睡眠
1. 先看有問題的版本
struct q {
struct spinlock lock;
void *ptr;
};
void *
send(struct q *q, void *p)
{
acquire(&q->lock);
while(q->ptr != 0)
;
q->ptr = p;
wakeup(q);
release(&q->lock);
}
void*
recv(struct q *q)
{
void *p;
acquire(&q->lock);
while((p = q->ptr) == 0)
sleep(q);
q->ptr = 0;
release(&q->lock;
return p;
}
這個代碼乍一看是沒什么問題,發出消息之后,調用wakeup喚醒進程來接受。而且在發送的過程中保持鎖。但是會出現問題。在發送完成后釋放鎖。同時wakeup會讓進程從seleep狀態返回,這里的返回就會到recv函數來進行接受。
但是這里的問題在於當當 recv
帶着鎖 q->lock
進入睡眠后,發送者就會在希望獲得鎖時一直阻塞。
所以想要解決問題,我們必須要改變 sleep
的接口。sleep
必須將鎖作為一個參數,然后在進入睡眠狀態后釋放之;這樣就能避免上面提到的“遺失的喚醒”問題。一旦進程被喚醒了,sleep
在返回之前還需要重新獲得鎖。於是我們應該使用下面的代碼:
2. 上面版本的改進
struct q {
struct spinlock lock;
void *ptr;
};
void *
send(struct q *q, void *p)
{
acquire(&q->lock);
while(q->ptr != 0)
;
q->ptr = p;
wakeup(q);
release(&q->lock);
}
void*
recv(struct q *q)
{
void *p;
acquire(&q->lock);
while((p = q->ptr) == 0)
sleep(q, &q->lock);
q->ptr = 0;
release(&q->lock;
return p;
}
recv
持有 q->lock
就能防止 send
在 recv
檢查 q->ptr
與調用 sleep
之間調用 wakeup
了。當然,為了避免死鎖,接收進程最好別在睡眠時仍持有鎖。所以我們希望 sleep
能用原子操作釋放 q->lock
並讓接收進程進入休眠狀態。
完整的發送者/接收者的實現還應該讓發送者在等待接收者拿出前一個 send
放入的值時處於休眠狀態。
3. xv6的wakeup和sleep
sleep(chan)
讓進程在任意的 chan
上休眠,稱之為等待隊列(wait channel)。sleep
讓調用進程休眠,釋放所占 CPU。wakeup(chan)
則喚醒在 chan
上休眠的所有進程,讓他們的 sleep
調用返回。如果沒有進程在 chan
上等待喚醒,wakeup
就什么也不做。
總體思路是希望
sleep
將當前進程轉化為SLEEPING
狀態並調用sched
以釋放 CPU,而wakeup
則尋找一個睡眠狀態的進程並把它標記為RUNNABLE
。
1. sleep函數
- 首先sleep會做幾個檢查:必須存在當前進程、並且
sleep
必須持有鎖(2558-2559)。接着sleep
要求持有ptable.lock
- 於是該進程就會同時持有鎖
ptable.lock
和lk
了。調用者(例如recv
)是必須持有lk
的,這樣可以保證其他進程(例如一個正在運行的send
)無法調用wakeup(chan)
。而如今sleep
已經持有了ptable.lock
,那么它現在就能安全地釋放lk
了:這樣即使別的進程調用了wakeup(chan)
,wakeup
也不可能在沒有持有ptable.lock
的情況下運行,所以wakeup
必須等待sleep
讓進程睡眠后才能運行。這樣一來,wakeup
就不會錯過sleep
了。 - 這里要注意如果
lk == ptable.lock
的話則就會跳過對於if語句的判斷 - 后面做的就是把當前進程放入chan中(等待隊列)
- 隨后進行調度交出cpu
void
sleep(void *chan, struct spinlock *lk)
{
struct proc *p = myproc();
if(p == 0)
panic("sleep");
if(lk == 0)
panic("sleep without lk");
// Must acquire ptable.lock in order to
// change p->state and then call sched.
// Once we hold ptable.lock, we can be
// guaranteed that we won't miss any wakeup
// (wakeup runs with ptable.lock locked),
// so it's okay to release lk.
if(lk != &ptable.lock){ //DOC: sleeplock0
acquire(&ptable.lock); //DOC: sleeplock1
release(lk);
}
// Go to sleep.
p->chan = chan;
p->state = SLEEPING;
sched();
// Tidy up.
p->chan = 0;
// Reacquire original lock.
if(lk != &ptable.lock){ //DOC: sleeplock2
release(&ptable.lock);
acquire(lk);
}
}
2. wakeup函數
- 遍歷所有的進程表
- 判斷p->chan == chan 設置為runable
// Wake up all processes sleeping on chan.
void
wakeup(void *chan)
{
acquire(&ptable.lock);
wakeup1(chan);
release(&ptable.lock);
}
// wakeup --> wakeup1
// Wake up all processes sleeping on chan.
// The ptable lock must be held.
//
static void
wakeup1(void *chan)
{
struct proc *p;
for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
if(p->state == SLEEPING && p->chan == chan)
p->state = RUNNABLE;
}
4.管道
在xv6中管道是比較復雜的使用 sleep
/wakeup
來同步讀者寫者隊列的例子。
1. 管道結構體
struct pipe {
struct spinlock lock;
char data[PIPESIZE];
uint nread; // number of bytes read
uint nwrite; // number of bytes written
int readopen; // read fd is still open
int writeopen; // write fd is still open
};
我們從管道的一端寫入數據字節,然后數據被拷貝到內核緩沖區中,接着就能從管道的另一端讀取數據了。
中有一個鎖 lock
和內存緩沖區。其中的域 nread
和 nwrite
表示從緩沖區讀出和寫入的字節數。pipe
對緩沖區做了包裝,使得雖然計數器繼續增長,但實際上在 data[PIPESIZE - 1]
之后寫入的字節存放在 data[0]
。這樣就讓我們可以區分一個滿的緩沖區(nwrite == nread + PIPESIZE
)和一個空的緩沖區(nwrite == nread
),但這也意味着我們必須使用 buf[nread % PIPESIZE]
而不是 buf[nread]
來讀出/寫入數據。
2. Piperead()函數
- 當緩沖數據區為空的時候(並且writeopen被打開了)這個時候就要等。因為寫操作還沒完成所以主動sleep
- 否則就可以讀了
- 讀完了就喚醒寫
int
piperead(struct pipe *p, char *addr, int n)
{
int i;
acquire(&p->lock);
while(p->nread == p->nwrite && p->writeopen){ //DOC: pipe-empty
if(myproc()->killed){
release(&p->lock);
return -1;
}
sleep(&p->nread, &p->lock); //DOC: piperead-sleep
}
for(i = 0; i < n; i++){ //DOC: piperead-copy
if(p->nread == p->nwrite)
break;
addr[i] = p->data[p->nread++ % PIPESIZE];
}
wakeup(&p->nwrite); //DOC: piperead-wakeup
release(&p->lock);
return i;
}
3. pipewrite()函數
- 當緩沖區已經滿了,但是readopen被打開了,這個時候就要喚醒讀緩沖區的進程去讀操作。
- 否則就是簡單的寫操作了
- 寫完了就喚醒讀
int
pipewrite(struct pipe *p, char *addr, int n)
{
int i;
acquire(&p->lock);
for(i = 0; i < n; i++){
while(p->nwrite == p->nread + PIPESIZE){ //DOC: pipewrite-full
if(p->readopen == 0 || myproc()->killed){
release(&p->lock);
return -1;
}
wakeup(&p->nread);
sleep(&p->nwrite, &p->lock); //DOC: pipewrite-sleep
}
p->data[p->nwrite++ % PIPESIZE] = addr[i];
}
wakeup(&p->nread); //DOC: pipewrite-wakeup1
release(&p->lock);
return n;
}
5. xv6下的多cpu啟動
這里多cpu主要結合828的lab來一起分析。要是有不同的也會做一下對比
在xv6中和828中對於多cpu的支持都是SMP體系架構下的。
在SMP下所有的cpu的地位是一模一樣的,他們具有相同的權限,相同的資源。所有CPU在SMP中在功能上相同,但在引導過程中,它們可以分為兩種類型:
- 引導處理器BSP(bootstrap processor)負責初始化系統並且引導操作系統。
- 應用處理器AP(application processor)在操作系統啟動之后被BSP激活。
由哪個(些)處理器來擔任BSP的功能是由BIOS和硬件決定的,之前的所有代碼都是在BSP上實現的。
總的步驟如下參考
-
BIOS 啟動 BSP,完成讀取內核balabala的操作一直到main.c中
-
BSP 從
MP Configuration Table
中獲取多處理器的的配置信息 -
BSP 啟動 APs,通過發送
INIT-SIPI-SIPI
消息給 APs -
APs 啟動,各個 APs 處理器要像 BSP 一樣建立自己的一些機制,比如保護模式,分頁,中斷等等
關於多核啟動在網上看見了一個很好的博客我這里也是大多參考於它
1. mpinit函數
mpinit
函數負責是檢測CPU個數並將檢測到的CPU存入一個全局的數組中。
這里我們需要解析MP Configuration Table。而在xv6中這個信息使用mp結構體來存儲的。下面是要注意的點!!
這個結構只可能出現在三個位置,尋找 floating pointer 的時候就按下面的循序查找:
- EBDA(Extended BIOS Data Area)最開始的 1KB
- 系統基本內存的最后 1KB (對於 640 KB 的基本內存來說就是 639KB-640KB,對於 512KB 的基本內存來說就是 511KB-512KB)
- BIOS 的 ROM 區域,在
到
之間
-
第一步要調用
mpconfig
函數來解析出mpvoid mpinit(void) { uchar *p, *e; int ismp; struct mp *mp; struct mpconf *conf; struct mpproc *proc; struct mpioapic *ioapic; if((conf = mpconfig(&mp)) == 0) panic("Expect to run on an SMP");
而
mpconfig
函數最重要的就是利用mpserach
函數關於對照關系上面給了一個表格
這里就是完全按照上面說的三個順序進行測試的,只不過對應的地址變化(我也不會,也懶得看了。)就是知道他是依此去這三個地方find mp就行了
static struct mp* mpsearch(void) //尋找mp floating pointer 結構 { uchar *bda; uint p; struct mp *mp; bda = (uchar *) P2V(0x400); //BIOS Data Area地址 if((p = ((bda[0x0F]<<8)| bda[0x0E]) << 4)){ //在EBDA中最開始1K中尋找 if((mp = mpsearch1(p, 1024))) return mp; } else { //在基本內存的最后1K中查找 p = ((bda[0x14]<<8)|bda[0x13])*1024; if((mp = mpsearch1(p-1024, 1024))) return mp; } return mpsearch1(0xF0000, 0x10000); //在0xf0000~0xfffff中查找 }
-
下面就是根據讀到的conf來解析cpu信息
ismp = 1;
lapic = (uint*)conf->lapicaddr;
for(p=(uchar*)(conf+1), e=(uchar*)conf+conf->length; p<e; ){
switch(*p){
case MPPROC: // 如果是處理器
proc = (struct mpproc*)p;
if(ncpu < NCPU) {
cpus[ncpu].apicid = proc->apicid; // 為cpu分配apicid 也就是說apicid標識了唯一的cpu
ncpu++;
}
p += sizeof(struct mpproc);
continue;
case MPIOAPIC:
ioapic = (struct mpioapic*)p;
ioapicid = ioapic->apicno;
p += sizeof(struct mpioapic);
continue;
case MPBUS:
case MPIOINTR:
case MPLINTR:
p += 8;
continue;
default:
ismp = 0;
break;
}
}
if(!ismp)
panic("Didn't find a suitable machine");
if(mp->imcrp){
// Bochs doesn't support IMCR, so this doesn't run on Bochs.
// But it would on real hardware.
outb(0x22, 0x70); // Select IMCR
outb(0x23, inb(0x23) | 1); // Mask external interrupts.
}
}
2. lapicinit(void)
這是在啟動多個aps之間要做的一些初始化操作。
下面用得最多的lapicw
操作其實非常簡單
static void
lapicw(int index, int value)
{
lapic[index] = value;
lapic[ID]; // wait for write to finish, by reading
}
中斷流程:
- 一個 CPU 給其他 CPU 發送中斷的時候, 就在自己的 ICR 中, 放中斷向量和目標LAPIC ID, 然后通過總線發送到對應 LAPIC,
- 目標 LAPIC 根據自己的 LVT(Local Vector Table) 來對不同的中斷進行處理.
- 處理完了寫EOI 表示處理完了.
所以下面的操作其實就是在初始化LVT表
void
lapicinit(void)
{
if(!lapic)
return;
// Enable local APIC; set spurious interrupt vector.
lapicw(SVR, ENABLE | (T_IRQ0 + IRQ_SPURIOUS));
// The timer repeatedly counts down at bus frequency
// from lapic[TICR] and then issues an interrupt.
// If xv6 cared more about precise timekeeping,
// TICR would be calibrated using an external time source.
lapicw(TDCR, X1);
lapicw(TIMER, PERIODIC | (T_IRQ0 + IRQ_TIMER));
lapicw(TICR, 10000000);
// Disable logical interrupt lines.
lapicw(LINT0, MASKED);
lapicw(LINT1, MASKED);
// Disable performance counter overflow interrupts
// on machines that provide that interrupt entry.
if(((lapic[VER]>>16) & 0xFF) >= 4)
lapicw(PCINT, MASKED);
// Map error interrupt to IRQ_ERROR.
lapicw(ERROR, T_IRQ0 + IRQ_ERROR);
// Clear error status register (requires back-to-back writes).
lapicw(ESR, 0);
lapicw(ESR, 0);
// Ack any outstanding interrupts.
lapicw(EOI, 0);
// Send an Init Level De-Assert to synchronise arbitration ID's.
lapicw(ICRHI, 0);
lapicw(ICRLO, BCAST | INIT | LEVEL);
while(lapic[ICRLO] & DELIVS)
;
// Enable interrupts on the APIC (but not on the processor).
lapicw(TPR, 0);
}
3. startothers函數
尋到了有多少個 CPU,而且也有了每個 CPU 的標識信息,就可以去啟動它們了,直接來看 startothers 的代碼:
-
entryother.S 是APs啟動時要運行的代碼,鏈接器將映像放在
_binary_entryother_start
然后將其移動到0x7000 -
然后利用for循環,循環啟動APs
-
最后調用
lapicstartap()
函數來啟動 APs
static void
startothers(void)
{
extern uchar _binary_entryother_start[], _binary_entryother_size[];
uchar *code;
struct cpu *c;
char *stack;
// Write entry code to unused memory at 0x7000.
// The linker has placed the image of entryother.S in
// _binary_entryother_start.
code = P2V(0x7000);
memmove(code, _binary_entryother_start, (uint)_binary_entryother_size);
for(c = cpus; c < cpus+ncpu; c++){
if(c == mycpu()) // We've started already.
continue;
// Tell entryother.S what stack to use, where to enter, and what
// pgdir to use. We cannot use kpgdir yet, because the AP processor
// is running in low memory, so we use entrypgdir for the APs too.
stack = kalloc();
*(void**)(code-4) = stack + KSTACKSIZE;
*(void(**)(void))(code-8) = mpenter;
*(int**)(code-12) = (void *) V2P(entrypgdir);
lapicstartap(c->apicid, V2P(code));
// wait for cpu to finish mpmain()
while(c->started == 0)
;
}
}
4. lapicstartap函數
前面提到過BSP是通過發送信號給APs來啟動其他cpu的,簡單來說就是一個 CPU 通過寫 LAPIC 的 ICR 寄存器來與其他 CPU 進行通信
其實下面一大堆我覺得看不懂也沒差。。太低層了。只要知道下面這一點就行了
BSP通過向AP逐個發送中斷來啟動AP,首先發送INIT中斷來初始化AP,然后發送SIPI中斷來啟動AP,發送中斷使用的是寫ICR寄存器的方式
void
lapicstartap(uchar apicid, uint addr)
{
int i;
ushort *wrv;
// "The BSP must initialize CMOS shutdown code to 0AH
// and the warm reset vector (DWORD based at 40:67) to point at
// the AP startup code prior to the [universal startup algorithm]."
outb(CMOS_PORT, 0xF); // offset 0xF is shutdown code
outb(CMOS_PORT+1, 0x0A);
wrv = (ushort*)P2V((0x40<<4 | 0x67)); // Warm reset vector
wrv[0] = 0;
wrv[1] = addr >> 4;
// "Universal startup algorithm."
// Send INIT (level-triggered) interrupt to reset other CPU.
lapicw(ICRHI, apicid<<24);
lapicw(ICRLO, INIT | LEVEL | ASSERT);
microdelay(200);
lapicw(ICRLO, INIT | LEVEL);
microdelay(100); // should be 10ms, but too slow in Bochs!
// Send startup IPI (twice!) to enter code.
// Regular hardware is supposed to only accept a STARTUP
// when it is in the halted state due to an INIT. So the second
// should be ignored, but it is part of the official Intel algorithm.
// Bochs complains about the second one. Too bad for Bochs.
for(i = 0; i < 2; i++){
lapicw(ICRHI, apicid<<24);
lapicw(ICRLO, STARTUP | (addr>>12));
microdelay(200);
}
}
5. 看一下entryother.S
上面的代碼總共就干了兩件事,1.找到所有的cpu信息、2. 利用IPI
中斷,也就是寫ICR
寄存器來啟動其他的APS。
而啟動APs是要執行boot代碼的。對應的地址是0x7000。也就是entryother.S
的代碼
這個代碼前面干的事情和entry.S
基本一樣的。需要注意的就是下面這兩行
# Switch to the stack allocated by startothers()
movl (start-4), %esp
# Call mpenter()
call *(start-8)
// Other CPUs jump here from entryother.S.
static void
mpenter(void)
{
switchkvm(); //切換到內核頁表
seginit(); // 初始化gdt
lapicinit(); // 初始化apic
mpmain();
}
static void mpmain(void)
{
cprintf("cpu%d: starting %d\n", cpuid(), cpuid());
idtinit(); // 加載GDT
xchg(&(mycpu()->started), 1); // 將started置1表啟動完成了
scheduler(); // 開始調度進程執行程序了
}
可以看到,這里面所做的工作主要還是初始化建立環境,最后 CPU 這個結構體中的元素 started 置 1 表示這個 CPU 已經啟動好了,這里就會通知 startothers 函數,可以啟動下一個 AP 了。最后就是調用 scheduler() 可以開始調度執行程序了。
下面是startothers
中的一段while循環。
// startothers
// wait for cpu to finish mpmain()
while(c->started == 0)
;