操作系統課程設計 消息緩沖隊列通信


消息緩沖隊列通信機制其基本思想是根據“生產者——消費者”原理,利用內存中公用消息緩沖區實現進程間的信息交換。

在這種通信機制中,首先需要在內存中開辟若干空閑消息緩沖區,用以存放要通信的消息。每當一個進程需要向另一個進程發送消息時,便向系統申請一個空閑消息緩沖區,並把已准備好的消息復制到該緩沖區,然后把該消息緩沖區插入到接收進程的消息隊列中,最后通知接收進程。接收進程接收到發送進程發來的通知后,從本進程的消息隊列中摘下一消息緩沖區,取出其中的信息,然后把消息緩沖區作為空閑消息緩沖區歸還給系統。系統負責管理公用消息緩沖區以及消息的傳遞。

  1 // 消息緩沖隊列
  2 // 2016.1.7
  3 
  4 #include <stdlib.h>
  5 #include <dos.h>
  6 #include <stdio.h>
  7 
  8 #define GET_INDOS 0x34         /* 34H 系統功能調用    */
  9 #define GET_CRIT_ERR 0x5d06    /* 5D06H號系統功能調用 */
 10 
 11 #define BLANK -1
 12 #define FINISHED 0        /*     終止    */
 13 #define RUNNING 1         /*   執行  */
 14 #define READY 2           /*   就緒  */
 15 #define BLOCKED 3         /*   阻塞  */
 16 #define NTCB 3            /* 系統線程的最大數 */
 17 
 18 #define TL 10             /* 時間片大小  */
 19 #define NBUF 2            /* 消息緩沖區數目 */
 20 #define NTEXT 50          /* 文本輸出大小  */
 21 
 22 
 23 char far* intdos_ptr=0;
 24 char far* crit_err_ptr=0;
 25 int timecount=0;
 26 int current=-1;
 27 
 28 typedef unsigned int UINT16;
 29 
 30 typedef struct/* 信號量 */
 31 {
 32   int value;
 33   struct TCB* wq;
 34 }semaphore;
 35 
 36 semaphore mutexfb={1,NULL};        // freebuf 互斥變量     初值 1
 37 semaphore sfb={2,NULL};            // 計數信號量     
 38 semaphore bufferSem1, bufferSem2;    
 39 
 40 // 消息緩沖區 
 41 // 空閑緩沖隊列 freebuf(臨界資源)
 42 struct buffer
 43 {
 44   int sender;             /*消息發送者的標識數 */
 45   int size;               /* 消息長度<=NTEXT 個字節   */
 46   char text[NTEXT];       /* 消息正文   */
 47   struct buffer* next;    /* 指向下一個消息緩沖區的指針 */
 48 } *freebuf;
 49 
 50 /* 線程控制塊  */
 51 struct TCB
 52 {           
 53   unsigned char* stack;          /* 堆棧的起始地址  */
 54   unsigned ss;
 55   unsigned sp;            /* 堆棧段址和堆棧指針 */
 56   char state;             /* 進程狀態   */
 57   char name[10];          /* 線程的外部標識符  */
 58   int value;              /*優先級*/
 59   struct TCB* next;       /* 指向控制快指針  */
 60   struct buffer* mq;      /* 消息緩沖隊列首指針  */
 61   semaphore mutex;        /* 互斥信號量   */
 62   semaphore sm;           /* 消息緩沖隊列計數信號量*/
 63 }tcb[NTCB];
 64 
 65 /* 堆棧現場保護和恢復結構體 */
 66 struct int_regs
 67 {          
 68   unsigned BP,DI,SI,DS,ES,DX,CX,BX,AX,IP,CS,Flags,off,seg;
 69 };
 70 
 71 typedef int(far* codeptr)(void);
 72 void interrupt(*old_int8)(void);
 73 int DosBusy(void);
 74 void InitIndos(void);
 75 void InitTcb();
 76 void interrupt new_int8(void);
 77 void interrupt swtch();
 78 void send(char *receiver,char *a,int size);
 79 int receive(char *sender,char *a);
 80 void p(semaphore *sem);
 81 void v(semaphore *sem);
 82 int Create(char* name,codeptr code,int stacklen,int prio);  /* 創建線程 */
 83 void Destroy(int i);
 84 
 85 
 86 // 1#線程
 87 void f1()       
 88 {
 89 
 90   while(1)
 91   {
 92         p(&bufferSem1);
 93 
 94       send("f2","f1 send message to f2",NTEXT);
 95 
 96       printf("f1 sending!\n");
 97 
 98       v(&bufferSem2);
 99   }
100 }
101 
102 // 2#線程  
103 void f2()           
104 {
105   char a[NTEXT];
106 
107   while(1)
108   {
109         p(&bufferSem2);
110 
111       receive("f1",a);
112 
113       printf("f2 receiving!\n");
114 
115       v(&bufferSem1);
116   }
117 }
118 
119 
120 void InitInDos()      /* 取得INDOS標志和嚴重錯誤標志地址 */
121 {
122   union REGS regs;
123   struct SREGS segregs;
124 
125   regs.h.ah=GET_INDOS;      /* 使用34H號系統功能調用 */
126   intdosx(&regs,&regs,&segregs);
127 
128   intdos_ptr=MK_FP(segregs.es,regs.x.bx);
129   if(_osmajor<3)
130     crit_err_ptr=intdos_ptr+1;      /* 嚴重錯誤在INDOS后一字節處 */
131   else if(_osmajor==3&&_osminor==0)
132     crit_err_ptr=intdos_ptr-1;      /* 嚴重錯誤在INDOS前一字節處 */
133   else
134     {
135      regs.x.ax=GET_CRIT_ERR;
136      intdosx(&regs,&regs,&segregs);
137      crit_err_ptr=MK_FP(segregs.ds,regs.x.si);
138    }
139 }
140 
141 int DosBusy(void)            /* 判斷DOS是否忙 */
142 {
143   if(intdos_ptr&&crit_err_ptr)
144     return(*intdos_ptr||*crit_err_ptr);  /* DOS忙,返回嚴重錯誤標志 */
145    else
146     return(-1);         /* DOS不忙 */
147 }
148 
149 void InitTcb()           /* 初始化線程 */
150 {
151   int i;
152 
153   for(i=0;i<NTCB;i++)
154    {
155     tcb[i].state=BLANK;       /* 初始狀態標志   */
156     tcb[i].mq=NULL;
157     tcb[i].mutex.value=1;
158     tcb[i].mutex.wq=NULL;
159     tcb[i].sm.value=0;
160     tcb[i].sm.wq=NULL;
161    }
162 }
163 
164 void Destroy(int i)
165 {
166 
167  if(tcb[i].state==RUNNING)
168  {
169    disable();
170    tcb[i].state=FINISHED;
171    strcpy(tcb[i].name,NULL);
172    free(tcb[i].stack);
173    tcb[i].ss=0;
174    tcb[i].sp=0;
175    enable();
176   }
177 
178 }
179 
180 void over()
181 {
182    Destroy(current);
183    swtch();
184 }
185 
186 int Create(char *name,codeptr code,int stacklen,int value)
187 {
188    int i;
189    char *p;
190    struct int_regs *pt;
191    unsigned int *pp;
192 
193    for(i=1;i<NTCB;i++)
194    {
195      if(tcb[i].state==BLANK||tcb[i].state==FINISHED)
196          break;
197    }
198    if(i==NTCB)
199      return-1;
200 
201    tcb[i].value=value;
202    strcpy(tcb[i].name,name);
203    tcb[i].stack=(p=(unsigned char*)malloc(stacklen));
204    memset(tcb[i].stack, 0xff, stacklen);
205    p=p+stacklen;
206 
207 #if 0
208    pt=(struct int_regs*)p;
209    pt--;
210    pt->Flags=0x200;
211    pt->CS=FP_SEG(code);
212    pt->IP=FP_OFF(code);
213 
214    pt->off=FP_OFF(over);
215    pt->seg=FP_SEG(over);
216    pt->DS=_DS;
217    pt->ES=_ES;
218    tcb[i].sp=FP_OFF(pt);
219    tcb[i].ss=FP_SEG(pt);
220 #else if
221    /*
222    pp=(UINT16 *)(p-2);
223    *(pp)=FP_SEG(over);
224    *(pp-1)=FP_OFF(over);
225    *(pp-2)=0x200;
226    *(pp-3)=FP_SEG(code);
227    *(pp-4)=FP_OFF(code);
228 
229    *(pp-9)=_ES;
230    *(pp-10)=_DS;
231    tcb[i].sp=FP_OFF(pp-13);
232    tcb[i].ss=FP_SEG(pp-13);
233    */
234 
235    *(p-1)=(FP_SEG(over)&0xff00)>>8;
236    *(p-2)=FP_SEG(over)&0x00ff;
237 
238    *(p-3)=(FP_OFF(over)&0xff00)>>8;
239    *(p-4)=FP_OFF(over)&0x00ff;
240 
241    *(p-5)=0x02;
242    *(p-6)=0x00;
243 
244    *(p-7)=(FP_SEG(code)&0xff00)>>8;
245    *(p-8)=FP_SEG(code)&0x00ff;
246 
247    *(p-9)=(FP_OFF(code)&0xff00)>>8;
248    *(p-10)=FP_OFF(code)&0x00ff;
249 
250    *(p-19)=(_ES&0xff00)>>8;
251    *(p-20)=_ES&0x00ff;
252 
253    *(p-21)=(_DS&0xff00)>>8;
254    *(p-22)=_DS&0x00ff;
255 
256    tcb[i].sp=FP_OFF((UINT16 *)(p-28));
257    tcb[i].ss=FP_SEG((UINT16 *)(p-28));
258 
259 #endif
260 
261    tcb[i].state=READY;
262 
263    return i;
264 }
265 
266 void tcb_state()        /* 線程狀態信息 */
267 {
268   int i;
269 
270   for(i=0;i<NTCB;i++)
271     if(tcb[i].state!=BLANK)
272     {
273         switch(tcb[i].state)
274         {
275             case FINISHED:
276             printf("\ntcb[%d] is FINISHED\n",i);
277             break;
278 
279             case RUNNING:
280             printf("tcb[%d] is RUNNING\n",i);
281             break;
282             case READY:
283             printf("tcb[%d] is READY\n",i);
284             break;
285             case BLOCKED:
286             printf("tcb[%d] is BLOCKED\n",i);
287 
288             break;
289         }
290     }
291 }
292 
293 int all_finished()
294 {
295     int i;
296 
297     for(i=1;i<NTCB;i++)
298         if(tcb[i].state==RUNNING||tcb[i].state==BLOCKED||tcb[i].state==READY)
299             return 0;
300 
301     return 1;
302 }
303 
304 int Find()
305 {
306     int i,j;
307     i=current;
308 
309     while(tcb[i=((i+1)%NTCB)].state!=READY||i==current);
310 
311     return i;
312 }
313 
314 void interrupt new_int8(void)       /* CPU 調度*/
315 {
316     int i;
317 
318     (*old_int8)();      /* 指向原來時鍾中斷處理過程入口的中斷處理函數指針 */
319     timecount++;
320 
321     if(timecount==TL)        /* 時間片是否到? */
322     {
323         if(!DosBusy())     /* DOS是否忙? */
324         {
325             disable();
326 
327             tcb[current].ss=_SS;     /* 保存現場 */
328             tcb[current].sp=_SP;
329 
330             if(tcb[current].state==RUNNING)
331                 tcb[current].state=READY;
332 
333             i=Find();
334             if(i<0)
335                 return;
336 
337             _SS=tcb[i].ss;
338             _SP=tcb[i].sp;
339             tcb[i].state=RUNNING;
340             current=i;
341             timecount=0;      /* 重新計時 */
342 
343             enable();
344         }
345         else
346             return;
347     }
348     else
349         return;
350 }
351 
352 void interrupt swtch()            /* 其他原因CPU調度  */
353 {
354     int i;
355 
356     if(tcb[current].state!=FINISHED
357         &&current!=0&&tcb[current].state!=BLOCKED) /* 當前線程還沒結束 */
358         return;
359 
360     i=Find();
361     if(i<0)
362         return;
363 
364     disable();
365     tcb[current].ss=_SS;
366     tcb[current].sp=_SP;
367 
368     if(tcb[current].state==RUNNING)
369         tcb[current].state=READY;      /* 放入就緒隊列中 */
370 
371     _SS=tcb[i].ss;
372     _SP=tcb[i].sp;        /* 保存現場 */
373 
374     tcb[i].state=RUNNING;
375     current=i;
376     enable();
377 }
378 
379 void block(struct TCB **p)         /* 阻塞原語 */
380 {
381     struct TCB *pp;
382 
383     tcb[current].state=BLOCKED;
384 
385     if((*p)==NULL)
386         *p=&tcb[current];    /* 阻塞隊列空,直接放入 */
387     else
388     {
389         pp=*p;
390         while(pp->next)
391             pp=pp->next;         /* 找到阻塞隊列最后一個節點 */
392 
393         pp->next=&tcb[current];      /* 放入阻塞隊列 */
394     }
395     tcb[current].next=NULL;
396     swtch();       /* 重新進行CPU調度 */
397 }
398 
399 void wakeup_first(struct TCB **p)    /* 喚醒隊首線程 */
400 {
401   struct TCB *pl;
402 
403   if((*p)==NULL)
404       return;
405 
406   pl=(*p);
407   (*p)=(*p)->next;     /* 得到阻塞隊列隊首線程 */
408   pl->state=READY;        /* 修為就緒狀態 */
409   pl->next=NULL;
410 }
411 
412 void p(semaphore *sem)
413 {
414     struct TCB **qp;
415 
416     disable();
417     sem->value=sem->value-1;
418 
419     if(sem->value<0)
420     {
421         qp=&(sem->wq);
422         block(qp);
423     }
424     enable();
425 }
426 
427 void v(semaphore*sem)
428 {
429     struct TCB **qp;
430 
431     disable();
432     qp=&(sem->wq);
433     sem->value=sem->value+1;
434 
435     if(sem->value>=0)
436         wakeup_first(qp);
437 
438     enable();
439 }
440 
441 ///////////////////////////////////////////////////////////////////////////////
442 // buffer
443 struct buffer*Initbuf(void)
444 {
445   struct buffer *p,*pt,*pt2;
446   int i;
447 
448   pt2=pt=(struct buffer*)malloc(sizeof(struct buffer));
449   pt->sender=-1;
450   pt->size=0;
451   strcmp(pt->text,"");
452   pt->next=NULL;
453 
454   for(i=0;i<NBUF-1;i++)
455   {
456    p=(struct buffer*)malloc(sizeof(struct buffer));
457    p->sender=-1;
458    p->size=0;
459    p->text[NTEXT]='\0';
460    p->next=NULL;
461    pt2->next=p;
462    pt2=p;
463   }
464 
465   return pt;
466 }
467 
468 // 從空閑消息緩沖隊列隊頭上取下一緩空閑消息沖區
469 struct buffer* getbuf(void)  
470 {
471   struct buffer *buf;
472 
473   buf=freebuf;        /* 取得緩沖隊列的緩沖區*/
474   freebuf=freebuf->next;
475 
476   return(buf);        /* 返回指向該緩沖區的指針 */
477 }
478 
479 // 將buff所指的緩沖區插到*mq所指的緩沖隊列末尾
480 void insert(struct buffer **mq, struct buffer *buff)
481 {
482   struct buffer *temp;
483 
484   if(buff==NULL)
485       return;       /* buff為空 */
486 
487   buff->next=NULL;
488   if(*mq==NULL)     /* *mq為空 則直接插入*/
489     *mq=buff;
490   else
491   {
492     temp=*mq;
493     while(temp->next)      /* 找到隊尾 */
494         temp=temp->next;
495 
496     temp->next=buff;
497   }
498 }
499 
500 // 將地址a開始的size個字節發送給外部標識符為receiver的線程
501 void send(char *receiver,char *a, int size)
502 {
503     struct buffer *buff;
504     int i,id=-1;
505 
506     disable();
507     for(i=0;i<NTCB;i++)
508     {
509         if(strcmp(receiver,tcb[i].name)==0)
510         {
511             id=i;
512             break;
513         }
514     }
515 
516     if(id==-1)
517     {
518         printf("Error:Receiver not exist!\n");
519         enable();
520         return;
521     }
522 
523     p(&sfb);             
524 
525     p(&mutexfb);         
526     buff=getbuf();
527     v(&mutexfb);
528 
529     buff->sender=current;
530     buff->size=size;
531     buff->next=NULL;
532 
533     for(i=0;i<buff->size;i++,a++)
534         buff->text[i]=*a;
535 
536 // 將要發送的消息放到接收者TCB的buffer中
537     p(&tcb[id].mutex);
538     insert(&(tcb[id].mq),buff);
539     v(&tcb[id].mutex);
540 
541   // 用於同步
542     v(&tcb[id].sm);              
543     enable();
544 }
545 
546 //////////////////////////////////////////////////////////////////////////////////////////////
547 // 獲取消息緩沖區函數
548 struct buffer *remov(struct buffer **mq, int sender)   
549 {   
550   struct buffer *buff, *p, *q;   
551   q = NULL;   
552   p = *mq;   
553 
554   // 在消息緩沖區隊列中找到其他進程發送給自己的消息
555   while((p->next != NULL) && (p->sender != sender))   
556   {   
557     q = p;   
558     p = p->next;   
559   }  
560 
561   // 獲取消息后從隊列中刪除,防止重復接收
562   if(p->sender == sender)
563   {   
564     buff = p;   
565     if(q == NULL)   
566       *mq = buff->next;   
567     else   
568       q->next = buff->next;   
569 
570     buff->next = NULL;   
571     return buff;   
572   }   
573   else   
574       return NULL;   
575 }   
576 
577 // 接收原語
578 int receive(char *sender, char *b)   
579 {   
580   int i, id = -1;   
581   struct buffer *buff;   
582 
583   disable();   
584 
585   // 尋找 sender
586   for(i = 0; i < NBUF; i++)   
587   {   
588     if(strcmp(sender, tcb[i].name) == 0)
589     {   
590       id = i;   
591       break;   
592     }   
593   }   
594 
595   if(id == -1)   
596   {   
597     enable();   
598     return -1;       
599   }   
600 
601   p(&tcb[current].sm); 
602 
603   p(&tcb[current].mutex);   
604   buff = remov(&(tcb[current].mq), id);   
605   v(&tcb[current].mutex);   
606 
607   if(buff == NULL)
608   {   
609     v(&tcb[current].sm);   
610     enable();   
611     return -1;   
612   }   
613   // 將消息正文復制到接收區
614   strcpy(b, buff->text);  
615 
616   // 釋放前先把標識去掉,防止重復接收
617   buff->sender = -1;
618   // 釋放相應的消息緩沖區
619   p(&mutexfb);   
620   insert(&freebuf, buff);   
621   v(&mutexfb);  
622 
623   v(&sfb);   
624 
625   enable();   
626 
627   return buff->size;   
628 }   
629 
630 void main()
631 {
632   long i, j, k;
633 
634   bufferSem1.value = 1;
635   bufferSem1.wq = NULL;
636 
637   bufferSem2.value = 0;
638   bufferSem2.wq = NULL;
639 
640   InitInDos();
641   InitTcb();
642 
643   freebuf=Initbuf();
644   old_int8=getvect(8);
645 
646   strcpy(tcb[0].name,"main");
647   tcb[0].state=RUNNING;
648   tcb[0].value=0;
649   current=0;
650 
651   Create("f1",(codeptr)f1,1024,5);
652   Create("f2",(codeptr)f2,1024,6);
653 
654   tcb_state();
655   setvect(8,new_int8);
656 
657   while(!all_finished());
658   {
659 
660       printf("running!\n");
661 
662   }
663 
664   tcb[0].name[0]='\0';
665   tcb[0].state=FINISHED;
666   setvect(8,old_int8);
667 
668   tcb_state();
669 
670   printf("\n Muli_task system teminated \n");
671 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM