生產者消費者模型Java實現


生產者消費者問題是研究多線程程序時繞不開的經典問題之一。

問題描述如下。使用一個商品的緩存池用來存放商品。當池子滿時,生產者不能往池子里加入商品;當池子空時,消費者不能從池子中取得商品。


使用Object的方法 wait() notify()/notifyAll()實現

獲取鎖和釋放鎖都是針對Object而言的,而和線程無關。試想如果和線程相關,那么一個線程就無法使用多個鎖。

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class Pool {

    private int MAX;
    private int cnt = 0;

    public Pool(int MAX) {
        this.MAX = MAX;
    }

    public synchronized void produce() throws InterruptedException {
        while (cnt == MAX) {
            wait();
        }
        System.out.println("Produce one.. Now:" + ++cnt);
        notify();
    }

    public synchronized void consume() throws InterruptedException {
        while (cnt == 0) {
            wait();
        }
        System.out.println("Consume one.. Now:" + --cnt);
        notifyAll();
    }

    public static void main(String[] args) {
        Pool pool = new Pool(6);
        Executor executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                try {
                    pool.produce();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                try {
                    pool.consume();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

使用 wait() notify()/notifyAll()的缺點在於在生產者喚醒消費者或者消費者喚醒生產者時,由於生產者和消費者使用同一個鎖,所以生產者也會將生產者喚醒,消費者也會將消費者喚醒。(這一點也被字節跳動的面試官問到啦TAT)

舉例:假設現在池子滿了,然后有3個生產者被阻塞,現在一個消費者拿走一個item,調用notify,此時一個被阻塞的生產者被喚醒了。這個生產者向池子里放入一個產品,並執行notify意圖喚醒被阻塞的消費者,此時這個notify有可能喚醒另外2個被阻塞的生產者中的一個。

Condition可以指定多個等待的條件,因此使用Condition可以解決這一問題。


使用ReentrantLock和Condition的await() signal()/signalAll()實現

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Pool {
    private int capacity;
    private Object[] queue;
    private int count;
    private Lock lock = new ReentrantLock();
    private Condition prod = lock.newCondition();
    private Condition cons = lock.newCondition();

    public Pool(int capacity) {
        this.capacity = capacity;
        queue = new Object[capacity];
        count = 0;
    }

    public void produce(Object o) throws InterruptedException {
        lock.lock();
        try {
            while (count == capacity) {
                prod.await();
            }
            queue[count++] = o;
            System.out.println(Thread.currentThread().getName()
                    + " produce " + o.toString() + ". current count: " + count);
            cons.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object consume() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                cons.await();
            }
            Object res = queue[--count];
            queue[count] = null;
            System.out.println(Thread.currentThread().getName()
                    + " consume " + res.toString() + ". current count: " + count);
            prod.signal();
            return res;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        Pool pool = new Pool(10);
        ExecutorService executorService = Executors.newCachedThreadPool();
        int cnt = 20;
        while (cnt-- > 0) {
            executorService.execute(() -> {
                try {
                    pool.produce(new Object());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            executorService.execute(() -> {
                try {
                    pool.consume();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM