概述
這次實驗主要涉及鎖在內核的應用,沒有用到什么特別的理論知識,但是編程的時候陷阱重重,要么資源競爭,要么死鎖,和實驗三差不多,非常考驗耐心和細心。
內容
Memory allocator
這個任務要求給物理內存分配程序重新設計鎖,使得等待鎖時的阻塞盡量少。可以按CPU的數量將空閑內存分組,分配內存的時候優先從當前所用CPU所管理的空閑內存中分配,如果沒有則從其他CPU的空閑內存中獲取,這樣就可以把原來的鎖拆開,每個CPU各自處理自己的空閑內存時只要鎖上自己的鎖就行了:
void
kinit()
{
for (int i = 0; i < NCPU; i++) initlock(&kmem[i].lock, "kmem");
freerange(end, (void*)PHYSTOP);
}
void
kfree(void *pa)
{
struct run *r;
if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");
// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);
r = (struct run*)pa;
int cpu_id; push_off(); cpu_id = cpuid(); pop_off();
acquire(&kmem[cpu_id].lock);
r->next = kmem[cpu_id].freelist;
kmem[cpu_id].freelist = r;
release(&kmem[cpu_id].lock);
}
由於kinit這個函數不是並行的,所以一開始會將所有空閑內存都交給一個CPU管理。
void *
kalloc(void)
{
struct run *r;
int cpu_id; push_off(); cpu_id = cpuid(); pop_off();
acquire(&kmem[cpu_id].lock);
r = kmem[cpu_id].freelist;
if(r) {
kmem[cpu_id].freelist = r->next;
release(&kmem[cpu_id].lock);
} else {
release(&kmem[cpu_id].lock);
for (int i = 0; i < NCPU; i++) if (i != cpu_id) {
acquire(&kmem[i].lock);
r = kmem[i].freelist;
if (r) {
kmem[i].freelist = r->next;
release(&kmem[i].lock);
break;
} else release(&kmem[i].lock);
}
}
......
Buffer cache
這個任務要求給硬盤緩存分配程序重新設計鎖,使得等待鎖時的阻塞盡量少。但是,因為硬盤緩存包含遍歷查找操作,即查找當前硬盤塊是否已被緩存,顯然這時就不能把緩存也按CPU進行分配,加上這個任務的操作也比較復雜,因此比上個任務多了很多問題。
實驗文檔給出的分配方式是對硬盤塊號進行取模哈希,數據結構如下:
struct {
struct spinlock lock;
struct spinlock block[BNUM];
struct buf buf[NBUF];
// Linked list of all buffers, through prev/next.
// Sorted by how recently the buffer was used.
// head.next is most recent, head.prev is least.
struct buf head[BNUM];
} bcache;
這里保留了原來的鎖是因為在不要求修改的bpin、bunpin函數中使用了,但要小心的是,既然用到了新定義的鎖,那么在整個實驗中所有相關的代碼都必須用新定義的鎖,不能用原來的鎖。
void
binit(void) {
struct buf *b;
initlock(&bcache.lock, "bcache");
// Create linked list of buffers
for (int i = 0; i < BNUM; i++) {
initlock(bcache.block + i, "bcache");
bcache.head[i].prev = &bcache.head[i];
bcache.head[i].next = &bcache.head[i];
}
for(b = bcache.buf; b < bcache.buf+NBUF; b++){
b->next = bcache.head[0].next;
b->prev = &bcache.head[0];
initsleeplock(&b->lock, "buffer");
bcache.head[0].next->prev = b;
bcache.head[0].next = b;
}
}
因為一開始所有的緩存對應硬盤塊號都是0,所以把它們都放到0號桶里。
void
brelse(struct buf *b)
{
if(!holdingsleep(&b->lock))
panic("brelse");
releasesleep(&b->lock);
int entry = b->blockno % BNUM;
acquire(bcache.block + entry);
b->refcnt--;
if (b->refcnt == 0) b->ticks = ticks;
release(bcache.block + entry);
}
釋放緩存時只要獲取塊對應的桶對應的鎖即可,由於原來的代碼在釋放緩存時會將緩存插在head的下一個節點,按照原來程序的思路head的下一個節點是目前最近使用的緩存,所以把查找最久未使用緩存的方式改成了查找最前時間戳后,這里也應該更新時間戳。
最麻煩的是bget,這里拆成兩部分,第一部分是待查找磁盤塊已被緩存的情況:
int entry = blockno % BNUM;
acquire(bcache.block + entry);
// Is the block already cached?
for(b = bcache.head[entry].next; b != &bcache.head[entry]; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(bcache.block + entry);
acquiresleep(&b->lock);
return b;
}
}
也是只要獲取待查找塊對應的桶對應的鎖即可,但要注意一個問題,這個鎖必須一直持有到整個函數運行結束,不能在中間釋放了再重新獲取,也就是說:
獲得鎖
查找緩存但沒找到
釋放鎖
可以並行的其他操作
獲得鎖
更新一個可用的緩存
釋放鎖
不等價於
獲得鎖
查找緩存但沒找到
可以並行的其他操作
更新一個可用的緩存
釋放鎖
而且前者是錯的,后者是錯的。理由是如果中間釋放了鎖,當前進程可能會讓出控制權執行別的進程,那么就會出現一個問題,比如A進程查找1號緩存,查不到,釋放了鎖,程序轉到B進程,它也查找1號緩存,查不到,釋放了鎖而剛好B立刻又獲得了鎖,繼續往下更新了一個可用的緩存,釋放鎖,這是A獲得鎖,繼續往下更新了一個可用的緩存,釋放鎖。現在緩存中就有兩個編號完全相同且引用數都為1的緩存了,這個是不允許的行為,可能會出現各種問題,而且這些問題的發生全靠運氣,有時不出錯,有時這個panic,有時那個panic,非常難調。
第二部分是待查找磁盤塊未被緩存的情況:
for (int i = (entry + 1) % BNUM; i != entry; i = (i + 1) % BNUM) {
uint minticks = 0x3fffffff; struct buf *minbuf = 0;
acquire(bcache.block + i);
for(b = bcache.head[i].prev; b != &bcache.head[i]; b = b->prev)
if (b->refcnt == 0 && b->ticks < minticks) {
minticks = b->ticks; minbuf = b;
}
if (minbuf != 0) {
minbuf->dev = dev;
minbuf->blockno = blockno;
minbuf->valid = 0;
minbuf->refcnt = 1;
minbuf->next->prev = minbuf->prev;
minbuf->prev->next = minbuf->next;
minbuf->next = bcache.head[entry].next;
minbuf->prev = &bcache.head[entry];
bcache.head[entry].next->prev = minbuf;
bcache.head[entry].next = minbuf;
release(bcache.block + i);
release(bcache.block + entry);
acquiresleep(&minbuf->lock);
return minbuf;
}
release(bcache.block + i);
}
panic("bget: no buffers");
}
一開始我尋找可更新緩存的辦法是直接遍歷整個數組,為了防止競爭,需要在遍歷前把所有的桶鎖起來,然而這樣會發生死鎖,即假設處理0號桶的進程運行到這里,把0號桶鎖了,准備獲取1號桶的鎖,與此同時處理1號桶的進程運行到這里,把1號桶鎖了,准備獲取0號桶的鎖,這樣就死鎖了。仔細分析原因,發現只要鎖桶的順序是亂序的,都可能發生死鎖,這里的解決方法是使用“資源有序分配法”,就如上面的循環,從當前桶的下一個桶往上遍歷到當前桶的前一個桶(循環遍歷),保證了順序,就不會死鎖了。另外一個需要小心的是鏈表的操作,雙向鏈表確實很容易寫錯,需要謹慎。
總結一下,這個實驗和上一個實驗相比,感覺更考驗並行思維,主要體現在鎖的應用,上個實驗的后兩個任務和這個實驗比真的是小巫見大巫了,可能設計實驗的老師主要還是想讓學生熟悉一下pthread才弄那兩個任務,畢竟pthread太常用了。目前覺得系統編程最難的就是四個問題:缺頁錯誤(這里指的是編程邏輯的錯誤導致的錯誤)、內存泄漏、資源競爭、死鎖,都是極為難以檢查難以調試的。