生產者消費者問題Java三種實現


生產者-消費者Java實現

2017-07-27

1 概述


 

生產者消費者問題是多線程的一個經典問題,它描述是有一塊緩沖區作為倉庫,生產者可以將產品放入倉庫,消費者則可以從倉庫中取走產品。

解決生產者/消費者問題的方法可分為兩類:

  • 采用某種機制保護生產者和消費者之間的同步;
  • 在生產者和消費者之間建立一個管道。

第一種方式有較高的效率,並且易於實現,代碼的可控制性較好,屬於常用的模式。第二種管道緩沖區不易控制,被傳輸數據對象不易於封裝等,實用性不強。

在Java中有四種方法支持同步,其中前三個是同步方法,一個是管道方法。

  • wait() / notify()方法
  • await() / signal()方法
  • BlockingQueue阻塞隊列方法
  • PipedInputStream / PipedOutputStream

本文只介紹前三種。

2 實現


 

2.1 wait() / notify()方法

wait() / nofity()方法是基類Object的兩個方法:

  • wait()方法:當緩沖區已滿/空時,生產者/消費者線程停止自己的執行,放棄鎖,使自己處於等等狀態,讓其他線程執行。
  • notify()方法:當生產者/消費者向緩沖區放入/取出一個產品時,向其他等待的線程發出可執行的通知,同時放棄鎖,使自己處於等待狀態。

緩沖區Storage.java代碼如下:

import java.util.LinkedList;

public class Storage
{
    // 倉庫最大存儲量
    private final int MAX_SIZE = 100;

    // 倉庫存儲的載體
    private LinkedList<Object> list = new LinkedList<Object>();

    // 生產產品
    public void produce(String producer)
    {
        synchronized (list)
        {
            // 如果倉庫已滿
            while (list.size() == MAX_SIZE)
            {
                System.out.println("倉庫已滿,【"+producer+"】: 暫時不能執行生產任務!");
                try
                {
                    // 由於條件不滿足,生產阻塞
                    list.wait();
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }

            // 生產產品            
            list.add(new Object());            

            System.out.println("【"+producer+"】:生產了一個產品\t【現倉儲量為】:" + list.size());

            list.notifyAll();
        }
    }

    // 消費產品
    public void consume(String consumer)
    {
        synchronized (list)
        {
            //如果倉庫存儲量不足
            while (list.size()==0)
            {
                System.out.println("倉庫已空,【"+consumer+"】: 暫時不能執行消費任務!");
                try
                {
                    // 由於條件不滿足,消費阻塞
                    list.wait();
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
            
            list.remove();
            System.out.println("【"+consumer+"】:消費了一個產品\t【現倉儲量為】:" + list.size());
            list.notifyAll();
        }
    }

    public LinkedList<Object> getList()
    {
        return list;
    }

    public void setList(LinkedList<Object> list)
    {
        this.list = list;
    }

    public int getMAX_SIZE()
    {
        return MAX_SIZE;
    }
}
View Code

Test.java

public class Test {
    public static void main(String[] args)
    {
        Storage storage=new Storage();

        for(int i=1;i<6;i++)
        {
            int finalI = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    storage.produce(String.format("生成者%d:", finalI));
                }
            }).start();
        }

        for(int i=1;i<4;i++)
        {
            int finalI = i;
            new Thread(()-> storage.consume(String.format("消費者%d:", finalI))).start();
        }
    }
}
View Code

結果如下:

倉庫已空,【消費者1】: 暫時不能執行消費任務!
【生產者3】:生產了一個產品    【現倉儲量為】:1
【消費者2】:消費了一個產品    【現倉儲量為】:0
倉庫已空,【消費者3】: 暫時不能執行消費任務!
【生產者1】:生產了一個產品    【現倉儲量為】:1
【生產者4】:生產了一個產品    【現倉儲量為】:2
【生產者2】:生產了一個產品    【現倉儲量為】:3
【生產者5】:生產了一個產品    【現倉儲量為】:4
【消費者1】:消費了一個產品    【現倉儲量為】:3
【消費者3】:消費了一個產品    【現倉儲量為】:2

2.2 await() / signal()方法

await()和signal()的功能基本上和wait() / nofity()相同,完全可以取代它們,但是它們和新引入的鎖定機制Lock直接掛鈎,具有更大的靈活性。通過在Lock對象上調用newCondition()方法,將條件變量和一個鎖對象進行綁定,進而控制並發程序訪問競爭資源的安全。

緩沖區Storage.java代碼如下:

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Storage {
    // 倉庫最大存儲量
    private final int MAX_SIZE = 100;

    // 倉庫存儲的載體
    private LinkedList<Object> list = new LinkedList<Object>();
    //
    private final Lock lock = new ReentrantLock();

    // 倉庫滿的條件變量
    private final Condition full = lock.newCondition();

    // 倉庫空的條件變量
    private final Condition empty = lock.newCondition();

    // 生產產品
    public void produce(String producer) {
        lock.lock();
        // 如果倉庫已滿
        while (list.size() == MAX_SIZE) {
            System.out.println("倉庫已滿,【" + producer + "】: 暫時不能執行生產任務!");
            try {
                // 由於條件不滿足,生產阻塞
                full.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 生產產品
        list.add(new Object());

        System.out.println("【" + producer + "】:生產了一個產品\t【現倉儲量為】:" + list.size());

        empty.signalAll();

        // 釋放鎖
        lock.unlock();

    }

    // 消費產品
    public void consume(String consumer) {
        // 獲得鎖
        lock.lock();

        // 如果倉庫存儲量不足
        while (list.size() == 0) {
            System.out.println("倉庫已空,【" + consumer + "】: 暫時不能執行消費任務!");
            try {
                // 由於條件不滿足,消費阻塞
                empty.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        list.remove();
        System.out.println("【" + consumer + "】:消費了一個產品\t【現倉儲量為】:" + list.size());
        full.signalAll();
        
        // 釋放鎖
        lock.unlock();

    }

    public LinkedList<Object> getList() {
        return list;
    }

    public void setList(LinkedList<Object> list) {
        this.list = list;
    }

    public int getMAX_SIZE() {
        return MAX_SIZE;
    }
}
View Code

2.3 BlockingQueue

它是一個已經在內部實現了同步的隊列,實現方式采用的是我們第2種await() / signal()方法。它可以在生成對象時指定容量大小。它用於阻塞操作的是put()和take()方法:

put()方法:類似於我們上面的生產者線程,容量達到最大時,自動阻塞。

take()方法:類似於我們上面的消費者線程,容量為0時,自動阻塞。

import java.util.concurrent.LinkedBlockingQueue;

public class Storage {
    // 倉庫最大存儲量
    private final int MAX_SIZE = 100;

    // 倉庫存儲的載體
    private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<Object>(100);  

    // 生產產品
    public void produce(String producer) {
        // 如果倉庫已滿
        if (list.size() == MAX_SIZE) {
            System.out.println("倉庫已滿,【" + producer + "】: 暫時不能執行生產任務!");            
        }

        // 生產產品
        try {
            list.put(new Object());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.out.println("【" + producer + "】:生產了一個產品\t【現倉儲量為】:" + list.size());
    }

    // 消費產品
    public void consume(String consumer) {
        // 如果倉庫存儲量不足
        if (list.size() == 0) {
            System.out.println("倉庫已空,【" + consumer + "】: 暫時不能執行消費任務!");            
        }

        try {
            list.take();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("【" + consumer + "】:消費了一個產品\t【現倉儲量為】:" + list.size());        

    }

    public LinkedBlockingQueue<Object> getList() {
        return list;
    }

    public void setList(LinkedBlockingQueue<Object> list) {
        this.list = list;
    }
    public int getMAX_SIZE() {
        return MAX_SIZE;
    }
}
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM