XV6操作系統代碼閱讀心得(三):鎖


鎖是操作系統中實現進程同步的重要機制。

基本概念

臨界區(Critical Section)是指對共享數據進行訪問與操作的代碼區域。所謂共享數據,就是可能有多個代碼執行流並發地執行,並在執行中可能會同時訪問的數據。

同步(Synchronization)是指讓兩個或多個進程/線程能夠按照程序員期望的方式來協調執行的順序。比如,讓A進程必須完成某個操作后,B進程才能執行。互斥(Mutual Exclusion)則是指讓多個線程不能夠同時訪問某些數據,必須要一個進程訪問完后,另一個進程才能訪問。

當多個進程/線程並發地執行並且訪問一塊數據,並且進程/線程的執行結果依賴於它們的執行順序,我們就稱這種情況為競爭狀態(Race Condition)。

Xv6操作系統要求在內核臨界區操作時中斷必須關閉。如果此時中斷開啟,那么可能會出現以下死鎖情況:A進程在內核態運行並拿下了p鎖時,觸發中斷進入中斷處理程序,中斷處理程序也在內核態中請求p鎖,由於鎖在A進程手里,且只有A進程執行時才能釋放p鎖,因此中斷處理程序必須返回,p鎖才能被釋放。那么此時中斷處理程序會永遠拿不到鎖,陷入無限循環,進入死鎖。

Xv6中實現了自旋鎖(Spinlock)用於內核臨界區訪問的同步和互斥。自旋鎖最大的特征是當進程拿不到鎖時會進入無限循環,直到拿到鎖退出循環。顯然,自旋鎖看上去效率很低,我們很容易想到更加高效的基於等待隊列的方法,讓等待進程陷入阻塞而不是無限循環。然而,Xv6允許同時運行多個CPU核,多核CPU上的等待隊列實現相當復雜,因此使用自旋鎖是相對比較簡單且能正確執行的實現方案。

Xv6的Spinlock

Xv6中鎖的定義如下

// Mutual exclusion lock.
struct spinlock {
  uint locked;       // Is the lock held?

  // For debugging:
  char *name;        // Name of lock.
  struct cpu *cpu;   // The cpu holding the lock.
  uint pcs[10];      // The call stack (an array of program counters)
                     // that locked the lock.
};

核心的變量只有一個locked,當locked為1時代表鎖已被占用,反之未被占用,初始值為0。

在調用鎖之前,必須對鎖進行初始化。

void initlock(struct spinlock *lk, char *name) {
  lk->name = name;
  lk->locked = 0;
  lk->cpu = 0;
}

最困難的地方是如何對locked變量進行原子操作占用鎖和釋放鎖。這兩步具體被實現為acquire()release()函數。(注意v7版本和v11版本的實現略有不同,本文使用的是v11版本)

acquire()函數

// Acquire the lock.
// Loops (spins) until the lock is acquired.
// Holding a lock for a long time may cause
// other CPUs to waste time spinning to acquire it.
void acquire(struct spinlock *lk) {
  pushcli(); // disable interrupts to avoid deadlock.
  if(holding(lk))
    panic("acquire");

  // The xchg is atomic.
  while(xchg(&lk->locked, 1) != 0)
    ;

  // Tell the C compiler and the processor to not move loads or stores
  // past this point, to ensure that the critical section's memory
  // references happen after the lock is acquired.
  __sync_synchronize();

  // Record info about lock acquisition for debugging.
  lk->cpu = mycpu();
  getcallerpcs(&lk, lk->pcs);
}

acquire()函數首先禁止了中斷,並且使用專門的pushcli()函數,這個函數保證了如果有兩個acquire()禁止了中斷,那么也必須調用兩次release()中的popcli()后中斷才會被允許。然后,acquire()函數采用xchg指令來實現在設置locked為1的同時獲得其原來的值的操作。這里的C代碼中封裝了一個xchg()函數,在xchg()函數中采用GCC的內聯匯編特性,實現如下

static inline uint xchg(volatile uint *addr, uint newval) {
  uint result;
  // The + in "+m" denotes a read-modify-write operand.
  asm volatile("lock; xchgl %0, %1" :
               "+m" (*addr), "=a" (result) :
               "1" (newval) :
               "cc");
  return result;
}

其中,volatile標志用於避免gcc對其進行一些優化;第一個冒號后的"+m" (*addr), "=a" (result)是這個匯編指令的兩個輸出值;newval是這個匯編指令的輸入值。假設newval位於eax寄存器中,addr位於rax寄存器中,那么gcc會翻譯得到如下匯編指令

 lock; xchgl (%rdx), %eax

由於xchg函數是inline的,它會被直接嵌入調用xchg函數的代碼中,使用的寄存器可能會有所不同。

下面我們來分析一下上面的指令的語義。·lock是一個指令前綴,它保證了這條指令對總線和緩存的獨占權,也就是這條指令的執行過程中不會有其他CPU或同CPU內的指令訪問緩存和內存。由於現代CPU一般是多發射流水線+亂序執行的,因此一般情況下並不能保證這一點。xchgl指令是一條古老的x86指令,作用是交換兩個寄存器或者內存地址里的4字節值,兩個值不能都是內存地址,他不會設置條件碼。

那么,仔細思考一下就能發現,以上一條xchg指令就同時做到了交換locked和1的值,並且在之后通過檢查eax寄存器就能知道locked的值是否為0。並且,以上操作是原子的,這就保證了有且只有一個進程能夠拿到locked的0值並且進入臨界區。

最后,acquire()函數使用__sync_synchronize為了避免編譯器對這段代碼進行指令順序調整的話和避免CPU在這塊代碼采用亂序執行的優化。

release()函數

// Release the lock.
void release(struct spinlock *lk) {
  if(!holding(lk))
    panic("release");

  lk->pcs[0] = 0;
  lk->cpu = 0;

  // Tell the C compiler and the processor to not move loads or stores
  // past this point, to ensure that all the stores in the critical
  // section are visible to other cores before the lock is released.
  // Both the C compiler and the hardware may re-order loads and
  // stores; __sync_synchronize() tells them both not to.
  __sync_synchronize();

  // Release the lock, equivalent to lk->locked = 0.
  // This code can't use a C assignment, since it might
  // not be atomic. A real OS would use C atomics here.
  asm volatile("movl $0, %0" : "+m" (lk->locked) : );

  popcli();
}

release函數為了保證設置locked為0的操作的原子性,同樣使用了內聯匯編。最后,使用popcli()來允許中斷(或者彈出一個cli,但因為其他鎖未釋放使得中斷依然被禁止)。

在Xv6中實現信號量

struct semaphore {
  int value;
  struct spinlock lock;
  struct proc *queue[NPROC];
  int end;
  int start;
};

void sem_init(struct semaphore *s, int value) {
  s->value = value;
  initlock(&s->lock, "semaphore_lock");
  end = start = 0;
}

void sem_wait(struct semaphore *s) {
  acquire(&s->lock);
  s->value--;
  if (s->value < 0) {
    s->queue[s->end] = myproc();
    s->end = (s->end + 1) % NPROC;
    sleep(myproc(), &s->lock)
  }
  release(&s->lock);
}

void sem_signal(struct semaphore *s) {
  acquire(&s->lock);
  s->value++;
  if (s->value <= 0) {
    wakeup(s->queue[s->start]);
    s->queue[s->start] = 0;
    s->start = (s->start + 1) % NPROC;
  }
  release(&s->lock);
}

上面的代碼使用Xv6提供的接口實現了信號量,格式和命名與POSIX標准類似。這個信號量的實現采用等待隊列的方式。當一個進程因信號量陷入阻塞時,會將自己放進等待隊列並睡眠(18-22行)。當一個進程釋放信號量時,會從等待隊列中取出一個進程繼續執行(29-33行)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM