Xv6代碼閱讀報告之進程調度


 
 

Xv6代碼閱讀報告-Topic3

@肖劍楠 20111013223

 

 

 

1. 序

Xv6為了實現CPU多進程化需要解決一系列問題。1. 如何在進程間切換?2. 如何讓這一切換變得透明?3. 需要鎖機制來避免競爭。4. 內存、資源的自動釋放。
Xv6通過實現上下文切換(Context Switching),時間中斷處理,鎖,睡眠與喚醒等機制基本解決了上述問題。主要代碼包括swtch.S, defs.h, proc.h, proc.c, mmu.h等文件。
下面按模塊對上述文件逐一分析。

 

2. 上下文切換

 

2.1 defs.h

在一切的一切之前,我們先看一下defs.h的結構體的定義以及函數的聲明。
該文件中集中聲明了一系列函數以及結構體,對於本章節之后需要討論的部分,需要關注struct context, struct proc等結構體。
struct context在proc.h(40-56)中定義。其結構實際上是五個寄存器的值。也就是在上下文切換時,主要做的事情就是保存並更新寄存器值。同時根據慣例,調用者會保存%eax,%ecx,%edx的值。

struct context {
  uint edi;
  uint esi;
  uint ebx;
  uint ebp;
  uint eip;
};

順便將proc和pipe的結構也分析一下。
struct proc 在proc.h(60-75)中定義,通過一個結構體記錄每個進程的狀態。

struct proc {
  uint sz;                     // 進程的內存大小(以byte計)
  pde_t* pgdir;                // 進程頁路徑的線性地址。
  char *kstack;                // 進程的內核棧底
  enum procstate state;        // 進程狀態
  volatile int pid;            // 進程ID
  struct proc *parent;         // 父進程
  struct trapframe *tf;        // 當前系統調用的中斷幀
  struct context *context;     // 進程運行的入口
  int killed;                  // 當非0時,表示已結束
  struct file *ofile[NOFILE];  // 打開的文件列表
  struct inode *cwd;           // 進程當前路徑
  char name[16];               // 進程名稱
};

pipe依賴對結構體spinlock,cpu的定義,見spinlock.h及proc.h(11-24)。
spinlock的作用在於當進程請求得到一個正在被占用的鎖時,將進程處於循環檢查,等待鎖被釋放的狀態。

struct spinlock {
  uint locked;       // 鎖是否處於鎖住狀態

  // For debugging:
  char *name;        // 鎖名稱
  struct cpu *cpu;   // 占有該鎖的CPU信息
  uint pcs[10];      // 占有該鎖的指令棧
};

pipe的結構在pipe.h(12-19)中定義,

struct pipe {
  struct spinlock lock; 
  char data[PIPESIZE];  // 保存pipe的內容,PIPESIZE為512
  uint nread;     // 讀取的byte長度
  uint nwrite;    // 寫入的byte長度
  int readopen;   // 是否正在讀取
  int writeopen;  // 是否正在寫入
};
 

2.2 swtch.S

該文件的作用在於使用匯編代碼實現了swtch函數,

.globl swtch
swtch:
  # 將需要保存的context地址讀取到%esp中,新context地址讀取到%edx中
  # 4(%esp)對應的是需要保存的context
  # 8(%esp)對應的是新的context
  movl 4(%esp), %eax
  movl 8(%esp), %edx

  # 將寄存器中過期的數值壓棧
  pushl %ebp
  pushl %ebx
  pushl %esi
  pushl %edi

  # 交換棧
  movl %esp, (%eax) # 保存需要保存的context地址
  movl %edx, %esp   # 讀取新的context信息

  # 加載新的context信息
  popl %edi
  popl %esi
  popl %ebx
  popl %ebp
  ret
 

3. 進程調度

進程調度的主要函數集中在proc.c中,就讓我們從這個文件開始說起吧。
對於單個CPU來說,scheduler是最主要的函數。當CPU初始化之后,即調用scheduler(),循環從進程隊列中選擇一個進程執行;當進程結束時,將控制權通過swtch()移交給scheduler。

void
scheduler(void)
{
  struct proc *p;

  for(;;){
    // 在每次執行一個進程之前,需要調用sti()函數開啟CPU的中斷
    sti();

    // 遍歷進程表找到一個進程執行
    acquire(&ptable.lock); // 獲取進程表的鎖,避免其他CPU更改進程表
    for(p = ptable.proc; p < &ptable.proc[NPROC]; p++){
      // 如果進程的狀態為不可運行,則略過
      if(p->state != RUNNABLE)
        continue;

      // 切換到選擇的進程,釋放進程表鎖,當進程結束時,再重新獲取
      proc = p;
      switchuvm(p);
      p->state = RUNNING;
      swtch(&cpu->scheduler, proc->context);
      switchkvm();

      // Process is done running for now.
      // It should have changed its p->state before coming back.
      proc = 0;
    }
    release(&ptable.lock);
  }
}

在每次Loop之后,都要及時釋放進程表鎖,這樣可以避免當進程表中暫時沒有可以運行的程序時,進程表會一直被該CPU鎖死,其他CPU便不能訪問。其中一種情況是,當進程等待IO時,不是RUNNABLE的,而CPU處於idle狀態,一直在占有進程表鎖,IO信號無法到達。

sched()切換至CPU context,在切換context之前,進行一系列判斷,以避免出現沖突。

void
sched(void)
{
  int intena;

  // 是否獲取到了進程表鎖
  if(!holding(&ptable.lock))
    panic("sched ptable.lock");
  // 是否執行過pushcli
  if(cpu->ncli != 1)
    panic("sched locks");
  // 執行的程序應該處於結束或者睡眠狀態
  if(proc->state == RUNNING)
    panic("sched running");
  // 判斷中斷是否可以關閉
  if(readeflags()&FL_IF)
    panic("sched interruptible");

  intena = cpu->intena;
  // 上下文切換至scheduler
  swtch(&proc->context, cpu->scheduler);
  cpu->intena = intena;
}

yield()函數將CPU主動讓出一個調度周期(scheduling round),這個函數在xv6的當前版本中,僅在trap()中調用,見trap.c(100)。實際應用在於當一個進程正在使用CPU,同時中斷處於打開狀態,需要查看nlock。

void
yield(void)
{
  // 獲取進程表鎖
  acquire(&ptable.lock);
  // 將進程狀態設為可運行,以便下次遍歷時可以被喚醒
  proc->state = RUNNABLE;
  // 執行sched函數,准備將CPU切換到scheduler context
  sched();
  // 釋放進程表鎖
  release(&ptable.lock);
}

sleep和wakeup是兩個互補的函數,共同作用實現改變進程執行順序,
sleep函數有兩個參數 void *chanstruct spinlock *lk

void
sleep(void *chan, struct spinlock *lk)
{
  if(proc == 0)
    panic("sleep");

  if(lk == 0)
    panic("sleep without lk");

  // 釋放鎖lk
  if(lk != &ptable.lock){  //DOC: sleeplock0
    acquire(&ptable.lock);  //DOC: sleeplock1
    release(lk);
  }

  // 更改狀態為SLEEPING,並切換至CPU context
  proc->chan = chan;
  proc->state = SLEEPING;
  sched();

  // Tidy up.
  proc->chan = 0;

  // 重新獲得剛剛釋放的lk鎖
  if(lk != &ptable.lock){  //DOC: sleeplock2
    release(&ptable.lock);
    acquire(lk);
  }
}

值得注意的是,使進程進入睡眠需要兩個鎖,lk和ptable.lock,由於之前已經得到了ptable.lock,所以wakeup在此期間不會執行,直至進程完全進入睡眠狀態,所以lk這個鎖可以釋放。

wakeup函數的主體部分位於wakeup1函數中。

void
wakeup(void *chan)
{
  // 先獲取ptable.lock,確保sleep不會執行,避免出現missed wakeup
  acquire(&ptable.lock);
  wakeup1(chan);
  // 喚醒結束,釋放ptable.lock
  release(&ptable.lock);
}

wakeup1函數完成了喚醒的主要工作。wakeup1之所以與wakeup作為兩個獨立的函數,是因為除了被wakeup調用之外,還在exit中調用,后面會詳細講到。

static void
wakeup1(void *chan)
{
  struct proc *p;
  // 遍歷進程表,當發現有符合運行條件的程序時,將其標記為RUNNABLE
  for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
    if(p->state == SLEEPING && p->chan == chan)
      p->state = RUNNABLE;
}

wait函數用於父進程等待子進程結束,如果沒有子進程,則返回-1,否則返回已經結束的子進程的pid。

int
wait(void)
{
  struct proc *p;
  int havekids, pid;
  // 獲取進程表鎖
  acquire(&ptable.lock);
  for(;;){
    // 遍歷查找是否有處於zombie狀態的子進程
    havekids = 0;
    for(p = ptable.proc; p < &ptable.proc[NPROC]; p++){
      if(p->parent != proc)
        continue;
      // 如果發現有子進程
      havekids = 1;
      // 如果進程狀態為zombie,則將其釋放並返回該子進程的pid
      if(p->state == ZOMBIE){
        // Found one.
        pid = p->pid;
        kfree(p->kstack);
        p->kstack = 0;
        freevm(p->pgdir);
        p->state = UNUSED;
        p->pid = 0;
        p->parent = 0;
        p->name[0] = 0;
        p->killed = 0;
        release(&ptable.lock);
        return pid;
      }
    }

    // 如果沒有子進程則直接返回
    if(!havekids || proc->killed){
      release(&ptable.lock);
      return -1;
    }

    // 如果有子進程處於睡眠狀態,則將父進程置於睡眠狀態
    sleep(proc, &ptable.lock);  //DOC: wait-sleep
  }
}

其中,當仍有子進程睡眠時,並沒有釋放ptable.lock,是因為釋放操作放在了sleep函數中,且滿足了sleep函數的調用條件,事先獲得ptable.lock。

exit()完成了進程結束時的資源釋放以及子進程處理等工作。其中只進行了一次acquire操作,這樣可以使進程結束的操作原子化;同時可能存在多次的wakeup1操作,這樣減少了很多時間。結束后,沒有主動調用release,是因為sched進行context switching的時候需要獲得ptable.lock,釋放在scheduler中進行。

void
exit(void)
{
  struct proc *p;
  int fd;

  if(proc == initproc)
    panic("init exiting");

  // 關閉之前打開的文件
  for(fd = 0; fd < NOFILE; fd++){
    if(proc->ofile[fd]){
      fileclose(proc->ofile[fd]);
      proc->ofile[fd] = 0;
    }
  }
  iput(proc->cwd);
  proc->cwd = 0;

  acquire(&ptable.lock);

  // 喚醒父進程,一邊父進程將處於zombie狀態的該進程回收
  wakeup1(proc->parent);

  // 將子進程移交給initproc
  for(p = ptable.proc; p < &ptable.proc[NPROC]; p++){
    if(p->parent == proc){
      p->parent = initproc;
      // 如果子進程處於zombie狀態,則喚醒其新父親initproc來料理后事
      if(p->state == ZOMBIE)
        wakeup1(initproc);
    }
  }

  // 移交給scheduler,等待父進程處理
  proc->state = ZOMBIE;
  sched();
  panic("zombie exit");
}
 

4. 管道

xv6中實現管道的結構體pipe已經在前面關於defs.h的分析中提及。此處直接分析pipe.c。

pipealloc實現了pipe的創建,並將pipe關聯到兩個文件上f0, f1。如果創建成功,返回0;否則返回-1。

int
pipealloc(struct file **f0, struct file **f1)
{
  struct pipe *p;

  p = 0;
  *f0 = *f1 = 0;
  // 如果f0,f1不存在則返回-1
  if((*f0 = filealloc()) == 0 || (*f1 = filealloc()) == 0)
    goto bad;
  if((p = (struct pipe*)kalloc()) == 0)
    goto bad;
  // 
  // 初始化pipe
  p->readopen = 1;
  p->writeopen = 1;
  p->nwrite = 0;
  p->nread = 0;
  initlock(&p->lock, "pipe");
  (*f0)->type = FD_PIPE;
  (*f0)->readable = 1;
  (*f0)->writable = 0;
  (*f0)->pipe = p;
  (*f1)->type = FD_PIPE;
  (*f1)->readable = 0;
  (*f1)->writable = 1;
  (*f1)->pipe = p;
  return 0;

 // 如果創建失敗,則將進度回滾,釋放占用的內存、解除對文件的占有
 bad:
  if(p)
    kfree((char*)p);
  if(*f0)
    fileclose(*f0);
  if(*f1)
    fileclose(*f1);
  return -1;
}

pipeclose實現了關閉pipe的處理。

void
pipeclose(struct pipe *p, int writable)
{
  // 獲取管道鎖,避免在關閉的同時進行讀寫操作
  acquire(&p->lock);
  // 判斷是否有未被讀取的數據
  if(writable){
    // 如果存在,則喚醒pipe的讀進程;否則喚醒寫進程
    p->writeopen = 0;
    wakeup(&p->nread);
  } else {
    p->readopen = 0;
    wakeup(&p->nwrite);
  }
  // 當pipe的讀寫都已結束時,釋放資源;否則釋放pipe鎖
  if(p->readopen == 0 && p->writeopen == 0) {
    release(&p->lock);
    kfree((char*)p);
  } else
    release(&p->lock);
}

pipewrite實現了管道的寫操作。

int
pipewrite(struct pipe *p, char *addr, int n)
{
  int i;

  acquire(&p->lock);
  // 逐字節寫入
  for(i = 0; i < n; i++){
    // 如果pipe已經寫滿
    while(p->nwrite == p->nread + PIPESIZE) {  //DOC: pipewrite-full
      // 喚醒讀進程,寫進程進入睡眠,並返回-1
      if(p->readopen == 0 || proc->killed){
        release(&p->lock);
        return -1;
      }
      wakeup(&p->nread);
      sleep(&p->nwrite, &p->lock);  //DOC: pipewrite-sleep
    }
    p->data[p->nwrite++ % PIPESIZE] = addr[i];
  }
  // 寫完之后喚醒讀進程
  wakeup(&p->nread);  //DOC: pipewrite-wakeup1
  release(&p->lock);
  return n;
}

piperead實現了pipe的讀操作。

int
piperead(struct pipe *p, char *addr, int n)
{
  int i;

  acquire(&p->lock);
  // 如果pipe已經讀空,並且正在寫入,則進入睡眠狀態
  while(p->nread == p->nwrite && p->writeopen){  //DOC: pipe-empty
    if(proc->killed){
      release(&p->lock);
      return -1;
    }
    sleep(&p->nread, &p->lock); //DOC: piperead-sleep
  }
  for(i = 0; i < n; i++){  //DOC: piperead-copy
    if(p->nread == p->nwrite)
      break;
    addr[i] = p->data[p->nread++ % PIPESIZE];
  }
  // 讀取完畢,喚醒寫進程
  wakeup(&p->nwrite);  //DOC: piperead-wakeup
  release(&p->lock);
  // 返回讀取的字節長度
  return i;
}
 

5. 進程調度流程

進程切換:當CPU啟動之后,執行scheduler函數,無限循環。在每個周期里,從進程表中找到一個RUNNABLE的進程,切換為進程的上下文,此時開始執行函數。當函數運行結束時,調用return函數,此時切換為CPU的上下文,開始下一循環。
進程喚醒與睡眠:如果一個程序需要等待IO,則CPU會將其設置為睡眠狀態,此時不能被執行。當IO信號到達時,執行的進程會將IO信號對應的進程設置為RUNNABLE,即喚醒。下一個scheduler周期的時候,該進程就可能會被執行,處理IO信號。
進程表鎖:對於多處理器架構而言,需要用到進程表的時候都需要事先獲得表的鎖,當結束之后再釋放,這樣保證了對進程表操作的原子化,可以避免多處理器的競爭問題。

 

6. Pipe實現概述

Pipe的主要部分實際上是一小段規定長度的連續數據存儲,讀寫操作將其視為無限循環長度的內存塊。
初始化時,將給定的文件輸入、輸出流與該結構體關聯;關閉時,釋放內存,解除文件占用。
讀寫操作時,分別需要判斷是否超出讀寫的范圍,避免覆蓋未讀數據或者讀取已讀數據;如果寫操作未執行完,則需通過睡眠喚醒的方式來完成大段數據的讀取。

 

7. 閱讀心得

由於這部分的代碼主要由C代碼實現,因為相對來說比第一次的閱讀任務簡單一些。有兩個難點,一需要了解依賴的各結構體信息,並通過實際看代碼認清每個屬性的作用;二需要將多個函數結合着看,才能理解進程表鎖的管理機制。xv6的實現機制並不復雜,主觀腦洞大開結合着sched.pdf,就比較容易理解。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM