消息緩沖隊列通信機制其基本思想是根據“生產者——消費者”原理,利用內存中公用消息緩沖區實現進程間的信息交換。
在這種通信機制中,首先需要在內存中開辟若干空閑消息緩沖區,用以存放要通信的消息。每當一個進程需要向另一個進程發送消息時,便向系統申請一個空閑消息緩沖區,並把已准備好的消息復制到該緩沖區,然后把該消息緩沖區插入到接收進程的消息隊列中,最后通知接收進程。接收進程接收到發送進程發來的通知后,從本進程的消息隊列中摘下一消息緩沖區,取出其中的信息,然后把消息緩沖區作為空閑消息緩沖區歸還給系統。系統負責管理公用消息緩沖區以及消息的傳遞。
1 // 消息緩沖隊列 2 // 2016.1.7 3 4 #include <stdlib.h> 5 #include <dos.h> 6 #include <stdio.h> 7 8 #define GET_INDOS 0x34 /* 34H 系統功能調用 */ 9 #define GET_CRIT_ERR 0x5d06 /* 5D06H號系統功能調用 */ 10 11 #define BLANK -1 12 #define FINISHED 0 /* 終止 */ 13 #define RUNNING 1 /* 執行 */ 14 #define READY 2 /* 就緒 */ 15 #define BLOCKED 3 /* 阻塞 */ 16 #define NTCB 3 /* 系統線程的最大數 */ 17 18 #define TL 10 /* 時間片大小 */ 19 #define NBUF 2 /* 消息緩沖區數目 */ 20 #define NTEXT 50 /* 文本輸出大小 */ 21 22 23 char far* intdos_ptr=0; 24 char far* crit_err_ptr=0; 25 int timecount=0; 26 int current=-1; 27 28 typedef unsigned int UINT16; 29 30 typedef struct/* 信號量 */ 31 { 32 int value; 33 struct TCB* wq; 34 }semaphore; 35 36 semaphore mutexfb={1,NULL}; // freebuf 互斥變量 初值 1 37 semaphore sfb={2,NULL}; // 計數信號量 38 semaphore bufferSem1, bufferSem2; 39 40 // 消息緩沖區 41 // 空閑緩沖隊列 freebuf(臨界資源) 42 struct buffer 43 { 44 int sender; /*消息發送者的標識數 */ 45 int size; /* 消息長度<=NTEXT 個字節 */ 46 char text[NTEXT]; /* 消息正文 */ 47 struct buffer* next; /* 指向下一個消息緩沖區的指針 */ 48 } *freebuf; 49 50 /* 線程控制塊 */ 51 struct TCB 52 { 53 unsigned char* stack; /* 堆棧的起始地址 */ 54 unsigned ss; 55 unsigned sp; /* 堆棧段址和堆棧指針 */ 56 char state; /* 進程狀態 */ 57 char name[10]; /* 線程的外部標識符 */ 58 int value; /*優先級*/ 59 struct TCB* next; /* 指向控制快指針 */ 60 struct buffer* mq; /* 消息緩沖隊列首指針 */ 61 semaphore mutex; /* 互斥信號量 */ 62 semaphore sm; /* 消息緩沖隊列計數信號量*/ 63 }tcb[NTCB]; 64 65 /* 堆棧現場保護和恢復結構體 */ 66 struct int_regs 67 { 68 unsigned BP,DI,SI,DS,ES,DX,CX,BX,AX,IP,CS,Flags,off,seg; 69 }; 70 71 typedef int(far* codeptr)(void); 72 void interrupt(*old_int8)(void); 73 int DosBusy(void); 74 void InitIndos(void); 75 void InitTcb(); 76 void interrupt new_int8(void); 77 void interrupt swtch(); 78 void send(char *receiver,char *a,int size); 79 int receive(char *sender,char *a); 80 void p(semaphore *sem); 81 void v(semaphore *sem); 82 int Create(char* name,codeptr code,int stacklen,int prio); /* 創建線程 */ 83 void Destroy(int i); 84 85 86 // 1#線程 87 void f1() 88 { 89 90 while(1) 91 { 92 p(&bufferSem1); 93 94 send("f2","f1 send message to f2",NTEXT); 95 96 printf("f1 sending!\n"); 97 98 v(&bufferSem2); 99 } 100 } 101 102 // 2#線程 103 void f2() 104 { 105 char a[NTEXT]; 106 107 while(1) 108 { 109 p(&bufferSem2); 110 111 receive("f1",a); 112 113 printf("f2 receiving!\n"); 114 115 v(&bufferSem1); 116 } 117 } 118 119 120 void InitInDos() /* 取得INDOS標志和嚴重錯誤標志地址 */ 121 { 122 union REGS regs; 123 struct SREGS segregs; 124 125 regs.h.ah=GET_INDOS; /* 使用34H號系統功能調用 */ 126 intdosx(®s,®s,&segregs); 127 128 intdos_ptr=MK_FP(segregs.es,regs.x.bx); 129 if(_osmajor<3) 130 crit_err_ptr=intdos_ptr+1; /* 嚴重錯誤在INDOS后一字節處 */ 131 else if(_osmajor==3&&_osminor==0) 132 crit_err_ptr=intdos_ptr-1; /* 嚴重錯誤在INDOS前一字節處 */ 133 else 134 { 135 regs.x.ax=GET_CRIT_ERR; 136 intdosx(®s,®s,&segregs); 137 crit_err_ptr=MK_FP(segregs.ds,regs.x.si); 138 } 139 } 140 141 int DosBusy(void) /* 判斷DOS是否忙 */ 142 { 143 if(intdos_ptr&&crit_err_ptr) 144 return(*intdos_ptr||*crit_err_ptr); /* DOS忙,返回嚴重錯誤標志 */ 145 else 146 return(-1); /* DOS不忙 */ 147 } 148 149 void InitTcb() /* 初始化線程 */ 150 { 151 int i; 152 153 for(i=0;i<NTCB;i++) 154 { 155 tcb[i].state=BLANK; /* 初始狀態標志 */ 156 tcb[i].mq=NULL; 157 tcb[i].mutex.value=1; 158 tcb[i].mutex.wq=NULL; 159 tcb[i].sm.value=0; 160 tcb[i].sm.wq=NULL; 161 } 162 } 163 164 void Destroy(int i) 165 { 166 167 if(tcb[i].state==RUNNING) 168 { 169 disable(); 170 tcb[i].state=FINISHED; 171 strcpy(tcb[i].name,NULL); 172 free(tcb[i].stack); 173 tcb[i].ss=0; 174 tcb[i].sp=0; 175 enable(); 176 } 177 178 } 179 180 void over() 181 { 182 Destroy(current); 183 swtch(); 184 } 185 186 int Create(char *name,codeptr code,int stacklen,int value) 187 { 188 int i; 189 char *p; 190 struct int_regs *pt; 191 unsigned int *pp; 192 193 for(i=1;i<NTCB;i++) 194 { 195 if(tcb[i].state==BLANK||tcb[i].state==FINISHED) 196 break; 197 } 198 if(i==NTCB) 199 return-1; 200 201 tcb[i].value=value; 202 strcpy(tcb[i].name,name); 203 tcb[i].stack=(p=(unsigned char*)malloc(stacklen)); 204 memset(tcb[i].stack, 0xff, stacklen); 205 p=p+stacklen; 206 207 #if 0 208 pt=(struct int_regs*)p; 209 pt--; 210 pt->Flags=0x200; 211 pt->CS=FP_SEG(code); 212 pt->IP=FP_OFF(code); 213 214 pt->off=FP_OFF(over); 215 pt->seg=FP_SEG(over); 216 pt->DS=_DS; 217 pt->ES=_ES; 218 tcb[i].sp=FP_OFF(pt); 219 tcb[i].ss=FP_SEG(pt); 220 #else if 221 /* 222 pp=(UINT16 *)(p-2); 223 *(pp)=FP_SEG(over); 224 *(pp-1)=FP_OFF(over); 225 *(pp-2)=0x200; 226 *(pp-3)=FP_SEG(code); 227 *(pp-4)=FP_OFF(code); 228 229 *(pp-9)=_ES; 230 *(pp-10)=_DS; 231 tcb[i].sp=FP_OFF(pp-13); 232 tcb[i].ss=FP_SEG(pp-13); 233 */ 234 235 *(p-1)=(FP_SEG(over)&0xff00)>>8; 236 *(p-2)=FP_SEG(over)&0x00ff; 237 238 *(p-3)=(FP_OFF(over)&0xff00)>>8; 239 *(p-4)=FP_OFF(over)&0x00ff; 240 241 *(p-5)=0x02; 242 *(p-6)=0x00; 243 244 *(p-7)=(FP_SEG(code)&0xff00)>>8; 245 *(p-8)=FP_SEG(code)&0x00ff; 246 247 *(p-9)=(FP_OFF(code)&0xff00)>>8; 248 *(p-10)=FP_OFF(code)&0x00ff; 249 250 *(p-19)=(_ES&0xff00)>>8; 251 *(p-20)=_ES&0x00ff; 252 253 *(p-21)=(_DS&0xff00)>>8; 254 *(p-22)=_DS&0x00ff; 255 256 tcb[i].sp=FP_OFF((UINT16 *)(p-28)); 257 tcb[i].ss=FP_SEG((UINT16 *)(p-28)); 258 259 #endif 260 261 tcb[i].state=READY; 262 263 return i; 264 } 265 266 void tcb_state() /* 線程狀態信息 */ 267 { 268 int i; 269 270 for(i=0;i<NTCB;i++) 271 if(tcb[i].state!=BLANK) 272 { 273 switch(tcb[i].state) 274 { 275 case FINISHED: 276 printf("\ntcb[%d] is FINISHED\n",i); 277 break; 278 279 case RUNNING: 280 printf("tcb[%d] is RUNNING\n",i); 281 break; 282 case READY: 283 printf("tcb[%d] is READY\n",i); 284 break; 285 case BLOCKED: 286 printf("tcb[%d] is BLOCKED\n",i); 287 288 break; 289 } 290 } 291 } 292 293 int all_finished() 294 { 295 int i; 296 297 for(i=1;i<NTCB;i++) 298 if(tcb[i].state==RUNNING||tcb[i].state==BLOCKED||tcb[i].state==READY) 299 return 0; 300 301 return 1; 302 } 303 304 int Find() 305 { 306 int i,j; 307 i=current; 308 309 while(tcb[i=((i+1)%NTCB)].state!=READY||i==current); 310 311 return i; 312 } 313 314 void interrupt new_int8(void) /* CPU 調度*/ 315 { 316 int i; 317 318 (*old_int8)(); /* 指向原來時鍾中斷處理過程入口的中斷處理函數指針 */ 319 timecount++; 320 321 if(timecount==TL) /* 時間片是否到? */ 322 { 323 if(!DosBusy()) /* DOS是否忙? */ 324 { 325 disable(); 326 327 tcb[current].ss=_SS; /* 保存現場 */ 328 tcb[current].sp=_SP; 329 330 if(tcb[current].state==RUNNING) 331 tcb[current].state=READY; 332 333 i=Find(); 334 if(i<0) 335 return; 336 337 _SS=tcb[i].ss; 338 _SP=tcb[i].sp; 339 tcb[i].state=RUNNING; 340 current=i; 341 timecount=0; /* 重新計時 */ 342 343 enable(); 344 } 345 else 346 return; 347 } 348 else 349 return; 350 } 351 352 void interrupt swtch() /* 其他原因CPU調度 */ 353 { 354 int i; 355 356 if(tcb[current].state!=FINISHED 357 &¤t!=0&&tcb[current].state!=BLOCKED) /* 當前線程還沒結束 */ 358 return; 359 360 i=Find(); 361 if(i<0) 362 return; 363 364 disable(); 365 tcb[current].ss=_SS; 366 tcb[current].sp=_SP; 367 368 if(tcb[current].state==RUNNING) 369 tcb[current].state=READY; /* 放入就緒隊列中 */ 370 371 _SS=tcb[i].ss; 372 _SP=tcb[i].sp; /* 保存現場 */ 373 374 tcb[i].state=RUNNING; 375 current=i; 376 enable(); 377 } 378 379 void block(struct TCB **p) /* 阻塞原語 */ 380 { 381 struct TCB *pp; 382 383 tcb[current].state=BLOCKED; 384 385 if((*p)==NULL) 386 *p=&tcb[current]; /* 阻塞隊列空,直接放入 */ 387 else 388 { 389 pp=*p; 390 while(pp->next) 391 pp=pp->next; /* 找到阻塞隊列最后一個節點 */ 392 393 pp->next=&tcb[current]; /* 放入阻塞隊列 */ 394 } 395 tcb[current].next=NULL; 396 swtch(); /* 重新進行CPU調度 */ 397 } 398 399 void wakeup_first(struct TCB **p) /* 喚醒隊首線程 */ 400 { 401 struct TCB *pl; 402 403 if((*p)==NULL) 404 return; 405 406 pl=(*p); 407 (*p)=(*p)->next; /* 得到阻塞隊列隊首線程 */ 408 pl->state=READY; /* 修為就緒狀態 */ 409 pl->next=NULL; 410 } 411 412 void p(semaphore *sem) 413 { 414 struct TCB **qp; 415 416 disable(); 417 sem->value=sem->value-1; 418 419 if(sem->value<0) 420 { 421 qp=&(sem->wq); 422 block(qp); 423 } 424 enable(); 425 } 426 427 void v(semaphore*sem) 428 { 429 struct TCB **qp; 430 431 disable(); 432 qp=&(sem->wq); 433 sem->value=sem->value+1; 434 435 if(sem->value>=0) 436 wakeup_first(qp); 437 438 enable(); 439 } 440 441 /////////////////////////////////////////////////////////////////////////////// 442 // buffer 443 struct buffer*Initbuf(void) 444 { 445 struct buffer *p,*pt,*pt2; 446 int i; 447 448 pt2=pt=(struct buffer*)malloc(sizeof(struct buffer)); 449 pt->sender=-1; 450 pt->size=0; 451 strcmp(pt->text,""); 452 pt->next=NULL; 453 454 for(i=0;i<NBUF-1;i++) 455 { 456 p=(struct buffer*)malloc(sizeof(struct buffer)); 457 p->sender=-1; 458 p->size=0; 459 p->text[NTEXT]='\0'; 460 p->next=NULL; 461 pt2->next=p; 462 pt2=p; 463 } 464 465 return pt; 466 } 467 468 // 從空閑消息緩沖隊列隊頭上取下一緩空閑消息沖區 469 struct buffer* getbuf(void) 470 { 471 struct buffer *buf; 472 473 buf=freebuf; /* 取得緩沖隊列的緩沖區*/ 474 freebuf=freebuf->next; 475 476 return(buf); /* 返回指向該緩沖區的指針 */ 477 } 478 479 // 將buff所指的緩沖區插到*mq所指的緩沖隊列末尾 480 void insert(struct buffer **mq, struct buffer *buff) 481 { 482 struct buffer *temp; 483 484 if(buff==NULL) 485 return; /* buff為空 */ 486 487 buff->next=NULL; 488 if(*mq==NULL) /* *mq為空 則直接插入*/ 489 *mq=buff; 490 else 491 { 492 temp=*mq; 493 while(temp->next) /* 找到隊尾 */ 494 temp=temp->next; 495 496 temp->next=buff; 497 } 498 } 499 500 // 將地址a開始的size個字節發送給外部標識符為receiver的線程 501 void send(char *receiver,char *a, int size) 502 { 503 struct buffer *buff; 504 int i,id=-1; 505 506 disable(); 507 for(i=0;i<NTCB;i++) 508 { 509 if(strcmp(receiver,tcb[i].name)==0) 510 { 511 id=i; 512 break; 513 } 514 } 515 516 if(id==-1) 517 { 518 printf("Error:Receiver not exist!\n"); 519 enable(); 520 return; 521 } 522 523 p(&sfb); 524 525 p(&mutexfb); 526 buff=getbuf(); 527 v(&mutexfb); 528 529 buff->sender=current; 530 buff->size=size; 531 buff->next=NULL; 532 533 for(i=0;i<buff->size;i++,a++) 534 buff->text[i]=*a; 535 536 // 將要發送的消息放到接收者TCB的buffer中 537 p(&tcb[id].mutex); 538 insert(&(tcb[id].mq),buff); 539 v(&tcb[id].mutex); 540 541 // 用於同步 542 v(&tcb[id].sm); 543 enable(); 544 } 545 546 ////////////////////////////////////////////////////////////////////////////////////////////// 547 // 獲取消息緩沖區函數 548 struct buffer *remov(struct buffer **mq, int sender) 549 { 550 struct buffer *buff, *p, *q; 551 q = NULL; 552 p = *mq; 553 554 // 在消息緩沖區隊列中找到其他進程發送給自己的消息 555 while((p->next != NULL) && (p->sender != sender)) 556 { 557 q = p; 558 p = p->next; 559 } 560 561 // 獲取消息后從隊列中刪除,防止重復接收 562 if(p->sender == sender) 563 { 564 buff = p; 565 if(q == NULL) 566 *mq = buff->next; 567 else 568 q->next = buff->next; 569 570 buff->next = NULL; 571 return buff; 572 } 573 else 574 return NULL; 575 } 576 577 // 接收原語 578 int receive(char *sender, char *b) 579 { 580 int i, id = -1; 581 struct buffer *buff; 582 583 disable(); 584 585 // 尋找 sender 586 for(i = 0; i < NBUF; i++) 587 { 588 if(strcmp(sender, tcb[i].name) == 0) 589 { 590 id = i; 591 break; 592 } 593 } 594 595 if(id == -1) 596 { 597 enable(); 598 return -1; 599 } 600 601 p(&tcb[current].sm); 602 603 p(&tcb[current].mutex); 604 buff = remov(&(tcb[current].mq), id); 605 v(&tcb[current].mutex); 606 607 if(buff == NULL) 608 { 609 v(&tcb[current].sm); 610 enable(); 611 return -1; 612 } 613 // 將消息正文復制到接收區 614 strcpy(b, buff->text); 615 616 // 釋放前先把標識去掉,防止重復接收 617 buff->sender = -1; 618 // 釋放相應的消息緩沖區 619 p(&mutexfb); 620 insert(&freebuf, buff); 621 v(&mutexfb); 622 623 v(&sfb); 624 625 enable(); 626 627 return buff->size; 628 } 629 630 void main() 631 { 632 long i, j, k; 633 634 bufferSem1.value = 1; 635 bufferSem1.wq = NULL; 636 637 bufferSem2.value = 0; 638 bufferSem2.wq = NULL; 639 640 InitInDos(); 641 InitTcb(); 642 643 freebuf=Initbuf(); 644 old_int8=getvect(8); 645 646 strcpy(tcb[0].name,"main"); 647 tcb[0].state=RUNNING; 648 tcb[0].value=0; 649 current=0; 650 651 Create("f1",(codeptr)f1,1024,5); 652 Create("f2",(codeptr)f2,1024,6); 653 654 tcb_state(); 655 setvect(8,new_int8); 656 657 while(!all_finished()); 658 { 659 660 printf("running!\n"); 661 662 } 663 664 tcb[0].name[0]='\0'; 665 tcb[0].state=FINISHED; 666 setvect(8,old_int8); 667 668 tcb_state(); 669 670 printf("\n Muli_task system teminated \n"); 671 }
