操作系統實驗報告-信號量的實現和應用


實驗內容

在Linux-0.11中實現信號量,並編寫生產者-消費者程序進行檢驗。

實驗步驟

添加信號量結構體與相應的系統調用函數

在include/unistd.h中添加代碼:

#define SEM_NAME_LEN 32                /* 信號量名稱最大長度 */
typedef struct sem_t{
    char name[SEM_NAME_LEN];        /* 信號量名稱 */
    unsigned int value;                /* 信號量的值 */
    struct task_struct *s_wait;        /* 等待信號量的進程的pcb指針 */
    struct sem_t *next;                /* 用於連接信號量形成鏈表 */
}sem_t;

sem_t *sem_open(const char *name, unsigned int value);    /* 打開或新建信號量 *//
int sem_wait(sem_t *sem);            /* 等待信號量至其值大於0,將其值減1;對應P原語 */
int sem_post(sem_t *sem);            /* 喚醒在信號量上等待的進程,將信號量值加1;對應V原語 */
int sem_unlink(const char *name);    /* 銷毀信號量 */

接下來將上面定義的4個函數添加為系統調用(步驟同操作系統實驗報告-系統調用),添加kernel/sem.c實現它們:

#include <linux/kernel.h>
#include <asm/system.h>
#include <linux/sched.h>
#include <asm/segment.h>
#include <unistd.h>

sem_t *sem_head = &((sem_t *){"\0", 0, NULL, NULL});    /* 鏈表頭結點,方便統一操作 */

/* 將用戶態中的ustr復制到內核態的kstr */
static inline int str_u2k(const char *ustr, char *kstr, unsigned int length)
{
    char c;
    int i;

    for(i=0; (c=get_fs_byte(ustr++))!='\0' && i<length; i++)
        *(kstr+i)=c;
    *(kstr+i)='\0';

    return i;
}


sem_t *sys_sem_open(const char *name, unsigned int value)
{
    sem_t *sem_cur, *sem_pre;
    char pname[SEM_NAME_LEN];

    /* 將用戶態參數name指向的信號量名稱拷貝到內核態指針pname中 */
    str_u2k(name, pname, SEM_NAME_LEN);

    /* 遍歷鏈表,檢驗信號量是否已存在 */
    for(sem_pre=sem_head, sem_cur=sem_head->next; sem_cur && strcmp(pname, sem_cur->name);
            sem_pre=sem_cur, sem_cur=sem_cur->next);

    /* sem_cur為空,表明信號量不存在,分配一塊內存新建一個信號量 */
    if(!sem_cur)
    {
        printk("semaphore %s no found. created a new one. \n", pname);
        sem_cur = (sem_t *)malloc(sizeof(sem_t));
        strcpy(sem_cur->name, pname);
        sem_cur->value = value;
        sem_cur->next = NULL;
        sem_pre->next = sem_cur;
    }
    printk("pid %d opens semaphore %s(value %u) OK. \n", current->pid, pname, sem_cur->value);
    return sem_cur;
}

int sys_sem_wait(sem_t *sem)
{
    cli();    /* 關閉中斷 */
    /* 進程等待直到信號量的值大於0 */
    while(sem->value<=0)
        sleep_on(&(sem->s_wait));
    sem->value--;
    sti();    /* 開啟中斷 */
    return 0;
}

int sys_sem_post(sem_t *sem)
{
    sem->value++;
    /* 喚醒在信號量上等待的進程 */
    if(sem->s_wait)
    {
        wake_up(&(sem->s_wait));
        return 0;
    }
    return -1;
}

int sys_sem_unlink(const char *name)
{
    sem_t *sem_cur, *sem_pre;
    char pname[SEM_NAME_LEN];
    int i;

    str_u2k(name, pname, SEM_NAME_LEN);

    for(sem_pre=sem_head, sem_cur=sem_head->next; sem_cur && strcmp(pname, sem_cur->name);
            sem_pre=sem_cur, sem_cur=sem_cur->next);

    /* 找不到則返回錯誤代碼-1 */
    if(!sem_cur)
        return -1;

    /* 找到了將其從鏈表中移除,並釋放空間 */
    sem_pre->next = sem_cur->next;
    free(sem_cur);
    printk("unlink semaphore %s OK. \n", pname);
    return 0;
}

其中sys_sem_wait()和sys_sem_post()參考自kernel/blk_drv/ll_rw_blk.c:

static inline void lock_buffer(struct buffer_head * bh)
{
    cli();
    while (bh->b_lock)
        sleep_on(&bh->b_wait);
    bh->b_lock=1;
    sti();
}

static inline void unlock_buffer(struct buffer_head * bh)
{
    if (!bh->b_lock)
        printk("ll_rw_block.c: buffer not locked\n\r");
    bh->b_lock = 0;
    wake_up(&bh->b_wait);
}

其中的sleep_on()為在kernel/sched.c中實現的函數:

void sleep_on(struct task_struct **p)
{
    /* 參數p指向原等待進程pcb */

    struct task_struct *tmp;

    if (!p)
        return;
    if (current == &(init_task.task))
        panic("task[0] trying to sleep");
    tmp = *p;        /* 本地指針tmp指向原等待進程 */
    *p = current;    /* 參數p指向當前進程,使其成為下一次調用此方法的等待進程 */
    current->state = TASK_UNINTERRUPTIBLE;    /* 休眠進程 */
    schedule();        /* 執行調度 */
    /* 由於是不可中斷睡眠,不會自動就緒,只能通過調用wake_up()來喚醒。
     * 如果調度后又回到這里,說明是信號量的值已經大於0了,於是就調用了wake_up()將此進程喚醒
     */
    if (tmp)    /* 將原等待進程也喚醒 */
        tmp->state=0;
    /* 等到原等待進程拿到CPU進入運行狀態,
     * 它也會將它以前調用此函數時產生的另一個本地指針tmp指向的等待進程喚醒。
     * 就這樣遞歸喚醒,就好像遍歷喚醒了一條等待的進程隊列
     */
}

下面是《Linux內核完全注釋》里面的一張圖,形象地描述了此函數中的指針變化:

 

wake_up()也是在kernel/sched.c中實現,是一個簡單的喚醒判斷:

void wake_up(struct task_struct **p)
{
    if (p && *p) {
        (**p).state=0;
        *p=NULL;
    }
}

編寫生產者-消費者檢驗程序

生產者-消費者問題

生產者-消費者問題是互斥的一個經典例子,下面是實驗指導書給出的功能要求:

  1. 建立一個生產者進程,N個消費者進程(N>1);
  2. 用文件建立一個共享緩沖區;
  3. 生產者進程依次向緩沖區寫入整數0,1,2,...,M,M>0;
  4. 消費者進程從緩沖區讀數,每次讀一個,並將讀出的數字從緩沖區刪除,然后將本進程ID和數字輸出到標准輸出;
  5. 緩沖區同時最多只能保存10個數。

為了增加可讀性,我以句子的形式輸出信息。

生產者-消費者問題的解決算法的偽代碼描述:

Producer()
{
    生產一個產品item;
    P(Empty);
    P(Mutex);
    將item放到空閑緩存中;
    V(Mutex);
    V(Full);
}

Consumer()
{
    P(Full);  
    P(Mutex);  
    從緩存區取出一個賦值給item;
    V(Mutex);
    V(Empty);
    消費產品item;
} 

新建pc.c文件,編寫測試程序:

#define __LIBRARY__
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>

_syscall2(sem_t *,sem_open,const char *,name,unsigned int,value)
_syscall1(int,sem_wait,sem_t *,sem)
_syscall1(int,sem_post,sem_t *,sem)
_syscall1(int,sem_unlink,const char *,name)

const char *FILENAME = "/usr/root/buffer_file";    /* 消費生產的產品存放的緩沖文件的路徑 */
const int NR_CONSUMERS = 5;                        /* 消費者的數量 */
const int NR_ITEMS = 50;                        /* 產品的最大量 */
const int BUFFER_SIZE = 10;                        /* 緩沖區大小,表示可同時存在的產品數量 */
sem_t *metux, *full, *empty;                    /* 3個信號量 */
unsigned int item_pro, item_used;                /* 剛生產的產品號;剛消費的產品號 */
int fi, fo;                                        /* 供生產者寫入或消費者讀取的緩沖文件的句柄 */


int main(int argc, char *argv[])
{
    char *filename;
    int pid;
    int i;

    filename = argc > 1 ? argv[1] : FILENAME;
    /* O_TRUNC 表示:當文件以只讀或只寫打開時,若文件存在,則將其長度截為0(即清空文件)
     * 0222 和 0444 分別表示文件只寫和只讀(前面的0是八進制標識)
     */
    fi = open(filename, O_CREAT| O_TRUNC| O_WRONLY, 0222);    /* 以只寫方式打開文件給生產者寫入產品編號 */
    fo = open(filename, O_TRUNC| O_RDONLY, 0444);            /* 以只讀方式打開文件給消費者讀出產品編號 */

    metux = sem_open("METUX", 1);    /* 互斥信號量,防止生產消費同時進行 */
    full = sem_open("FULL", 0);        /* 產品剩余信號量,大於0則可消費 */
    empty = sem_open("EMPTY", BUFFER_SIZE);    /* 空信號量,它與產品剩余信號量此消彼長,大於0時生產者才能繼續生產 */

    item_pro = 0;

    if ((pid = fork()))    /* 父進程用來執行消費者動作 */
    {
        printf("pid %d:\tproducer created....\n", pid);
        /* printf()輸出的信息會先保存到輸出緩沖區,並沒有馬上輸出到標准輸出(通常為終端控制台)。
         * 為避免偶然因素的影響,我們每次printf()都調用一下stdio.h中的fflush(stdout)
         * 來確保將輸出立刻輸出到標准輸出。
         */
        fflush(stdout);

        while (item_pro <= NR_ITEMS)    /* 生產完所需產品 */
        {
            sem_wait(empty);
            sem_wait(metux);

            /* 生產完一輪產品(文件緩沖區只能容納BUFFER_SIZE個產品編號)后
             * 將緩沖文件的位置指針重新定位到文件首部。
             */
            if(!(item_pro % BUFFER_SIZE))
                lseek(fi, 0, 0);

            write(fi, (char *) &item_pro, sizeof(item_pro));        /* 寫入產品編號 */
            printf("pid %d:\tproduces item %d\n", pid, item_pro);
            fflush(stdout);
            item_pro++;

            sem_post(full);        /* 喚醒消費者進程 */
            sem_post(metux);
        }
    }
    else    /* 子進程來創建消費者 */
    {
        i = NR_CONSUMERS;
        while(i--)
        {
            if(!(pid=fork()))    /* 創建i個消費者進程 */
            {
                pid = getpid();
                printf("pid %d:\tconsumer %d created....\n", pid, NR_CONSUMERS-i);
                fflush(stdout);

                while(1)
                {
                    sem_wait(full);
                    sem_wait(metux);

                    /* read()讀到文件末尾時返回0,將文件的位置指針重新定位到文件首部 */
                    if(!read(fo, (char *)&item_used, sizeof(item_used)))
                    {
                        lseek(fo, 0, 0);
                        read(fo, (char *)&item_used, sizeof(item_used));
                    }

                    printf("pid %d:\tconsumer %d consumes item %d\n", pid, NR_CONSUMERS-i+1, item_used);
                    fflush(stdout);

                    sem_post(empty);    /* 喚醒生產者進程 */
                    sem_post(metux);

                    if(item_used == NR_ITEMS)    /* 如果已經消費完最后一個商品,則結束 */
                        goto OK;
                }
            }
        }
    }
OK:
    close(fi);
    close(fo);
    return 0;
}

我們先將虛擬硬盤掛載,將文件pc.c拷貝到虛擬硬盤下:

cd workspace/oslab/
sudo ./mount-hdc
cp pc.c hdc/usr/root/

編譯運行linux-0.11:

cd linux-0.11
make
../run

在linux-0.11中,編譯運行pc.c:

gcc -o pc pc.c
./pc > sem_output    # 這里我將輸出重定向到文件sem_output,因為輸出的內容比較多,而linux-0.11終端不能滾屏,
              # 而且輸出內容多了還會顯示錯亂(可以用Ctrl+L刷新屏幕),不能復制終端輸出的內容

一定要記得把修改的數據寫入磁盤:

sync

關閉linux-0.11,掛載虛擬磁盤,查看我們的文件(當然也可以在linux-0.11中直接查看,只是顯示內容多時會錯亂,需要反復按Ctrl+L刷新):

cd ..
sudo ./mount-hdc
sudo less hdc/usr/root/sem_output

得到輸出:

pid 20: producer created....
pid 20: produces item 0
pid 20: produces item 1
.......
pid 20: produces item 8
pid 20: produces item 9
pid 24: consumer 5 created....
pid 24: consumer 5 consumes item 0
pid 24: consumer 5 consumes item 1
pid 24: consumer 5 consumes item 2
......
pid 24: consumer 5 consumes item 7
pid 24: consumer 5 consumes item 8
pid 24: consumer 5 consumes item 9
pid 23: consumer 4 created....
......
pid 20: produces item 47
pid 20: produces item 48
pid 20: produces item 49
pid 21: consumer 2 consumes item 40
pid 21: consumer 2 consumes item 41
......
pid 21: consumer 2 consumes item 48
pid 21: consumer 2 consumes item 49
pid 20: produces item 50
pid 22: consumer 3 consumes item 50

可以看到得出正確結果。

再看一下緩沖文件:

sudo cat  hdc/usr/root/buffer_file
2^@^@^@)^@^@^@*^@^@^@+^@^@^@,^@^@^@-^@^@^@.^@^@^@/^@^@^@0^@^@^@1^@^@^@

它是一個數據文件,我們把它轉成十六進制輸出到終端:

sudo xxd hdc/usr/root/buffer_file
00000000: 3200 0000 2900 0000 2a00 0000 2b00 0000  2...)...*...+...
00000010: 2c00 0000 2d00 0000 2e00 0000 2f00 0000  ,...-......./...
00000020: 3000 0000 3100 0000                      0...1...

8個十六進制位 = 32個二進制位 = 4 byte = sizeof(unsigned int),所以上面翻譯為十進制則是:

00000000: 50 41 42 43  2...)...*...+...
00000010: 44 45 46 47  ,...-......./...
00000020: 48 49        0...1...

50是最后一輪的產品編號,覆蓋掉了上一輪的40,也是正確的。

思考

1. 在pc.c中去掉所有與信號量有關的代碼,再運行程序,執行效果有變化嗎?為什么會這樣?

刪除所有sem_*()調用,在linux-0.11中編譯運行得到的輸出為:

pid 32: producer created....
pid 32: produces item 0
pid 32: produces item 1
pid 32: produces item 2
pid 32: produces item 3
......
pid 32: produces item 49
pid 32: produces item 50
pid 38: consumer 5 created....
pid 38: consumer 5 consumes item 50
pid 37: consumer 4 created....
pid 37: consumer 4 consumes item 41
pid 37: consumer 4 consumes item 42
......
pid 37: consumer 4 consumes item 49
pid 37: consumer 4 consumes item 50
pid 36: consumer 3 created....
pid 36: consumer 3 consumes item 41
pid 36: consumer 3 consumes item 42
......
pid 36: consumer 3 consumes item 49
pid 36: consumer 3 consumes item 50
pid 35: consumer 2 created....
pid 35: consumer 2 consumes item 41
pid 35: consumer 2 consumes item 42
.......
pid 35: consumer 2 consumes item 49
pid 35: consumer 2 consumes item 50
pid 34: consumer 1 created....
pid 34: consumer 1 consumes item 41
pid 34: consumer 1 consumes item 42
......
pid 34: consumer 1 consumes item 49
pid 34: consumer 1 consumes item 50

生產者進程生產完所有的商品,消費者才開始消費商品,並且都只能消費緩存區中的最終10件商品(從輪到它們時的文件位置指針開始直到消費了第50號商品)。這是因為沒有信號量的約束,生產者不知道緩存區已經滿了,仍然繼續生產;也沒有信號量告訴它是否有消費者要訪問這塊臨界區(緩存文件),它就無所顧慮地生產完所有的商品。消費者也一樣沒有了信號量的約束,直接消費到了50號商品。

我覺得這個問題的目的在於讓我們看到沒有信號量時,消費品消費的順序很亂、重復(臟數據導致),可能我的驗證程序的設計思路與出題者的不一樣。

2. 實驗的設計者在第一次編寫生產者——消費者程序的時候,是這么做的:

Producer()
{
P(Mutex); //互斥信號量
生產一個產品item;
P(Empty); //空閑緩存資源
將item放到空閑緩存中;
V(Full); //產品資源
V(Mutex);
}

Consumer()
{
P(Mutex);
P(Full);
從緩存區取出一個賦值給item;
V(Empty);
消費產品item;
V(Mutex);
}

這樣可行嗎?如果可行,那么它和標准解法在執行效果上會有什么不同?如果不可行,那么它有什么問題使它不可行? 

不可行。

  1. 假設Producer剛生產完一件商品,釋放了Mutex,Mutex為1,此時緩存區滿了,Empty為0;
  2. 然后OS執行調度,若被Producer拿到CPU,它拿到Mutex,使Mutex為0,而Empty為0,Producer讓出CPU,等待Consumer執行V(Empty);
  3. 而Consumer拿到CPU后,卻要等待Producer執行V(Mutex);
  4. 兩者相互持有對方需要的資源,造成死鎖。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM