Java實現PV操作 | 生產者與消費者


  • 導語

在學習操作系統的過程中,PV操作是很重要的一個環節。然而面對書本上枯燥的代碼,每一個愛好技術的人總是想能親自去實現。現在我要推出一個專題,專門講述如何用Java實現PV操作,讓操作系統背后的邏輯躍然屏上。

如有錯誤,請廣大網友斧正,感激不盡!


經典問題1、生產者與消費者

 

  • PV操作數據結構的構建

 

在書本上,我們給出了一種數據結構,叫做信號量。這種信號量有兩個元素:

  一個是count,如果是正值則表示當前資源的個數,如果是0,表示有一個進程在執行臨界區的代碼(也就是說這個進程位於臨界區);並且沒有進程處於阻塞隊列中。如果是負值,這個值的絕對值(abs(count))表示阻塞隊列中進程的個數。

  一個是queue,即為阻塞進程隊列。當進程不能申請相應的資源是,則使用P操作,將自己插入阻塞隊列中。當運行的進程執行完臨界區代碼時,就執行V操作,喚醒一個阻塞隊列中的進程。

 

現在我們定義一個PV操作類:syn。在這個類中我們可以通過構造函數設置count的值。可以看到這個類中並沒有阻塞進程所在的queue,我是通過java的this.wait()this.notifyAll()來實現的。

並且,通過關鍵字【synchronized】,保證了PV操作是一條【原語】,即在運行過程中,占有完整的一個時間片,不可分割。

 1 class syn{//PV操作類
 2     int count=0;//信號量
 3     syn(){}
 4     syn(int a){count=a;}
 5     public synchronized void Wait(){ //關鍵字 synchronized 保證了此操作是一條【原語】
 6         count--;
 7         if(count<0){//等於0 :有一個進程進入了臨界區
 8             try {         //小於0:abs(count)=阻塞的進程數目
 9                 this.wait();
10             } catch (InterruptedException e) {
11                 e.printStackTrace();  
12             }  
13         }  
14     }  
15     public synchronized void Signal(){   //關鍵字 synchronized 保證了此操作是一條【原語】
16         count++;
17         if(count<=0){//如果有進程阻塞
18             this.notify(); 19         }
20     }  
21 }

 注1:P操作wait中,應為if(count<0)而不是while(count<0) 。在后續的編寫中,發現如果引入了多個生產者與消費者,用語句while(count<0) 就會出錯。

 注2:V操作signal中,應為this.notify()而不是this.notifyAll()。也就是說只需要從阻塞隊列中喚醒一個進程。


 

  • 單個生產者與消費者的模型構建

我們構建實現Runnable接口的生產者類和消費者類,使用empty(表示空緩沖區的數目)和full(表示滿緩沖區的數目)兩個信號量來實現進程的同步。(不同類型的進程共享某一資源為同步關系

代碼如下:

 

 1 public class Main {
 2 
 3     public static void main(String[] args) {
 4         Producer p=new Producer();
 5         Consumer c=new Consumer();
 6         Thread pp=new Thread(p);
 7         Thread cp=new Thread(c);
 8         pp.start();
 9         cp.start();
10     }
11 }
12 
13 class Global{
14     static syn empty=new syn(8);
15     static syn full=new syn(0);
16     static int buffer []=new int[8];//緩沖區
17 }
18 
19 //生產者類
20 class Producer implements Runnable{
21     int count=0;
22     public void run(){
23         while(count<20){
24             Global.empty.Wait();
25             //臨界區
26             int index=count%8;
27             Global.buffer[index]=count;
28             System.out.println("生產者在緩沖區"+index+"中生產了物品"+count);
29             count++;
30             try {
31                 Thread.sleep(10);
32             } catch (InterruptedException e) {
33                 // TODO Auto-generated catch block
34                 e.printStackTrace();
35             }
36             // end of 臨界區
37             Global.full.Signal();
38         }
39     }
40 }
41 
42 //消費者類
43 class Consumer implements Runnable{
44     int count=0;
45     public void run(){
46         while(count<20){
47             Global.full.Wait();
48             //臨界區
49             int index=count%8;
50             int value=Global.buffer[index];
51             System.out.println("消費者在緩沖區"+index+"中消費了物品"+value);
52             count++;
53             try {
54                 Thread.sleep(10);
55             } catch (InterruptedException e) {
56                 // TODO Auto-generated catch block
57                 e.printStackTrace();
58             }
59             // end of 臨界區
60             Global.empty.Signal();
61         }
62     }
63 }

 

運行結果:

 

 可以看出進程嚴格按照先生產再消費的順序,完美運行。


 

  • 引入多個生產者與消費者

為保證同一類進程在進行訪問時能保證互斥互斥是同類進程共享某一資源時的方式)我們引入mutex信號量。

    static syn pMutex=new syn(1);//保證生產者之間互斥
    static syn cMutex=new syn(1);//保證消費者之間互斥

 運行結果:

 

可見程序完美運行。


完整Java代碼:

  1 public class Main {
  2 
  3     public static void main(String[] args) {
  4         Producer p[]=new Producer[3];//3個生產者
  5         Consumer c[]=new Consumer[3];
  6         int i;
  7         
  8         for(i=0;i<3;i++){
  9             p[i]=new Producer(i+1);
 10         }
 11         for(i=0;i<3;i++){
 12             c[i]=new Consumer(i+1);
 13         }
 14         
 15         Thread pp[]=new Thread[3];
 16         Thread cp[]=new Thread[3];
 17         
 18         for(i=0;i<3;i++){
 19             pp[i]=new Thread(p[i]);
 20         }
 21         for(i=0;i<3;i++){
 22             cp[i]=new Thread(c[i]);
 23         }
 24         
 25         for(i=0;i<3;i++){
 26             pp[i].start();
 27         }
 28         for(i=0;i<3;i++){
 29             cp[i].start();
 30         }
 31 
 32     }
 33 }
 34 
 35 class Global{
 36     static syn empty=new syn(8);
 37     static syn full=new syn(0);
 38     static syn pMutex=new syn(1);//保證生產者之間互斥
 39     static syn cMutex=new syn(1);//保證消費者之間互斥
 40     static int buffer []=new int[8];//緩沖區
 41     static int pCount=0;
 42     static int cCount=0;
 43 }
 44 
 45 //生產者類
 46 class Producer implements Runnable{
 47     int ID=0;
 48     Producer(){}
 49     Producer(int id){ID=id;}
 50     public void run(){
 51         while(Global.pCount<20){
 52             Global.empty.Wait();
 53             Global.pMutex.Wait();
 54             //臨界區
 55             int index=Global.pCount%8;
 56             Global.buffer[index]=Global.pCount;
 57             System.out.println("生產者"+ID+"在緩沖區"+index+"中生產了物品"+Global.pCount);
 58             Global.pCount++;
 59             try {
 60                 Thread.sleep(10);
 61             } catch (InterruptedException e) {
 62                 // TODO Auto-generated catch block
 63                 e.printStackTrace();
 64             }
 65             // end of 臨界區
 66             Global.pMutex.Signal();
 67             Global.full.Signal();
 68         }
 69     }
 70 }
 71 
 72 //消費者類
 73 class Consumer implements Runnable{
 74     int ID=0;
 75     Consumer(){}
 76     Consumer(int id){ID=id;}
 77     public void run(){
 78         while(Global.cCount<20){
 79             Global.full.Wait();
 80             Global.cMutex.Wait();
 81             //臨界區
 82             int index=Global.cCount%8;
 83             int value=Global.buffer[index];
 84             System.out.println("消費者"+ID+"在緩沖區"+index+"中消費了物品"+value);
 85             Global.cCount++;
 86             try {
 87                 Thread.sleep(10);
 88             } catch (InterruptedException e) {
 89                 // TODO Auto-generated catch block
 90                 e.printStackTrace();
 91             }
 92             // end of 臨界區
 93             Global.cMutex.Signal();
 94             Global.empty.Signal();
 95         }
 96     }
 97 }
 98 
 99 class syn{//PV操作類
100     int count=0;//信號量
101     syn(){}
102     syn(int a){count=a;}
103     public synchronized void Wait(){ //關鍵字 synchronized 保證了此操作是一條【原語】
104         count--;
105         if(count<0){//等於0 :有一個進程進入了臨界區
106             try {         //小於0:abs(count)=阻塞的進程數目
107                 this.wait();
108             } catch (InterruptedException e) {
109                 e.printStackTrace();  
110             }  
111         }  
112     }  
113     public synchronized void Signal(){   //關鍵字 synchronized 保證了此操作是一條【原語】
114         count++;
115         if(count<=0){//如果有進程阻塞
116             this.notify();//All
117         }
118     }  
119 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM