MIT-6.S081-2020實驗(xv6-riscv64)八:lock


實驗文檔

概述

這次實驗主要涉及鎖在內核的應用,沒有用到什么特別的理論知識,但是編程的時候陷阱重重,要么資源競爭,要么死鎖,和實驗三差不多,非常考驗耐心和細心。

內容

Memory allocator

這個任務要求給物理內存分配程序重新設計鎖,使得等待鎖時的阻塞盡量少。可以按CPU的數量將空閑內存分組,分配內存的時候優先從當前所用CPU所管理的空閑內存中分配,如果沒有則從其他CPU的空閑內存中獲取,這樣就可以把原來的鎖拆開,每個CPU各自處理自己的空閑內存時只要鎖上自己的鎖就行了:

void
kinit()
{
  for (int i = 0; i < NCPU; i++) initlock(&kmem[i].lock, "kmem");
  freerange(end, (void*)PHYSTOP);
}
void
kfree(void *pa)
{
  struct run *r;

  if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
    panic("kfree");

  // Fill with junk to catch dangling refs.
  memset(pa, 1, PGSIZE);

  r = (struct run*)pa;

  int cpu_id; push_off(); cpu_id = cpuid(); pop_off();
  acquire(&kmem[cpu_id].lock);
  r->next = kmem[cpu_id].freelist;
  kmem[cpu_id].freelist = r;
  release(&kmem[cpu_id].lock);
}

由於kinit這個函數不是並行的,所以一開始會將所有空閑內存都交給一個CPU管理。

void *
kalloc(void)
{
  struct run *r;

  int cpu_id; push_off(); cpu_id = cpuid(); pop_off();
  acquire(&kmem[cpu_id].lock);
  r = kmem[cpu_id].freelist;
  if(r) {
      kmem[cpu_id].freelist = r->next;
      release(&kmem[cpu_id].lock);
  } else {
      release(&kmem[cpu_id].lock);
      for (int i = 0; i < NCPU; i++) if (i != cpu_id) {
          acquire(&kmem[i].lock);
          r = kmem[i].freelist;
          if (r) {
              kmem[i].freelist = r->next;
              release(&kmem[i].lock);
              break;
          } else release(&kmem[i].lock);
      }
  }
  ......

Buffer cache

這個任務要求給硬盤緩存分配程序重新設計鎖,使得等待鎖時的阻塞盡量少。但是,因為硬盤緩存包含遍歷查找操作,即查找當前硬盤塊是否已被緩存,顯然這時就不能把緩存也按CPU進行分配,加上這個任務的操作也比較復雜,因此比上個任務多了很多問題。

實驗文檔給出的分配方式是對硬盤塊號進行取模哈希,數據結構如下:

struct {
  struct spinlock lock;
  struct spinlock block[BNUM];
  struct buf buf[NBUF];

  // Linked list of all buffers, through prev/next.
  // Sorted by how recently the buffer was used.
  // head.next is most recent, head.prev is least.
  struct buf head[BNUM];
} bcache;

這里保留了原來的鎖是因為在不要求修改的bpin、bunpin函數中使用了,但要小心的是,既然用到了新定義的鎖,那么在整個實驗中所有相關的代碼都必須用新定義的鎖,不能用原來的鎖。

void
binit(void)                                                                     {
  struct buf *b;

  initlock(&bcache.lock, "bcache");

  // Create linked list of buffers
  for (int i = 0; i < BNUM; i++) {
      initlock(bcache.block + i, "bcache");
      bcache.head[i].prev = &bcache.head[i];
      bcache.head[i].next = &bcache.head[i];
  }
  for(b = bcache.buf; b < bcache.buf+NBUF; b++){
    b->next = bcache.head[0].next;
    b->prev = &bcache.head[0];
    initsleeplock(&b->lock, "buffer");
    bcache.head[0].next->prev = b;
    bcache.head[0].next = b;
  }
}

因為一開始所有的緩存對應硬盤塊號都是0,所以把它們都放到0號桶里。

void
brelse(struct buf *b)
{
  if(!holdingsleep(&b->lock))
    panic("brelse");

  releasesleep(&b->lock);

  int entry = b->blockno % BNUM;
  acquire(bcache.block + entry);
  b->refcnt--;
  if (b->refcnt == 0) b->ticks = ticks;
  release(bcache.block + entry);
}

釋放緩存時只要獲取塊對應的桶對應的鎖即可,由於原來的代碼在釋放緩存時會將緩存插在head的下一個節點,按照原來程序的思路head的下一個節點是目前最近使用的緩存,所以把查找最久未使用緩存的方式改成了查找最前時間戳后,這里也應該更新時間戳。

最麻煩的是bget,這里拆成兩部分,第一部分是待查找磁盤塊已被緩存的情況:

  int entry = blockno % BNUM;
  acquire(bcache.block + entry);

  // Is the block already cached?
  for(b = bcache.head[entry].next; b != &bcache.head[entry]; b = b->next){
    if(b->dev == dev && b->blockno == blockno){
      b->refcnt++;
      release(bcache.block + entry);
      acquiresleep(&b->lock);
      return b;
    }
  }

也是只要獲取待查找塊對應的桶對應的鎖即可,但要注意一個問題,這個鎖必須一直持有到整個函數運行結束,不能在中間釋放了再重新獲取,也就是說:

獲得鎖
查找緩存但沒找到
釋放鎖
可以並行的其他操作
獲得鎖
更新一個可用的緩存
釋放鎖

不等價於

獲得鎖
查找緩存但沒找到
可以並行的其他操作
更新一個可用的緩存
釋放鎖

而且前者是錯的,后者是錯的。理由是如果中間釋放了鎖,當前進程可能會讓出控制權執行別的進程,那么就會出現一個問題,比如A進程查找1號緩存,查不到,釋放了鎖,程序轉到B進程,它也查找1號緩存,查不到,釋放了鎖而剛好B立刻又獲得了鎖,繼續往下更新了一個可用的緩存,釋放鎖,這是A獲得鎖,繼續往下更新了一個可用的緩存,釋放鎖。現在緩存中就有兩個編號完全相同且引用數都為1的緩存了,這個是不允許的行為,可能會出現各種問題,而且這些問題的發生全靠運氣,有時不出錯,有時這個panic,有時那個panic,非常難調。

第二部分是待查找磁盤塊未被緩存的情況:

  for (int i = (entry + 1) % BNUM; i != entry; i = (i + 1) % BNUM) {
      uint minticks = 0x3fffffff; struct buf *minbuf = 0;
      acquire(bcache.block + i);
      for(b = bcache.head[i].prev; b != &bcache.head[i]; b = b->prev)
          if (b->refcnt == 0 && b->ticks < minticks) {
              minticks = b->ticks; minbuf = b;
          }
      if (minbuf != 0) {
          minbuf->dev = dev;
          minbuf->blockno = blockno;
          minbuf->valid = 0;
          minbuf->refcnt = 1;
          minbuf->next->prev = minbuf->prev;
          minbuf->prev->next = minbuf->next;
          minbuf->next = bcache.head[entry].next;
          minbuf->prev = &bcache.head[entry];
          bcache.head[entry].next->prev = minbuf;
          bcache.head[entry].next = minbuf;
          release(bcache.block + i);
          release(bcache.block + entry);
          acquiresleep(&minbuf->lock);
          return minbuf;
      }
       release(bcache.block + i);
  }
  panic("bget: no buffers");
}

一開始我尋找可更新緩存的辦法是直接遍歷整個數組,為了防止競爭,需要在遍歷前把所有的桶鎖起來,然而這樣會發生死鎖,即假設處理0號桶的進程運行到這里,把0號桶鎖了,准備獲取1號桶的鎖,與此同時處理1號桶的進程運行到這里,把1號桶鎖了,准備獲取0號桶的鎖,這樣就死鎖了。仔細分析原因,發現只要鎖桶的順序是亂序的,都可能發生死鎖,這里的解決方法是使用“資源有序分配法”,就如上面的循環,從當前桶的下一個桶往上遍歷到當前桶的前一個桶(循環遍歷),保證了順序,就不會死鎖了。另外一個需要小心的是鏈表的操作,雙向鏈表確實很容易寫錯,需要謹慎。


總結一下,這個實驗和上一個實驗相比,感覺更考驗並行思維,主要體現在鎖的應用,上個實驗的后兩個任務和這個實驗比真的是小巫見大巫了,可能設計實驗的老師主要還是想讓學生熟悉一下pthread才弄那兩個任務,畢竟pthread太常用了。目前覺得系統編程最難的就是四個問題:缺頁錯誤(這里指的是編程邏輯的錯誤導致的錯誤)、內存泄漏、資源競爭、死鎖,都是極為難以檢查難以調試的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM